本文最后更新于:2024年5月7日 下午
本文介绍Python多进程模块。
概述
Python中的多进程是通过multiprocessing包来实现的,和多线程的threading.Thread差不多,它可以利用multiprocessing.Process对象来创建一个进程对象。这个进程对象的方法和线程对象的方法差不多也有start(), run(), join()等方法,其中有一个方法不同Thread线程对象中的守护线程方法是setDeamon,而Process进程对象的守护进程是通过设置deamon属性来完成的。
Python多进程实现
方法一
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from multiprocessing import Processdef fun1 (name ): print ('测试%s多进程' %name)if __name__ == '__main__' : process_list = [] for i in range (5 ): p = Process(target=fun1,args=('Python' ,)) p.start() process_list.append(p) for i in process_list: p.join() print ('结束测试' )
结果
1 2 3 4 5 6 7 8 测试Python多进程 测试Python多进程 测试Python多进程 测试Python多进程 测试Python多进程 结束测试 Process finished with exit code 0
上面的代码开启了5个子进程去执行函数,我们可以观察结果,是同时打印的,这里实现了真正的并行操作,就是多个CPU同时执行任务。我们知道进程是python中最小的资源分配单元,也就是进程中间的数据,内存是不共享的,每启动一个进程,都要独立分配资源和拷贝访问的数据,所以进程的启动和销毁的代价是比较大了,所以在实际中使用多进程,要根据服务器的配置来设定。
方法二
还记得python多线程的第二种实现方法吗?是通过类继承的方法来实现的,python多进程的第二种实现方式也是一样的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from multiprocessing import Processclass MyProcess (Process ): def __init__ (self,name ): super (MyProcess,self).__init__() self.name = name def run (self ): print ('测试%s多进程' % self.name)if __name__ == '__main__' : process_list = [] for i in range (5 ): p = MyProcess('Python' ) p.start() process_list.append(p) for i in process_list: p.join() print ('结束测试' )
结果
1 2 3 4 5 6 7 8 测试Python多进程 测试Python多进程 测试Python多进程 测试Python多进程 测试Python多进程 结束测试 Process finished with exit code 0
效果和第一种方式一样。
我们可以看到Python多进程的实现方式和多线程的实现方式几乎一样。
Process类的其他方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 构造方法: Process([group [, target [, name [, args [, kwargs]]]]]) group: 线程组 target: 要执行的方法 name: 进程名 args/kwargs: 要传入方法的参数 实例方法: is_alive():返回进程是否在运行,bool类型。 join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。 start():进程准备就绪,等待CPU调度 run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。 terminate():不管任务是否完成,立即停止工作进程 属性: daemon:和线程的setDeamon功能一样 name:进程名字 pid:进程号
关于join,daemon的使用和python多线程一样,这里就不在复述了,大家可以看看以前的python多线程系列文章。
Python多线程的通信
进程是系统独立调度核分配系统资源(CPU、内存)的基本单位,进程之间是相互独立的,每启动一个新的进程相当于把数据进行了一次克隆,子进程里的数据修改无法影响到主进程中的数据,不同子进程之间的数据也不能共享,这是多进程在使用中与多线程最明显的区别。但是难道Python多进程中间难道就是孤立的吗?当然不是,python也提供了多种方法实现了多进程中间的通信和数据共享(可以修改一份数据)
进程队列Queue
Queue在多线程中也说到过,在生成者消费者模式中使用,是线程安全的,是生产者和消费者中间的数据管道,那在python多进程中,它其实就是进程之间的数据管道,实现进程通信。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 from multiprocessing import Process,Queuedef fun1 (q,i ): print ('子进程%s 开始put数据' %i) q.put('我是%s 通过Queue通信' %i)if __name__ == '__main__' : q = Queue() process_list = [] for i in range (3 ): p = Process(target=fun1,args=(q,i,)) p.start() process_list.append(p) for i in process_list: p.join() print ('主进程获取Queue数据' ) print (q.get()) print (q.get()) print (q.get()) print ('结束测试' )
结果
1 2 3 4 5 6 7 8 9 10 子进程0 开始put数据 子进程1 开始put数据 子进程2 开始put数据 主进程获取Queue数据 我是0 通过Queue通信 我是1 通过Queue通信 我是2 通过Queue通信 结束测试 Process finished with exit code 0
上面的代码结果可以看到我们主进程中可以通过Queue获取子进程中put的数据,实现进程间的通信。
管道Pipe
管道Pipe和Queue的作用大致差不多,也是实现进程间的通信,下面之间看怎么使用吧
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from multiprocessing import Process, Pipedef fun1 (conn ): print ('子进程发送消息:' ) conn.send('你好主进程' ) print ('子进程接受消息:' ) print (conn.recv()) conn.close()if __name__ == '__main__' : conn1, conn2 = Pipe() p = Process(target=fun1, args=(conn2,)) p.start() print ('主进程接受消息:' ) print (conn1.recv()) print ('主进程发送消息:' ) conn1.send("你好子进程" ) p.join() print ('结束测试' )
结果
1 2 3 4 5 6 7 8 9 主进程接受消息: 子进程发送消息: 子进程接受消息: 你好主进程 主进程发送消息: 你好子进程 结束测试 Process finished with exit code 0
上面可以看到主进程和子进程可以相互发送消息
Managers
Queue和Pipe只是实现了数据交互,并没实现数据共享,即一个进程去更改另一个进程的数据。那么久要用到Managers
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 from multiprocessing import Process, Managerdef fun1 (dic,lis,index ): dic[index] = 'a' dic['2' ] = 'b' lis.append(index) if __name__ == '__main__' : with Manager() as manager: dic = manager.dict () l = manager.list (range (5 )) process_list = [] for i in range (10 ): p = Process(target=fun1, args=(dic,l,i)) p.start() process_list.append(p) for res in process_list: res.join() print (dic) print (l)
结果:
1 2 {0: 'a', '2': 'b', 3: 'a', 1: 'a', 2: 'a', 4: 'a', 5: 'a', 7: 'a', 6: 'a', 8: 'a', 9: 'a'} [0, 1, 2, 3, 4, 0, 3, 1, 2, 4, 5, 7, 6, 8, 9]
可以看到主进程定义了一个字典和一个列表,在子进程中,可以添加和修改字典的内容,在列表中插入新的数据,实现进程间的数据共享,即可以共同修改同一份数据
进程锁
在多进程程序运行时,可能会有部分内存或硬盘可以同时被多个进程操作,如果不小心管理很可能导致冲突。此时可以用进程锁有效解决问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 from multiprocessing import Process, Managerdef fun1 (dic,lis,index, lock ): lock.acquire() dic[index] = 'a' dic['2' ] = 'b' lis.append(index) lock.release() if __name__ == '__main__' : with Manager() as manager: dic = manager.dict () l = manager.list (range (5 )) lock = manager.Lock() process_list = [] for i in range (10 ): p = Process(target=fun1, args=(dic,l,i,lock)) p.start() process_list.append(p) for res in process_list: res.join() print (dic) print (l)
进程池
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。就是固定有几个进程可以使用。
常规进程池
进程池中有两个方法:
apply:同步,一般不使用
apply_async:异步
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from multiprocessing import Process,Poolimport os, time, randomdef fun1 (name ): print ('Run task %s (%s)...' % (name, os.getpid())) start = time.time() time.sleep(random.random() * 3 ) end = time.time() print ('Task %s runs %0.2f seconds.' % (name, (end - start)))if __name__=='__main__' : pool = Pool(5 ) for i in range (10 ): pool.apply_async(func=fun1, args=(i,)) pool.close() pool.join() print ('结束测试' )
结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 Run task 0 (37476)... Run task 1 (4044)... Task 0 runs 0.03 seconds. Run task 2 (37476)... Run task 3 (17252)... Run task 4 (16448)... Run task 5 (24804)... Task 2 runs 0.27 seconds. Run task 6 (37476)... Task 1 runs 0.58 seconds. Run task 7 (4044)... Task 3 runs 0.98 seconds. Run task 8 (17252)... Task 5 runs 1.13 seconds. Run task 9 (24804)... Task 6 runs 1.46 seconds. Task 4 runs 2.73 seconds. Task 8 runs 2.18 seconds. Task 7 runs 2.93 seconds. Task 9 runs 2.93 seconds. 结束测试
对Pool
对象调用join()
方法会等待所有子进程执行完毕,调用join()
之前必须先调用close()
,调用close()
之后就不能继续添加新的Process
了。
进程池map方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import os import PIL from multiprocessing import Pool from PIL import Image SIZE = (75 ,75 ) SAVE_DIRECTORY = \'thumbs\' def get_image_paths(folder): return (os.path.join(folder, f) for f in os.listdir(folder) if \'jpeg\' in f) def create_thumbnail(filename): im = Image.open(filename) im.thumbnail(SIZE, Image.ANTIALIAS) base, fname = os.path.split(filename) save_path = os.path.join(base, SAVE_DIRECTORY, fname) im.save(save_path) if __name__ == \'__main__\': folder = os.path.abspath( \'11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840\') os.mkdir(os.path.join(folder, SAVE_DIRECTORY)) images = get_image_paths(folder) pool = Pool() pool.map(creat_thumbnail, images) #关键点,images是一个可迭代对象 pool.close() pool.join()
上边这段代码的主要工作就是将遍历传入的文件夹中的图片文件,一一生成缩略图,并将这些缩略图保存到特定文件夹中。 map 函数并不支持手动线程管理,反而使得相关的 debug 工作也变得异常简单。
多进程启动方式
multiprocessing 支持三种方式 启动进程
1 2 import multiprocessing as mp mp = mp.get_context('spawn' )
spawn
父进程启动一个新的python解释器进程.子进程只继承运行 Run()方法所需的资源。
来自父进程的不必要的文件描述符和句柄将不会被继承,运行速度比较慢
适用系统:unix and windows
fork
父进程使用os.fork()方法对Python解释器进行fork。子进程开始时实际上与父进程相同。
父进程的所有资源都由子进程继承。请注意,安全的fork多线程的进程是有问题的。
适用系统:unix
forkserver
当程序启动并选择forkserver的启动方法时,将启动服务器进程。
从那时起,每当需要一个新进程时,父进程就会连接到服务器,并请求它fork一个新进程。
fork server进程是单线程的,所以使用os.fork()是安全的。没有任何不必要的资源被继承
适用系统:unix
参考资料
文章链接:
https://www.zywvvd.com/notes/coding/python/multi-processing/multi-processing/