Python
更新時間 2025-07-03 13:57:42
最近更新時間: 2025-07-03 13:57:42
分享文章
本文介紹Python版本的RabbitMQ客戶端連接指導,包括RabbitMQ客戶端安裝,以及生產、消費消息。
使用前請參考收集連接信息收集RabbitMQ所需的連接信息。
準備環境
-
安裝Python3
-
安裝kombu
pip3 install kombu
生產消息
SSL認證方式
import?ssl
import?sys
from?kombu?import?Connection,?Exchange,?Producer
def?Main(argv):
????ca_certfile?=?"certs\\ca_certificate.pem"
????certfile?=?"certs\\client_rabbitmq_certificate.pem"
????private_key?=?"certs\\client_rabbitmq_key.pem"
????conn?=?Connection('amqp://XXX:xxx/',?login_method='EXTERNAL',?ssl={
????????'ca_certs':?ca_certfile,
????????'keyfile':?private_key,
????????'certfile':?certfile,
????????'cert_reqs':?ssl.CERT_REQUIRED,
????????'ssl_version':?ssl.PROTOCOL_TLSv1_2,
????})
????channel?=?conn.channel()
????exchange?=?Exchange("example-exchange",?type="direct")
????producer?=?Producer(exchange=exchange,?channel=channel,?routing_key="test-key")
????message?=?"Hello?Rabbimtq"
????producer.publish(message,?retry=True)
????print('send?message:?%s'?%?message)
if?__name__?==?'__main__':
????Main(sys.argv)
非SSL認證方式
import?sys
from?kombu?import?Connection,?Exchange,?Producer
def?Main(argv):
????rabbitmq_url?=?'amqp://USERNAME:PASSWORD@XXX:xxx/'
????conn?=?Connection(rabbitmq_url,?login_method='PLAIN')
????channel?=?conn.channel()
????exchange?=?Exchange("example-exchange",?type="direct")
????producer?=?Producer(exchange=exchange,?channel=channel,?routing_key="test-key")
????message?=?"Hello?Rabbimtq"
????producer.publish(message,?retry=True)
????print('send?message:?%s'?%?message)
if?__name__?==?'__main__':
????Main(sys.argv)
消費消息
SSL認證方式
import?sys
import?ssl
from?kombu?import?Connection,?Exchange,?Queue,?Consumer
def?Main(argv):
????ca_certfile?=?"certs\\ca_certificate.pem"
????certfile?="certs\\client_rabbitmq_certificate.pem"
????private_key?=?"certs\\client_rabbitmq_key.pem"
????conn?=?Connection('amqp://XXX:xxx/',?login_method='EXTERNAL',?ssl={
????????'ca_certs':?ca_certfile,
????????'keyfile':?private_key,
????????'certfile':?certfile,
????????'cert_reqs':?ssl.CERT_REQUIRED,
????????'ssl_version':?ssl.PROTOCOL_TLSv1_2,
????})
????exchange?=?Exchange("example-exchange",?type="direct")
????queue?=?Queue(name="example-queue",?exchange=exchange,?routing_key="test-key")
????def?process_message(body,?message):
????????print("receive?message:?%s"?%?format(body))
????????message.ack()
????with?Consumer(conn,?queues=queue,?callbacks=[process_message],?accept=["text/plain"]):
????????conn.drain_events(timeout=2)
if?__name__?==?'__main__':
????Main(sys.argv)
非SSL認證方式
import?sys
from?kombu?import?Connection,?Exchange,?Queue,?Consumer
def?Main(argv):
????rabbitmq_url?=?'amqp://USERNAME:PASSWORD@XXX:xxx/'
????conn?=?Connection(rabbitmq_url,?login_method='PLAIN')
????exchange?=?Exchange("example-exchange",?type="direct")
????queue?=?Queue(name="example-queue",?exchange=exchange,?routing_key="test-key")
????def?process_message(body,?message):
????????print("receive?message:?%s"?%?format(body))
????????message.ack()
????with?Consumer(conn,?queues=queue,?callbacks=[process_message],?accept=["text/plain"]):
????????conn.drain_events(timeout=2)
if?__name__?==?'__main__':
????Main(sys.argv)