2022年 11月 3日

python并行计算

python多线程是否真并行?

Python有一个名为Global Interpreter Lock(GIL)的结构,全局解释器锁。

Python的代码执行由Python虚拟机(也叫解释器主循环,CPython版本)来控制,python在设计之初,考虑在解释器的主循环中,同时只有一个线程在运行。即在任意时刻只有一个线程在解释器中运行。对Python虚拟机访问的控制由全局解释锁GIL控制,正是这个锁来控制同一时刻只有一个线程能够运行。也就是说,GIL确保每次只能执行一个“线程”。一个线程获取GIL,然后执行相关操作,然后将GIL传递到下一个线程。

python虚拟机执行过程:

1、设置GIL。

2、切换到一个线程去运行。

3、运行直至完成指定的字节码指令,或者线程主动让出控制。

4、将该线程设置为睡眠状态。

5、解锁GIL。

6、重复以上所有步骤,运行下一个线程。

所以,由于GIL的限制,python多线程实际只能运行在单核CPU。如要实现多核CPU并行,只能通过多进程的方式实现。大部分并行模块中,多进程相当于开启多个python解释器,每个解释器对应一个进程。也有一些并行模块通过修改pyhton的GIL机制突破这个限制。

python实现多进程的模块最常用的是multiprocessing,此外还有multiprocess、pathos、concurrent.futures、pp、parallel等。本文对主要的模块进行介绍。

mutilprocessing

mutilprocess的核心是像线程一样管理进程,他与threading很是相像,但是是多进程,所以对多核CPU的利用率会比threading好的多。

1、创建子进程

(1)最基本的方法是通过函数:
multiprocessing.Process( target=None, name=None, args=(), kwargs={}, *, daemon=None)

target为可调用对象(函数对象),为子进程对应的活动;
相当于multiprocessing.Process子类化中重写的run()方法。
name为线程的名称,默认(None)为"Process-N"。
args、kwargs为进程活动(target)的非关键字参数、关键字参数。
deamon为bool值,表示是否为守护进程。
  • 1
  • 2
  • 3
  • 4
  • 5

实例.start() #启动进程活动(run())
实例.join(timeout = None) #使主调进程阻塞,直至被调用进程运行结束或超时(如指定timeout)

import multiprocessing
import time
 
def worker():
    name = multiprocessing.current_process().name
    print 'Starting worker'
    time.sleep(0.1)
    print 'Finished worker'
 
if __name__ == '__main__':
    p = multiprocessing.Process(target=worker)
    print 'BEFORE:', p, p.is_alive()      // False
    p.start()
    print 'DURING:', p, p.is_alive()      // True
    #p.terminate()
    #print 'TERMINATED:', p, p.is_alive()  // True
    p.join() //terminate之后要join,使其可以更新状态
    print 'JOINED:', p, p.is_alive()      // False 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

输出结果如下:

BEFORE: <Process(Process-1, initial)> False
DURING: <Process(Process-1, started)> True
Process-1:Starting worker
Process-1:Finished worker
JOINED: <Process(Process-1, stopped)> False
  • 1
  • 2
  • 3
  • 4
  • 5

(2)采用进程池创建多个子进程:multiprocessing.Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None)

processes :工作进程的数量,如果为None,使用 os.cpu_count()返回的数量。
initializer: 如果为None,每一个工作进程在开始的时候会调用initializer(*initargs)。
maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。默认是None,也就是只要Pool存在,工作进程就会一直存活。
  • 1
  • 2
  • 3

在进程池中实际创建子进程的方法如下:

(a)直接申请:

xx.apply(func, args=(), kwds={}, callback=None, error_callback=None)

apply对应的子进程是排队执行的,实际非并行(阻塞的,即上一个子进程完成了才能进行下一个子进程;注意是单个子进程执行的,而不是按批执行的)。xx为进程池实例。

xx.apply_async(func, args=(), kwds={})

apply_async对应的每个子进程是异步执行的(即并行)。异步执行指的是一批子进程并行执行,且子进程完成一个,就新开始一个,而不必等待同一批其他进程完成。xx为进程池实例。

func(*args,**kwds)为子进程对应的活动。
  • 1
def func(dt, funname, hour):
    hour = "%02d" % hour
    cmd = 'sh -x ./%s "%s" "%s"' % (funname, dt, hour)
    os.system(cmd)
    return "done" + dt

if __name__ == "__main__":
    datestart = sys.argv[1]
    dateend = sys.argv[2]
    funname = sys.argv[3]
    processnum = sys.argv[4]
    result = []
    pool = multiprocessing.Pool(processes=int(processnum))
    for dt in getdate(datestart, dateend):
        for hour in range(0,24):
            //并行执行任务
            result.append(pool.apply_async(func, (dt, funname, hour)))
    pool.close()
    pool.join()
    for res in result:
        print ":::", res.get()
    print "Sub-process(es) done."

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

其中
XX.close()
#关闭进程池,关闭后不能往pool中增加新的子进程,然后可以调用join()函数等待已有子进程执行完毕。

XX.join()
#等待进程池中的子进程执行完毕。需在close()函数后调用。