Python并发复习3 - 多进程模块 multiprocessing
Python标准库为我们提供了threading(多线程模块)和multiprocessing(多进程模块)。从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了 ThreadPoolExecutor 和 ProcessPoolExecutor 两个类,实现了对threading和multiprocessing的更高级的抽象,对编写线程池/进程池提供了直接的支持。
核心原理是:concurrent.futures会以子进程的形式,平行的运行多个python解释器,从而令python程序可以利用多核CPU来提升执行速度。由于子进程与主解释器相分离,所以他们的全局解释器锁也是相互独立的。每个子进程都能够完整的使用一个CPU内核,可以利用multiprocessing实现真正的平行计算。
一 、进程的调用
1.1 函数式调用
1 from multiprocessing import Process
2 import time
3 def f(name):
4 time.sleep(1)
5 print('hello', name,time.ctime())
6
7 if __name__ == '__main__':
8 p_list=[]
9 for i in range(3):
10 p = Process(target=f, args=('alvin',))
11 p_list.append(p)
12 p.start()
13 for i in p_list:
14 p.join()
15 print('end')
1.2 类调用
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self):
super(MyProcess, self).__init__()
#self.name = name
def run(self):
time.sleep(1)
print ('hello', self.name,time.ctime())
if __name__ == '__main__':
p_list=[]
for i in range(3):
p = MyProcess()
p.start()
p_list.append(p)
for p in p_list:
p.join()
print('end')
二 、Process类
构造方法:
Process([group [, target [, name [, args [, kwargs]]]]])
group: 线程组,目前还没有实现,库引用中提示必须是None;
target: 要执行的方法;
name: 进程名;
args/kwargs: 要传入方法的参数。
实例方法:
is_alive():返回进程是否在运行。
join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。
start():进程准备就绪,等待CPU调度
run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。
terminate():不管任务是否完成,立即停止工作进程
属性:
daemon:和线程的setDeamon功能一样
name:进程名字。
pid:进程号。
三 、进程间通讯
1、进程对列Queue
---------- 一个流水线,各个工人共享主线程流水线产品队列数据
2、 管道pipe
1 from multiprocessing import Process, Pipe
2
3 def func(contact):
4 contact.send("这是管道测试信息")
5 contact.close()
6
7 if __name__ == '__main__':
8 a_con, b_con = Pipe()
9 p = Process(target=func, args=(a_con,))
10 print(b_con.recv())
11 b_con.send("管道返回信息")
3、manage
--- Manager是一种较为高级的多进程通信方式,它能支持Python支持的的任何数据结构,适用于多个进程不是源于同一个父进程的情形。
原理是:先启动一个ManagerServer进程,这个进程是阻塞的,它监听一个socket,然后其他进程(ManagerClient)通过socket来连接到ManagerServer,实现通信。
1 from multiprocessing import Process, Manager 2 from time import sleep 3 4 5 def thread_a_main(sync_data_pool): # A 进程主函数,存入100+的数 6 for ix in range(100, 105): 7 sleep(1) 8 sync_data_pool.append(ix) 9 10 11 def thread_b_main(sync_data_pool): # B 进程主函数,存入300+的数 12 for ix in range(300, 309): 13 sleep(0.6) 14 sync_data_pool.append(ix) 15 16 17 def _test_case_000(): # 测试用例 18 manager = Manager() # multiprocessing 中的 Manager 是一个工厂方法,直接获取一个 SyncManager 的实例 19 sync_data_pool = manager.list() # 利用 SyncManager 的实例来创建同步数据池 20 Process(target=thread_a_main, args=(sync_data_pool, )).start() # 创建并启动 A 进程 21 Process(target=thread_b_main, args=(sync_data_pool, )).start() # 创建并启动 B 进程 22 for ix in range(6): # C 进程(主进程)中实时的去查看数据池中的数据 23 sleep(1) 24 print(sync_data_pool) 25 26 27 if '__main__' == __name__: 28 _test_case_000()
四 、进程同步
1 from multiprocessing import Process, Lock
2
3 def f(l, i):
4
5 with l.acquire():
6 print('hello world %s'%i)
7
8 if __name__ == '__main__':
9 lock = Lock()
10
11 for num in range(10):
12 Process(target=f, args=(lock, num)).start()
