世界杯logo

线程池、进程池、回调函数(add_done_callback)、map

池concurrent.futrues

什么是池?

要在程序开始的时候,还没提交任务,先创建几个线程或者进程放在一个池子里,这就是池

为什么要用池?

如果先开好进程/或线程,那么有任务之后就可以直接使用这个池中的数据了

开好的进程或线程会一直存在池中,可以被多个任务反复利用,这样极大的减少了开启,关闭,调度线程、进程的时间开销

池中的进程/线程个数控制了操作系统需要调度的任务个数,控制池中的单位,有利于提高操作系统的效率, 减轻操作系统的负担

线程池

线程池里面推荐的线程个数(一般根据IO的比例定制):cpu_count*5

代码:

from threading import current_thread

from concurrent.futures import ThreadPoolExecutor

import time

import random

def func():

print(current_thread().ident,'start') #从内部获取开启进程的个数

time.sleep(random.randint(1,4))

print(current_thread().ident, 'end')

tp=ThreadPoolExecutor(4) #起了4个线程

for i in range(10):

tp.submit(func) #submit向池中提交任务

#得:

6876 start

13340 start

11760 start

6080 start

6876 end

6876 start

6080 end

6080 start

11760 end

11760 start

6876 end

6876 start

13340 end

13340 start

11760 end

11760 start

11760 end

6080 end

带参数

from threading import current_thread

from concurrent.futures import ThreadPoolExecutor

import time

import random

def func(a,b):

print(current_thread().ident,'start',a,b) #从内部获取开启进程的个数

time.sleep(random.randint(1,4))

print(current_thread().ident, 'end')

tp=ThreadPoolExecutor(4) #起了4个线程

for i in range(10):

tp.submit(func,i,i+1)

#得:

8624 start 0 1

7884 start 1 2

7844 start 2 3

11832 start 3 4

7844 end

7844 start 4 5

8624 end

8624 start 5 6

7884 end

7884 start 6 7

11832 end

11832 start 7 8

7844 end

7844 start 8 9

7884 end

7884 start 9 10

11832 end

8624 end

7884 end

7844 end

进程池

运用场景:高计算的时候,没有io(没有文件操作,没有数据库操作,没有网络操作,没有input)

进程池里面推荐的进程个数:cpu_count*1<进程个数

代码:

import os

from concurrent.futures import ProcessPoolExecutor

import time

import random

def func(a, b):

print(os.getpid(), 'start', a, b) # 从内部获取开启进程的个数

time.sleep(random.randint(1, 4))

print(os.getpid(), 'end')

if __name__ == '__main__':

tp = ProcessPoolExecutor(4) # 起了4个线程

for i in range(10):

tp.submit(func, i, i + 1)

#得

7836 start 0 1

8888 start 1 2

11916 start 2 3

14660 start 3 4

11916 end

11916 start 4 5

7836 end

7836 start 5 6

14660 end

14660 start 6 7

7836 end

8888 end

7836 start 7 8

8888 start 8 9

14660 end

14660 start 9 10

11916 end

7836 end

8888 end

14660 end

获取任务结果

from concurrent.futures import ProcessPoolExecutor

def func(a, b):

return a*b

if __name__ == '__main__':

tp = ProcessPoolExecutor(4) # 起了4个线程

future_l={}

for i in range(10): #异步非阻塞

ret=tp.submit(func, i, i + 1)

future_l[i]=ret

# print(ret) #Future未来对象

# print(ret.result()) #缺点:慢,效率低

for key in future_l: #同步阻塞

print(key,future_l[key].result())

回调函数(add_done_callback)

回调函数(异步阻塞),给ret对象绑定一个回调函数。等ret对应的任务有了结果之后,立即调用print_func这个函数,就可以 立即对函数进行处理,而不是按照顺序接受结果,处理结果

代码

from concurrent.futures import ProcessPoolExecutor

import os

import time

import random

def func(a, b):

print(os.getpid(), 'start', a, b) # 从内部获取开启进程的个数

time.sleep(random.randint(1, 4))

print(os.getpid(), 'end')

return a*b

def print_func(ret):

print(ret.result())

if __name__ == '__main__':

tp = ProcessPoolExecutor(4) # 起了4个线程

for i in range(10): #异步非阻塞

ret=tp.submit(func, i, i + 1)

ret.add_done_callback(print_func)

#异步阻塞,回调函数,给ret对象绑定一个回调函数。

# 等ret对应的任务有了结果之后,立即调用print_func这个函数

#就可以立即对函数进行处理,而不是按照顺序接受结果,处理结果

#得:

15284 start 0 1

7540 start 1 2

544 start 2 3

6880 start 3 4

7540 end

7540 start 4 5

2

15284 end

544 end

544 start 5 6

6

15284 start 6 7

0

6880 end

6880 start 7 8

12

15284 end

15284 start 8 9

42

7540 end

7540 start 9 10

20

544 end

30

7540 end

90

15284 end

6880 end

72

56

map

map函数只适合传递简单的参数,并且必须是一个可迭代的数据类型作为参数

from concurrent.futures import ProcessPoolExecutor

def func(a):

b=a+1

return a*b

if __name__ == '__main__':

tp = ProcessPoolExecutor(4) # 起了4个线程

tm=tp.map(func,range(20)) #使用map将一个可迭代对象的元素依次传入函数

print(list(tm))

#得:

[0, 2, 6, 12, 20, 30, 42, 56, 72, 90, 110, 132, 156, 182, 210, 240, 272, 306, 342, 380]