廊坊大城网站建设,朝阳公园网站建设,用php 如何做网站,易网网站来源#xff1a;http://www.cnblogs.com/phennry/p/5713274.html 本篇博客主要介绍如何通过Python来操作管理RabbitMQ消息队列#xff0c;大家在工作中可能遇到很多类似RabbitMQ这种消息队列的中间件#xff0c;如#xff1a;ZeroMQ、ActiveMQ、MetaMQ等#xff0c;我们学…来源http://www.cnblogs.com/phennry/p/5713274.html 本篇博客主要介绍如何通过Python来操作管理RabbitMQ消息队列大家在工作中可能遇到很多类似RabbitMQ这种消息队列的中间件如ZeroMQ、ActiveMQ、MetaMQ等我们学会了如何操作RabbitMQ的话基本上操作其他的队列都是一通百通。 一、RabbitMQ安装 RabbitMQ是一个在AMQP基础上完整的可复用的企业消息系统它遵循Mozilla Pulic License开源协议。 MQ全称为Message Queue消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信而无需专用链接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信而不是通过直接调用彼此来通信直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。 1yum安装rabbitmq
#安装配置epel源rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm#安装Erlangyum -y insatll erlang#安装RabbitMQyum -y install rabbitmq-server#注意service rabbitmq-server start/stop 2安装API
#pip安装pip install pika#源码安装https://pypi.python.org/pypi/pika #官网地址
之前我们在介绍线程进程的时候介绍过python中自带的队列用法下面我们通过一段代码复习一下#生产者消费者模型解耦的意思就是两个程序之间互相没有关联了互不影响。
import Queue
import threading
import time
q Queue.Queue(20) #队列里最多存放20个元素def productor(arg): #生成者创建30个线程来请求吃包子往队列里添加请求元素q.put(str(arg) - 包子) for i in range(30):t threading.Thread(targetproductor,args(i,))t.start()def consumer(arg): #消费者接收到队列请求以后开始生产包子来消费队列里的请求while True:print(arg,q.get())time.sleep(2)for j in range(3):t threading.Thread(targetconsumer,args(j,))t.start() 二、通过Python来操作RabbitMQ队列 上面我们已经将环境装备好下面我们通过Pika模块来对Rabbitmq队列来进行操作对于RabbitMQ来说生产和消费不再针对内存里的一个Queue对象而是某台服务器上的RabbitMQ Server实现的消息队列。 1基本用法
####################################生产者#####################################import pikaconnectionpika.BlockingConnection(pika.ConnectionParameters(host192.168.10.131))
#创建一个链接对象对象中绑定rabbitmq的IP地址channelconnection.channel() #创建一个频道channel.queue_declare(queuename1) #通过这个频道来创建队列如果MQ中队列存在忽略没有则创建channel.basic_publish(exchange,routing_keyname1, #指定队列名称bodyHello World!) #往该队列中发送一个消息
print( [x] Sent Hello World!)
connection.close() #发送完关闭链接
#####################################消费者######################################import pikaconnection pika.BlockingConnection(pika.ConnectionParameters(host192.168.10.131))
#创建一个链接对象对象中绑定rabbitmq的IP地址channel connection.channel() #创建一个频道channel.queue_declare(queuename1) #通过这个频道来创建队列如果MQ中队列存在忽略没有则创建def callback(ch, method, properties, body): #callback函数负责接收队列里的消息print( [x] Received %r % body)channel.basic_consume(callback, #从队列里去消息queuename1, #指定队列名no_ackTrue)print( [*] Waiting for messages. To exit press CTRLC)
channel.start_consuming() acknowledgment 消息不丢失 上面的例子中如果我们将no-ackFalse ,那么当消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了那么RabbitMQ会重新将该任务添加到队列中。
import pikaconnection pika.BlockingConnection(pika.ConnectionParameters(host192.168.10.131))
channel connection.channel()channel.queue_declare(queuename1)def callback(ch, method, properties, body):print( [x] Received %r % body)import timetime.sleep(10)print(ok)ch.basic_ack(delivery_tag method.delivery_tag) #向生成者发送消费完毕的确认信息然后生产者将此条消息同队列里剔除channel.basic_consume(callback,queuename1, no_ackFalse) #如果no_ackFalse,当消费者down掉了RabbitMQ会重新将该任务添加到队列中print( [*] Waiting for messages. To exit press CTRLC)
channel.start_consuming() 上例如果消费者中断后如果不超过10秒重新链接的时候数据还在。当超过10秒之后消费者往生产者发送了ack重新链接的时候数据将消失。 durable消息不丢失 消费者down掉后我们知道怎么处理了如果我的RabbitMQ服务down掉了该怎么办呢 消息队列是可以做持久化如果我们在生产消息的时候就指定某条消息需要做持久化那么RabbitMQ发现有问题时就会将消息保存到硬盘持久化下来。
####################################生产者#####################################
#!/usr/bin/env pythonimport pikaconnection pika.BlockingConnection(pika.ConnectionParameters(host192.168.10.131))channel connection.channel()channel.queue_declare(queuename2, durableTrue) #指定队列持久化channel.basic_publish(exchange,routing_keyname2,bodyHello World!,propertiespika.BasicProperties(delivery_mode2, #指定消息持久化))
print( [x] Sent Hello World!)
connection.close()
#####################################消费者######################################
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pikaconnection pika.BlockingConnection(pika.ConnectionParameters(host192.168.10.131))channel connection.channel()channel.queue_declare(queuename2, durableTrue)def callback(ch, method, properties, body):print( [x] Received %r % body)import timetime.sleep(10)print(ok)ch.basic_ack(delivery_tag method.delivery_tag)channel.basic_consume(callback,queuename2,no_ackFalse)print( [*] Waiting for messages. To exit press CTRLC)
channel.start_consuming() 消息获取顺序 默认消息队列里的数据是按照顺序被消费者拿走的例如消费者1去队列中获取奇数序列任务消费者2去队列中获取偶数序列的任务消费者1处理的比较快而消费者2处理的比较慢那么消费者1就会一直处于繁忙的状态为了解决这个问题在需要加入下面代码: channel.basic_qos(prefetch_count1) :表示谁来获取不再按照奇偶数 排列
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pikaconnection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))channel connection.channel()channel.queue_declare(queuename1)def callback(ch, method, properties, body):print( [x] Received %r % body)import timetime.sleep(10)print okch.basic_ack(delivery_tag method.delivery_tag)channel.basic_qos(prefetch_count1)channel.basic_consume(callback,queuename1,no_ackFalse)print( [*] Waiting for messages. To exit press CTRLC)
channel.start_consuming() 2发布订阅 发布订阅和简单的消息队列区别在于发布订阅会将消息发送给所有的订阅者而消息队列中的数据被消费一次便消失。所以RabbitMQ实现发布和订阅时会为每一个订阅者创建一个队列二发布者发布消息时会将消息放置在所有相关队列中。 在RabbitMQ中所有生产者提交的消息都有Exchange来接收然后Exchange按照特定的策略转发到Queue进行存储RabbitMQ提供了四种Exchangefanout、direct、topic、header。由于header模式在实际工作中用的比较少下面主要对前三种进行比较。 exchange type fanout 任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上 为了方便理解应用了上面这张图可以清晰的看到相互之间的关系当我们设置成fanout模式时如何操作请看下面代码
####################################发布者#####################################
import pikaconnection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))
channel connection.channel()channel.exchange_declare(exchangetest_fanout,typefanout)message 4456
channel.basic_publish(exchangetest_fanout,routing_key,bodymessage)
print( [x] Sent %r % message)
connection.close()
####################################订阅者#####################################import pikaconnection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))
channel connection.channel()channel.exchange_declare(exchangetest_fanout, #创建一个exchangetypefanout) #任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上#随机创建队列
result channel.queue_declare(exclusiveTrue)
queue_name result.method.queue#绑定
channel.queue_bind(exchangetest_fanout,queuequeue_name) #exchange绑定后端队列print(-------------)def callback(ch,method,properties,body):print( [x] %r % body)channel.basic_consume(callback,queuequeue_name,no_ackTrue)
channel.start_consuming() exchange type direct任何发送到Direct Exchange的消息都会被转发到RouteKey中指定的Queue上(关键字发送) 之前事例发送消息时明确指定了某个队列并向其中发送消息RabbitMQ还支持根据关键字发送即队列绑定关键字发送者将数据关键字发送到消息ExchangeExchange根据关键字判定应该将数据发送至指定队列。 发布者
#!/usr/bin/env python
import pika
import sysconnection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))channel connection.channel()channel.exchange_declare(exchangedirect_test,typedirect)severity info #设置一个key,
message 99999
channel.basic_publish(exchangedirect_test,routing_keyseverity,bodymessage)
print( [x] Sent %r:%r % (severity, message))
connection.close() 订阅者1
#!/usr/bin/env python
import pika
import sysconnection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))channel connection.channel()channel.exchange_declare(exchangedirect_test,typedirect)result channel.queue_declare(exclusiveTrue)
queue_name result.method.queueseverities [error,info,] #绑定队列并发送关键字errorinfo
for severity in severities:channel.queue_bind(exchangedirect_test,queuequeue_name,routing_keyseverity)print( [*] Waiting for logs. To exit press CTRLC)def callback(ch, method, properties, body):print( [x] %r:%r % (method.routing_key, body))channel.basic_consume(callback,queuequeue_name,no_ackTrue)channel.start_consuming()订阅者2 #!/usr/bin/env python
import pika
import sysconnection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))channel connection.channel()channel.exchange_declare(exchangedirect_test,typedirect)result channel.queue_declare(exclusiveTrue)
queue_name result.method.queueseverities [error,]
for severity in severities:channel.queue_bind(exchangedirect_test,queuequeue_name,routing_keyseverity)print( [*] Waiting for logs. To exit press CTRLC)def callback(ch, method, properties, body):print( [x] %r:%r % (method.routing_key, body))channel.basic_consume(callback,queuequeue_name,no_ackTrue)channel.start_consuming() 结论当我们将发布者的key设置成Error的时候两个队列对可以收到Exchange的消息当我们将key设置成info后只有订阅者1可以收到Exchange的消息。 exchange type topic任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey中指定话题的Queue上(模糊匹配) 在topic类型下可以让队列绑定几个模糊的关键字之后发送者将数据发送到exchangeexchange将传入路由值和关键字进行匹配匹配成功则将数据发送到指定队列。 # 表示可以匹配0个或多个单词 * 表示只能匹配一个单词。 #发送路由值 队列中
www.cnblogs.com www.* ---#无法匹配
www.cnblogs.com www.# ---#匹配成功发布者 #!/usr/bin/env python
import pika
import sysconnection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))channel connection.channel()channel.exchange_declare(exchangetopic_logs,typetopic)routing_key sys.argv[1] if len(sys.argv) 1 else anonymous.infomessage .join(sys.argv[2:]) or Hello World!channel.basic_publish(exchangetopic_logs,routing_keyrouting_key,bodymessage)
print( [x] Sent %r:%r % (routing_key, message))connection.close()#执行方式
python xxx.py name1 #name1为routing_key订阅者 #!/usr/bin/env python
import pika
import sysconnection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))channel connection.channel()channel.exchange_declare(exchangetopic_logs,typetopic)result channel.queue_declare(exclusiveTrue)
queue_name result.method.queuebinding_keys sys.argv[1:]
if not binding_keys:sys.stderr.write(Usage: %s [binding_key]...\n % sys.argv[0])sys.exit(1)for binding_key in binding_keys:channel.queue_bind(exchangetopic_logs,queuequeue_name,routing_keybinding_key)print( [*] Waiting for logs. To exit press CTRLC)def callback(ch, method, properties, body):print( [x] %r:%r % (method.routing_key, body))channel.basic_consume(callback,queuequeue_name,no_ackTrue)channel.start_consuming()#执行方式
python xxx,py name1 更多相关内容请参考以下连接 http://www.rabbitmq.com/documentation.html http://blog.csdn.net/songfreeman/article/details/50945025 python采用pika库使用rabbitmq总结多篇笔记和示例 http://blog.csdn.net/chenjiebin/article/details/8253433 这一段时间学习了下rabbitmq在学习的过程中发现国内关于Python采用pika库使用rabbitmq的资料很少官网有这方面的资料不过是都英文的。于是笔者结合自己的理解就这方面内容写了一些示例总共有七篇笔记分享出来。 笔记依次是循序渐进的笔记内贴出的代码笔者都实际运行过运行系统ubuntu 12.04rabbitmq版本是2.7.1python版本是2.7.3。 因为笔记里提到一些名词虽然叫法不一样不过都是表达同样的事物所以有必要先说明下以免产生疑惑。主要是两个名词 producer 直译为生成者就是产生消息的东东笔记里提到的发送者、发送端都是一个意思。如果把消息比喻成任务也可以理解为任务分配者。 consumer 直译为消费者就是接收消息的东东 笔记里提到的接收者、接收端都是一个意思。如果把消息比喻成任务也可以理解为工作者。 1、ubuntu安装rabbitmq和python的使用实现 这篇主要记录了在ubuntu下安装rabbitmq服务的过程和安装python pika库的过程并演示了单向发送消息的工作方式。 2、python使用rabbitmq实例二工作队列 继上一篇演示了多个接收端情况下消息发送的工作方式。 3、python使用rabbitmq实例三交换机 前面两篇的示例都只使用了一个队列消息是依次发送给绑定到该队列的接收端。如果要广播出去就要使用交换机本篇演示了交换机的工作方式。 4、python使用rabbitmq实例四路由键 第三篇的消息是广播出去的所有接收端都会接收到如果要精确指明消息的接收端就要使用路由键本篇主要演示了路由键的工作方式。 5、python使用rabbitmq实例五路由键模糊匹配 第四篇的路由键是精确匹配的有时用需要模糊匹配本篇主要演示了路由键模糊匹配的工作方式。 6、python使用rabbitmq实例六远程结果返回 前面五篇的消息都是发送出去就完事了接收端并没有将结果返回给发送端。有些情况下需要接收端将接收到的消息处理后再返回给发送端本篇演示了这种情况的处理方式。 7、python使用rabbitmq实例七相互关联编号correlation id 上一篇只是发送单条消息返回的结果自然是对应该条消息但是如果同时发出多条消息就会返回多个结果如何将发送的消息和返回的结果一一对应起来呢本篇演示了correlation id的工作方式就是用来解决这个问题的。 来源http://yidao620c.iteye.com/blog/1947338使用python开发RabbitMQ应用 使用python开发RabbitMQ应用 参考了RabbitMQ网站上提供的英文版本入门指南: http://www.rabbitmq.com/getstarted.html 测试环境CentOS 6.2 1测试环境准备 安装python一般系统都自带了python 安装RabbitMQ server可以参考前面的文章。 安装pika 使用pip安装的时候可能会报错 importerror no module named pkg_resources 请用下面命令解决这个问题 $ curl https://bitbucket.org/pypa/setuptools/raw/bootstrap/ez_setup.py | python 然后还可能出现 pkg_resources.distributionnotfound pip1.4.1 这时候先把pip卸载掉 sudo yum remove python-pip 然后去下载最新的get-pip.py文件python get-pip.py安装 在/etc/profile里面将/usr/local/python27/bin加入PATH最前面 把rabbitmq server启动一下和准备好测试目录rabbitmq_app $ /usr/local/rabbitmq/sbin/rabbitmq-server -detached $ cd ~ $ mkdir -p test /rabbitmq_app $ cd test /rabbitmq_app $ mkdir tut1 tut2 tut3 tut4 tut5 tut6 2实例一来个hello world程序 $ cd tut1 $ vim send.py (代码如下) $ vim receive.py (代码如下) 首先是消息发送程序: send.py Python代码 #!/usr/bin/env python # -*- coding: utf-8 -*- import sys import pika connection pika.BlockingConnection(pika.ConnectionParameters(localhost)) channel connection.channel() channel.queue_declare(queue hello) if len (sys.argv) 2 : print message is empty! sys.exit(0) message sys.argv[1] channel.basic_publish(exchange , routing_keyhello, body message) print [x] sent: message \n connection.close() 跑一下send.py发送一个消息 $ python send.py Hello World! $ python send.py 你好刀哥 $ /usr/local/rabbitmq/sbin/rabbitmqctl list_queues Listing queues ... hello 2 ... done . 如果你也看到hello队列里面有一个消息的话就证明可以发消息了。 然后写一个接收消息脚本receive.py Python代码 #!/usr/bin/env python # -*- coding: utf-8 -*- import pika connection pika.BlockingConnection(pika.ConnectionParameters( localhost )) channel connection.channel() channel.queue_declare(queue hello ) print [*] Waiting for messages. To exit press CTRLC def callback(ch, method, properties, body): print body channel.basic_consume(callback, queue hello , no_ack True ) channel.start_consuming() 其中第12行的 no_ackTrue 表示消费完了这个消息以后不主动把完成状态通知rabbitmq。 然后开另外一个shell执行一下receive.py $ python receive.py [*] Waiting for messages. To exit press CTRLC Hello World! 你好刀哥 3实例二工作队列work queue / task queue 一般应用于把比较耗时的任务从主线任务分离出来。比如一个http页面请求里面需要发送带大附件的邮件、或者是要处理一张头像图片等。这类型工作队列的 处理端一般有多个worker进程分担队列里面的任务。这就有点负载均衡的策略在里面了。尽量做到每个进程的工作量比较平均而且是完成了一个任务才接 第二个任务。看看我们的实现吧。 $ cd tut2 $ vim manager.py (代码如下) $ vim worker.py (代码如下) 首先是消息发送程序: manager.py Python代码 #!/usr/bin/env python # -*- coding: utf-8 -*- import pika import sys parameters pika.ConnectionParameters(host localhost ) connection pika.BlockingConnection(parameters) channel connection.channel() channel.queue_declare(queue task_queue , durable True ) message .join(sys.argv[ 1 :]) or Hello World! channel.basic_publish(exchange , routing_key task_queue , body message, properties pika.BasicProperties( delivery_mode 2 , # make message persistent )) print [x] Sent %r % (message,) connection.close() 其中第8行的 durableTrue 声明了队列需要持久化第14行的 delivery_mode 2 声明了队列的消息需要持久化。 然后写一个接收消息脚本worker.py Python代码 #!/usr/bin/env python # -*- coding: utf-8 -*- import pika import time connection pika.BlockingConnection(pika.ConnectionParameters( host localhost )) channel connection.channel() channel.queue_declare(queue task_queue , durable True ) print [*] Waiting for messages. To exit press CTRLC def callback(ch, method, properties, body): print [x] Received %r % (body,) time.sleep( body.count( . ) ) print [x] Done ch.basic_ack(delivery_tag method.delivery_tag) channel.basic_qos(prefetch_count 1 ) channel.basic_consume(callback, queue task_queue ) channel.start_consuming() 其中第15行的 basic_ack 是执行完任务通知rabbitmq第17行的basic_qos是告诉rabbitmq只有当worker完成了任务以后才分派1条新的消息实现公平分派。 测试方法开3个bash2个跑worker1个跑manager $ python manager.py task1. $ python manager.py task2.. $ python manager.py task3... $ python manager.py task4.... 点号数量决定worker工作的时间( 其实是睡觉时间呵呵 time.sleep(body.count(.)) )。 而在worker那边可以看到每个worker都处理了两个任务。 这种分配机制就是所谓的循环调度Round-robin dispatching 4实例三发布和订阅 发布订阅模式简单来说就像是广播一个消息发布出来以后所有订阅者都能听到至于接收到这个信息以后大家做什么就看具体个人了。 啊怎么忽然冒出个X是什么玩意这个X就是所谓的exchange简单来说就是消息的管家由他决定接收到的信息是放特定的队列还是所有队列还是直接丢弃。 其实在前两个实例里面已经用到了exchange channel.basic_publish(exchange,...这个exchange的名字为空外号无名人若无名便可专心练剑~。他会把你的消息都转达给routing_key指明的队列。 当我们声明了exchange以后我们需要为queue和exchange建立联系这时候就要用到绑定binding了。 $ cd tut3 $ vim emitlog.py (代码如下) $ vim recelog.py (代码如下) emitlog.py Python代码 #!/usr/bin/env python import pika import sys connection pika.BlockingConnection(pika.ConnectionParameters( host localhost )) channel connection.channel() channel.exchange_declare(exchange logs , type fanout ) message .join(sys.argv[ 1 :]) or info: Hello World! channel.basic_publish(exchange logs , routing_key , body message) print [x] Sent %r % (message,) connection.close() recelog.py Python代码 #!/usr/bin/env python import pika connection pika.BlockingConnection(pika.ConnectionParameters( host localhost )) channel connection.channel() channel.exchange_declare(exchange logs , type fanout ) result channel.queue_declare(exclusive True ) queue_name result.method.queue channel.queue_bind(exchange logs , queue queue_name) print [*] Waiting for logs. To exit press CTRLC def callback(ch, method, properties, body): print [x] %r % (body,) channel.basic_consume(callback, queue queue_name, no_ack True ) channel.start_consuming() 测试 和前一个实例差不多。开3个bash2个跑recelog1个跑emitlog。查看recelog是否都收到emitlog发送的消息。代码里面用 了一个fanout(意思是成扇形展开)类型的exchange只要和exchange绑定的queue都能收到一份消息的 copyrouting_key会被忽略掉。 5路由模式 选择接收信息 $ cd tut4 $ vim emitlog.py (代码如下) $ vim recelog.py (代码如下) emitlog.py Python代码 #!/usr/bin/env python import pika import sys connection pika.BlockingConnection(pika.ConnectionParameters( host localhost )) channel connection.channel() channel.exchange_declare(exchange direct_logs , type direct ) severity sys.argv[ 1 ] if len (sys.argv) 1 else info message .join(sys.argv[ 2 :]) or Hello World! channel.basic_publish(exchange direct_logs , routing_key severity, body message) print [x] Sent %r:%r % (severity, message) connection.close() 这里声明exchange时类型定义为direct直接匹配就是说只有当一个信息的routing_key和队列的binding_key一 致时信息才会被放入到这个队列。消息发布给exchange时必须带上routing_key。其实在消息生产端队列这个概念是透明的。 recelog.py Python代码 #!/usr/bin/env python import pika import sys connection pika.BlockingConnection(pika.ConnectionParameters( host localhost )) channel connection.channel() channel.exchange_declare(exchange direct_logs , type direct ) result channel.queue_declare(exclusive True ) queue_name result.method.queue severities sys.argv[ 1 :] if not severities: print sys.stderr, Usage: %s [info] [warning] [error] % \ (sys.argv[ 0 ],) sys.exit( 1 ) for severity in severities: channel.queue_bind(exchange direct_logs , queue queue_name, routing_key severity) print [*] Waiting for logs. To exit press CTRLC def callback(ch, method, properties, body): print [x] %r:%r % (method.routing_key, body,) channel.basic_consume(callback, queue queue_name, no_ack True ) channel.start_consuming() 这里首先定义exchange和消息发送端是一样的。然后定义队列队列是自动命名并且只要进程终止队列就会终止。然后把队列和 exchange绑定绑定时的routing_key是用户输入的如果输入多个key就做多次的绑定。注意这里的队列还是一个。如果你需要建立两个 队列就得跑两次这个python脚本。 6topic和rpc 官方tutorial还有两个高级一点的实例topic和rpc这里就不作说明了留着大家学学英文吧 :) RabbitMQ提供了很多消息队列客户端代码比如pythonjavac等等大家可以根据产品或项目的实际情况选择。关键是原理必须搞懂。 其他资源 中文入门篇 http://adamlu.net/dev/2011/09/rabbitmq-get-started/ 利用Python学习RabbitMQ消息队列 RabbitMQ可以当做一个消息代理它的核心原理非常简单即接收和发送消息可以把它想象成一个邮局我们把信件放入邮箱邮递员就会把信件投递到你的收件人处RabbitMQ就是一个邮箱、邮局、投递员功能综合体整个过程就是邮箱接收信件邮局转发信件投递员投递信件到达收件人处。 RabbitMQ和邮局的主要区别就是RabbitMQ接收、存储和发送的是二进制数据----消息。 rabbitmq基本管理命令 一步启动Erlang node和Rabbit应用sudo rabbitmq-server 在后台启动Rabbit nodesudo rabbitmq-server -detached 关闭整个节点包括应用sudo rabbitmqctl stop add_user UserName Password
delete_user UserName
change_password UserName NewPassword
list_users
add_vhost VHostPath
delete_vhost VHostPath
list_vhosts
set_permissions [-p VHostPath] UserName Regexp Regexp Regexp
clear_permissions [-p VHostPath] UserName
list_permissions [-p VHostPath]
list_user_permissions UserName
list_queues [-p VHostPath] [QueueInfoItem ...]
list_exchanges [-p VHostPath] [ExchangeInfoItem ...]
list_bindings [-p VHostPath]
list_connections [ConnectionInfoItem ...] Demo: producer.py #!/usr/bin/env python
# -*- coding: utf_ -*-
# Date: 年月日
# Author:蔚蓝行
# 博客 http://www.cnblogs.com/duanv/
import pika
import sys
#创建连接connection到localhost
con pika.BlockingConnection(pika.ConnectionParameters(localhost))
#创建虚拟连接channel
cha con.channel()
#创建队列anheng,durable参数为真时队列将持久化exclusive为真时建立临时队列
resultcha.queue_declare(queueanheng,durableTrue,exclusiveFalse)
#创建名为yanfa,类型为fanout的exchange其他类型还有direct和topic如果指定durable为真exchange将持久化
cha.exchange_declare(durableFalse,exchangeyanfa,typedirect,)
#绑定exchange和queue,result.method.queue获取的是队列名称
cha.queue_bind(exchangeyanfa, queueresult.method.queue,routing_key,)
#公平分发使每个consumer在同一时间最多处理一个message收到ack前不会分配新的message
cha.basic_qos(prefetch_count)
#发送信息到队列‘anheng
message .join(sys.argv[:])
#消息持久化指定delivery_mode
cha.basic_publish(exchange,routing_keyanheng,bodymessage,propertiespika.BasicProperties(delivery_mode ,))
print [x] Sent %r % (message,)
#关闭连接
con.close() consumer.py #!/usr/bin/env python
# -*- coding: utf_ -*-
# Date: 年月日
# Author:蔚蓝行
# 博客 http://www.cnblogs.com/duanv/
import pika
#建立连接connection到localhost
con pika.BlockingConnection(pika.ConnectionParameters(localhost))
#创建虚拟连接channel
cha con.channel()
#创建队列anheng
resultcha.queue_declare(queueanheng,durableTrue)
#创建名为yanfa,类型为fanout的交换机其他类型还有direct和topic
cha.exchange_declare(durableFalse,exchangeyanfa, typedirect,)
#绑定exchange和queue,result.method.queue获取的是队列名称
cha.queue_bind(exchangeyanfa,queueresult.method.queue,routing_key,)
#公平分发使每个consumer在同一时间最多处理一个message收到ack前不会分配新的message
cha.basic_qos(prefetch_count)
print [*] Waiting for messages. To exit press CTRLC
#定义回调函数
def callback(ch, method, properties, body):print [x] Received %r % (body,)ch.basic_ack(delivery_tag method.delivery_tag)
cha.basic_consume(callback,queueanheng,no_ackFalse,)
cha.start_consuming() 一、概念 Connection 一个TCP的连接。Producer和Consumer都是通过TCP连接到RabbitMQ Server的。程序的起始处就是建立这个TCP连接。 Channels 虚拟连接。建立在上述的TCP连接中。数据流动都是在Channel中进行的。一般情况是程序起始建立TCP连接第二步就是建立这个Channel。 二、队列 首先建立一个Connection然后建立Channels在channel上建立队列 建立时指定durable参数为真队列将持久化指定exclusive为真队列为临时队列关闭consumer后该队列将不再存在一般情况下建立临时队列并不指定队列名称rabbitmq将随机起名通过result.method.queue来获取队列名 result channel.queue_declare(exclusiveTrue) result.method.queue 区别durable是队列持久化与否如果为真队列将在rabbitmq服务重启后仍存在如果为假rabbitmq服务重启前不会消失与consumer关闭与否无关 而exclusive是建立临时队列当consumer关闭后该队列就会被删除 三、exchange和bind Exchange中durable参数指定exchange是否持久化exchange参数指定exchange名称type指定exchange类型。Exchange类型有directfanout和topic。 Bind是将exchange与queue进行关联exchange参数和queue参数分别指定要进行bind的exchange和queuerouting_key为可选参数。 Exchange的三种模式 Direct 任何发送到Direct Exchange的消息都会被转发到routing_key中指定的Queue 1.一般情况可以使用rabbitMQ自带的Exchange””(该Exchange的名字为空字符串) 2.这种模式下不需要将Exchange进行任何绑定(bind)操作 3.消息传递时需要一个“routing_key”可以简单的理解为要发送到的队列名字 4.如果vhost中不存在routing_key中指定的队列名则该消息会被抛弃。 Demo中虽然声明了一个exchangeyanfa和queueanheng的bind但是在后面发送消息时并没有使用该exchange和bind而是采用了direct的模式没有指定exchange而是指定了routing_key的名称为队列名消息将发送到指定队列。 如果一个exchange 声明为direct并且bind中指定了routing_key,那么发送消息时需要同时指明该exchange和routing_key. Fanout: 任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上 1.可以理解为路由表的模式 2.这种模式不需要routing_key 3.这种模式需要提前将Exchange与Queue进行绑定一个Exchange可以绑定多个Queue一个Queue可以同多个Exchange进行绑定。 4.如果接受到消息的Exchange没有与任何Queue绑定则消息会被抛弃。 Demo中创建了一个将一个exchange和一个queue进行fanout类型的bind.但是发送信息时没有用到它如果要用到它只要在发送消息时指定该exchange的名称即可该exchange就会将消息发送到所有和它bind的队列中。在fanout模式下指定的routing_key是无效的 。 Topic 任何发送到Topic Exchange的消息都会被转发到所有关心routing_key中指定话题的Queue上 1.这种模式较为复杂简单来说就是每个队列都有其关心的主题所有的消息都带有一个“标题”(routing_key)Exchange会将消息转发到所有关注主题能与routing_key模糊匹配的队列。 2.这种模式需要routing_key也许要提前绑定Exchange与Queue。 3.在进行绑定时要提供一个该队列关心的主题如“#.log.#”表示该队列关心所有涉及log的消息(一个routing_key为”MQ.log.error”的消息会被转发到该队列)。 4.“#”表示0个或若干个关键字“*”表示一个关键字。如“log.*”能与“log.warn”匹配无法与“log.warn.timeout”匹配但是“log.#”能与上述两者匹配。 5.同样如果Exchange没有发现能够与routing_key匹配的Queue则会抛弃此消息。 四、任务分发 1.Rabbitmq的任务是循环分发的如果开启两个consumerproducer发送的信息是轮流发送到两个consume的。 2.在producer端使用cha.basic_publish()来发送消息其中body参数就是要发送的消息propertiespika.BasicProperties(delivery_mode 2,)启用消息持久化可以防止RabbitMQ Server 重启或者crash引起的数据丢失。 3.在接收端使用cha.basic_consume()无限循环监听如果设置no-ack参数为真每次Consumer接到数据后而不管是否处理完成RabbitMQ Server会立即把这个Message标记为完成然后从queue中删除了。为了保证数据不被丢失RabbitMQ支持消息确认机制即acknowledgments。为了保证数据能被正确处理而不仅仅是被Consumer收到那么我们不能采用no-ack。而应该是在处理完数据后发送ack。 在处理数据后发送的ack就是告诉RabbitMQ数据已经被接收处理完成RabbitMQ可以去安全的删除它了。如果Consumer退出了但是没有发送ack那么RabbitMQ就会把这个Message发送到下一个Consumer。这样就保证了在Consumer异常退出的情况下数据也不会丢失。 这里并没有用到超时机制。RabbitMQ仅仅通过Consumer的连接中断来确认该Message并没有被正确处理。也就是说RabbitMQ给了Consumer足够长的时间来做数据处理。 Demo的callback方法中ch.basic_ack(delivery_tag method.delivery_tag)告诉rabbitmq消息已经正确处理。如果没有这条代码Consumer退出时Message会重新分发。然后RabbitMQ会占用越来越多的内存由于RabbitMQ会长时间运行因此这个“内存泄漏”是致命的。去调试这种错误可以通过一下命令打印un-acked Messages sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged 4.公平分发设置cha.basic_qos(prefetch_count1)这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message。换句话说在接收到该Consumer的ack前他它不会将新的Message分发给它。 五、注意 生产者和消费者都应该声明建立队列网上教程上说第二次创建如果参数和第一次不一样那么该操作虽然成功但是queue的属性并不会被修改。 可能因为版本问题在我的测试中如果第二次声明建立的队列属性和第一次不完全相同将报类似这种错406, PRECONDITION_FAILED - parameters for queue anheng in vhost / not equivalent 如果是exchange第二次创建属性不同将报这种错406, PRECONDITION_FAILED - cannot redeclare exchange yanfa in vhost / with different type, durable, internal or autodelete value 如果第一次声明建立队列也出现这个错误说明之前存在名字相同的队列且本次声明的某些属性和之前声明不同可通过命令sudo rabbitmqctl list_queues查看当前有哪些队列。解决方法是声明建立另一名称的队列或删除原有队列如果原有队列是非持久化的可通过重启rabbitmq服务删除原有队列如果原有队列是持久化的只能删除它所在的vhost,然后再重建vhost,再设置vhost的权限先确认该vhost中没有其他有用队列。 sudo rabbitmqctl delete_vhost /
sudo rabbitmqctl add_vhost /
sudo rabbitmqctl set_permissions -p / username .* .* .* 以上内容是小编给大家介绍的利用Python学习RabbitMQ消息队列希望大家喜欢。