python多线程是否真并行?
Python有一个名为Global Interpreter Lock(GIL)的结构,全局解释器锁。
Python的代码执行由Python虚拟机(也叫解释器主循环,CPython版本)来控制,python在设计之初,考虑在解释器的主循环中,同时只有一个线程在运行。即在任意时刻只有一个线程在解释器中运行。对Python虚拟机访问的控制由全局解释锁GIL控制,正是这个锁来控制同一时刻只有一个线程能够运行。也就是说,GIL确保每次只能执行一个“线程”。一个线程获取GIL,然后执行相关操作,然后将GIL传递到下一个线程。
python虚拟机执行过程:
1、设置GIL。
2、切换到一个线程去运行。
3、运行直至完成指定的字节码指令,或者线程主动让出控制。
4、将该线程设置为睡眠状态。
5、解锁GIL。
6、重复以上所有步骤,运行下一个线程。
所以,由于GIL的限制,python多线程实际只能运行在单核CPU。如要实现多核CPU并行,只能通过多进程的方式实现。大部分并行模块中,多进程相当于开启多个python解释器,每个解释器对应一个进程。也有一些并行模块通过修改pyhton的GIL机制突破这个限制。
python实现多进程的模块最常用的是multiprocessing,此外还有multiprocess、pathos、concurrent.futures、pp、parallel等。本文对主要的模块进行介绍。
mutilprocessing
mutilprocess的核心是像线程一样管理进程,他与threading很是相像,但是是多进程,所以对多核CPU的利用率会比threading好的多。
1、创建子进程
(1)最基本的方法是通过函数:
multiprocessing.Process( target=None, name=None, args=(), kwargs={}, *, daemon=None)
target为可调用对象(函数对象),为子进程对应的活动;
相当于multiprocessing.Process子类化中重写的run()方法。
name为线程的名称,默认(None)为"Process-N"。
args、kwargs为进程活动(target)的非关键字参数、关键字参数。
deamon为bool值,表示是否为守护进程。
- 1
- 2
- 3
- 4
- 5
实例.start() #启动进程活动(run())
实例.join(timeout = None) #使主调进程阻塞,直至被调用进程运行结束或超时(如指定timeout)
import multiprocessing
import time
def worker():
name = multiprocessing.current_process().name
print 'Starting worker'
time.sleep(0.1)
print 'Finished worker'
if __name__ == '__main__':
p = multiprocessing.Process(target=worker)
print 'BEFORE:', p, p.is_alive() // False
p.start()
print 'DURING:', p, p.is_alive() // True
#p.terminate()
#print 'TERMINATED:', p, p.is_alive() // True
p.join() //terminate之后要join,使其可以更新状态
print 'JOINED:', p, p.is_alive() // False
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
输出结果如下:
BEFORE: <Process(Process-1, initial)> False
DURING: <Process(Process-1, started)> True
Process-1:Starting worker
Process-1:Finished worker
JOINED: <Process(Process-1, stopped)> False
- 1
- 2
- 3
- 4
- 5
(2)采用进程池创建多个子进程:multiprocessing.Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None)
processes :工作进程的数量,如果为None,使用 os.cpu_count()返回的数量。
initializer: 如果为None,每一个工作进程在开始的时候会调用initializer(*initargs)。
maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。默认是None,也就是只要Pool存在,工作进程就会一直存活。
- 1
- 2
- 3
在进程池中实际创建子进程的方法如下:
(a)直接申请:
xx.apply(func, args=(), kwds={}, callback=None, error_callback=None)
apply对应的子进程是排队执行的,实际非并行(阻塞的,即上一个子进程完成了才能进行下一个子进程;注意是单个子进程执行的,而不是按批执行的)。xx为进程池实例。
xx.apply_async(func, args=(), kwds={})
apply_async对应的每个子进程是异步执行的(即并行)。异步执行指的是一批子进程并行执行,且子进程完成一个,就新开始一个,而不必等待同一批其他进程完成。xx为进程池实例。
func(*args,**kwds)为子进程对应的活动。
- 1
def func(dt, funname, hour):
hour = "%02d" % hour
cmd = 'sh -x ./%s "%s" "%s"' % (funname, dt, hour)
os.system(cmd)
return "done" + dt
if __name__ == "__main__":
datestart = sys.argv[1]
dateend = sys.argv[2]
funname = sys.argv[3]
processnum = sys.argv[4]
result = []
pool = multiprocessing.Pool(processes=int(processnum))
for dt in getdate(datestart, dateend):
for hour in range(0,24):
//并行执行任务
result.append(pool.apply_async(func, (dt, funname, hour)))
pool.close()
pool.join()
for res in result:
print ":::", res.get()
print "Sub-process(es) done."
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
其中
XX.close()
#关闭进程池,关闭后不能往pool中增加新的子进程,然后可以调用join()函数等待已有子进程执行完毕。
XX.join()
#等待进程池中的子进程执行完毕。需在close()函数后调用。