Python 操作 Rabbit MQ 發(fā)布/訂閱 (五)
一、發(fā)布、訂閱:
我們將一個(gè)消息分發(fā)給
多個(gè)消費(fèi)者
,這種模式被稱為
發(fā)布/訂閱
。
為了更好的理解這個(gè)模式,我們將構(gòu)建一個(gè)日志系統(tǒng),它包括兩個(gè)程序:
- 第一個(gè)程序,負(fù)責(zé)發(fā)送日志消息;
- 第二個(gè)程序,負(fù)責(zé)獲取消息并輸出內(nèi)容;
在日志系統(tǒng)中,所有正在運(yùn)行的接收方程序都會(huì)接收消息;
- 一個(gè)接受者,把日志寫入硬盤中;
- 另一個(gè)接受者,把日志輸出到屏幕上;
最終,日志消息被廣播給所有的接受者。
二、交換機(jī)(Exchanges):
概念
:應(yīng)用程序發(fā)送消息時(shí),先把消息給交換機(jī),由交換機(jī)投遞給隊(duì)列,而不是直接給隊(duì)列。交換機(jī)可以由多個(gè)
消息通道(Channel)
,用于投遞消息。
簡(jiǎn)單概括下之前的知識(shí) :
- 發(fā)布者(Producer):是發(fā)布消息的應(yīng)用程序。
- 隊(duì)列(Queue):用于消息存儲(chǔ)的緩沖。
- 消費(fèi)者(Consumer):是接收消息的應(yīng)用程序。
- P:代表是發(fā)布者;
- X:是交換機(jī);
詳解圖意 :發(fā)布者(P )→交換機(jī)(X)→隊(duì)列(Q)→消費(fèi)者(C );
-
交換機(jī)一邊從發(fā)布者方接收消息,一邊把消息推送到隊(duì)列(Q)。
交換機(jī)必須知道如何處理它接收到的消息,是推送到指定的隊(duì)列、還是多個(gè)隊(duì)列,或者是忽略消息。這些都是通過(guò)交換機(jī)類型(Exchange Type)來(lái)定義的。
交換機(jī)類型 :
1.直連交換機(jī)(Direct);
2.主題交換機(jī)(Topic);
3.頭交換機(jī)(Headers);
4.扇形交換機(jī)(Fanout);
-
主要說(shuō)明—扇形交換,它把消息發(fā)送給它所知道的所有隊(duì)列。
channel . exchange_declare ( exchange = 'fanout_logs' , exchange_type = 'fanout' )
參數(shù)講解 :
-
exchange:就是交換機(jī)的名稱,
空字符串代表默認(rèn)或者匿名交換機(jī);channel . basic_publish ( exchange = '' ) -
exchange_type:就是交換機(jī)的類型;
-
routing_key:分發(fā)到指定的隊(duì)列;
-
body:發(fā)送的內(nèi)容;
-
properties:使消息持久化;
查看交換器列表 :
命令:
rabbitmqctl list_exchanges
Listing exchanges
...
amq
.
rabbitmq
.
log topic
amq
.
direct direct
amq
.
topic topic
amq
.
headers headers
direct
amq
.
fanout fanout
amq
.
rabbitmq
.
trace topic
amq
.
match headers
列表中以amq.*的開頭的交換器,都是默認(rèn)創(chuàng)建的,目前不需要管它們。
三、臨時(shí)隊(duì)列:
我們連接上Rabbit MQ的時(shí)候,需要一個(gè)
全新的、空的隊(duì)列
(也就是說(shuō)不使用之前提到的,routing_key參數(shù)指定的隊(duì)列名),我們可以
手動(dòng)創(chuàng)建一個(gè)隨機(jī)的隊(duì)列名
,或者讓
服務(wù)器為我們選擇一個(gè)隨機(jī)的隊(duì)列名(推薦)
。我們僅需要在
調(diào)用queue_declare方法時(shí),不提供queue參數(shù)
即可:
# 在管道里
,
不聲明隊(duì)列名稱
result
=
channel
.
queue_declare
(
)
可通過(guò)
result.method.queue
獲取已經(jīng)生成的隨機(jī)隊(duì)列名,大概的樣子如下所示:
amq
.
gen
-
DIAODS2sDSAKJKS
==
與消費(fèi)者斷開連接時(shí),這個(gè)隊(duì)列應(yīng)被立即刪除:
# 需要一個(gè)空的隊(duì)列 exclusive
=
True 表示與消費(fèi)者斷開時(shí)
,
隊(duì)列立即刪除
result
=
channel
.
queue_declare
(
exclusive
=
True
)
四、綁定:
目前已經(jīng)創(chuàng)建一個(gè)扇形交換機(jī)和一個(gè)隊(duì)列。現(xiàn)在需要告訴交換機(jī)如果發(fā)送消息給隊(duì)列。
交換機(jī)和隊(duì)列之間的聯(lián)系我們稱為綁定(binding)
# 將fanount_logs交換機(jī)將會(huì)把消息添加到我們的隊(duì)列中
,
隊(duì)列名服務(wù)器隨機(jī)生成
channel
.
queue_bind
(
exchange
=
'fanout_logs'
,
queue
=
result
.
method
.
queue
)
查看綁定列表 :
列出所有現(xiàn)存的綁定命令:
rabbitmqctl list_bindings
五、整理本節(jié)最終代碼:
圖解最終流程 :
發(fā)布日志與之前的區(qū)別 :
1.我們把消息發(fā)送給fanout_logs交換機(jī)而不是匿名的交換機(jī);
2.發(fā)送的時(shí)候需要提供routing_key參數(shù),但它的值會(huì)被扇形交換機(jī)忽略;
以下是
send.py
:
#
!
/
usr
/
bin
/
python
#
-
*
-
coding
:
utf
-
8
-
*
-
import
pika
import
sys
message
=
' '
.
join
(
sys
.
argv
[
1
:
]
)
or
"Hello World!"
# 創(chuàng)建一個(gè)實(shí)例 本地訪問(wèn)
IP
地址可以為 localhost
后面5672是端口地址
(
可以不用指
# 定
,
因?yàn)槟J(rèn)就是
5672
)
connection
=
pika
.
BlockingConnection
(
pika
.
ConnectionParameters
(
'localhost'
,
5672
)
)
# 聲明一個(gè)管道
,
在管道里發(fā)送消息
channel
=
connection
.
channel
(
)
# 指定交換機(jī)的類型為fanout
,
執(zhí)行交換機(jī)名
:
fanout_logs
channel
.
exchange_declare
(
exchange
=
'fanout_logs'
,
exchange_type
=
'fanout'
)
# 投遞消息 exchange
=
'fancout_logs'
交換機(jī)的名命
;
type
=
'fanout'
:
扇形交換機(jī)
channel
.
basic_publish
(
exchange
=
'fanout_logs'
,
routing_key
=
''
,
body
=
message
)
print
"[x] sent {}"
.
format
(
message
,
)
# 隊(duì)列關(guān)閉
connection
.
close
(
)
若沒(méi)有綁定隊(duì)列的交換器,消息將會(huì)丟失。以下是
receive.py
:
#
!
/
usr
/
bin
/
python
#
-
*
-
coding
:
utf
-
8
-
*
-
import
pika
# 創(chuàng)建實(shí)例
connection
=
pika
.
BlockingConnection
(
pika
.
ConnectionParameters
(
'localhost'
)
)
# 聲明管道
channel
=
connection
.
channel
(
)
# 指定交換機(jī)名為 fanout_logs 類型為扇形
channel
.
exchange_declare
(
exchange
=
'fanout_logs'
,
exchange_type
=
'fanout'
)
# 表示與消費(fèi)者斷開連接
,
隊(duì)列立即刪除
result
=
channel
.
queue_declare
(
queue
=
''
,
exclusive
=
True
)
# 生成隊(duì)列的名字
queue_name
=
result
.
method
.
queue
# 綁定交換機(jī)和隊(duì)列
channel
.
queue_bind
(
exchange
=
'fanout_logs'
,
queue
=
queue_name
)
def
callback
(
ch
,
method
,
properties
,
body
)
:
print
'[X] Received{}'
.
format
(
body
,
)
# 消費(fèi)消息
channel
.
basic_consume
(
queue
=
queue_name
,
# 從指定的消息隊(duì)列中接收消息
on_message_callback
=
callback
,
# 如果收到消息
,
就調(diào)用callback函數(shù)來(lái)處理
)
print
(
'=======正在等待消息========'
)
channel
.
start_consuming
(
)
# 開始消費(fèi)消息
3.如果想把日志保存到文件中,打開控制臺(tái)輸入:
python receive
.
py
>
logs_from_rabbit
.
log
4.在屏幕中查看日志,在打開一個(gè)新的終端運(yùn)行:
python receive
.
py
===
===
=
正在等待消息
===
===
==
5.發(fā)送消息:
python send
.
py 發(fā)送第一條消息
6.可以看到消費(fèi)者接收到了消息,并且日志中也記錄了這條消息。
cat logs_from_rabbit
.
log
===
===
=
正在等待消息
===
===
==
[
X
]
Received發(fā)送第一條消息
7.確認(rèn)已經(jīng)創(chuàng)建的隊(duì)列綁定:
rabbitmqctl list_bindings
Listing bindings
...
exchange amq
.
gen
-
Di2rIkS1kQWcMODPxF5KuA queue amq
.
gen
-
Di2rIkS1kQWcMODPxF5KuA
[
]
exchange hello queue hello
[
]
exchange task_queue queue task_queue
[
]
fanout_logs exchange amq
.
gen
-
Di2rIkS1kQWcMODPxF5KuA queue amq
.
gen
-
Di2rIkS1kQWcMODPxF5KuA
[
]
交換器fanout_logs把數(shù)據(jù)發(fā)送給兩個(gè)系統(tǒng)名命的隊(duì)列
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061
微信掃一掃加我為好友
QQ號(hào)聯(lián)系: 360901061
您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對(duì)您有幫助就好】元

