哪里有网站建设加工,wordpress 吃cpu,西安关键词排名首页,江苏省两学一做网站1 RABBITMQ简介及安装 RabbitMQ是一个开源的AMQP实现#xff0c;服务器端用Erlang语言编写#xff0c;支持多种客户端#xff0c;如#xff1a;Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等#xff0c;支持AJAX。用于在分布式系统中存储转发消息… 1 RABBITMQ简介及安装 RabbitMQ是一个开源的AMQP实现服务器端用Erlang语言编写支持多种客户端如Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等支持AJAX。用于在分布式系统中存储转发消息在易用性、扩展性、高可用性等方面表现不俗。 AMQP即Advanced message Queuing Protocol高级消息队列协议是应用层协议的一个开放标准为面向消息的中间件设计。消息中间件主要用于组件之间的解耦消息的发送者无需知道消息使用者的存在反之亦然。 AMQP的主要特征是面向消息、队列、路由包括点对点和发布/订阅、可靠性、安全。 1 RABBITMQ系统架构 RabbitMQ Server 也叫broker server是一种传输服务负责维护一条从Producer到consumer的路线保证数据能够按照指定的方式进行传输。 Producer数据的发送方。 Consumer数据的接收方。 Exchanges 接收消息转发消息到绑定的队列。主要使用3种类型direct topic fanout。 Queue RabbitMQ内部存储消息的对象。相同属性的queue可以重复定义但只有第一次定义的有效。 Bindings 绑定Exchanges和Queue之间的路由。 Connection 就是一个TCP的连接。Producer和consumer都是通过TCP连接到RabbitMQ Server的。 Channel虚拟连接。它建立在上述的TCP连接中。数据流动都是在Channel中进行的。也就是说一般情况是程序起始建立TCP连接第二步就是建立这个Channel。 2 RABBITMQ安装和启动 Ubuntu 安装 编辑 /etc/apt/sources.list 文件添加如下行 deb http://www.rabbitmq.com/debian/ testing main 执行更新软件源命令 #apt-get update. 安装rabbitmq #apt-get install rabbitmq-server 启动rabbitmq # service rabbitmq-server start Centos 安装 从官网下载erlang和rabbitmq 的rpm包 安装erlang #rpm –i erlang-17.4-1.el6.x86_64.rpm 安装rabbitmq #rpm –i rabbitmq-server-3.5.3-1.noarch.rpm rabbitmq的rpm安装包不能指定安装路径 3 RABBITMQ配置文件 RABBITMQ配置文件位置 Generic UNIX- $RABBITMQ_HOME/etc/rabbitmq/Debian- /etc/rabbitmq/RPM- /etc/rabbitmq/ 4 RABBITMQ页面web管理 启用web插件 #rabbitmq-plugins enable rabbitmq_management #启用 关闭web插件 # rabbitmq-plugins disable rabbitmq_management 可以用默认账号guest,guest登陆http://主机IP:15672如果要远程登录需要先创建帐户可查看下一节。 5 RABBITMQ创建帐户 如果是集群的话只要在一台主机设置即可其它会自动同步。 #rabbitmqctl add_user iom 123456 –iom为新建的用户123456为密码 #rabbitmqctl set_user_tags iom administrator –将用户设置为管理员角色 #rabbitmqctl set_permissions -p / iom “.*” “.*” “.*” –在 / 虚拟主机里设置iom用户配置权限写权限读权限。.*是正则表达式里用法。rabbitmq的权限是根据不同的虚拟主机virtual hosts配置的同用户在不同的虚拟主机virtual hosts里可能不一样。 2 RABBITMQ安全特性 6 publish消息确认机制 如果采用标准的 AMQP 协议则唯一能够保证消息不会丢失的方式是利用事务机制 — 令 channel 处于 transactional 模式、向其 publish 消息、执行 commit 动作。在这种方式下事务机制会带来大量的多余开销并会导致吞吐量下降 250% 。为了补救事务带来的问题引入了 confirmation 机制即 Publisher Confirm。 confirm 机制是在channel上使用 confirm.select方法处于 transactional 模式的 channel 不能再被设置成 confirm 模式反之亦然。 在 channel 被设置成 confirm 模式之后所有被 publish 的后续消息都将被 confirm即 ack 或者被 nack 一次。但是没有对消息被 confirm 的快慢做任何保证并且同一条消息不会既被 confirm 又被 nack 。 RabbitMQ 将在下面的情况中对消息进行 confirm RabbitMQ发现当前消息无法被路由到指定的 queues 中 非持久属性的消息到达了其所应该到达的所有 queue 中和镜像 queue 中 持久消息到达了其所应该到达的所有 queue 中和镜像 queue 中并被持久化到了磁盘被 fsync 持久消息从其所在的所有 queue 中被 consume 了如果必要则会被 acknowledge。 7 consumer消息确认机制 为了保证数据不被丢失RabbitMQ支持消息确认机制即acknowledgments。 如果没启动消息确认机制RabbitMQ在consumer收到消息后就会把消息删除。 启用消息确认后consumer在处理数据后应通过回调函数显示发送ack RabbitMQ收到ack后才会删掉数据。如果consumer一段时间内不回馈RabbitMQ会将该消息重新分配给另外一个绑定在该队列上的consumer。另一种情况是consumer断开连接但是获取到的消息没有回馈则RabbitMQ同样重新分配。 注意如果consumer 没调用basic.qos 方法设置prefetch_count1那即使该consumer有未ack的messagesRabbitMQ仍会继续发messages给它。 8 消息持久化 消息确认机制确保了consumer退出时消息不会丢失但如果是RabbitMQ本身因故障退出消息还是会丢失。为了保证在RabbitMQ出现意外情况时数据仍没有丢失需要将queue和message都要持久化。 queue持久化channel.queue_declare(queue’hello’, durableTrue) message持久化channel.basic_publish(exchange”, routing_key”task_queue”, bodymessage, propertiespika.BasicProperties( delivery_mode 2,) #消息持久化 ) 即使有消息持久化数据也有可能丢失因为rabbitmq是先将数据缓存起来到一定条件才保存到硬盘上这期间rabbitmq出现意外数据有可能丢失。 网上有测试表明持久化会对RabbitMQ的性能造成比较大的影响可能会下降10倍不止。 3 RABBITMQ集群 1 RABBITMQ集群基本概念 一个RABBITMQ集 群中可以共享uservirtualhostsqueues开启Highly Available Queuesexchanges等。但message只会在创建的节点上传输。当message进入A节点的queue中后consumer从B节点拉取时RabbitMQ会临时在A、B间进行消息传输把A中的消息实体取出并经过B发送给consumer。所以consumer应尽量连接每一个节点从中取消息。 RABBITMQ的集群节点包括内存节点、磁盘节点。内存节点的元数据仅放在内存中性能比磁盘节点会有所提升。不过如果在投递message时打开了message的持久化那么内存节点的性能只能体现在资源管理上比如增加或删除队列queue虚拟主机vrtual hosts交换机exchange等发送和接受message速度同磁盘节点一样。一个集群至少要有一个磁盘节点。 2 RABBITMQ搭建集群 环境有三台主机主机名和IP如下rabbitmq的执行用户为rabbitmq所属组为rabbitmq。 主机名 IP rabbitmq1 192.168.10.2 rabbitmq2 192.168.10.3 rabbitmq3 192.168.10.4 同步erlang.cookie 杀掉rabbitmq2和rabbitmq3的rabbitmq进程 #ps –ef|grep rab|awk ‘{print $2}’|xargs kill -9。–用service rabbitmq-servier stop停会有遗留进程。 登陆rabbitmq1rabbitmq1上的rabbitmq服务不能关执行 #cd /var/lib/rabbitmq –进入erlang.cookie所在目录只有ls –al能看见此文件 #chmod 777 .erlang* –该文件默认为400权限为方便传输先修改权限非必须操作 #scp .erlang.cookie rabbitmq192.168.10.3:/var/lib/rabbitmq –将此文件传给另外两条主机 #scp .erlang.cookie rabbitmq192.168.10.4:/var/lib/rabbitmq #chmod 400 .er* –恢复文件权限 分别在rabbitmq2和rabbitmq3 上执行 #chown rabbitmq:rabbitmq .er* –修改文件所属用户和所属组 #chmod 400 .er* –修改文件权限 #service rabbitmq-server start 加入集群 查询rabbitmq1节点名称 #rabbitmqctl cluster_status Cluster status of node rabbitrabbitmq1 … [{nodes,[{disc,[rabbitrabbitmq1]}]},{running_nodes,[rabbit rabbitmq1]}] …done. rabbitmq2 加入rabbitmq1 节点. # rabbitmqctl stop_app –关掉rabbitmq2服务 # rabbitmqctl join_cluster rabbitrabbitmq1 — rabbitmq2加入rabbitmq1, rabbitmq2必须能通过rabbitmq1的主机名ping通rabbitmq1。 # rabbitmqctl start_app –启动rabbitmq2服务 查看集群信息 # rabbitmqctl cluster_status –此时里面就应该能看见两个节点。集群名字为rabbitrabbitmq。 用相同的方法把rabbitmq3也加入rabbitmq1。 更改节点属性 #rabbitmqctl stop_app –停止rabbitmq服务 #rabbitmqctl change_cluster_node_type disc/ram –更改节点为磁盘或内存节点 #rabbitmqctl start_app –开启rabbitmq服务 查看集群状态 #rabbitmqctl cluster_status [{nodes,[{disc,[rabbitrabbitmq1,rabbitrabbitmq2,rabbitrabbitmq3]}]}, {running_nodes,[rabbitrabbitmq1,rabbitrabbitmq2, rabbitrabbitmq3]}]…done. –第一行是集群中的节点成员disc表示这些都是磁盘节点。 –第二行是正在运行的节点成员 3 RABBITMQ退出集群 假设要把rabbitmq2退出集群 在rabbitmq2上执行 #rabbitmqctl stop_app #rabbitmqctl reset #rabbitmqctl start_app 在集群主节点上执行 # rabbitmqctl forget_cluster_node rabbitrabbitmq2 4 RABBITMQ集群重启 集群重启时最后一个挂掉的节点应该第一个重启如果因特殊原因比如同时断电而不知道哪个节点最后一个挂掉。可用以下方法重启 先在一个节点上执行 #rabbitmqctl force_boot #service rabbitmq-server start 在其他节点上执行 #service rabbitmq-server start 查看cluster状态是否正常要在所有节点上查询。 #rabbitmqctl cluster_status 如果有节点没加入集群可以先退出集群然后再重新加入集群。 上述方法不适合内存节点重启内存节点重启的时候是会去磁盘节点同步数据如果磁盘节点没起来内存节点一直失败。 5 注意事项 cookie在所有节点上必须完全一样同步时一定要注意。erlang是通过主机名来连接服务必须保证各个主机名之间可以ping通。可以通过编辑/etc/hosts来手工添加主机名和IP对应关系。如果主机名ping不通rabbitmq服务启动会失败。如果queue是非持久化queue则如果创建queue的那个节点失败发送方和接收方可以创建同样的queue继续运作。但如果是持久化queue则只能等创建queue的那个节点恢复后才能继续服务。在集群元数据有变动的时候需要有disk node在线但是在节点加入或退出的时候所有的disk node必须全部在线。如果没有正确退出disk node集群会认为这个节点当掉了在这个节点恢复之前不要加入其它节点。. 4 RABBITMQ HA 1 镜像队列概念 镜像队列可以同步queue和message当主queue挂掉从queue中会有一个变为主queue来接替工作。 镜像队列是基于普通的集群模式的,所以你还是得先配置普通集群,然后才能设置镜像队列。 镜像队列设置后会分一个主节点和多个从节点如果主节点宕机从节点会有一个选为主节点原先的主节点起来后会变为从节点。 queue和message虽然会存在所有镜像队列中但客户端读取时不论物理面连接的主节点还是从节点都是从主节点读取数据然后主节点再将queue和message的状态同步给从节点因此多个客户端连接不同的镜像队列不会产生同一message被多次接受的情况。 2 配置镜像队列 沿用3.2的环境现在我们把名为“hello”的队列设置为同步给所有节点 #rabbitmqctl set_policy ha-all “hello” ‘{“ha-mode”:”all”}’ ha-all 是同步模式指同步给所有节点还有另外两种模式ha-exactly表示在指定个数的节点上进行镜像节点的个数由ha-params指定ha-nodes表示在指定的节点上进行镜像节点名称通过ha-params指定 hello 是同步的队列名可以用正则表达式匹配 {“ha-mode”:”all”} 表示同步给所有同步模式的不同此参数也不同。 执行上面命令后可以在web管理界面查看queue 页面里面hello队列的node节点后会出现2标签表示有2个从节点而主节点则是当前显示的nodexf7021是测试用的名字按4-2应该为rabbitmq1-3。 5 keepalived和执行脚本 1 A-keepalived配置 红字为手工加的备注原文件里没有。 vi /etc/keepalived/keepalived.cnf文件 global_defs { router_id LVS_MASTER } vrrp_instance VI_1 { state MASTER interface eth0 virtual_router_id 51 priority 100 advert_int 1 authentication { auth_type PASS auth_pass 1111 } virtual_ipaddress { 192.168.10.251/24 #rabbitmq } } virtual_server 192.168.10.251 5672 { delay_loop 6 lb_algo rr lb_kind DR protocol TCP real_server 192.168.10.3 5672 { weight 3 TCP_CHECK { connect_timeout 3 nb_get_retry 3 delay_before_retry 3 connect_port 5672 } } real_server 192.168.10.4 5672 { weight 3 TCP_CHECK { connect_timeout 3 nb_get_retry 3 delay_before_retry 3 connect_port 5672 } } } 2 rabbitmq 服务器上执行的脚本 lvs_rabbitmq.sh脚本内容 #!/bin/bash VIP192.168.10.251 case “$1” in start) ifconfig lo:0 $VIP netmask 255.255.255.255 broadcast $VIP /sbin/route add -host $VIP dev lo:0 echo “1” /proc/sys/net/ipv4/conf/lo/arp_ignore echo “2” /proc/sys/net/ipv4/conf/lo/arp_announce echo “1” /proc/sys/net/ipv4/conf/all/arp_ignore echo “2” /proc/sys/net/ipv4/conf/all/arp_announce sysctl -p /dev/null 21 echo “lvs_vip server start ok!”;; stop) ifconfig lo:0 down /sbin/route del $VIP /dev/null 21 echo “0” /proc/sys/net/ipv4/conf/lo/arp_ignore echo “0” /proc/sys/net/ipv4/conf/lo/arp_announce echo “0” /proc/sys/net/ipv4/conf/all/arp_ignore echo “0” /proc/sys/net/ipv4/conf/all/arp_announce echo “lvs_vip server stoped.”;; *) echo “arg start|stop.” exit 1 esac exit 0 6 RABBITMQ监控 RabbitMQ监控项目很多可通过web管理界面监控。 OVERVIEW页面下有4个标签。主要关注totals和nodes两个。 1 totals ready为待处理消息量total为总消息量。 publish为每秒发送消息量deliver为每秒接受消息量。 下面5个灰色长方块分别代表对应的模块连接数。 2 nodes name为节点名称后面5个蓝色方块分别代表文件打开数socket连接数erlang processes暂时未知内存占用两磁盘空余量info里显示节点属性将鼠标放在内容上会显示对应的统计内容。 7 系统测试 3 测试环境 主机名 IP VIP 192.168.10.251 client 192.168.10.2 –本测试发送producer和接收consumer在同一台机器 rabbitmq1 192.168.10.3 rabbitmq2 192.168.10.4 4 准备工作 负载机启动keepalived # service keepalived start rabbitmq1和rabbitmq2执行5-2的脚本 #./lvs_rabbitmq.sh start 按第3和第4章的方法组建集群配置镜像队列节点类型最好都设置为磁盘节点。 按第1-5创建用户。 5 客户机测试脚本 测试用RabbitMQ的python语言客户端注意python是靠缩进量来区分语句块。红色部分为注释源码上没有。 发送源码 #vi send.py #!/usr/bin/env python import pika import time credentialspika.PlainCredentials(‘iom’,’123456′) –配置连接的用户名和密码 parameterspika.ConnectionParameters(‘192.168.10.251′,5672,’/’,credentials) connectionpika.BlockingConnection(parameters) channelconnection.channel() channel.queue_declare(queue’hello’) count0 while count9999: message’Hello World’str(count) countcount1 channel.basic_publish(exchange”,routing_key’hello’,bodymessage) print “Sent %s” %(message) time.sleep(1) connection.close() 接收源码 #vi receive.py #!/usr/bin/env python import pika connectionpika.BlockingConnection(pika.ConnectionParameters(host’xf7027′)) channelconnection.channel() channel.queue_declare(queue’hello’) print ‘[*] Waiting for message.To exit press CTRLC’ def callback(ch,method,properties,body): print “[x] Received %r” %(body,) channel.basic_consume(callback,queue’hello’,no_ackTrue) channel.start_consuming() 6 测试过程 测试keepalived分配 在客户机上执行 #python send.py 在负载机上执行 #watch ipvsadm –Ln 可以看到rabbitmq1或rabbitmq2的activeconn列数值为1。 客户机重新执行发送程序 #python send.py 在负载机上可以看到另一个rabbitmq服务的activeconn 列数值也变为1。 测试容灾性 在客户机上分别执行发送和接受程序。 #python send.py #python receive.py 然后关掉一个rabbitmq节点如果关掉的正好是客户机连的那个节点的话客户机发送和接收程序会报错退出程序本身如果有错误重发机制则不受任何影响。如果关掉的是另外的节点程序不受任何影响。 学习时的痛苦是暂时的 未学到的痛苦是终生的 加入集群 同步.erlang.cookie这个文件的内容 转载于:https://www.cnblogs.com/zhengchunyuan/p/10179927.html