<span id="mktg5"></span>

<i id="mktg5"><meter id="mktg5"></meter></i>

        <label id="mktg5"><meter id="mktg5"></meter></label>
        最新文章專題視頻專題問答1問答10問答100問答1000問答2000關鍵字專題1關鍵字專題50關鍵字專題500關鍵字專題1500TAG最新視頻文章推薦1 推薦3 推薦5 推薦7 推薦9 推薦11 推薦13 推薦15 推薦17 推薦19 推薦21 推薦23 推薦25 推薦27 推薦29 推薦31 推薦33 推薦35 推薦37視頻文章20視頻文章30視頻文章40視頻文章50視頻文章60 視頻文章70視頻文章80視頻文章90視頻文章100視頻文章120視頻文章140 視頻2關鍵字專題關鍵字專題tag2tag3文章專題文章專題2文章索引1文章索引2文章索引3文章索引4文章索引5123456789101112131415文章專題3
        問答文章1 問答文章501 問答文章1001 問答文章1501 問答文章2001 問答文章2501 問答文章3001 問答文章3501 問答文章4001 問答文章4501 問答文章5001 問答文章5501 問答文章6001 問答文章6501 問答文章7001 問答文章7501 問答文章8001 問答文章8501 問答文章9001 問答文章9501
        當前位置: 首頁 - 科技 - 知識百科 - 正文

        Queue模塊及源碼分析

        來源:懂視網 責編:小采 時間:2020-11-27 14:28:03
        文檔

        Queue模塊及源碼分析

        Queue模塊及源碼分析:Queue模塊是提供隊列操作的模塊,隊列是線程間最常用的交換數據的形式。該模塊提供了三種隊列:Queue.Queue(maxsize):先進先出,maxsize是隊列的大小,其值為非正數時為無線循環隊列Queue.LifoQueue(maxsize):后進先出,相當于棧Queue.Prior
        推薦度:
        導讀Queue模塊及源碼分析:Queue模塊是提供隊列操作的模塊,隊列是線程間最常用的交換數據的形式。該模塊提供了三種隊列:Queue.Queue(maxsize):先進先出,maxsize是隊列的大小,其值為非正數時為無線循環隊列Queue.LifoQueue(maxsize):后進先出,相當于棧Queue.Prior

        Queue模塊是提供隊列操作的模塊,隊列是線程間最常用的交換數據的形式。該模塊提供了三種隊列:

        Queue.Queue(maxsize):先進先出,maxsize是隊列的大小,其值為非正數時為無線循環隊列

        Queue.LifoQueue(maxsize):后進先出,相當于棧

        Queue.PriorityQueue(maxsize):優先級隊列。

        其中LifoQueue,PriorityQueue是Queue的子類。三者擁有以下共同的方法:

        qsize():返回近似的隊列大小。為什么要加“近似”二字呢?因為當該值大于0的時候并不保證并發執行的時候get()方法不被阻塞,同樣,對于put()方法有效。

        empty():返回布爾值,隊列為空時,返回True,反之返回False。

        full():當設定了隊列大小的時候,如果隊列滿了,則返回True,否則返回False。

        put(item[,block[,timeout]]):向隊列里添加元素item,block設置為False的時候,如果隊列滿了則拋出Full異常。如果block設置為True,timeout設置為None時,則會一種等到有空位的時候再添加進隊列;否則會根據timeout設定的超時值拋出Full異常。

        put_nowwait(item):等價與put(item,False)。block設置為False的時候,如果隊列為空,則拋出Empty異常。如果block設置為True,timeout設置為None時,則會一種等到有空位的時候再添加進隊列;否則會根據timeout設定的超時值拋出Empty異常。

        get([block[,timeout]]):從隊列中刪除元素并返回該元素的值,如果timeout是一個正數,它會阻塞最多超時秒數,并且如果在該時間內沒有可用的項目,則引發Empty異常。

        get_nowwait():等價于get(False)

        task_done():發送信號表明入列任務已完成,經常在消費者線程中用到。

        join():阻塞直至隊列所有元素處理完畢,然后再處理其它操作。

        (一)源碼分析

        Queue模塊用起來很簡單很簡單,但我覺得有必要把該模塊的相關源代碼貼出來分析下,會學到不少東西,看看大神們寫的代碼多么美觀,多么結構化模塊化,再想想自己寫的代碼,都是淚呀,來學習學習。為了縮減篇幅,源碼的注釋部分被刪減掉。

        from time import time as _time
        try:
         import threading as _threading
        except ImportError:
         import dummy_threading as _threading
        from collections import deque
        import heapq
         
        __all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue']
         
        class Empty(Exception):
         "Exception raised by Queue.get(block=0)/get_nowait()."
         pass
         
        class Full(Exception):
         "Exception raised by Queue.put(block=0)/put_nowait()."
         pass
         
        class Queue:
         def __init__(self, maxsize=0):
         self.maxsize = maxsize
         self._init(maxsize)
         self.mutex = _threading.Lock()
         self.not_empty = _threading.Condition(self.mutex)
         self.not_full = _threading.Condition(self.mutex)
         self.all_tasks_done = _threading.Condition(self.mutex)
         self.unfinished_tasks = 
         
         def get_nowait(self):
         return self.get(False)
         def _init(self, maxsize):
         self.queue = deque()
         def _qsize(self, len=len):
         return len(self.queue)
         def _put(self, item):
         self.queue.append(item)
         def _get(self):
         return self.queue.popleft()

        通過后面的幾個函數分析知道,Queue對象是在collections模塊的queue基礎上(關于collections模塊參考 Python:使用Counter進行計數統計及collections模塊),加上threading模塊互斥鎖和條件變量封裝的。

        deque是一個雙端隊列,很適用于隊列和棧。上面的Queue對象就是一個先進先出的隊列,所以首先_init()函數定義了一個雙端隊列,然后它的定義了_put()和_get()函數,它們分別是從雙端隊列右邊添加元素、左邊刪除元素,這就構成了一個先進先出隊列,同理很容易想到LifoQueue(后進先出隊列)的實現了,保證隊列右邊添加右邊刪除就可以。可以貼出源代碼看看。

        class LifoQueue(Queue):
         '''Variant of Queue that retrieves most recently added entries first.'''
         
         def _init(self, maxsize):
         self.queue = []
         
         def _qsize(self, len=len):
         return len(self.queue)
         
         def _put(self, item):
         self.queue.append(item)
         
         def _get(self):
         return self.queue.pop()

        雖然它的"queue"沒有用queue(),用列表也是一樣的,因為列表append()和pop()操作是在最右邊添加元素和刪除最右邊元素。

        再來看看PriorityQueue,他是個優先級隊列,這里用到了heapq模塊的heappush()和heappop()兩個函數。heapq模塊對堆這種數據結構進行了模塊化,可以建立這種數據結構,同時heapq模塊也提供了相應的方法來對堆做操作。其中_init()函數里self.queue=[]可以看作是建立了一個空堆。heappush() 往堆中插入一條新的值 ,heappop() 從堆中彈出最小值 ,這就可以實現優先級(關于heapq模塊這里這是簡單的介紹)。源代碼如下:

        class PriorityQueue(Queue):
         '''Variant of Queue that retrieves open entries in priority order (lowest first).
         
         Entries are typically tuples of the form: (priority number, data).
         '''
         
         def _init(self, maxsize):
         self.queue = []
         
         def _qsize(self, len=len):
         return len(self.queue)
         
         def _put(self, item, heappush=heapq.heappush):
         heappush(self.queue, item)
         
         def _get(self, heappop=heapq.heappop):
         return heappop(self.queue)

        基本的數據結構分析完了,接著分析其它的部分。

        mutex 是個threading.Lock()對象,是互斥鎖;not_empty、 not_full 、all_tasks_done這三個都是threading.Condition()對象,條件變量,而且維護的是同一把鎖對象mutex(關于threading模塊中Lock對象和Condition對象可參考上篇博文Python:線程、進程與協程(2)——threading模塊)。

        其中:

        self.mutex互斥鎖:任何獲取隊列的狀態(empty(),qsize()等),或者修改隊列的內容的操作(get,put等)都必須持有該互斥鎖。acquire()獲取鎖,release()釋放鎖。同時該互斥鎖被三個條件變量共同維護。

        self.not_empty條件變量:線程添加數據到隊列中后,會調用self.not_empty.notify()通知其它線程,然后喚醒一個移除元素的線程。

        self.not_full條件變量:當一個元素被移除出隊列時,會喚醒一個添加元素的線程。

        self.all_tasks_done條件變量 :在未完成任務的數量被刪除至0時,通知所有任務完成

        self.unfinished_tasks : 定義未完成任務數量

        再來看看主要方法:

        (1)put()

        源代碼如下:

        def put(self, item, block=True, timeout=None):
         self.not_full.acquire() #not_full獲得鎖
         try:
         if self.maxsize > 0: #如果隊列長度有限制
         if not block: #如果沒阻塞
         if self._qsize() == self.maxsize: #如果隊列滿了拋異常
         raise Full
         elif timeout is None: #有阻塞且超時為空,等待
         while self._qsize() == self.maxsize:
         self.not_full.wait()
         elif timeout < 0:
         raise ValueError("'timeout' must be a non-negative number")
         else: #如果有阻塞,且超時非負時,結束時間=當前時間+超時時間
         endtime = _time() + timeout
         while self._qsize() == self.maxsize:
         remaining = endtime - _time()
         if remaining <= 0.0: #到時后,拋異常
         raise Full
         #如果沒到時,隊列是滿的就會一直被掛起,直到有“位置”騰出
         self.not_full.wait(remaining)
         self._put(item) #調用_put方法,添加元素
         self.unfinished_tasks += 1 #未完成任務+1
         self.not_empty.notify() #通知非空,喚醒非空掛起的任務
         finally:
         self.not_full.release() #not_full釋放鎖

        默認情況下block為True,timeout為None。如果隊列滿則會等待,未滿則會調用_put方法將進程加入deque中(后面介紹),并且未完成任務加1還會通知隊列非空。

        如果設置block參數為Flase,隊列滿時則會拋異常。如果設置了超時那么在時間到之前進行阻塞,時間一到拋異常。這個方法使用not_full對象進行操作。

        (2)get()

        源碼如下:

        def get(self, block=True, timeout=None):
         
         self.not_empty.acquire() #not_empty獲得鎖
         try:
         if not block: #不阻塞時
         if not self._qsize(): #隊列為空時拋異常
         raise Empty
         elif timeout is None: #不限時時,隊列為空則會等待
         while not self._qsize():
         self.not_empty.wait()
         elif timeout < 0:
         raise ValueError("'timeout' must be a non-negative number")
         else:
         endtime = _time() + timeout
         while not self._qsize():
         remaining = endtime - _time()
         if remaining <= 0.0:
         raise Empty
         self.not_empty.wait(remaining)
         item = self._get() #調用_get方法,移除并獲得項目
         self.not_full.notify() #通知非滿
         return item #返回項目
         finally:
         self.not_empty.release() #釋放鎖

        邏輯跟put()函數一樣,參數默認情況下隊列空了則會等待,否則將會調用_get方法(往下看)移除并獲得一個項,最后返回這個項。這個方法使用not_empty對象進行操作。

        不過我覺得put()與get()兩個函數結合起來理解比較好。not_full與not_empty代表的是兩種不同操作類型的線程,not_full可以理解成is-not-full,即隊列是否滿了,默認是沒有滿,沒有滿時not_full這個條件變量才能獲取鎖,并做一些條件判斷,只有符合條件才能向隊列里加元素,添加成功后就會通知not_empty條件變量隊列里不是空的,“我”剛剛添加進了一個元素,滿足可以執行刪除動作的基本條件了(隊列不是空的,想想如果是空的執行刪除動作就沒有意義了),同時喚醒一些被掛起的執行移除動作的線程,讓這些線程重新判斷條件,如果條件準許就會執行刪除動作,然后又通知not_full條件變量,告訴“它”隊列不是滿的,因為“我”剛才刪除了一個元素(想想如果隊列滿了添加元素就添加不進呀,就沒意義了),滿足了添加元素的基本條件(隊列不是滿的),同時喚醒一些被掛起的執行添加動作的線程,這些線程又會進行條件判斷,符合條件就會添加元素,否則繼續掛起,依次類推,同時這樣也保證了線程的安全。正與前面所說,當一個元素被移除出隊列時,會喚醒一個添加元素的線程;當添加一個元素時會喚醒一個刪除元素的線程。

        (3)task_done()

        源碼如下:

        def task_done(self):
         
         self.all_tasks_done.acquire() #獲得鎖
         try:
         unfinished = self.unfinished_tasks - 1 #判斷隊列中一個線程的任務是否全部完成
         if unfinished <= 0: #是則進行通知,或在過量調用時報異常
         if unfinished < 0:
         raise ValueError('task_done() called too many times')
         self.all_tasks_done.notify_all()
         self.unfinished_tasks = unfinished #否則未完成任務數量-1
         finally:
         self.all_tasks_done.release() #最后釋放鎖

        這個方法判斷隊列中一個線程的任務是否全部完成,首先會通過all_tasks_done對象獲得鎖,如果是則進行通知,最后釋放鎖。

        (4)join()

        源碼如下:

        def join(self):
         
         self.all_tasks_done.acquire()
         try:
         while self.unfinished_tasks: #如果有未完成的任務,將調用wait()方法等待
         self.all_tasks_done.wait()
         finally:
         self.all_tasks_done.release()

        阻塞方法,當隊列中有未完成進程時,調用join方法來阻塞,直到他們都完成。

        其它的方法都比較簡單,也比較好理解,有興趣可以去看看Queue.py里的源碼,要注意的是任何獲取隊列的狀態(empty(),qsize()等),或者修改隊列的內容的操作(get,put等)都必須持有互斥鎖mutex。

        (二)簡單例子

        實現一個線程不斷生成一個隨機數到一個隊列中

        實現一個線程從上面的隊列里面不斷的取出奇數

        實現另外一個線程從上面的隊列里面不斷取出偶數

        import random,threading,time
        from Queue import Queue
        is_product = True
        class Producer(threading.Thread):
         """生產數據"""
         def __init__(self, t_name, queue):
         threading.Thread.__init__(self,name=t_name)
         self.data=queue
         def run(self):
         while 1:
         
         if self.data.full():
         global is_product
         is_product = False
         else:
         if self.data.qsize() <= 7:#隊列長度小于等于7時添加元素
         is_product = True
         for i in range(2): #每次向隊列里添加兩個元素
         
         randomnum=random.randint(1,99)
         print "%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), randomnum)
         self.data.put(randomnum,False) #將數據依次存入隊列
         time.sleep(1)
         print "deque length is %s"%self.data.qsize()
         else:
         if is_product:
         for i in range(2): #
         
         randomnum = random.randint(1, 99)
         print "%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), randomnum)
         self.data.put(randomnum,False) # 將數據依次存入隊列
         time.sleep(1)
         print "deque length is %s" % self.data.qsize()
         else:
         pass
         
         print "%s: %s finished!" %(time.ctime(), self.getName())
         
        #Consumer thread
        class Consumer_even(threading.Thread):
         def __init__(self,t_name,queue):
         threading.Thread.__init__(self,name=t_name)
         self.data=queue
         def run(self):
         while 1:
         if self.data.qsize() > 7:#隊列長度大于7時開始取元素
         val_even = self.data.get(False)
         if val_even%2==0:
         print "%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(),self.getName(),val_even)
         time.sleep(2)
         else:
         self.data.put(val_even)
         time.sleep(2)
         print "deque length is %s" % self.data.qsize()
         else:
         pass
         
         
        class Consumer_odd(threading.Thread):
         def __init__(self,t_name,queue):
         threading.Thread.__init__(self, name=t_name)
         self.data=queue
         def run(self):
         while 1:
         if self.data.qsize() > 7:
         val_odd = self.data.get(False)
         if val_odd%2!=0:
         print "%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(), self.getName(), val_odd)
         time.sleep(2)
         else:
         self.data.put(val_odd)
         time.sleep(2)
         print "deque length is %s" % self.data.qsize()
         else:
         pass
         
        #Main thread
        def main():
         queue = Queue(20)
         producer = Producer('Pro.', queue)
         consumer_even = Consumer_even('Con_even.', queue)
         consumer_odd = Consumer_odd('Con_odd.',queue)
         producer.start()
         consumer_even.start()
         consumer_odd.start()
         producer.join()
         consumer_even.join()
         consumer_odd.join()
         
        if __name__ == '__main__':
         main()

        聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。TEL:177 7030 7066 E-MAIL:11247931@qq.com

        文檔

        Queue模塊及源碼分析

        Queue模塊及源碼分析:Queue模塊是提供隊列操作的模塊,隊列是線程間最常用的交換數據的形式。該模塊提供了三種隊列:Queue.Queue(maxsize):先進先出,maxsize是隊列的大小,其值為非正數時為無線循環隊列Queue.LifoQueue(maxsize):后進先出,相當于棧Queue.Prior
        推薦度:
        標簽: 代碼 模塊 源代碼
        • 熱門焦點

        最新推薦

        猜你喜歡

        熱門推薦

        專題
        Top
        主站蜘蛛池模板: 国产成人精品曰本亚洲79ren| 国内外成人免费视频| 亚洲精品无码Av人在线观看国产| 国产成人综合久久精品亚洲| 全免费a级毛片免费**视频| 精品国产福利尤物免费| 一本色道久久88亚洲综合| 国产成人久久精品亚洲小说| 吃奶摸下高潮60分钟免费视频| 风间由美在线亚洲一区| 日本免费人成黄页在线观看视频 | 国产亚洲婷婷香蕉久久精品 | 麻豆91免费视频| 亚洲精品无码专区2| 一进一出60分钟免费视频| 国产亚洲精品成人AA片新蒲金| 国产免费爽爽视频在线观看| 亚洲国产高清视频| 免费阿v网站在线观看g| 亚洲国产成人久久一区二区三区| 国产免费一区二区三区VR| 久久久久久噜噜精品免费直播| 亚洲春色在线视频| 在线看免费观看AV深夜影院| 亚洲精品av无码喷奶水糖心| 久久久久久A亚洲欧洲AV冫| 99热在线免费播放| 亚洲精品精华液一区二区| 免费一级成人毛片| 男女午夜24式免费视频| 亚洲人成电影青青在线播放| 国产成人精品男人免费| 一区二区三区无码视频免费福利| 亚洲国产精品网站久久| 国产片免费在线观看| 四虎国产精品免费永久在线| 亚洲国产精品一区二区三区在线观看 | 色屁屁www影院免费观看视频| 在线精品亚洲一区二区小说| 亚洲一级免费毛片| 全部在线播放免费毛片|