上一篇文章介紹了線程的使用。然而 Python 中由于
Global Interpreter Lock
(全局解釋鎖 GIL )的存在,每個(gè)線程在在執(zhí)行時(shí)需要獲取到這個(gè) GIL ,在同一時(shí)刻中只有一個(gè)線程得到解釋鎖的執(zhí)行, Python 中的線程并沒有真正意義上的并發(fā)執(zhí)行,多線程的執(zhí)行效率也不一定比單線程的效率更高。 如果要充分利用現(xiàn)代多核 CPU 的并發(fā)能力,就要使用 multipleprocessing 模塊了。
0x01 multipleprocessing
與使用線程的 threading 模塊類似,
multipleprocessing
模塊提供許多高級(jí) API 。最常見的是 Pool 對(duì)象了,使用它的接口能很方便地寫出并發(fā)執(zhí)行的代碼。
from multiprocessing import Pool
def f(x):
return x * x
if __name__ == '__main__':
with Pool(5) as p:
# map方法的作用是將f()方法并發(fā)地映射到列表中的每個(gè)元素
print(p.map(f, [1, 2, 3]))
# 執(zhí)行結(jié)果
# [1, 4, 9]
關(guān)于 Pool 下文中還會(huì)提到,這里我們先來看 Process 。
Process
要?jiǎng)?chuàng)建一個(gè)進(jìn)程可以使用 Process 類,使用 start() 方法啟動(dòng)進(jìn)程。
from multiprocessing import Process
import os
def echo(text):
# 父進(jìn)程ID
print("Process Parent ID : ", os.getppid())
# 進(jìn)程ID
print("Process PID : ", os.getpid())
print('echo : ', text)
if __name__ == '__main__':
p = Process(target=echo, args=('hello process',))
p.start()
p.join()
# 執(zhí)行結(jié)果
# Process Parent ID : 27382
# Process PID : 27383
# echo : hello process
進(jìn)程池
正如開篇提到的
multiprocessing
模塊提供了 Pool 類可以很方便地實(shí)現(xiàn)一些簡(jiǎn)單多進(jìn)程場(chǎng)景。 它主要有以下接口
- apply(func[, args[, kwds]])
- 執(zhí)行 func(args,kwds) 方法,在方法結(jié)束返回前會(huì)阻塞。
- apply_async(func[, args[, kwds[, callback[, error_callback]]]])
- 異步執(zhí)行 func(args,kwds) ,會(huì)立即返回一個(gè) result 對(duì)象,如果指定了 callback 參數(shù),結(jié)果會(huì)通過回調(diào)方法返回,還可以指定執(zhí)行出錯(cuò)的回調(diào)方法 error_callback()
- map(func, iterable[, chunksize])
- 類似內(nèi)置函數(shù) map() ,可以并發(fā)執(zhí)行 func ,是同步方法
- map_async(func, iterable[, chunksize[, callback[, error_callback]]])
- 異步版本的 map
- close()
- 關(guān)閉進(jìn)程池。當(dāng)池中的所有工作進(jìn)程都執(zhí)行完畢時(shí),進(jìn)程會(huì)退出。
- terminate()
- 終止進(jìn)程池
- join()
-
等待工作進(jìn)程執(zhí)行完,必需先調(diào)用 close() 或者 terminate()
from multiprocessing import Pool
def f(x):
return x * x
if __name__ == '__main__':
with Pool(5) as p:
# map方法的作用是將f()方法并發(fā)地映射到列表中的每個(gè)元素
a = p.map(f, [1, 2, 3])
print(a)
# 異步執(zhí)行map
b = p.map_async(f, [3, 5, 7, 11])
# b 是一個(gè)result對(duì)象,代表方法的執(zhí)行結(jié)果
print(b)
# 為了拿到結(jié)果,使用join方法等待池中工作進(jìn)程退出
p.close()
# 調(diào)用join方法前,需先執(zhí)行close或terminate方法
p.join()
# 獲取執(zhí)行結(jié)果
print(b.get())
# 執(zhí)行結(jié)果
# [1, 4, 9]
#
# [9, 25, 49, 121]
map_async() 和 apply_async() 執(zhí)行后會(huì)返回一個(gè) class multiprocessing.pool.AsyncResult 對(duì)象,通過它的 get() 可以獲取到執(zhí)行結(jié)果, ready() 可以判斷 AsyncResult 的結(jié)果是否準(zhǔn)備好。
進(jìn)程間數(shù)據(jù)的傳輸
multiprocessing 模塊提供了兩種方式用于進(jìn)程間的數(shù)據(jù)共享:隊(duì)列( Queue )和管道( Pipe )
Queue 是線程安全,也是進(jìn)程安全的。使用 Queue 可以實(shí)現(xiàn)進(jìn)程間的數(shù)據(jù)共享,例如下面的 demo 中子進(jìn)程 put 一個(gè)對(duì)象,在主進(jìn)程中就能 get 到這個(gè)對(duì)象。 任何可以序列化的對(duì)象都可以通過 Queue 來傳輸。
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
# 使用Queue進(jìn)行數(shù)據(jù)通信
q = Queue()
p = Process(target=f, args=(q,))
p.start()
# 主進(jìn)程取得子進(jìn)程中的數(shù)據(jù)
print(q.get()) # prints "[42, None, 'hello']"
p.join()
# 執(zhí)行結(jié)果
# [42, None, 'hello']
Pipe() 返回一對(duì)通過管道連接的 Connection 對(duì)象。這兩個(gè)對(duì)象可以理解為管道的兩端,它們通過 send() 和 recv() 發(fā)送和接收數(shù)據(jù)。
from multiprocessing import Process, Pipe
def write(conn):
# 子進(jìn)程中發(fā)送一個(gè)對(duì)象
conn.send([42, None, 'hello'])
conn.close()
def read(conn):
# 在讀的進(jìn)程中通過recv接收對(duì)象
data = conn.recv()
print(data)
if __name__ == '__main__':
# Pipe()方法返回一對(duì)連接對(duì)象
w_conn, r_conn = Pipe()
wp = Process(target=write, args=(w_conn,))
rp = Process(target=read, args=(r_conn,))
wp.start()
rp.start()
# 執(zhí)行結(jié)果
# [42, None, 'hello']
需要注意的是,兩個(gè)進(jìn)程不能同時(shí)對(duì)一個(gè)連接對(duì)象進(jìn)行 send 或 recv 操作。
同步
我們知道線程間的同步是通過鎖機(jī)制來實(shí)現(xiàn)的,進(jìn)程也一樣。
from multiprocessing import Process, Lock
import time
def print_with_lock(l, i):
l.acquire()
try:
time.sleep(1)
print('hello world', i)
finally:
l.release()
def print_without_lock(i):
time.sleep(1)
print('hello world', i)
if __name__ == '__main__':
lock = Lock()
# 先執(zhí)行有鎖的
for num in range(5):
Process(target=print_with_lock, args=(lock, num)).start()
# 再執(zhí)行無鎖的
# for num in range(5):
# Process(target=print_without_lock, args=(num,)).start()
有鎖的代碼將每秒依次打印
hello world 0
hello world 1
hello world 2
hello world 3
hello world 4
如果執(zhí)行無鎖的代碼,則在我的電腦上執(zhí)行結(jié)果是這樣的
hello worldhello world? 0
1
hello world 2
hello world 3
hello world 4
除了 Lock ,還包括 RLock 、 Condition 、 Semaphore 和 Event 等進(jìn)程間的同步原語。其用法也與線程間的同步原語很類似。 API 使用可以參考文末中引用的文檔鏈接。
在工程中實(shí)現(xiàn)進(jìn)程間的數(shù)據(jù)共享應(yīng)當(dāng)優(yōu)先使用 隊(duì)列或管道。
0x02 總結(jié)
本文對(duì) multiprocessing 模塊中常見的 API 作了簡(jiǎn)單的介紹。講述了 Process 和 Pool 的常見用法,同時(shí)介紹了進(jìn)程間的數(shù)據(jù)方式:隊(duì)列和管道。最后簡(jiǎn)單了解了進(jìn)程間的同步原語。
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061
微信掃一掃加我為好友
QQ號(hào)聯(lián)系: 360901061
您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對(duì)您有幫助就好】元

