黄色网页视频 I 影音先锋日日狠狠久久 I 秋霞午夜毛片 I 秋霞一二三区 I 国产成人片无码视频 I 国产 精品 自在自线 I av免费观看网站 I 日本精品久久久久中文字幕5 I 91看视频 I 看全色黄大色黄女片18 I 精品不卡一区 I 亚洲最新精品 I 欧美 激情 在线 I 人妻少妇精品久久 I 国产99视频精品免费专区 I 欧美影院 I 欧美精品在欧美一区二区少妇 I av大片网站 I 国产精品黄色片 I 888久久 I 狠狠干最新 I 看看黄色一级片 I 黄色精品久久 I 三级av在线 I 69色综合 I 国产日韩欧美91 I 亚洲精品偷拍 I 激情小说亚洲图片 I 久久国产视频精品 I 国产综合精品一区二区三区 I 色婷婷国产 I 最新成人av在线 I 国产私拍精品 I 日韩成人影音 I 日日夜夜天天综合

Python 操作 Rabbit MQ 發(fā)布/訂閱 (五)

系統(tǒng) 2370 0

Python 操作 Rabbit MQ 發(fā)布/訂閱 (五)

一、發(fā)布、訂閱:

我們將一個(gè)消息分發(fā)給 多個(gè)消費(fèi)者 ,這種模式被稱為 發(fā)布/訂閱

為了更好的理解這個(gè)模式,我們將構(gòu)建一個(gè)日志系統(tǒng),它包括兩個(gè)程序:

  • 第一個(gè)程序,負(fù)責(zé)發(fā)送日志消息;
  • 第二個(gè)程序,負(fù)責(zé)獲取消息并輸出內(nèi)容;

在日志系統(tǒng)中,所有正在運(yùn)行的接收方程序都會(huì)接收消息;

  • 一個(gè)接受者,把日志寫入硬盤中;
  • 另一個(gè)接受者,把日志輸出到屏幕上;

最終,日志消息被廣播給所有的接受者。

二、交換機(jī)(Exchanges):

概念 :應(yīng)用程序發(fā)送消息時(shí),先把消息給交換機(jī),由交換機(jī)投遞給隊(duì)列,而不是直接給隊(duì)列。交換機(jī)可以由多個(gè) 消息通道(Channel) ,用于投遞消息。

簡(jiǎn)單概括下之前的知識(shí)

  • 發(fā)布者(Producer):是發(fā)布消息的應(yīng)用程序。
  • 隊(duì)列(Queue):用于消息存儲(chǔ)的緩沖。
  • 消費(fèi)者(Consumer):是接收消息的應(yīng)用程序。

圖解大體流程
Python 操作 Rabbit MQ 發(fā)布/訂閱 (五)_第1張圖片

  • P:代表是發(fā)布者;
  • X:是交換機(jī);

詳解圖意 :發(fā)布者(P )→交換機(jī)(X)→隊(duì)列(Q)→消費(fèi)者(C );

  • 交換機(jī)一邊從發(fā)布者方接收消息,一邊把消息推送到隊(duì)列(Q)。 交換機(jī)必須知道如何處理它接收到的消息,是推送到指定的隊(duì)列、還是多個(gè)隊(duì)列,或者是忽略消息 。這些都是通過(guò) 交換機(jī)類型(Exchange Type) 來(lái)定義的。

交換機(jī)類型

1.直連交換機(jī)(Direct);

2.主題交換機(jī)(Topic);

3.頭交換機(jī)(Headers);

4.扇形交換機(jī)(Fanout);

  • 主要說(shuō)明—扇形交換,它把消息發(fā)送給它所知道的所有隊(duì)列。

                    
                      channel
                      
                        .
                      
                      
                        exchange_declare
                      
                      
                        (
                      
                      exchange
                      
                        =
                      
                      
                        'fanout_logs'
                      
                      
                        ,
                      
                      
                             exchange_type
                      
                        =
                      
                      
                        'fanout'
                      
                      
                        )
                      
                    
                  

參數(shù)講解

  • exchange:就是交換機(jī)的名稱, 空字符串代表默認(rèn)或者匿名交換機(jī);

                    
                      channel
                      
                        .
                      
                      
                        basic_publish
                      
                      
                        (
                      
                      exchange
                      
                        =
                      
                      
                        ''
                      
                      
                        )
                      
                    
                  
  • exchange_type:就是交換機(jī)的類型;

  • routing_key:分發(fā)到指定的隊(duì)列;

  • body:發(fā)送的內(nèi)容;

  • properties:使消息持久化;

查看交換器列表

命令: rabbitmqctl list_exchanges

            
              Listing exchanges 
              
                ...
              
              
amq
              
                .
              
              rabbitmq
              
                .
              
              log	topic
amq
              
                .
              
              direct	direct
amq
              
                .
              
              topic	topic
amq
              
                .
              
              headers	headers
	direct
amq
              
                .
              
              fanout	fanout
amq
              
                .
              
              rabbitmq
              
                .
              
              trace	topic
amq
              
                .
              
              match	headers

            
          

列表中以amq.*的開頭的交換器,都是默認(rèn)創(chuàng)建的,目前不需要管它們。

三、臨時(shí)隊(duì)列:

我們連接上Rabbit MQ的時(shí)候,需要一個(gè) 全新的、空的隊(duì)列 (也就是說(shuō)不使用之前提到的,routing_key參數(shù)指定的隊(duì)列名),我們可以 手動(dòng)創(chuàng)建一個(gè)隨機(jī)的隊(duì)列名 ,或者讓 服務(wù)器為我們選擇一個(gè)隨機(jī)的隊(duì)列名(推薦) 。我們僅需要在 調(diào)用queue_declare方法時(shí),不提供queue參數(shù) 即可:

            
              # 在管道里
              
                ,
              
               不聲明隊(duì)列名稱
result 
              
                =
              
               channel
              
                .
              
              
                queue_declare
              
              
                (
              
              
                )
              
            
          

可通過(guò) result.method.queue 獲取已經(jīng)生成的隨機(jī)隊(duì)列名,大概的樣子如下所示:

            
              amq
              
                .
              
              gen
              
                -
              
              DIAODS2sDSAKJKS
              
                ==
              
            
          

與消費(fèi)者斷開連接時(shí),這個(gè)隊(duì)列應(yīng)被立即刪除:

            
              # 需要一個(gè)空的隊(duì)列  exclusive
              
                =
              
              True 表示與消費(fèi)者斷開時(shí)
              
                ,
              
               隊(duì)列立即刪除
result 
              
                =
              
               channel
              
                .
              
              
                queue_declare
              
              
                (
              
              exclusive
              
                =
              
              True
              
                )
              
            
          

四、綁定:

img

目前已經(jīng)創(chuàng)建一個(gè)扇形交換機(jī)和一個(gè)隊(duì)列。現(xiàn)在需要告訴交換機(jī)如果發(fā)送消息給隊(duì)列。

交換機(jī)和隊(duì)列之間的聯(lián)系我們稱為綁定(binding)

            
              # 將fanount_logs交換機(jī)將會(huì)把消息添加到我們的隊(duì)列中
              
                ,
              
               隊(duì)列名服務(wù)器隨機(jī)生成
channel
              
                .
              
              
                queue_bind
              
              
                (
              
              exchange
              
                =
              
              
                'fanout_logs'
              
              
                ,
              
               queue
              
                =
              
              result
              
                .
              
              method
              
                .
              
              queue
              
                )
              
            
          

查看綁定列表

列出所有現(xiàn)存的綁定命令: rabbitmqctl list_bindings

五、整理本節(jié)最終代碼:

圖解最終流程

Python 操作 Rabbit MQ 發(fā)布/訂閱 (五)_第2張圖片

發(fā)布日志與之前的區(qū)別

1.我們把消息發(fā)送給fanout_logs交換機(jī)而不是匿名的交換機(jī);

2.發(fā)送的時(shí)候需要提供routing_key參數(shù),但它的值會(huì)被扇形交換機(jī)忽略;

以下是 send.py

            
              #
              
                !
              
              
                /
              
              usr
              
                /
              
              bin
              
                /
              
              python
# 
              
                -
              
              
                *
              
              
                -
              
               coding
              
                :
              
               utf
              
                -
              
              
                8
              
              
                -
              
              
                *
              
              
                -
              
              
                import
              
               pika

              
                import
              
               sys

message 
              
                =
              
              
                ' '
              
              
                .
              
              
                join
              
              
                (
              
              sys
              
                .
              
              argv
              
                [
              
              
                1
              
              
                :
              
              
                ]
              
              
                )
              
               or 
              
                "Hello World!"
              
              

# 創(chuàng)建一個(gè)實(shí)例  本地訪問(wèn)
              
                IP
              
              地址可以為 localhost 
              
                后面5672是端口地址
              
              
                (
              
              可以不用指
# 定
              
                ,
              
               因?yàn)槟J(rèn)就是
              
                5672
              
              
                )
              
              
connection 
              
                =
              
               pika
              
                .
              
              
                BlockingConnection
              
              
                (
              
              pika
              
                .
              
              
                ConnectionParameters
              
              
                (
              
              
                'localhost'
              
              
                ,
              
              
                5672
              
              
                )
              
              
                )
              
              

# 聲明一個(gè)管道
              
                ,
              
               在管道里發(fā)送消息
channel 
              
                =
              
               connection
              
                .
              
              
                channel
              
              
                (
              
              
                )
              
              

# 指定交換機(jī)的類型為fanout
              
                ,
              
               執(zhí)行交換機(jī)名
              
                :
              
              fanout_logs
channel
              
                .
              
              
                exchange_declare
              
              
                (
              
              exchange
              
                =
              
              
                'fanout_logs'
              
              
                ,
              
               exchange_type
              
                =
              
              
                'fanout'
              
              
                )
              
              

# 投遞消息 exchange
              
                =
              
              
                'fancout_logs'
              
              交換機(jī)的名命
              
                ;
              
               type
              
                =
              
              
                'fanout'
              
              
                :
              
              扇形交換機(jī)
channel
              
                .
              
              
                basic_publish
              
              
                (
              
              exchange
              
                =
              
              
                'fanout_logs'
              
              
                ,
              
              
                      routing_key
              
                =
              
              
                ''
              
              
                ,
              
              
                      body
              
                =
              
              message
                      
              
                )
              
              

print 
              
                "[x] sent {}"
              
              
                .
              
              
                format
              
              
                (
              
              message
              
                ,
              
              
                )
              
              
# 隊(duì)列關(guān)閉
connection
              
                .
              
              
                close
              
              
                (
              
              
                )
              
            
          

若沒(méi)有綁定隊(duì)列的交換器,消息將會(huì)丟失。以下是 receive.py

            
              #
              
                !
              
              
                /
              
              usr
              
                /
              
              bin
              
                /
              
              python
# 
              
                -
              
              
                *
              
              
                -
              
               coding
              
                :
              
               utf
              
                -
              
              
                8
              
              
                -
              
              
                *
              
              
                -
              
              
                import
              
               pika

# 創(chuàng)建實(shí)例
connection 
              
                =
              
               pika
              
                .
              
              
                BlockingConnection
              
              
                (
              
              pika
              
                .
              
              
                ConnectionParameters
              
              
                (
              
              
                'localhost'
              
              
                )
              
              
                )
              
              

# 聲明管道
channel 
              
                =
              
               connection
              
                .
              
              
                channel
              
              
                (
              
              
                )
              
              

# 指定交換機(jī)名為 fanout_logs 類型為扇形
channel
              
                .
              
              
                exchange_declare
              
              
                (
              
              exchange
              
                =
              
              
                'fanout_logs'
              
              
                ,
              
               exchange_type
              
                =
              
              
                'fanout'
              
              
                )
              
              

# 表示與消費(fèi)者斷開連接
              
                ,
              
               隊(duì)列立即刪除
result 
              
                =
              
               channel
              
                .
              
              
                queue_declare
              
              
                (
              
              queue
              
                =
              
              
                ''
              
              
                ,
              
               exclusive
              
                =
              
              True
              
                )
              
              

# 生成隊(duì)列的名字
queue_name 
              
                =
              
               result
              
                .
              
              method
              
                .
              
              queue

# 綁定交換機(jī)和隊(duì)列
channel
              
                .
              
              
                queue_bind
              
              
                (
              
              exchange
              
                =
              
              
                'fanout_logs'
              
              
                ,
              
               queue
              
                =
              
              queue_name
              
                )
              
              


def 
              
                callback
              
              
                (
              
              ch
              
                ,
              
               method
              
                ,
              
               properties
              
                ,
              
               body
              
                )
              
              
                :
              
              
    print 
              
                '[X] Received{}'
              
              
                .
              
              
                format
              
              
                (
              
              body
              
                ,
              
              
                )
              
              


# 消費(fèi)消息
channel
              
                .
              
              
                basic_consume
              
              
                (
              
              queue
              
                =
              
              queue_name
              
                ,
              
                # 從指定的消息隊(duì)列中接收消息
                      on_message_callback
              
                =
              
              callback
              
                ,
              
                # 如果收到消息
              
                ,
              
               就調(diào)用callback函數(shù)來(lái)處理
                      
              
                )
              
              
                print
              
              
                (
              
              
                '=======正在等待消息========'
              
              
                )
              
              
channel
              
                .
              
              
                start_consuming
              
              
                (
              
              
                )
              
                # 開始消費(fèi)消息

            
          

3.如果想把日志保存到文件中,打開控制臺(tái)輸入:

            
              python receive
              
                .
              
              py 
              
                >
              
               logs_from_rabbit
              
                .
              
              log 

            
          

4.在屏幕中查看日志,在打開一個(gè)新的終端運(yùn)行:

            
              python receive
              
                .
              
              py 

              
                ===
              
              
                ===
              
              
                =
              
              正在等待消息
              
                ===
              
              
                ===
              
              
                ==
              
            
          

5.發(fā)送消息:

            
              python send
              
                .
              
              py 發(fā)送第一條消息

            
          

6.可以看到消費(fèi)者接收到了消息,并且日志中也記錄了這條消息。

            
              cat logs_from_rabbit
              
                .
              
              log 

              
                ===
              
              
                ===
              
              
                =
              
              正在等待消息
              
                ===
              
              
                ===
              
              
                ==
              
              
                [
              
              
                X
              
              
                ]
              
               Received發(fā)送第一條消息

            
          

7.確認(rèn)已經(jīng)創(chuàng)建的隊(duì)列綁定:

            
              rabbitmqctl list_bindings
Listing bindings 
              
                ...
              
              
	exchange	amq
              
                .
              
              gen
              
                -
              
              Di2rIkS1kQWcMODPxF5KuA	queue	amq
              
                .
              
              gen
              
                -
              
              Di2rIkS1kQWcMODPxF5KuA	
              
                [
              
              
                ]
              
              
	exchange	hello	queue	hello	
              
                [
              
              
                ]
              
              
	exchange	task_queue	queue	task_queue	
              
                [
              
              
                ]
              
              
fanout_logs	exchange	amq
              
                .
              
              gen
              
                -
              
              Di2rIkS1kQWcMODPxF5KuA	queue	amq
              
                .
              
              gen
              
                -
              
              Di2rIkS1kQWcMODPxF5KuA	
              
                [
              
              
                ]
              
            
          

交換器fanout_logs把數(shù)據(jù)發(fā)送給兩個(gè)系統(tǒng)名命的隊(duì)列


更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號(hào)聯(lián)系: 360901061

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對(duì)您有幫助就好】

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長(zhǎng)會(huì)非常 感謝您的哦!!!

發(fā)表我的評(píng)論
最新評(píng)論 總共0條評(píng)論