风景网站模板,wordpress 登录失败,淘客网站备案,全球建筑设计网站一、消息队列消息队列作为分布式系统中的重要组件#xff0c;常用的有MSMQ#xff0c;RabbitMq#xff0c;Kafa#xff0c;ActiveMQ#xff0c;RocketMQ。至于各种消息队列的优缺点比较#xff0c;在这里就不做扩展了#xff0c;网上资源很多。更多内容可参考 消息队列及… 一、消息队列 消息队列作为分布式系统中的重要组件常用的有MSMQRabbitMqKafaActiveMQRocketMQ。至于各种消息队列的优缺点比较在这里就不做扩展了网上资源很多。 更多内容可参考 消息队列及常见消息队列介绍。我在这里选用的是RabbitMq。官网地址http://www.rabbitmq.com安装和配置Windows下RabbitMq安装及配置二、RabbitMq简单介绍 RabbitMQ是一款基于AMQP高级消息队列协议由Erlang开发的开源消息队列组件。是一款优秀的消息队列组件他由两部分组成服务端和客户端客户端支持多种语言的驱动如.Net、JAVA、 Erlang等。在RabbitMq中首先要弄清楚的概念是 交换机、队列、绑定。基本的消息通讯步骤就是首先定义ExChange然后定义队列然后绑定交换机和队列。 需要明确的一点儿是发布者在发送消息是并不是把消息直接发送到队列中而是发送到Exchang然后由交互机根据定义的消息匹配规则在将消息发送到队列中。 Exchange有四种消息消息分发规则directtopicfanoutheader。headers 匹配 AMQP 消息的 header 而不是路由键此外 headers 交换器和 direct 交换器完全一致但性能差很多目前几乎用不到了。 详细的概念介绍推荐查看消息队列之RabbitMq三、EasyNetQ使用 Easynetq是一个简单易用的Rabbitmq Net客户端。同时支持 NetFramework和NetCore。GitHub地址。它是针对RabbitMq Net客户端的进一步封装。关于EasyNetQ的简单使用推荐教程EasyNetQ的介绍。 本文主要介绍基于EasyNeq的高级API的使用。EasyNetQ的作者在核心的IBus接口中尽量避免暴露AMQP中的交换机、队列、绑定这些概念使用者即使不去了解这些概念也能完成消息的发送接收。这相当简洁但某些情况下基于应用场景的需要我们需要自定义交换机、队列、绑定这些信息EasyNetQ允许你这么做这些都是通过IAdvanceBus接口实现。3.1 项目装备 这里为了演示首先新建一个项目包括一个发布者两个接收者一个公共的类库安装EasyNetQ NuGetInstall-Package EasyNetQ3.2 简单封装在Common项目里面是针对Easynetq的使用封装主要目录如下 在RabbitMq文件夹下是针对消息发送接收的简单封装。 首先来看下RabbitMqManage主要的发送和订阅操作都在这个类中。其中ISend接口定义了发送消息的规范SendMessageManage是ISend的实现。IMessageConsume接口定义订阅规范。 MesArg 和PushMsg分别是订阅和发送需用到的参数类。RabbitMQManage是暴露在外的操作类。 首先看发送的代码在EasyNetQ中对于异步发送消息的时候消息是否送达Broker只需要查看异步发送方法最终执行成功还是失败成功就表示消息送达如果失败可以将失败后的消息存入数据库中然后用后台线程轮询数据库表将失败后的消息进行重新 发送。这种方式还可以进一步变成消息表就是先将要发送的消息存入消息表中然后后台线程轮询消息表来进行消息发送。一般这种方式被广泛用于分布式事务中将本地数据库操作和消息表写入放入同一个本地事务中来保证消息发送和本地数据操作的同步成功因为我的系统中分布式事务的涉及很少所以就没这样去做只是简单的在异步发送的时候监控下是否发送失败然后针对失败的消息做一个重新发送的机制。这里推荐大佬的NetCore分布式事务解决方案 CAP GitHub地址。 接着看一下消息订阅接收涉及的代码在订阅中我定义了一个接口最终业务代码中所有的消息订阅类都需要继续此接口最后我们来看下对外使用的操作类这里面主要封装了消息的发送和订阅以及IBus单例的创建。在后续的消息发送和订阅主要就通过此处来实现。我们看到一开始的类目结构中还有一个RaExMessageHandleJob类这个类就是一个后台循环任务用来监测数据库中是否保存了发送失败的消息如果有则将消息取出尝试重新发送。在此就不做多的介绍大家可以根据自己的实际需求来实现。3.3 发布者 现在来看一下消息发布者的代码 主要的发送代码都在Send类中其中appsettings.json里面配置了Rabbitmq的连接地址TestDto只是一个为了方便演示的参数类。 下面看一下Program里面的代码 很简单的一个发送消息调用。 然后来看一下Send类中的代码3.4 消费者 首先来看下消费者端的目录结构 其中appsettings.json中配置Rabbitmq的连接信息Program中只是简单调用消息订阅主要的消息订阅代码都在MessageManage文件夹下MessageManService用于定义消息订阅类型Consume文件夹下主要定义了消息的业务处理可以看到所有的类都集成自我们定义的接口IMessageConsume。四、总结在EasyNetQ中如果需要消费者确认功能则需要在Rabbitmq的连接配置中设置publisherConfirmstrue这将会开启自动确认。在使用高级api定义交换机和队列时可以自己定义多种参数比如消息是否持久化消息最大长度等等具体大家可以去看官方文档上面有详细介绍。Easynetq会自动去捕获消费异常的消息并将其放入到错误队列中而且官方提供了重新发送错误队列中消息的方法当然你也可以自己去监视错误列队对异常消息进行处理。EasyNetQ里面作者针对消息的发布确认和消费确认都做了封装。在EasyNetQ中发布消息的时候如果选用的同步发送只要没有抛出异常我们就可以认为任务消息已经正确到达Broker而异步发送的话需要我们自己去监视Task是否成功 。如果开启了自动确认并不需要我们在消息处理的方法体中手动返回ack信息只要消息被 正确处理就会自动ack。虽然RabbitMq中也有事务消息但由于性能比较差并不推荐使用。其实只要我们能明确消息是否发布成功和消费成功就将会很容易在这个基础上扩展出分布式事务的处理。原文地址:https://www.cnblogs.com/dandan123/p/10097711.html.NET社区新闻深度好文欢迎访问公众号文章汇总 http://www.csharpkit.com