网站怎么上传,网站搜索引擎优化方案,网站建设对公司有什么好处,网站首页结构怎么写RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue)的开源实现#xff0c;是实现消息队列应用的一个中间件#xff0c;消息队列中间件是分布式系统中重要的组件#xff0c;主要解决应用耦合#xff0c;异步消息#xff0c;流量削锋等问题。实现高性能#xff0c;…RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue)的开源实现是实现消息队列应用的一个中间件消息队列中间件是分布式系统中重要的组件主要解决应用耦合异步消息流量削锋等问题。实现高性能高可用可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。EasyNetQ则是基于官方.NET组件RabbitMQ.Client 的又一层封装使用起来更加方便。本篇随笔主要大概介绍下RabbitMQ的基础知识和环境的准备以及使用EasyNetQ的相关开发调用。 1、RabbitMQ基础知识 AMQP即Advanced Message Queuing Protocol高级消息队列协议是应用层协议的一个开放标准为面向消息的中间件设计。消息中间件主要用于组件之间的解耦消息的发送者无需知道消息使用者的存在反之亦然。AMQP的主要特征是面向消息、队列、路由包括点对点和发布/订阅、可靠性、安全。RabbitMQ 是一个开源的AMQP实现服务器端用Erlang语言编写支持多种客户端如Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等支持AJAX。用于在分布式系统中存储转发消息在易用性、扩展性、高可用性等方面表现不俗。 RabbitMQ的特点强大的应用程序消息传递使用方便运行在所有主要操作系统上支持大量开发人员平台开源和商业支持。消息队列的模式有两种模式P2PPoint to PointP2P模式包含三个角色消息队列Queue发送者(Sender)接收者(Receiver)。每个消息都被发送到一个特定的队列接收者从队列中获取消息。队列保留着消息直到他们被消费或超时。Publish/Subscribe(Pub/Sub)包含三个角色主题Topic发布者Publisher订阅者Subscriber 。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。 EasyNetQ 的目标是提供一个使.NET中的RabbitMQ尽可能简单的库。在EasyNetQ中消息应由.NET类型表示消息应通过其.NET类型进行路由。EasyNetQ按消息类型进行路由。发布消息时EasyNetQ会检查其类型并根据类型名称命名空间和装配体给出一个路由密钥。在消费方面用户订阅类型。订阅类型后该类型的消息将路由到订户。默认情况下EasyNetQ使用Newtonsoft.Json库将.NET类型序列化为JSON。这具有消息是人类可读的优点因此您可以使用RabbitMQ管理应用程序等工具来调试消息问题。 EasyNetQ是在RabbitMQ.Client库之上提供服务的组件集合。这些操作可以像序列化错误处理线程编组连接管理等。它们由mini-IoC容器组成。您可以轻松地用自己的实现替换任何组件。因此如果您希望XML序列化而不是内置的JSON只需编写一个ISerializer的实现并将其注册到容器。以下是官方提供的一个结构图这个结构图可以很好的解析该组件的结构 2、RabbitMQ的环境准备 本处主要介绍在Windows系统中安装RabbitMQ。 1. 下载安装erlang 下载地址 http://www.erlang.org/downloads根据操作系统选择32还64位 2. 下载安装rabbitmq-server 下载地址 http://www.rabbitmq.com/install-windows.html 下载后获得两个安装文件按照顺序安装即可 安装erlang环境后一般会添加了一个ERLANG_HOME的系统变量指向erlang的安装目录路径如下所示一般都添加了确认下 安装RabbitMQ后在程序里面可以看到 我们使用它的命令行来启动RabbitMQ的服务 查看安装是否成功命令 rabbitmqctl status 安装成功在浏览器中输入 http://127.0.0.1:15672/可以看到如下界面使用默认的账号密码均为guest登陆进行管理 guest 账号是管理员账号可以添加ExchangesQueuesAdmin。但我们一般不使用guest账号可以选择用命令来添加账号和权限也可以使用管理界面进行添加相应的内容。 例如我添加相应的用户账号 一般我们还需要添加虚拟机默认的虚拟机为/我这里添加了一个虚拟机myvhost。 然后绑定账号到虚拟机上即可。 3、EasyNetQ组件的使用 EasyNetQ组件的使用方式比较简单跟很多组件都类似例如建立连接进行操作做等等对于EasyNetQ组件也是如此。 在.NET中使用EasyNetQ要求至少基于 .NET4.5的框架基础上进行开发可以直接在VS项目上使用NuGet的程序包进行添加EasyNetQ的引用。 一般添加引用后至少包含了下面图示的几个引用DLL。 1创建连接 使用EasyNetQ连接RabbitMQ是在应用程序启动时创建一个IBus对象并且在应用程序关闭时释放该对象。 RabbitMQ连接是基于IBus接口的当IBus中的方法被调用连接才会开启。创建一个IBus对象的方法如下 var bus RabbitHutch.CreateBus(“hostmyServer;virtualHostmyVirtualHost;usernameadmin;password123456”); 与RabbitMQ服务器的延迟连接由IBus接口表示创建连接的方式连接字符串由格式为key value的键/值对组成每一个用分号;分隔。 hosthostlocalhost 或者host 192.168.1.102或者hostmy.rabbitmq.com,如果用到集群配置的话那么可以用逗号将服务地址隔开例如hosta.com,b.com,c.comvirtualHost,虚拟主机默认为/username,用户登录名password用户登录密码requestedHeartbeat,心跳设置默认是10秒prefetchcount默认是50pubisherConfirms默认为falsepersistentMessages消息持久化默认为trueproduct,产品名platform,平台timeout默认为10秒一般我们在代码里面测试的话简化连接代码如下所示。 //初始化bus对象bus RabbitHutch.CreateBus(hostlocalhost); 2关闭连接 bus.Dispose(); 要关闭连接只需简单地处理总线这将关闭EasyNetQ使用的连接渠道消费者和所有其他资源。 如果我们在Winform窗体里面初始化一个IBus对象那么在窗体关闭的时候关闭这个接口即可。 private void FrmPublisher_FormClosed(object sender, FormClosedEventArgs e){//关闭IBus接口if(bus ! null){bus.Dispose();}} 3发布消息 EasyNetQ支持最简单的消息模式是发布和订阅。发布消息后任意消费者可以订阅该消息也可以多个消费者订阅。并且不需要额外配置。首先如上文中需要先创建一个IBus对象然后在创建一个可序列化的.NET对象。调用Publish方法即可。 var message new MyMessage { Text Hello Rabbit };
bus.Publish(message); 4订阅消息 EasyNetQ提供了消息订阅当调用Subscribe方法时候EasyNetQ会创建一个用于接收消息的队列不过与消息发布不同的是消息订阅增加了一个参数subscribe_id.代码如下 bus.SubscribeMyMessage(my_subscription_id, msg Console.WriteLine(msg.Text)); 第一个参数是订阅id另外一个是delegate参数用于处理接收到的消息。这里要注意的是subscribe_id参数很重要假如开发者用同一个subscribeid订阅了同一种消息类型两次或者多次RabbitMQ会以轮训的方式给每个订阅的队列发送消息。接收到之后其他队列就接收不到该消息。如果用不同的subscribeid订阅同一种消息类型那么生成的每一个队列都会收到该消息。 需要注意的是在收到消息处理消息时候不要占用太多的时间会影响消息的处理效率所以遇到占用长时间的处理方法最好用异步处理。 为了测试发布和订阅消息我们可以建立几个不同的项目来进行测试如发布放在一个Winform项目订阅放在一个Winform项目另外一个项目放置共享的消息对象定义如下所示。 定义消息对象类如下所示。 /// summary/// 定义的MQ消息类型/// /summarypublic class TextMessage{public string Text { get; set; }} 然后在发布消息的Winform项目上创建一个处理的窗体并添加如下代码。 namespace MyRabbitMQ.Publisher
{/// summary/// 测试RabbitMQ消息队列的发布/// /summarypublic partial class FrmPublisher : DevExpress.XtraEditors.XtraForm{//构建一个IBus公用接口对象private IBus bus null;public FrmPublisher(){InitializeComponent();//初始化bus对象bus RabbitHutch.CreateBus(hostlocalhost);//对指定消息类型进行回应bus.RespondMyRequest, MyResponse(request new MyResponse { Text Responding to: request.Text});//收到消息后输出到控制台上显示bus.Receive(my.queue, x x.AddMyMessage(message Console.WriteLine(message.ToJson())).AddMyOtherMessage(message Console.WriteLine(message.ToJson())));} 发布消息的处理代码如下代码所示。 private void btnSend_Click(object sender, EventArgs e){if (bus ! null){bus.Publish(new TextMessage{Text this.txtContent.Text});}} 然后在创建一个类似窗体用来订阅消息的处理窗体如下所示代码和窗体。 namespace MyRabbitMQ.Subcriber
{ /// summary/// 测试RabbitMQ消息队列的订阅/// /summarypublic partial class FrmSubcriber : DevExpress.XtraEditors.XtraForm{//构建一个IBus公用接口对象private IBus bus null;public FrmSubcriber(){InitializeComponent();//初始化bus对象bus RabbitHutch.CreateBus(hostlocalhost);if(bus ! null){//订阅一个消息并对接收到的消息进行处理展示在控件上bus.SubscribeTextMessage(test, (msg) {StringBuilder sb new StringBuilder();sb.AppendLine(msg.Text , DateTime.Now.ToString());sb.AppendLine(this.txtContent.Text);this.txtContent.Invoke(new MethodInvoker(delegate(){this.txtContent.Text sb.ToString();}));});}//使用消息发送接口发送消息bus.Send(my.queue, new MyMessage { Text Hello Widgets! });bus.Send(my.queue, new MyOtherMessage { Text Hello wuhuacong! });} 发送请求获取响应的代码如下所示。 private void btnRequest_Click(object sender, EventArgs e){//定义请求消息的对象var request new MyRequest(){Text string.Format(请求消息{0}, DateTime.Now)};//异步获取请求消息的结果并进行处理展示应答消息在窗体中的var task bus.RequestAsyncMyRequest, MyResponse(request);task.ContinueWith(response {StringBuilder sb new StringBuilder();sb.AppendLine(response.Result.Text);sb.AppendLine(this.txtContent.Text);this.txtContent.Invoke(new MethodInvoker(delegate(){this.txtContent.Text sb.ToString();}));});} 两个项目联合进行测试如下界面所示。 发布者多次发送消息的情况下订阅者中会进行消息的轮训处理也就是进行均匀分配。 5消息发送Send和接收Receive 与Publish/Subscribe略有不同的是Send/Receive 可以自己定义队列名称。 //发送端代码
bus.Send(my.queue, new MyMessage{ Text Hello Widgets! });//接收端代码
bus.ReceiveMyMessage(my.queue, message Console.WriteLine(MyMessage: {0}, message.Text)); 并且也可以在同一个队列上发送不同的消息类型Receive方法可以这么写 bus.Receive(my.queue, x x.AddMyMessage(message deliveredMyMessage message).AddMyOtherMessage(message deliveredMyOtherMessage message)); 如果消息到达队列但是没有发现相应消息类型的处理时EasyNetQ会发送一条消息到error队列并且带上一个异常信息No handler found for message type message type。与Subscribe类型如果在同一个队列同一个消息类型多次调用Receive方法时消息会通过轮询的形式发送给每个Receive端。 6远程过程调用 var request new TestRequestMessage {Text Hello from the client! };
bus.RequestTestRequestMessage, TestResponseMessage(request, response Console.WriteLine(Got response: {0}, response.Text)); 7RPC服务器 bus.RespondTestRequestMessage, TestResponseMessage(request new TestResponseMessage{ Text request.Text all done! }); 8记录器 var logger new MyLogger() ;
var bus RabbitHutch.CreateBus(“my connection string”, x x.RegisterIEasyNetQLogger(_ logger)); 9路由 Publish方法可以加一个topic参数。 bus.Publish(message, X.A); 消息订阅方可以通过路由来过滤相应的消息。 * 匹配一个字符 #匹配0个或者多个字符 所以 X.A.2 会匹配到 #, X.#, *.A.* 但不会匹配 X.B.* 或者 A. 当消息订阅需要用到topic时候需要调用Subscribe的重载方法 bus.Subscribe(my_id, handlerOfXDotStar, x x.WithTopic(X.*));
bus.Subscribe(my_id, handlerOfStarDotB, x x.WithTopic(*.B)); 上述这种方式会将消息轮询发送给两个订阅者如果只需要一个订阅者的话可以这么调用 bus.Subscribe(my_id, handler, x x.WithTopic(X.*).WithTopic(*.B)); RabbitMQ具有非常好的功能基于主题的路由允许订阅者基于多个标准过滤消息。*星号匹配一个字。哈希匹配为零个或多个单词。 RabbitMQ的应用场景一般在快速处理订单以及异步的多任务处理中可以得到很好的体现下面是几个应用场景。 邮件和短消息的处理 订单的解耦处理 RabbitMQ的服务器架构 3、RabbitMQ查询状态出现错误的处理 安装成功之后使用rabbitmqctl status命令之后出现如下错误。 Status of node rabbitWUHUACONG ...
Error: unable to perform an operation on node rabbitWUHUACONG. Please see diagnostics information and suggestions below.Most common reasons for this are:* Target node is unreachable (e.g. due to hostname resolution, TCP connection or firewall issues)* CLI tool fails to authenticate with the server (e.g. due to CLI tools Erlang cookie not matching that of the server)* Target node is not runningIn addition to the diagnostics info below:* See the CLI, clustering and networking guides on http://rabbitmq.com/documentation.html to learn more* Consult server logs on node rabbitWUHUACONGDIAGNOSTICS
attempted to contact: [rabbitWUHUACONG]rabbitWUHUACONG:* connected to epmd (port 4369) on WUHUACONG* epmd reports node rabbit uses port 25672 for inter-node and CLI tool traffic* TCP connection succeeded but Erlang distribution failed* Authentication failed (rejected by the remote node), please check the Erlang cookieCurrent node details:* node name: rabbitmqcli100WUHUACONG* effective users home directory: C:\Users\Administrator* Erlang cookie hash: RgaUM2cocrxIhJrfLS7Jw 这个问题出现比较常见主要原因是两个目录的.erlang.cookie文件内容不一样。 要确保.erlang.cookie文件的一致性不知道什么原因导致了C:\Users\{UserName}\.erlang.cookie和默认情况下C:\WINDOWS\System32\config\systemprofile\.erlang.cookie不一致了将Windows目录下的拷贝到用户目录下就可以了。 反正无论如何两个地址的Cookie内容一致就可以了然后重启下RabbitMQ服务器即可正常运行并可以正常获取它的状态信息。 转载于:https://www.cnblogs.com/wuhuacong/p/8927096.html