2022年 11月 5日

python 多进程一篇学懂

目录

1.python多进程介绍    

        1.1 进程介绍

        1.2 进程与线程的区别:

2.创建一个进程(Process类)

        2.1 建立函数,直接创建进程

       2. 2 通过类继承的方法创建进程

3.进程间的通信(Queue、Pipes、Mangers)

        3.1 进程队列(Queue)通信:

        3.2 进程管道(Pipes)通信:

        3.3 进程的Mangers通信:

4.守护进程

5.进程锁  (Lock)

6.进程池(pool)


1.python多进程介绍    

        1.1 进程介绍

        Python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。

        进程是python中最小的资源分配单元,进程中间的数据,内存是不共享的,每启动一个进程,都要独立分配资源和拷贝访问的数据,所以进程的启动和销毁的代价是比较大。

        线程是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位,一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。 在同一个进程内的线程的数据是可以进行互相访问的,这点区别于多进程。

        一个进程至少要包含一个线程,每个进程在启动的时候就会自动的启动一个线程,进程里面的第一个线程就是主线程,每次在进程内创建的子线程都是由主线程进程创建和销毁,子线程也可以由主线程创建出来的线程创建和销毁线程。

        1.2 进程与线程的区别:

  1. 线程是执行的指令集,进程是资源的集合;
  2. 线程的启动速度要比进程的启动速度要快;
  3. 两个线程的执行速度是一样的;
  4. 进程与线程的运行速度是没有可比性的;
  5. 线程共享创建它的进程的内存空间,进程的内存是独立的。
  6. 两个线程共享的数据都是同一份数据,两个子进程的数据不是共享的,而且数据是独立的;
  7. 同一个进程的线程之间可以直接交流,同一个主进程的多个子进程之间是不可以进行交流,如果两个进程之间需要通信,就必须要通过一个中间代理来实现;
  8. 一个新的线程很容易被创建,一个新的进程创建需要对父进程进行一次克隆;
  9. 一个线程可以控制和操作同一个进程里的其他线程,线程与线程之间没有隶属关系,但是进程只能操作子进程;
  10. 改变主线程,有可能会影响到其他线程的行为,但是对于父进程的修改是不会影响子进程;

2.创建一个进程(Process类)

        Process(group = None,target =None,name=None, args=[ ], kwargs={ })

        参数说明:

        1 group参数未使用,值始终为None
        2 target表示调用对象,即子进程要执行的任务  
        3 name为子进程的名称
        4 args表示调用对象的位置参数元组,args=(1,2,'anne',)
        5 kwargs表示调用对象的字典,kwargs={'name':'anne','age':18}

        process属性&方法:

authkey 进程的身份验证密钥
daemon 同thread的setDaemon,守护进程
exitcode 进程运行时为None,若为—N,则表示被信号N结束
pid 进程号
name 进程名
is_alive() 返回进程是否正在运行
join([timeout]) 阻塞到线程结束或到timeout值 
start() 进程准备就绪,等待CPU调度
run() start()调用run方法,如果实例进程时未制定传入target,start执行默认run()方法。         
terminate() 不管任务是否完成,立即停止工作进程

        下面马上动手来创建一个进程跑起来。创建进程一般使用下面两种方法:

        2.1 建立函数,直接创建进程

  1. from multiprocessing import Process # 多进程的类
  2. import time
  3. import random
  4. def test_fun(name):
  5. # 随机等待1~5秒
  6. time.sleep(random.randrange(1, 5))
  7. print(f"我是{name}子进程!")
  8. # 进程一定要写在“if __name__ == '__main__':”下面
  9. if __name__ == '__main__':
  10. process_list = [] # 存放开启的进程
  11. for i in range(3):
  12. # 进程中的参数args表示调用对象的位置参数元组.注意:元组中只有一个元素时结尾要加","逗号
  13. p = Process(target=test_fun, args=(f"son{i+1}",))
  14. p.start()
  15. process_list.append(p)
  16. for i in process_list:
  17. i.join() # 阻塞每个子进程,主进程会等待所有子进程结束再结束主进程
  18. print("主进程结束!")

       2. 2 通过类继承的方法创建进程

  1. from multiprocessing import Process # 多进程的类
  2. import time
  3. import random
  4. # 创建一个进程类Pros
  5. class Pros(Process):
  6. def __init__(self, name):
  7. super().__init__()
  8. self.name = name
  9. def run(self):
  10. print('%s 开始运行!' % self.name)
  11. # 随机睡眠1~5秒
  12. time.sleep(random.randrange(1, 5))
  13. print('%s 结束!' % self.name)
  14. if __name__ == "__main__":
  15. pro_list = []
  16. # 创建3个子进程
  17. for i in range(3):
  18. num = "p" + str(i+1)
  19. num = Pros(f"子进程{num}")
  20. num.start() # start方法会自动调用进程类中的run方法
  21. pro_list.append(num)
  22. # 子进程阻塞,主进程会等待所有子进程结束再结束
  23. for i in pro_list:
  24. i.join()
  25. print('主进程结束')

3.进程间的通信(Queue、Pipes、Mangers)

        为了进程安全起见,两个进程之间的数据是不能够互相访问的(默认情况下),进程与进程之间的数据是不可以互相访问的,而且每一个进程的内存是独立的。多进程的资源是独立的,不可以互相访问,如果想多个进程之间实现数据交互就必须通过中间件实现。

         进程间通信方法有Queue、Pipes、Mangers三种:

        3.1 进程队列(Queue)通信:

        Queue([maxsize]):建立一个共享的队列(其实并不是共享的,实际是克隆的,内部维护着数据的共享),多个进程可以向队列里存/取数据。其中,参数是队列最大项数,省略则无限制。

        put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。

        get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常。

  1. from multiprocessing import Queue, Process
  2. def fun(q):
  3. q.put([1, 2, 3])
  4. if __name__ == '__main__':
  5. Q = Queue(5) # 设置进程队列长度
  6. for i in range(2): # 启动两个进程,想队列里put数据
  7. process = Process(target=fun, args=(Q, )) # 创建一个进程,将Q传入,实际上是克隆了Q
  8. process.start()
  9. process.join()
  10. print(Q.qsize())
  11. print(Q.get()) # 在主进程中获取元素
  12. print(Q.get())
  13. print(Q.qsize())
  14. # 结果是:
  15. 2
  16. [1, 2, 3]
  17. [1, 2, 3]
  18. 0

        3.2 进程管道(Pipes)通信:

        进程间的管道内部机制是通过启动 socket 连接来维护两个进程间的通讯。 

        Pipe([duplex]):在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道。

        Pipe方法有 duplex 参数,如果 duplex参数为 True (默认值),那么这个管道是全双工模式,就是说conn1和conn2均可接收发送。duplex为False时,conn1只负责接受消息,conn2只负责发送消息。

        其中, send方法和recv方法分别是发送和接受消息的方法。例如,在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。

        还有两个常用方法,close() 退出和 poll() 判断子进程是否结束。

  1. from multiprocessing import Pipe, Process
  2. def fun2(q, name):
  3. print(f' 开始运行 {name} 子进程...')
  4. cam = cv.VideoCapture(0, cv.CAP_DSHOW)
  5. # 判断摄像头是否打开成功
  6. if not cam.isOpened():
  7. print("打开摄像头失败,程序退出!")
  8. exit()
  9. # 设置摄像头的使用时间
  10. st = time.time()
  11. # 如果打开摄像头成功,则获取摄像头返回的每一帧图像数据
  12. while (time.time() - st) < 5: # 2秒钟
  13. # 获取返回帧
  14. ret, cam_data = cam.read()
  15. q.send(cam_data)
  16. # 释放捕获器
  17. cam.release()
  18. print(f' {name} 子进程运行结束。')
  19. if __name__ == '__main__':
  20. # Pipe(duplex=False)表示管道是单向的,即,您只能调用parent_conn.recv()和child_conn.send().
  21. # 否则,它是双向的,并且两个连接都支持send / recv
  22. son, father = Pipe()
  23. p = Process(target=fun2, args=(son, "jcy",))
  24. p.start()
  25. son.close() # 设置了close,后面接收不到数据,会抛出EOFError错误,不关闭会一直阻塞
  26. # 在主进程通过队列Queue使用子进程的返回的数据
  27. res = [0]
  28. while p.is_alive():
  29. try:
  30. datas = father.recv() # 进程收取数据,如果没收到会阻塞
  31. except EOFError as e: # 抛出错误后,就不会阻塞在这里了,但仍会阻塞1~3秒
  32. p.terminate() # 阻塞便结束进程
  33. # 当son接收不到输出的时候且输入被关闭的时候,会抛出EORFError,可捕获并且退出子进程
  34. continue
  35. res[0] = datas
  36. cv.imshow('frame', res[0])
  37. if cv.waitKey(1) == ord("q"):
  38. break
  39. cv.destroyAllWindows()
  40. print("主进程结束!")

        Pipe管道仅适用于只有两个进程一读一写的单双工情况,也就是说信息是只向一个方向流动。例如电视、广播。

        3.3 进程的Mangers通信:

        Manager实现了多个进程间的数据共享,支持的数据类型有 list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array

  1. from multiprocessing import Manager,Process
  2. def fun(lst,dct):
  3. for i in range(3):
  4. lst.append(1)
  5. dct['name'] = 'my'
  6. if __name__ == '__main__':
  7. with Manager() as manager:
  8. L = manager.list() # 定义共享列表
  9. D = manager.dict() # 定义共享字典
  10. p1 = Process(target=fun, args=(L, D)) # 启动一个进程,将定义的list和dict传入
  11. p1.start() # 启动进程
  12. p1.join() # 阻塞进程,等待进程完成
  13. print(L)
  14. print(D)
  15. # 结果是:
  16. [0, 1, 2]
  17. {'name': 'my'}

4.守护进程

        主进程创建守护进程:

        1、守护进程会在主进程代码执行结束后就终止。

        2、守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children。

        注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止。但是当有多个子进程时,只有设置了守护进程的子进程会随同主进程结束而结束,其他子进程不受影响。

  1. from multiprocessing import Process
  2. import time
  3. import random
  4. class Run(Process):
  5. def __init__(self, name):
  6. super().__init__()
  7. self.name = name # MyProcess的__init__方法会执行self.name=MyProcess-1,所以加到这里,会覆盖我们的self.name=name,因而self.name = name要写在__init__方法之后
  8. def run(self): # run方法会在start方法调用后自动执行
  9. print('%s 进程开始运行。。。' % self.name)
  10. time.sleep(random.randrange(1, 3))
  11. print('%s 进程运行结束。' % self.name)
  12. if __name__ == '__main__':
  13. p = Run('my')
  14. p.daemon = True # 一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行
  15. p.start()
  16. time.sleep(1)
  17. print('主进程结束。')
  18. # 结果是:
  19. my 进程开始运行。。。
  20. 主进程结束。

5.进程锁  (Lock)

        进程之间数据不共享,但是共享同一套文件系统,当多个进程需要访问共享资源的时候,加锁处理Lock可以用来避免访问的冲突。

        加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,虽然速度变慢了,但牺牲了速度却保证了数据安全。

        这里加个说明:

        串行:就是程序会按照你编写的代码,一步一步往下进行执行,只有执行完前面的方法,才会进入下一个方法,一个时间点内cpu只执行一个任务。特点:相对于并行的概念,同一个时间点只能执行一个任务,执行完了才能执行下一个任务。

        并发:在操作系统中,指一段时间中有多个程序处在已启动到运行完毕之间,且这些程序都在同一个处理器上运行。其实并发不是真正的‘同时进行“,只是cpu把一个时间段划分成几个时间分段,然后在这几个时间分区之间来回切换,由于cpu的处理速度非常快,只要时间间隔处理得当,用户感觉就是多个应用程序在同时进行。特点:同一个时间段内可以做多个事情;但是多个任务之间是互相抢占cpu资源的。

        并行:当系统有一个以上cpu时,当一个cpu执行一个线程时,另一个cpu可以执行另一个进程,两个进程互不抢占cpu资源,可以同时进行,称之为并行;决定并行的因素不是cpu的数量,而是cpu的核数,一个多核cpu也可以并行。适合科学计算,后台处理等弱交互场景。特点:同一个时间点做了多个事情;任务与任务之间互不抢占资源。

  1. from multiprocessing import Process, Lock
  2. import time
  3. def fun(l, i):
  4. l.acquire() # 获取锁
  5. print("正在运行进程: ", i)
  6. time.sleep(2)
  7. l.release() # 使用完后释放锁
  8. if __name__ == '__main__':
  9. lock = Lock() # 生成锁的实例
  10. for i in range(5):
  11. p = Process(target=fun, args=(lock, i)) # 创建进程
  12. p.start() # 启动,这里没有join,进程可以并发
  13. # 结果是:因为加了进程锁,每一个进程运行完后再到下一个
  14. 正在运行进程: 1
  15. 正在运行进程: 0
  16. 正在运行进程: 2
  17. 正在运行进程: 3
  18. 正在运行进程: 4

        在每个函数获取锁到释放锁之间的程序会禁止其他进程修改或获取数据,如果没有加锁,进程会并发同时执行,如果同时操作同一个数据,可能会出错。

6.进程池(Pool)

       进程池(Pool)的作用, 当进程数过多时,用于限制进程数。Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

        进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

        语法:Pool([numprocess [,initializer [, initargs]]])       # 创建进程池 

         参数介绍:

        1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值
        2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None
        3 initargs:是要传给initializer的参数组

        假如指定numprocess为3,则进程池会从无到有创建三个进程,然后自始至终使用这三个进程去执行所有任务,不会开启其他进程。

        主要方法:

        1 、p.apply(func [, args [, kwargs]]) (非阻塞):启动进程为串行执行,在一个池工作进程中执行func(*args,**kwargs),然后返回结果。 需要强调的是:此操作并不会在所有池工作进程中并发执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()。

        2 、p.apply_async(func [, args [, kwargs]])(阻塞的): 启动进程为并行执行,在一个池工作进程中执行func(*args,**kwargs)然后返回结果。 此方法的结果是AsyncResult类的实例,callback 是可调用对象,接收输入参数。当func的结果变为可用时, 将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。

        3、 p.close():关闭进程池,使其不在接受新的任务。如果所有操作持续挂起,它们将在工作进程终止前完成 。

        4 、p.jion():主进程阻塞,等待子进程的退出, join方法要在close()或 terminate()之后使用。

        5、p.terminate():结束工作进程,不在处理未完成的任务。

  1. from multiprocessing import Pool
  2. import os
  3. import time
  4. def fun(i):
  5. print('正在运行进程: ', i)
  6. time.sleep(2)
  7. def end_fun(i=None):
  8. print('完成!')
  9. print("回调进程::", os.getpid())
  10. if __name__ == '__main__':
  11. print("主进程:", os.getpid())
  12. pool = Pool(3) # 限制同时进行的进程数为 3
  13. for i in range(5):
  14. pool.apply_async(func=fun, args=(i, ), callback=end_fun) # 并行执行,callback回调由主程序回调
  15. # pool.apply(func=fun, args=(i,)) # 串行执行
  16. # 注意:join必须放在close()后面,否则将不会等待子进程打印结束,而直接结束
  17. pool.close() # 关闭进程池
  18. pool.join() # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭
  19. # 并行结果是:
  20. 主进程: 12352
  21. 正在运行进程: 0
  22. 正在运行进程: 1
  23. 正在运行进程: 2
  24. 正在运行进程: 3
  25. 完成!
  26. 回调进程:: 12352
  27. 完成!正在运行进程: 4
  28. 回调进程:: 12352
  29. 完成!
  30. 回调进程:: 12352
  31. 完成!
  32. 回调进程:: 12352
  33. 完成!
  34. 回调进程:: 12352
  35. # 串行结果是:
  36. 主进程: 17748
  37. 正在运行进程: 0
  38. 正在运行进程: 1
  39. 正在运行进程: 2
  40. 正在运行进程: 3
  41. 正在运行进程: 4

        执行说明:创建一个进程池pool,并设定进程的数量为3,xrange(5)会相继产生四个对象[0, 1, 2, 3,4],5个对象被提交到pool中,因pool指定进程数为3,所以0、1、2会直接送到进程中执行,当其中一个执行完事后才空出一个进程处理对象3和4。主程序在pool.join()处等待各个进程的结束。