当前位置: 首页 > news >正文

永济市住房保障和城乡建设管理局网站合肥专业网站建设公司哪家好

永济市住房保障和城乡建设管理局网站,合肥专业网站建设公司哪家好,成都知名建筑公司排名,淮南网上房地产RabbitMQ消息确认 SpringBoot与RabbitMQ整合后#xff0c;对RabbitClient的“确认”进行了封装、使用方式与RabbitMQ官网不一致#xff1b; 消息发布确认 生产者给交换机发送消息后、若是不管了#xff0c;则会出现消息丢失#xff1b; 解决方案1#xff1a; 交换机接受…RabbitMQ消息确认 SpringBoot与RabbitMQ整合后对RabbitClient的“确认”进行了封装、使用方式与RabbitMQ官网不一致 消息发布确认 生产者给交换机发送消息后、若是不管了则会出现消息丢失 解决方案1 交换机接受到消息、给生产者一个答复ack, 若生产者没有收到ack, 可能出现消息丢失因此重新发送消息 解决方案1隐藏问题若是交换机发送了ack, 出现网络延迟则生产者没有收到ack, 就会出现消息重复发送问题 进而衍生幂等性问题 隐藏问题解决方案1在数据库中增加一张去重表设置唯一索引 生产者在消息内容中翻入唯一ID消费者消费时、先从数据库查询是否存在存在则不处理该消息 适用于并发低、业务严谨的场景 隐藏问题解决方案2利用Redis的String的setnx,若key存在则不处理、若key存在则执行业务 适用于短时间处理大量消息且 key不会重复 -这就是大名鼎鼎的幂等性问题贼讨厌这些专有名词 业务开发中的幂等性 前端保存数据时、点击多次保存按钮插入多条数据 解决方案 前端限制按钮点击、 数据库设置业务唯一索引 消息推送中可能出现多条内容一样的消息又不可以重复处理 需要幂等性处理 上家公司中后台给app客户端推送系统消息时、配置给所有用户推送消息 其他服务给我的应用消息服务推送 RabbitMQ消息 正常来说 每次推送的消息 设备ID和用户ID合起来唯一的结果其他服务业务数据存在问题有些旧数据没有清除 导致相通的设备ID用户ID 一次给设备用户推送了十几条安卓客户端当当当的响 直接惊动了产品经理 经过排查上是上游数据有问题代码又很老其他服务负责人排查了好几天 把问题数据清楚了 结果后面又产生了问题数据 解决方案由于会一次性处理几万条推送消息因此对业务要求速度高因此利用Redis的String的setNx, 以taskId mobileDevId userId tenantId 组成了唯一key,若是存在则不处理 key有限时间为60分钟 就成功处理了该问题吐槽上游的业务问题让下游服务做业务保证属实离谱 { “taskId” “xxxx”; “mobileDevId” : “xxxx”; “userId”:“xxx”; “tenantId” : “xxx”; “其他字段”: “…” } RabbitMQ发布确认与返回 SpringBoot发布确认与返回 配置 第二个参数因为过时所以要配置第三个参数为correlated表示用来确认消息 #生产者 spring.rabbitmq.publisher-returnstrue spring.rabbitmq.publisher-confirmstrue spring.rabbitmq.publisher-confirm-typecorrelated生产者 通过RabbitTempldate#setConfirmCallback设置确认回调 即交换器发送给ack给生产者生产者调用ConfirmCallback回调 若出现异常cause则可重新推送 通过RabbitTempldate#setReturnCallback设置返回回调通过template#waitForConfirms(xxx)表示等待xxx毫秒后确认超时返回false; 若返回false, 则进行业务补救处理 public class ConfirmProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{Autowiredprivate RabbitTemplate template;Autowiredprivate DirectExchange confirmExchange;AtomicInteger index new AtomicInteger(0);AtomicInteger count new AtomicInteger(0);private final String[] keys {sms, mail};Scheduled(fixedDelay 1000, initialDelay 500)public void send() throws IOException {//短信String sms {userName: xxx; phone:xxx};HashMapString, Object map new HashMap();map.put(userName, hanxin);map.put(phone, index.getAndIncrement());template.setMandatory(true);template.setConfirmCallback(this);template.setReturnCallback(this);template.convertAndSend(confirmExchange.getName(), confirm, map);System.out.println(send sms confirm);}Scheduled(fixedDelay 1000, initialDelay 500)public void send2() throws IOException {template.invoke((operations) - {//短信String sms {userName: xxx; phone:xxx};HashMapString, Object map new HashMap();map.put(userName, hanxin);map.put(phone, index.getAndIncrement());//必须设置template.setMandatory(true);template.convertAndSend(confirmExchange.getName(), confirm, map);System.out.println(send sms confirm);return template.waitForConfirms(1000);});}Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println(receive confirm callback, ack ack);}Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println(receive return call : message: message.getBody());System.out.println(receive return call : replyCode: replyCode);System.out.println(receive return call : replyText: replyText);System.out.println(receive return call : exchange: exchange);System.out.println(receive return call : routingKey: routingKey);} }业务开发中非严谨追求性能高业务建议使用send,这个过程是异步确认的 严谨业务建议使用send2, 同步等待相应出现问题好确认 交换机: Configuration public class ConfirmConfig {public final static String CONFIRM_QUEUE_NAME confirmQueue;public final static String CONFIRM_EXCHANGE_NAME confirmExchange;public final static String CONFIRM_ROUTING_NAME confirm;Beanpublic DirectExchange confirmExchange() {return new DirectExchange(CONFIRM_EXCHANGE_NAME);}Beanpublic ConfirmProducer confirmProducer() {return new ConfirmProducer();} }运行结果 receive status : true RabbitMQ发布确认 若是使用原生Rabbit MQ客户端API则有三种方式 声明channel是需要交换机确认的 channel.confirmSelect();发布单条消息 for (int i 0; i MESSAGE_COUNT; i) {String body String.valueOf(i);channel.basicPublish(, queue, null, body.getBytes());channel.waitForConfirmsOrDie(5_000); }channel.waitForConfirmsOrDie(5_000);这个方法就会在channel端等待RabbitMQ给出一个响应用来表明这个消息已经正确发送到了RabbitMQ服务端。但是要注意这个方法会同步阻塞channel在等待确认期间channel将不能再继续发送消息也就是说会明显降低集群的发送速度即吞吐量。 官方说明了其实channel底层是异步工作的会将channel阻塞住然后异步等待服务端发送一个确认消息才解除阻塞。但是我们在使用时可以把他当作一个同步工具来看待。利用一个异步转同步功能可利用JUC实现 然后如果到了超时时间还没有收到服务端的确认机制那就会抛出异常。然后通常处理这个异常的方式是记录错误日志或者尝试重发消息但是尝试重发时一定要注意不要使程序陷入死循环。 发送批量消息 条确认的机制会对系统的吞吐量造成很大的影响所以稍微中和一点的方式就是发送一批消息后再一起确认 int batchSize 100;int outstandingMessageCount 0;long start System.nanoTime();for (int i 0; i MESSAGE_COUNT; i) {String body String.valueOf(i);ch.basicPublish(, queue, null, body.getBytes());outstandingMessageCount;if (outstandingMessageCount batchSize) {ch.waitForConfirmsOrDie(5_000);outstandingMessageCount 0;}}if (outstandingMessageCount 0) {ch.waitForConfirmsOrDie(5_000);}存在隐藏问题若是500条消息处理太久超时了则响应失败消息重新入队、出现重新消费问题 异步确认消息 实现的方式也比较简单Producer在channel中注册监听器来对消息进行确认。核心代码就是一个 channel.addConfirmListener(ConfirmCallback var1, ConfirmCallback var2);这三种确认机制都能够提升Producer发送消息的安全性。通常情况下第三种异步确认机制的性能是最好的。第一种安全性最高。 消费者确认 当交换机接受消息后就要转发给消费者如何保证消息不丢失重复消费 SpringBoot消费确认 自动确认 若是业务中一些消息发送给消费者若是消息出现异常消费者返回通知交换机消息出现了异常交换机会将消息重新入队 若是没有确认消息交换机没有收到消息会将消息会重新放入队列中每次消费者启动都会把以前消费的消息重新消费 SpringBoot整合RabbitMQ后 设置参数max-attempts为最大重试次数、retry.enabled为开启重试机制 spring.rabbitmq.listener.direct.retry.max-attempts5 spring.rabbitmq.listener.direct.retry.enabledtrue 为什么要开启重试 不开启重试、消费者处理消息发生异常后 RabbitMQ会丢弃该消息 通常业务开发中是不允许的。 为什么设置重试次数 不开启重试次数则消息会一直重新入队占用内存若是错误消息过多RabbitMQ内存爆了 手动确认 在手动确认的模式下不管是消费成功还是消费失败一定要记得确认消息不然消息会一直处于unack状态直到消费者进程重启或者停止。 设置参数 spring.rabbitmq.listener.direct.acknowledge-modemanual spring.rabbitmq.listener.simple.acknowledge-modemanual消费者 通过使用channel#basicAck, basicNack, basicReject完成 RabbitListener(queues ConfirmConfig.CONFIRM_QUEUE_NAME) public class ConfirmConsumer {// RabbitHandler : 标记的方法只能有一个参数类型为String ,若是传Map参数、则需要传入map参数// RabbitListener:标记的方法可以传入Channel Message参数RabbitListener(queues ConfirmConfig.CONFIRM_QUEUE_NAME)public void listenObjectQueue(Channel channel, Message message, MapString, Object msg) throws IOException {System.out.println(接收到object.queue的消息 msg);System.out.println(消息ID message.getMessageProperties().getDeliveryTag());try {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}catch (IOException exception) {//拒绝确认消息channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);//拒绝消息 // channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);}}RabbitHandlerpublic void listenObjectQueue2(MapString, Object msg) throws IOException {System.out.println(接收到object.queue的消息 msg);}} 确认收到一个或多个消息 void basicAck(long deliveryTag, boolean multiple) throws IOException;deliveryTag 消息传递标识multiple是否批量确认为true则确认后其他消息deliveryTag小于当前消息的deliveryTag的消息全部变为确认(慎重) 拒绝一个或多个消息 void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;deliveryTag消息的传递标识。multiple 如果为true则拒绝所有consumer获得的小于deliveryTag的消息。(慎重)requeue 设置为true 会把消费失败的消息从新添加到队列的尾端设置为false不会重新回到队列。 拒绝一个消息 void basicReject(long deliveryTag, boolean requeue) throws IOException;deliveryTag消息的传递标识。requeue 设置为false 表示不再重新入队如果配置了死信队列则进入死信队列。 channel.basicNack 与 channel.basicReject 的区别在于basicNack可以批量拒绝多条消息而basicReject一次只能拒绝一条消息。 个人还是推荐手动确认可控性更高 SpringBoot消费者手动确认调用的API与RabbitMQClient原生API一致都是通过这三个方法完成确认操作 业务开发中RabbitHandler注解用的少因为注解标记的方法只能传入 消息内容参数 无法传Channel, Message, 获取到的消息有限, 而RabbitListener则相反
http://www.yutouwan.com/news/475095/

相关文章:

  • 一流的网站建设流程图企业网站建设费用账务处理
  • 网站推广引流广告创意图片
  • iapp如何用网站做软件家谱网站怎么做
  • 做网站公司需要什么淘宝店铺首页设计模板
  • 北京网站推广seo优化如何自己做网址
  • 影响网站收录的因数学包装设计网站
  • 哪个网站可以做优惠券网站开发需要多少钱新闻
  • 在试用网站做推广辽宁省阜蒙县建设局网站
  • 网站建设维护是做什么的制作视频的软件app免费
  • 哈尔滨网站建设培训班天津建设工程
  • 电商运营培训课程网站北京优化网站推广
  • 建站平台企业排名空间设计公司网站
  • 塘厦企业网站推广公司博客wordpress怎么编辑
  • 佛山禅城网站建设安卓优化大师清理
  • 企业网络推广网站设计公司企业标语
  • 稻壳网站建设哪些网站是python做的
  • 网站开发估价免费企业建站系统排名
  • 公司网站建设价格贵吗网页设计html代码大全超链接
  • h5免费制作平台火蚁seo优化培训班
  • 多个网站备案盗版做的最好的网站
  • 上海网站建设润滋网站建设课程毕设
  • 网站开发工作方向厦门做企业网站比较好的公司
  • html中文网站作业刷赞网站推广qq免费
  • 健康管理 网站建设九江市seo
  • 甘肃建投建设有限公司网站网站建设立项申请报告
  • 汕头食品骏域网站建设网站开发作业图片
  • 网站建设熊猫建站您的域名因未取得工信部网站备案号
  • 网站建设项目实践企业网站的建立必要性
  • 建网站张掖哪家强?陕西省建设厅执业资格注册中心网站报名系统
  • 北京网站建设公司报价wordpress 不显示媒体