python Rabbitmq編程(一)
?
?
實現最簡單的隊列通信
?
send端
#
!/usr/bin/env python
import
pika
credentials
= pika.PlainCredentials(
"用戶名
"
,
"密碼
"
)
connection
=
pika.BlockingConnection(pika.ConnectionParameters(
'
localhost
'
,credentials=
credentials))
channel
= connection.channel()
#
建立了rabbit協議的通道
#
聲明queue
channel.queue_declare(queue=
'
hello
'
)
#
n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange=
''
,
routing_key
=
'
hello
'
,
body
=
'
Hello World!
'
)
print
(
"
[x] Sent 'Hello World!'
"
)
connection.close()
?
receive端
#
_*_coding:utf-8_*_
__author__
=
'
Alex Li
'
import
pika
credentials
= pika.PlainCredentials(
"用戶名
"
,
"密碼
"
)
connection
=
pika.BlockingConnection(pika.ConnectionParameters(
'
localhost
'
,credentials=
credentials))
channel
= connection.channel()
#
建立了rabbit協議的通道
#
You may ask why we declare the queue again ? we have already declared it in our previous code.
#
We could avoid that if we were sure that the queue already exists. For example if send.py program
#
was run before. But we're not yet sure which program to run first. In such cases it's a good
#
practice to repeat declaring the queue in both programs.
channel.queue_declare(queue=
'
hello
'
)
def
callback(ch, method, properties, body):
print
(
"
[x] Received %r
"
%
body)
#
callback函數當拿到隊列里的值,則調用
channel.basic_consume(callback,
queue
=
'
hello
'
,
no_ack
=
True)
print
(
'
[*] Waiting for messages. To exit press CTRL+C
'
)
channel.start_consuming()
?
#注意:
遠程連接rabbitmq server的話,需要配置權限。
#1.設置用戶與密碼
#
> rabbitmqctl add_user name pass
#
> rabbitmqctl set_user_tags name administrator
#2.設置權限,允許從外面訪問
#
rabbitmqctl set_permissions -p /name ".*" ".*" ".*"
set_permissions [-
p vhost] {user} {conf} {write} {read}
vhost
The name of the virtual host to which to grant the user access, defaulting to
/
.
user
The name of the user to grant access to the specified virtual host.
conf
A regular expression matching resource names
for
which the user
is
granted configure permissions.
write
A regular expression matching resource names
for
which the user
is
granted write permissions.
read
A regular expression matching resource names
for
which the user
is
granted read permissions.
#3.生產者與消費者添加認證信息
credentials = pika.PlainCredentials(
"用戶名
"
,
"密碼
"
)
?
#為什么要聲明兩次queue,這里hello為隊列名
# channel.queue_declare(queue='hello')
# 解決發起者先啟動,而接收者還沒有啟動,發送者先創建queue,
# 如果發起者已經聲明了,接收者會檢測有沒有queue,如果有了,實際接收者是不會執行聲明的,沒有就會聲明這個queue。
?
消息公平分發(循環調度)
在這種模式下,RabbitMQ會默認把p發的消息依次分發給各個消費者(c)。
輪巡公平的發送給接收者,比如第一次發送給第一個接收者,第二次發送給第二格接受者,如此。
send端
import
pika
import
time
credentials
= pika.PlainCredentials(
"
用戶名
"
,
"
密碼
"
)
connection
=
pika.BlockingConnection(pika.ConnectionParameters(
'
localhost
'
,credentials=
credentials))
channel
=
connection.channel()
#
聲明queue
channel.queue_declare(queue=
'
task_queue
'
)
#
n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
import
sys
message
=
'
'
.join(sys.argv[1:])
or
"
Hello World! %s
"
%
time.time()
channel.basic_publish(exchange
=
''
,
routing_key
=
'
task_queue
'
,
body
=
message,
properties
=
pika.BasicProperties(
delivery_mode
=2,
#
make message persistent
)
)
print
(
"
[x] Sent %r
"
%
message)
connection.close()
?
receive端
#
_*_coding:utf-8_*_
import
pika, time
credentials
= pika.PlainCredentials(
"
用戶名
"
,
"
密碼
"
)
connection
=
pika.BlockingConnection(pika.ConnectionParameters(
'
localhost
'
,credentials=
credentials))
channel
=
connection.channel()
def
callback(ch, method, properties, body):
print
(
"
[x] Received %r
"
%
body)
time.sleep(
20
)
print
(
"
[x] Done
"
)
print
(
"
method.delivery_tag
"
, method.delivery_tag)
ch.basic_ack(delivery_tag
=
method.delivery_tag)
channel.basic_consume(callback,
queue
=
'
task_queue
'
,
no_ack
=
True
)
print
(
'
[*] Waiting for messages. To exit press CTRL+C
'
)
channel.start_consuming()
?
消息確認
執行任務可能需要幾秒鐘。 你可能想知道如果其中一個消費者開始一項長期任務并且只是部分完成而死亡會發生什么。 使用我們當前的代碼,一旦RabbitMQ向消費者傳遞消息,它立即將其標記為刪除。 在這種情況下,如果你殺死一個工人,我們將丟失它剛剛處理的消息。 我們還將丟失分發給這個特定工作者但尚未處理的所有消息。
但我們不想失去任何任務。 如果工人死亡,我們希望將任務交付給另一名工人。
為了確保消息永不丟失,RabbitMQ支持? 消息 確認 。 消費者發回ack(nowledgement)告訴RabbitMQ已收到,處理了特定消息,RabbitMQ可以自由刪除它。
如果消費者死亡(其通道關閉,連接關閉或TCP連接丟失)而不發送確認,RabbitMQ將理解消息未完全處理并將重新排隊。 如果同時有其他在線消費者,則會迅速將其重新發送給其他消費者。 這樣你就可以確保沒有消息丟失,即使工人偶爾會死亡。
沒有任何消息超時;? 當消費者死亡時,RabbitMQ將重新發送消息。 即使處理消息需要非常長的時間,也沒關系。
默認情況下, 手動消息確認 已打開。 在前面的示例中,我們通過 auto_ack = True ?標志 明確地將它們關閉 。 在我們完成任務后,是時候刪除此標志并從工作人員發送適當的確認。
def
callback(ch, method, properties, body):
print
"
[x] Received %r
"
%
(body,)
time.sleep( body.count(
'
.
'
) )
print
"
[x] Done
"
ch.basic_ack(delivery_tag
=
method.delivery_tag)
channel.basic_consume(callback,
queue
=
'
hello
'
)
Using this code we can be sure that even if you kill a worker using CTRL+C while it was processing a message, nothing will be lost. Soon after the worker dies all unacknowledged messages will be redelivered
?
?
消息持久化
我們已經學會了如何確保即使消費者死亡,任務也不會丟失。 但是如果RabbitMQ服務器停止,我們的任務仍然會丟失。
當RabbitMQ退出或崩潰時,它將忘記隊列和消息,除非你告訴它不要。 確保消息不會丟失需要做兩件事:我們需要將隊列和消息都標記為持久。
首先,我們需要確保RabbitMQ永遠不會丟失我們的隊列。 為此,我們需要聲明它是 持久的 :
channel.queue_declare(queue=
'
hello
'
, durable=True)
雖然此命令本身是正確的,但它在我們的設置中不起作用。 那是因為我們已經定義了一個名為 hello 的隊列 ?, 這個隊列 不耐用。 RabbitMQ不允許您使用不同的參數重新定義現有隊列,并將向嘗試執行此操作的任何程序返回錯誤。 但是有一個快速的解決方法 - 讓我們聲明一個具有不同名稱的隊列,例如 task_queue :
channel.queue_declare(queue=
'
task_queue
'
, durable=True)
此 queue_declare 更改需要應用于生產者和消費者代碼。
此時我們確信 即使RabbitMQ重新啟動 , task_queue 隊列也不會丟失。 現在我們需要將消息標記為持久性 - 通過提供 值為 2 的 delivery_mode 屬性 。
channel.basic_publish(exchange=
''
,
routing_key
=
"
task_queue
"
,
body
=
message,
properties
=
pika.BasicProperties(
delivery_mode
= 2,
#
make message persistent
))
?
?
負載均衡
如果Rabbit只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,很可能出現,一個機器配置不高的消費者那里堆積了很多消息處理不完,同時配置高的消費者卻一直很輕松。為解決此問題,可以在各個消費者端,
配置perfetch_count=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。
send端
#
!/usr/bin/env python
import
pika
import
sys
connection
=
pika.BlockingConnection(
pika.ConnectionParameters(host
=
'
localhost
'
))
channel
=
connection.channel()
channel.queue_declare(queue
=
'
task_queue
'
, durable=
True)
message
=
'
'
.join(sys.argv[1:])
or
"
Hello World!
"
channel.basic_publish(
exchange
=
''
,
routing_key
=
'
task_queue
'
,
body
=
message,
properties
=
pika.BasicProperties(
delivery_mode
=2,
#
make message persistent
))
print
(
"
[x] Sent %r
"
%
message)
connection.close()
?
receive端
#
!/usr/bin/env python
import
pika
import
time
connection
=
pika.BlockingConnection(
pika.ConnectionParameters(host
=
'
localhost
'
))
channel
=
connection.channel()
channel.queue_declare(queue
=
'
task_queue
'
, durable=
True)
print
(
'
[*] Waiting for messages. To exit press CTRL+C
'
)
def
callback(ch, method, properties, body):
print
(
"
[x] Received %r
"
%
body)
time.sleep(body.count(b
'
.
'
))
print
(
"
[x] Done
"
)
ch.basic_ack(delivery_tag
=
method.delivery_tag)
channel.basic_qos(prefetch_count
=1
)
channel.basic_consume(queue
=
'
task_queue
'
, on_message_callback=
callback)
channel.start_consuming()
?
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061
微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元

