<span id="mktg5"></span>

<i id="mktg5"><meter id="mktg5"></meter></i>

        <label id="mktg5"><meter id="mktg5"></meter></label>
        最新文章專題視頻專題問答1問答10問答100問答1000問答2000關鍵字專題1關鍵字專題50關鍵字專題500關鍵字專題1500TAG最新視頻文章推薦1 推薦3 推薦5 推薦7 推薦9 推薦11 推薦13 推薦15 推薦17 推薦19 推薦21 推薦23 推薦25 推薦27 推薦29 推薦31 推薦33 推薦35 推薦37視頻文章20視頻文章30視頻文章40視頻文章50視頻文章60 視頻文章70視頻文章80視頻文章90視頻文章100視頻文章120視頻文章140 視頻2關鍵字專題關鍵字專題tag2tag3文章專題文章專題2文章索引1文章索引2文章索引3文章索引4文章索引5123456789101112131415文章專題3
        問答文章1 問答文章501 問答文章1001 問答文章1501 問答文章2001 問答文章2501 問答文章3001 問答文章3501 問答文章4001 問答文章4501 問答文章5001 問答文章5501 問答文章6001 問答文章6501 問答文章7001 問答文章7501 問答文章8001 問答文章8501 問答文章9001 問答文章9501
        當前位置: 首頁 - 科技 - 知識百科 - 正文

        RabbitMQ快速入門python教程

        來源:懂視網 責編:小采 時間:2020-11-27 14:25:50
        文檔

        RabbitMQ快速入門python教程

        RabbitMQ快速入門python教程:HelloWorld簡介RabbitMQ:接受消息再傳遞消息,可以視為一個郵局。發送者和接受者通過隊列來進行交互,隊列的大小可以視為無限的,多個發送者可以發生給一個隊列,多個接收者也可以從一個隊列中接受消息。coderabbitmq使用的協議是amqp,用于pyth
        推薦度:
        導讀RabbitMQ快速入門python教程:HelloWorld簡介RabbitMQ:接受消息再傳遞消息,可以視為一個郵局。發送者和接受者通過隊列來進行交互,隊列的大小可以視為無限的,多個發送者可以發生給一個隊列,多個接收者也可以從一個隊列中接受消息。coderabbitmq使用的協議是amqp,用于pyth

        HelloWorld

        簡介

        RabbitMQ:接受消息再傳遞消息,可以視為一個“郵局”。發送者和接受者通過隊列來進行交互,隊列的大小可以視為無限的,多個發送者可以發生給一個隊列,多個接收者也可以從一個隊列中接受消息。

        code

        rabbitmq使用的協議是amqp,用于python的推薦客戶端是pika

        pip install pika -i https://pypi.douban.com/simple/

        send.py

        # coding: utf8
        import pika
        
        # 建立一個連接
        connection = pika.BlockingConnection(pika.ConnectionParameters(
         'localhost')) # 連接本地的RabbitMQ服務器
        channel = connection.channel() # 獲得channel

        這里鏈接的是本機的,如果想要連接其他機器上的服務器,只要填入地址或主機名即可。

        接下來我們開始發送消息了,注意要確保接受消息的隊列是存在的,否則rabbitmq就丟棄掉該消息

        channel.queue_declare(queue='hello') # 在RabbitMQ中創建hello這個隊列
        channel.basic_publish(exchange='', # 使用默認的exchange來發送消息到隊列
         routing_key='hello', # 發送到該隊列 hello 中
         body='Hello World!') # 消息內容
        
        connection.close() # 關閉 同時flush

        RabbitMQ默認需要1GB的空閑磁盤空間,否則發送會失敗。

        這時已在本地隊列hello中存放了一個消息,如果使用 rabbitmqctl list_queues 可看到

        hello 1

        說明有一個hello隊列 里面存放了一個消息

        receive.py

        # coding: utf8
        import pika
        connection = pika.BlockingConnection(pika.ConnectionParameters(
         'localhost'))
        channel = connection.channel()

        還是先鏈接到服務器,和之前發送時相同

        channel.queue_declare(queue='hello') # 此處就是聲明了 來確保該隊列 hello 存在 可以多次聲明 這里主要是為了防止接受程序先運行時出錯
        
        def callback(ch, method, properties, body): # 用于接收到消息后的回調
         print(" [x] Received %r" % body)
        
        channel.basic_consume(callback,
         queue='hello', # 收指定隊列hello的消息
         no_ack=True) #在處理完消息后不發送ack給服務器
        channel.start_consuming() # 啟動消息接受 這會進入一個死循環

        工作隊列(任務隊列)

        工作隊列是用于分發耗時任務給多個工作進程的。不立即做那些耗費資源的任務(需要等待這些任務完成),而是安排這些任務之后執行。例如我們把task作為message發送到隊列里,啟動工作進程來接受并最終執行,且可啟動多個工作進程來工作。這適用于web應用,即不應在一個http請求的處理窗口內完成復雜任務。

        channel.basic_publish(exchange='',
         routing_key='task_queue',
         body=message,
         properties=pika.BasicProperties(
         delivery_mode = 2, # 使得消息持久化
         ))

        分配消息的方式為 輪詢 即每個工作進程獲得相同的消息數。

        消息ack

        如果消息分配給某個工作進程,但是該工作進程未處理完成就崩潰了,可能該消息就丟失了,因為rabbitmq一旦把一個消息分發給工作進程,它就把該消息刪掉了。

        為了預防消息丟失,rabbitmq提供了ack,即工作進程在收到消息并處理后,發送ack給rabbitmq,告知rabbitmq這時候可以把該消息從隊列中刪除了。如果工作進程掛掉 了,rabbitmq沒有收到ack,那么會把該消息 重新分發給其他工作進程。不需要設置timeout,即使該任務需要很長時間也可以處理。

        ack默認是開啟的,之前我們的工作進程顯示指定了no_ack=True

        channel.basic_consume(callback, queue='hello') # 會啟用ack

        帶ack的callback:

        def callback(ch, method, properties, body):
         print " [x] Received %r" % (body,)
         time.sleep( body.count('.') )
         print " [x] Done"
         ch.basic_ack(delivery_tag = method.delivery_tag) # 發送ack

        消息持久化

        但是,有時RabbitMQ重啟了,消息也會丟失。可在創建隊列時設置持久化:
        (隊列的性質一旦確定無法改變)

        channel.queue_declare(queue='task_queue', durable=True)

        同時在發送消息時也得設置該消息的持久化屬性:

        channel.basic_publish(exchange='',

         routing_key="task_queue",
         body=message,
         properties=pika.BasicProperties(
         delivery_mode = 2, # make message persistent
         ))

        但是,如果在RabbitMQ剛接收到消息還沒來得及存儲,消息還是會丟失。同時,RabbitMQ也不是在接受到每個消息都進行存盤操作。如果還需要更完善的保證,需要使用publisher confirm。

        公平的消息分發

        輪詢模式的消息分發可能并不公平,例如奇數的消息都是繁重任務的話,某些進程則會一直運行繁 重任務。即使某工作進程上有積壓的消息未處理,如很多都沒發ack,但是RabbitMQ還是會按照順序發消息給它。可以在接受進程中加設置:

        channel.basic_qos(prefetch_count=1)

        告知RabbitMQ,這樣在一個工作進程沒回發ack情況下是不會再分配消息給它。

        群發

        一般情況下,一條消息是發送給一個工作進程,然后完成,有時想把一條消息同時發送給多個進程:

        exchange

        發送者是不是直接發送消息到隊列中的,事實上發生者根本不知道消息會發送到那個隊列,發送者只能把消息發送到exchange里。exchange一方面收生產者的消息,另一方面把他們推送到隊列中。所以作為exchange,它需要知道當收到消息時它需要做什么,是應該把它加到一個特殊的隊列中還是放到很多的隊列中,或者丟棄。exchange有direct、topic、headers、fanout等種類,而群發使用的即fanout。之前在發布消息時,exchange的值為 '' 即使用default exchange。

        channel.exchange_declare(exchange='logs', type='fanout') # 該exchange會把消息發送給所有它知道的隊列中

        臨時隊列

        result = channel.queue_declare() # 創建一個隨機隊列
        result = channel.queue_declare(exclusive=True) # 創建一個隨機隊列,同時在沒有接收者連接該隊列后則銷毀它
        queue_name = result.method.queue

        這樣result.method.queue即是隊列名稱,在發送或接受時即可使用。

        綁定exchange 和 隊列

        channel.queue_bind(exchange='logs',
         queue='hello')

        logs在發送消息時給hello也發一份。

        在發送消息是使用剛剛創建的 logs exchange

         channel.basic_publish(exchange='logs',
         routing_key='',
         body=message)

        路由

        之前已經使用過bind,即建立exchange和queue的關系(該隊列對來自該exchange的消息有興趣),bind時可另外指定routing_key選項。

        使用direct exchange

        將對應routing key的消息發送到綁定相同routing key的隊列中

        channel.exchange_declare(exchange='direct_logs',
         type='direct')

        發送函數,發布不同severity的消息:

        channel.basic_publish(exchange='direct_logs',
         routing_key=severity,
         body=message)

        接受函數中綁定對應severity的:

        channel.queue_bind(exchange='direct_logs',
         queue=queue_name,
         routing_key=severity)

        使用topic exchange

        之前使用的direct exchange 只能綁定一個routing key,可以使用這種可以拿.隔開routing key的topic exchange,例如:

        "stock.usd.nyse" "nyse.vmw"

        和direct exchange一樣,在接受者那邊綁定的key與發送時指定的routing key相同即可,另外有些特殊的值:

        * 代表1個單詞
        # 代表0個或多個單詞

        如果發送者發出的routing key都是3個部分的,如:celerity.colour.species。

        Q1:
        *.orange.* 對應的是中間的colour都為orange的
        
        Q2:
        *.*.rabbit 對應的是最后部分的species為rabbit的
        lazy.# 對應的是第一部分是lazy的

        qucik.orange.rabbit Q1 Q2都可接收到,quick.orange.fox 只有Q1能接受到,對于lazy.pink.rabbit雖然匹配到了Q2兩次,但是只會發送一次。如果綁定時直接綁定#,則會收到所有的。

         RPC

        在遠程機器上運行一個函數然后獲得結果。

        1、客戶端啟動 同時設置一個臨時隊列用于接受回調,綁定該隊列

         self.connection = pika.BlockingConnection(pika.ConnectionParameters(
         host='localhost'))
         self.channel = self.connection.channel()
         result = self.channel.queue_declare(exclusive=True)
         self.callback_queue = result.method.queue
         self.channel.basic_consume(self.on_response, no_ack=True,
         queue=self.callback_queue)

        2、客戶端發送rpc請求,同時附帶reply_to對應回調隊列,correlation_id設置為每個請求的唯一id(雖然說可以為每一次RPC請求都創建一個回調隊列,但是這樣效率不高,如果一個客戶端只使用一個隊列,則需要使用correlation_id來匹配是哪個請求),之后阻塞在回調隊列直到收到回復

        注意:如果收到了非法的correlation_id直接丟棄即可,因為有這種情況--服務器已經發了響應但是還沒發ack就掛了,等一會服務器重啟了又會重新處理該任務,又發了一遍相應,但是這時那個請求已經被處理掉了

        channel.basic_publish(exchange='',
         routing_key='rpc_queue',
         properties=pika.BasicProperties(
         reply_to = self.callback_queue,
         correlation_id = self.corr_id,
         ),
         body=str(n)) # 發出調用
        
        while self.response is None: # 這邊就相當于阻塞了
         self.connection.process_data_events() # 查看回調隊列
        return int(self.response)

        3、請求會發送到rpc_queue隊列
        4、RPC服務器從rpc_queue中取出,執行,發送回復

        channel.basic_consume(on_request, queue='rpc_queue') # 綁定 等待請求
        
        # 處理之后:
        ch.basic_publish(exchange='',
         routing_key=props.reply_to,
         properties=pika.BasicProperties(correlation_id = 
         props.correlation_id),
         body=str(response)) # 發送回復到回調隊列
        ch.basic_ack(delivery_tag = method.delivery_tag) # 發送ack

        5、客戶端從回調隊列中取出數據,檢查correlation_id,執行相應操作

        if self.corr_id == props.correlation_id:
         self.response = body

        聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。TEL:177 7030 7066 E-MAIL:11247931@qq.com

        文檔

        RabbitMQ快速入門python教程

        RabbitMQ快速入門python教程:HelloWorld簡介RabbitMQ:接受消息再傳遞消息,可以視為一個郵局。發送者和接受者通過隊列來進行交互,隊列的大小可以視為無限的,多個發送者可以發生給一個隊列,多個接收者也可以從一個隊列中接受消息。coderabbitmq使用的協議是amqp,用于pyth
        推薦度:
        • 熱門焦點

        最新推薦

        猜你喜歡

        熱門推薦

        專題
        Top
        主站蜘蛛池模板: 亚洲色婷婷一区二区三区| 国产精品色午夜视频免费看| 中文亚洲成a人片在线观看| 亚洲av无码片vr一区二区三区| 免费看国产精品3a黄的视频| 亚洲精品456人成在线| 成年人性生活免费视频| 亚洲精品国产综合久久久久紧| 在线v片免费观看视频| 狠狠色香婷婷久久亚洲精品| 国产h视频在线观看免费| 性xxxx黑人与亚洲| 无码少妇一区二区浪潮免费| 亚洲色欲色欱wwW在线| 天天摸天天碰成人免费视频| 亚洲精品色在线网站| 亚洲国产成人精品91久久久| 在线观看免费视频一区| 亚洲AV日韩AV永久无码下载| 一级女人18毛片免费| 亚洲第一第二第三第四第五第六| 国产免费小视频在线观看| 色多多A级毛片免费看| 久久久亚洲精品国产| 免费看片在线观看| 日韩精品亚洲专区在线影视 | 成年人在线免费观看| 亚洲а∨精品天堂在线| 亚洲精品高清在线| 久久久久国产精品免费看| 精品久久亚洲中文无码| www国产亚洲精品久久久| 暖暖日本免费中文字幕| 亚洲伦理中文字幕| 亚洲无线一二三四区手机| 91视频免费网址| 国产亚洲男人的天堂在线观看| 亚洲欧洲自拍拍偷午夜色无码| 最近中文字幕2019高清免费| 亚洲国产成人AV网站| 亚洲成在人天堂一区二区|