目前開發中有遇到進程間需要共享數據的情況. 所以研究了下multiprocessing.Manager, 主要會以dict為例子, 說明下進程間共享(同一個父進程).
dict使用說明
import multiprocessing # 1. 創建一個Manger對象 manager = multiprocessing.Manager() # 2. 創建一個dict temp_dict = manager.dict() # 3. 創建一個測試程序 def test(idx, test_dict): test_dict[idx] = idx # 4. 創建進程池進行測試 pool = multiprocessing.Pool(4) for i in range(100): pool.apply_async(test, args=(i, temp_dict)) pool.close() pool.join() print(temp_dict)
too simple.
這時我們再看一個例子
import multiprocessing # 1. 創建一個Manger對象 manager = multiprocessing.Manager() # 2. 創建一個dict temp_dict = manager.dict() temp_dict['test'] = {} # 3. 創建一個測試程序 def test(idx, test_dict): test_dict['test'][idx] = idx # 4. 創建進程池進行測試 pool = multiprocessing.Pool(4) for i in range(100): pool.apply_async(test, args=(i, temp_dict)) pool.close() pool.join() print(temp_dict)
可以看到輸出結果是奇怪的{'test': {}}
如果我們簡單修改一下代碼
import multiprocessing # 1. 創建一個Manger對象 manager = multiprocessing.Manager() # 2. 創建一個dict temp_dict = manager.dict() temp_dict['test'] = {} # 3. 創建一個測試程序 def test(idx, test_dict): row = test_dict['test'] row[idx] = idx test_dict['test'] = row # 4. 創建進程池進行測試 pool = multiprocessing.Pool(4) for i in range(100): pool.apply_async(test, args=(i, temp_dict)) pool.close() pool.join() print(temp_dict)
這時輸出結果就符合預期了.
為了了解這個現象背后的原因, 我簡單去讀了一下源碼, 主要有以下幾段代碼很關鍵.
def Manager(): ''' Returns a manager associated with a running server process The managers methods such as `Lock()`, `Condition()` and `Queue()` can be used to create shared objects. ''' from multiprocessing.managers import SyncManager m = SyncManager() m.start() return m ... def start(self, initializer=None, initargs=()): ''' Spawn a server process for this manager object ''' assert self._state.value == State.INITIAL if initializer is not None and not hasattr(initializer, '__call__'): raise TypeError('initializer must be a callable') # pipe over which we will retrieve address of server reader, writer = connection.Pipe(duplex=False) # spawn process which runs a server self._process = Process( target=type(self)._run_server, args=(self._registry, self._address, self._authkey, self._serializer, writer, initializer, initargs), ) ident = ':'.join(str(i) for i in self._process._identity) self._process.name = type(self).__name__ + '-' + ident self._process.start() ...
上面代碼可以看出, 當我們聲明了一個Manager對象的時候, 程序實際在其他進程啟動了一個server服務, 這個server是阻塞的, 以此來實現進程間數據安全.
我的理解就是不同進程之間操作都是互斥的, 一個進程向server請求到這部分數據, 再把這部分數據修改, 返回給server, 之后server再去處理其他進程的請求.
回到上面的奇怪現象上, 這個操作test_dict['test'][idx] = idx
實際上在拉取到server上的數據后進行了修改, 但并沒有返回給server, 所以temp_dict的數據根本沒有變化. 在第二段正常代碼, 就相當于先向服務器請求數據, 再向服務器傳送修改后的數據. 這樣就可以解釋這個現象了.
這個時候如果出現一種情況, 兩個進程同時請求了一份相同的數據, 分別進行修改, 再提交到server上會怎么樣呢? 那當然是數據產生異常. 基于此, 我們需要Manager的另一個對象, Lock(). 這個對象也不難理解, Manager本身就是一個server, dict跟lock都來自于這個server, 所以當你lock住的時候, 其他進程是不能取到數據, 自然也不會出現上面那種異常情況.
代碼示例:
import multiprocessing # 1. 創建一個Manger對象 manager = multiprocessing.Manager() # 2. 創建一個dict temp_dict = manager.dict() lock = manager.Lock() temp_dict['test'] = {} # 3. 創建一個測試程序 def test(idx, test_dict, lock): lock.acquire() row = test_dict['test'] row[idx] = idx test_dict['test'] = row lock.release() # 4. 創建進程池進行測試 pool = multiprocessing.Pool(4) for i in range(100): pool.apply_async(test, args=(i, temp_dict, lock)) pool.close() pool.join() print(temp_dict)
切忌不要進程里自己新建lock對象, 要使用統一的lock對象.
本篇文章到這里就已經全部結束了,更多其他精彩內容可以關注PHP中文網的python視頻教程欄目!
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。TEL:177 7030 7066 E-MAIL:11247931@qq.com