代码判断网站,泰安网络推广公司平台,什么软件可以建设网站,wordpress登陆后台总是跳转首页RabbitMQ是什么#xff1f; RabbitMQ是一个在AMQP基础上完整的#xff0c;可复用的企业消息系统。他遵循Mozilla Public License开源协议。 MQ全称为Message Queue, 消息队列#xff08;MQ#xff09;是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息 RabbitMQ是一个在AMQP基础上完整的可复用的企业消息系统。他遵循Mozilla Public License开源协议。 MQ全称为Message Queue, 消息队列MQ是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息针对应用程序的数据来通信而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信而不是通过直接调用彼此来通信直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。 RabbitMQ的安装 首先说明RabbitMQ在win上安装是一件颇为麻烦的事情。试了很长时间都没有成功后来就转战linux了。在linux的安装中也可能会出现一点问题下面会贴出一个网址有安装中出现问题的解决办法。 linux上都是直接install rabbitmq-server 当然可能会在安装中和后来的使用上出现这样或者是那样的问题解决办法参见这篇博客http://www.cnblogs.com/kaituorensheng/p/4985767.html RabbitMQ的语法以及实例 1.基本实例 基于Queue实现生产者消费者模型 1 import Queue2 import threading3 4 5 message Queue.Queue(10)6 7 8 def producer(i):9 while True:
10 message.put(i)
11
12
13 def consumer(i):
14 while True:
15 msg message.get()
16
17
18 for i in range(12):
19 t threading.Thread(targetproducer, args(i,))
20 t.start()
21
22 for i in range(10):
23 t threading.Thread(targetconsumer, args(i,))
24 t.start() View Code 对于RabbitMQ来说生产和消费不再针对内存里的一个Queue对象而是某台服务器上的RabbitMQ Server实现的消息队列。 import pikaconnection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))
channel connection.channel()#开通一个管道#声明queue
channel.queue_declare(queuehello)channel.basic_publish(exchange,routing_keyhello,#queue名字bodyHello World!)#消息内容
print( [x] Sent Hello World!)
connection.close() import pika
#建立连接
connection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))
channel connection.channel()channel.queue_declare(queuehello)def callback(ch, method, properties, body):print( [x] Received %r % body)channel.basic_consume(#消费消息callback,#如果收到消息就调用callback函数处理消息queuehello,no_ackTrue)print( [*] Waiting for messages. To exit press CTRLC)
channel.start_consuming() 2.消息发布轮询 上面的只是一个生产者、一个消费者能不能一个生产者多个消费者呢 可以上面的例子多启动几个消费者consumer看一下消息的接收情况。 采用轮询机制把消息依次分发 假如消费者处理消息需要15秒如果当机了那这个消息处理明显还没处理完怎么处理 可以模拟消费端断了分别注释和不注释 no_ackTrue 看一下 你没给我回复确认就代表消息没处理完。 上面的效果消费端断了就转到另外一个消费端去了但是生产者怎么知道消费端断了呢 因为生产者和消费者是通过socket连接的socket断了就说明消费端断开了。 上面的模式只是依次分发实际情况是机器配置不一样。怎么设置类似权重的操作RabbitMQ怎么办呢RabbitMQ做了简单的处理就能实现公平的分发。 就是RabbitMQ给消费者发消息的时候检测下消费者里的消息数量如果超过指定值比如1条就不给你发了。 只需要在消费者端channel.basic_consume前加上就可以了。 channel.basic_qos(prefetch_count1) # 类似权重按能力分发如果有一个消息就不在给你发 3. acknowledgment 消息持久化 no-ack False 如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了那么RabbitMQ会重新将该任务添加到队列中。 import pikaconnection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))
channel connection.channel()channel.queue_declare(queuehello)def callback(ch, method, properties, body):print( [x] Received %r % body)import timetime.sleep(10)print (ok)ch.basic_ack(delivery_tag method.delivery_tag)channel.basic_consume(callback,queuehello,no_ackFalse)print( [*] Waiting for messages. To exit press CTRLC)
channel.start_consuming() 生产者 durable 如果队列里还有消息RabbitMQ 服务端宕机了呢消息还在不在 把RabbitMQ服务重启看一下消息在不在。 上面的情况下宕机了消息就久了下面看看如何把消息持久化。 每次声明队列的时候都加上durable注意每个队列都得写客户端、服务端声明的时候都得写。 # 在管道里声明queue
channel.queue_declare(queuehello2, durableTrue) durable的作用只是把队列持久化。离消息持久话还差一步 发送端发送消息时加上properties propertiespika.BasicProperties(delivery_mode2, # 消息持久化) #!/usr/bin/env python
# -*- coding:utf-8 -*-
import pikaconnection pika.BlockingConnection(pika.ConnectionParameters(host10.211.55.4))
channel connection.channel()# make message persistent
channel.queue_declare(queuehello, durableTrue)def callback(ch, method, properties, body):print( [x] Received %r % body)import timetime.sleep(10)print okch.basic_ack(delivery_tag method.delivery_tag)channel.basic_consume(callback,queuehello,no_ackFalse)print( [*] Waiting for messages. To exit press CTRLC)
channel.start_consuming()消费者 生产者 import pikaconnection pika.BlockingConnection(pika.ConnectionParameters(host10.211.55.4))
channel connection.channel()# make message persistent
channel.queue_declare(queuehello, durableTrue)def callback(ch, method, properties, body):print( [x] Received %r % body)import timetime.sleep(10)print( ok)ch.basic_ack(delivery_tag method.delivery_tag)channel.basic_consume(callback,queuehello,no_ackFalse)print( [*] Waiting for messages. To exit press CTRLC)
channel.start_consuming() 消费者 4.消息获取顺序 默认消息队列里的数据是按照顺序被消费者拿走例如消费者1 去队列中获取 奇数 序列的任务消费者2去队列中获取 偶数 序列的任务。 channel.basic_qos(prefetch_count1) 表示谁来谁取不再按照奇偶数排列 1 #Auther: Xiaoliuer Li2 3 import pika4 5 connection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))6 channel connection.channel()7 8 # make message persistent9 channel.queue_declare(queuehello)
10
11
12 def callback(ch, method, properties, body):
13 print( [x] Received %r % body)
14 import time
15 time.sleep(10)
16 print (ok)
17 ch.basic_ack(delivery_tag method.delivery_tag)
18
19 channel.basic_qos(prefetch_count1)
20
21 channel.basic_consume(callback,
22 queuehello,
23 no_ackFalse)
24
25 print( [*] Waiting for messages. To exit press CTRLC)
26 channel.start_consuming() 消费者 5.发布订阅广播模式 前面的效果都是一对一发如果做一个广播效果可不可以这时候就要用到exchange了 exchange必须精确的知道收到的消息要发给谁。exchange的类型决定了怎么处理 类型有以下几种 fanout: 所有绑定到此exchange的queue都可以接收消息direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息topic 所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息fanout 纯广播、all 发布订阅和简单的消息队列区别在于发布订阅会将消息发送给所有的订阅者而消息队列中的数据被消费一次便消失。所以RabbitMQ实现发布和订阅时会为每一个订阅者创建一个队列而发布者发布消息时会将消息放置在所有相关队列中。 import pika
import sysconnection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))
channel connection.channel()channel.exchange_declare(exchangelogs,typefanout)message .join(sys.argv[1:]) or info: Hello World!
channel.basic_publish(exchangelogs,routing_key,bodymessage)
print( [x] Sent %r % message)
connection.close() 发布者 import pikaconnection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))
channel connection.channel()channel.exchange_declare(exchangelogs,typefanout)result channel.queue_declare(exclusiveTrue)
queue_name result.method.queuechannel.queue_bind(exchangelogs,queuequeue_name)print( [*] Waiting for logs. To exit press CTRLC)def callback(ch, method, properties, body):print( [x] %r % body)channel.basic_consume(callback,queuequeue_name,no_ackTrue)channel.start_consuming() 订阅者 注意广播是实时的收不到就没了消息不会存下来类似收音机。 direct 有选择的接收消息 之前事例发送消息时明确指定某个队列并向其中发送消息RabbitMQ还支持根据关键字发送即队列绑定关键字发送者将数据根据关键字发送到消息exchangeexchange根据 关键字 判定应该将数据发送至指定队列。 import pika
import sysconnection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))
channel connection.channel()channel.exchange_declare(exchangedirect_logs,typedirect)severity sys.argv[1] if len(sys.argv) 1 else info
message .join(sys.argv[2:]) or Hello World!
channel.basic_publish(exchangedirect_logs,routing_keyseverity,bodymessage)
print( [x] Sent %r:%r % (severity, message))
connection.close() 发送者 import pika
import sysconnection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))
channel connection.channel()channel.exchange_declare(exchangedirect_logs,typedirect)result channel.queue_declare(exclusiveTrue)
queue_name result.method.queue
# 获取运行脚本所有的参数
severities sys.argv[1:]
if not severities:sys.stderr.write(Usage: %s [info] [warning] [error]\n % sys.argv[0])sys.exit(1)
# 循环列表去绑定
for severity in severities:channel.queue_bind(exchangedirect_logs,queuequeue_name,routing_keyseverity)print( [*] Waiting for logs. To exit press CTRLC)def callback(ch, method, properties, body):print( [x] %r:%r % (method.routing_key, body))channel.basic_consume(callback,queuequeue_name,no_ackTrue)channel.start_consuming() 接收者 运行接收端指定接收级别的参数例 python direct_sonsumer.py info warning
python direct_sonsumer.py warning error topic 更细致的过滤 在topic类型下可以让队列绑定几个模糊的关键字之后发送者将数据发送到exchangeexchange将传入”路由值“和 ”关键字“进行匹配匹配成功则将数据发送到指定队列。 # 表示可以匹配 0 个 或 多个 单词* 表示只能匹配 一个 单词 发送者路由值 队列中
old.boy.python old.* -- 不匹配
old.boy.python old.# -- 匹配 import pika
import sysconnection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))
channel connection.channel()channel.exchange_declare(exchangetopic_logs,typetopic)routing_key sys.argv[1] if len(sys.argv) 1 else anonymous.info
message .join(sys.argv[2:]) or Hello World!
channel.basic_publish(exchangetopic_logs,routing_keyrouting_key,bodymessage)
print( [x] Sent %r:%r % (routing_key, message))
connection.close() 生产者 import pika
import sysconnection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))
channel connection.channel()channel.exchange_declare(exchangetopic_logs,typetopic)routing_key sys.argv[1] if len(sys.argv) 1 else anonymous.info
message .join(sys.argv[2:]) or Hello World!
channel.basic_publish(exchangetopic_logs,routing_keyrouting_key,bodymessage)
print( [x] Sent %r:%r % (routing_key, message))
connection.close() 消费者 注意 sudo rabbitmqctl add_user alex 123
# 设置用户为administrator角色
sudo rabbitmqctl set_user_tags alex administrator
# 设置权限
sudo rabbitmqctl set_permissions -p / alex ...# 然后重启rabbiMQ服务
sudo /etc/init.d/rabbitmq-server restart# 然后可以使用刚才的用户远程连接rabbitmq server了。------------------------------
credentials pika.PlainCredentials(alex,123)connection pika.BlockingConnection(pika.ConnectionParameters(192.168.14.47,credentialscredentials)) View Code 6.RabbitMQ RPC 实现Remote procedure call 不知道你有没有发现上面的流都是单向的如果远程的机器执行完返回结果就实现不了了。 如果返回这种模式叫什么呢RPC远程过程调用snmp就是典型的RPC RabbitMQ能不能返回呢怎么返回呢既是发送端又是接收端。 但是接收端返回消息怎么返回可以发送到发过来的queue里么不可以。 返回时再建立一个queue把结果发送新的queue里 为了服务端返回的queue不写死在客户端给服务端发指令的的时候同时带一条消息说你结果返回给哪个queue import pika
import uuid
import timeclass FibonacciRpcClient(object):def __init__(self):self.connection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))self.channel self.connection.channel()result self.channel.queue_declare(exclusiveTrue)self.callback_queue result.method.queueself.channel.basic_consume(self.on_response, # 只要一收到消息就调用on_responseno_ackTrue,queueself.callback_queue) # 收这个queue的消息def on_response(self, ch, method, props, body): # 必须四个参数# 如果收到的ID和本机生成的相同则返回的结果就是我想要的指令返回的结果if self.corr_id props.correlation_id:self.response bodydef call(self, n):self.response None # 初始self.response为Noneself.corr_id str(uuid.uuid4()) # 随机唯一字符串self.channel.basic_publish(exchange,routing_keyrpc_queue, # 发消息到rpc_queuepropertiespika.BasicProperties( # 消息持久化reply_to self.callback_queue, # 让服务端命令结果返回到callback_queuecorrelation_id self.corr_id, # 把随机uuid同时发给服务器),bodystr(n))while self.response is None: # 当没有数据就一直循环# 启动后on_response函数接到消息self.response 值就不为空了self.connection.process_data_events() # 非阻塞版的start_consuming()# print(no msg……)# time.sleep(0.5)# 收到消息就调用on_responsereturn int(self.response)if __name__ __main__:fibonacci_rpc FibonacciRpcClient()print( [x] Requesting fib(7))response fibonacci_rpc.call(7)print( [.] Got %r % response) RPC client import pika
import timedef fib(n):if n 0:return 0elif n 1:return 1else:return fib(n-1) fib(n-2)def on_request(ch, method, props, body):n int(body)print( [.] fib(%s) % n)response fib(n)ch.basic_publish(exchange, # 把执行结果发回给客户端routing_keyprops.reply_to, # 客户端要求返回想用的queue# 返回客户端发过来的correction_id 为了让客户端验证消息一致性propertiespika.BasicProperties(correlation_id props.correlation_id),bodystr(response))ch.basic_ack(delivery_tag method.delivery_tag) # 任务完成告诉客户端if __name__ __main__:connection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))channel connection.channel()channel.queue_declare(queuerpc_queue) # 声明一个rpc_queue ,
channel.basic_qos(prefetch_count1)# 在rpc_queue里收消息,收到消息就调用on_requestchannel.basic_consume(on_request, queuerpc_queue)print( [x] Awaiting RPC requests)channel.start_consuming() RPC server 转载于:https://www.cnblogs.com/lixiaoliuer/p/6846063.html