当前位置: 首页 > news >正文

揭西网站建设华为云虚拟主机

揭西网站建设,华为云虚拟主机,东莞网站推广及优化,谷歌官网登录入口先从概念解释上搞清楚这个定义#xff0c;死信#xff0c;顾名思义就是无法被消费的消息#xff0c;字面意思可以这样理解#xff0c;一般来说#xff0c;producer将消息投递到broker或者直接到queue里了#xff0c;consumer从queue取出消息进行消费#xff0c;但某些时… 先从概念解释上搞清楚这个定义死信顾名思义就是无法被消费的消息字面意思可以这样理解一般来说producer将消息投递到broker或者直接到queue里了consumer从queue取出消息进行消费但某些时候由于特定的原因导致queue中的某些消息无法被消费这样的消息如果没有后续的处理就变成了死信有死信自然就有了死信队列以上是个人的通俗解释专业术语解释的比较正规点大家可以参考主要想搞清楚这个概念不同的消息中间件大概都有自身对于死信或者死信队列的处理方式下面重点要说说rabbitmq的死信队列对rabbitmq来说产生死信的来源大致有如下几种消息被拒绝basic.reject或basic.nack并且requeuefalse.消息TTL过期队列达到最大长度队列满了无法再添加数据到mq中死信的产生既然不可避免那么就需要从实际的业务角度和场景出发对这些死信进行后续的处理常见的处理方式大致有下面几种综合来看更常用的做法是第三种即通过死信队列将产生的死信通过程序的配置路由到指定的死信队列然后应用监听死信队列对接收到的死信做后续的处理关于这一点也是本篇要重点讲述的下面将用代码演示一下死信的产生及路由即上面提到的三种方式网上可供参考的资料比较多但大多不全面下面提供比较完整的demo将各种场景的产生和过程进行列举方式1消息超时进入死信队列这是一种在实际生产中应用场景比较多的一种方式比如我们熟知的订单业务场景当用户购买商品产生了一个订单的时候可以设置过期时间如果在这段时间内消息还没有被消费将会被路由到死信队列专业术语来讲即消息的TTLTTL过期了消息将进入死信队列下面是一段演示代码这里包括两部分生产者和消费者。producer代码此处模拟生产者产生订单推送到队列中消息有效时间是10S过了10S如果没有被消费将会被路由到死信队列public static void main(String[] args) throws Exception{ final Channel channel RabbitUtil.getChannel(); String orderExchangeName order_exchange; String orderQueueName order_queue; String orderRoutingKey order.#; MapString, Object arguments new HashMapString, Object(16); //死信队列配置 ---------------- String dlxExchangeName dlx.exchange; String dlxQueueName dlx.queue; String dlxRoutingKey #; // 为队列设置队列交换器 arguments.put(x-dead-letter-exchange,dlxExchangeName); // 设置队列中的消息 10s 钟后过期 arguments.put(x-message-ttl, 10000); //正常的队列绑定 channel.exchangeDeclare(orderExchangeName, topic, true, false, null); channel.queueDeclare(orderQueueName, true, false, false, arguments); channel.queueBind(orderQueueName, orderExchangeName, orderRoutingKey); String message new SimpleDateFormat(yyyy-MM-dd HH:mm:ss).format(new Date()) 创建订单.; // 创建死信交换器和队列 channel.exchangeDeclare(dlxExchangeName, topic, true, false, null); channel.queueDeclare(dlxQueueName, true, false, false, null); channel.queueBind(dlxQueueName, dlxExchangeName, orderRoutingKey); channel.basicPublish(orderExchangeName, order.save, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.err.println(消息发送完成......); }consumer代码消费端监听的是死信队列如果conusmer收到了消息表明死信队列里面有消息了public class Consumer { //消费端监听的是死信队列如果conusmer收到了消息表明死信队列里面有消息了 private static final String QUEUE_NAME dlx.queue; public static void main(String[] args) throws Exception{ // 创建信道 final Channel channel RabbitUtil.getChannel(); System.out.println(消费者启动 ..........); com.rabbitmq.client.Consumer consumer new DefaultConsumer(channel){ Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println(死信队列接收到消息 new String(body)); System.err.println(deliveryTag: envelope.getDeliveryTag()); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(QUEUE_NAME, consumer); TimeUnit.SECONDS.sleep(10000000L); } }然后我们分别运行两端的代码这里提示一下我们并没有提前在控制台去创建queue 和 exchange这个在producer启动或者consumer启动的时候如果没有创建过会自动创建以及建立queue和exchange的绑定关系启动producer消息发送成功同时可以通过控制台看到exhange和相关的队列也帮我们创建了要注意的是在dlx.queue中有一个消息就绪很明显消息过了10S中没有任何消费者消费就被路由到了死信队列dlx.queue中启动consumer,通过控制台打印结果可以看到由于消费端监听的是死信队列已经从dlx.queue中成功获取到了这条信息2、消息被拒绝且requeuefalse没有细致研究过这个问题的可能会有点儿懵其实就是在consumer端当消费者要过滤某些消息的时候那部分被过滤掉的消息如果不设置退回即上一篇所讲的消息重回队列的话这些消息就变成了死信即在下面的代码中第三个参数设置成false即可下面来看具体的代码有这样一个场景一批消息中当消费端从header中收到了num0的消息将会被过滤掉并且设置如上requeuefalse下面看具体的代码peoducer端代码/** * 生产者 * 死信队列使用 */ public class Producer { public static void main(String[] args) throws Exception{ Channel channel RabbitUtil.getChannel(); String exchangeName test_ack_exchange; String routingKey ack.save; //通过在properties设置来标识消息的相关属性 for(int i0;i5;i){ MapString, Object headers new HashMapString, Object(); headers.put(num,i); AMQP.BasicProperties properties new AMQP.BasicProperties().builder() .deliveryMode(2) // 传送方式 2:持久化投递 .contentEncoding(UTF-8) // 编码方式 //.expiration(10000) // 过期时间 .headers(headers) //自定义属性 .build(); String message hello this is ack message ..... i; System.out.println(message); channel.basicPublish(exchangeName,routingKey,true,properties,message.getBytes()); } } }consumer端代码public class Consumer { public static void main(String[] args) throws Exception{ final Channel channel RabbitUtil.getChannel(); String exchangeName test_ack_exchange; String exchangeTypetopic; final String queueName test_ack_queue; String routingKey ack.#; //死信队列配置 ---------------- String deadExchangeName dead_exchange; String deadQueueName dead_queue; String deadRoutingKey #; //死信队列配置 ---------------- //如果需要将死信消息路由 MapString,Object arguments new HashMapString, Object(); arguments.put(x-dead-letter-exchange,deadExchangeName); channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null); channel.queueDeclare(queueName,false,false,false,arguments); channel.queueBind(queueName,exchangeName,routingKey); //死信队列绑定配置 ---------------- channel.exchangeDeclare(deadExchangeName,exchangeType,true,false,false,null); channel.queueDeclare(deadQueueName,true,false,false,null); channel.queueBind(deadQueueName,deadExchangeName,deadRoutingKey); //死信队列配置 ---------------- System.out.println(consumer启动 .....); com.rabbitmq.client.Consumer consumer new DefaultConsumer(channel){ Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try{ Thread.sleep(2000); }catch (Exception e){ } Integer num (Integer)properties.getHeaders().get(num); if(num0){ //未被ack的消息并且requeuefalse。即nack的 消息不再被退回队列而成为死信队列 channel.basicNack(envelope.getDeliveryTag(),false,false); String message new String(body, UTF-8); System.out.println(consumer端的Nack消息是 message); }else { channel.basicAck(envelope.getDeliveryTag(),false); String message new String(body, UTF-8); System.out.println(consumer端的ack消息是 message); } } }; //消息要能重回队列需要设置autoAck的属性为false即在回调函数中进行手动签收 channel.basicConsume(queueName,false,consumer); } }要关注的即下面的这处代码和第三个参数启动生产者和消费者启动生产者生产者成功发送5条消息再看消费端的控制台这里num0的这条消息由于设置了死信队列而不会重回原来的队列在上一篇中当参数设置成了true的时候看到控制台一直会打印一条消息同时通过控制台也可以发现在dead_queue中有一条消息为就绪状态了即死信消息但这里并没有对这条消息做处理目前一直存在队列里面可以根据实际应用做后续的处理3、队列达到最大长度这个很好理解比如我们设置某个队列的最大可承载消息的数量是100个超出第100个的消息将会被路由到死信队列中设置消息队列的最大数量也是实际生产中作为队列限流的一种常规手段具有实际的业务意义下面是代码演示基本设置和上述的TTL类似只是在参数中将TTL更换为如下配置生产者代码这里我们设定order_queue这个队列的容量是5个但是我们在程序中设置的x-max-length3那么按照这个猜想将会有两个消息被路由到死信队列public class Producer { public static void main(String[] args) throws Exception{ final Channel channel RabbitUtil.getChannel(); String orderExchangeName order_exchange; String orderQueueName order_queue; String orderRoutingKey order.#; MapString, Object arguments new HashMapString, Object(16); //死信队列配置 ---------------- String dlxExchangeName dlx.exchange; String dlxQueueName dlx.queue; String dlxRoutingKey #; // 为队列设置队列交换器 arguments.put(x-dead-letter-exchange,dlxExchangeName); // 设置队列中的消息 10s 钟后过期 //arguments.put(x-message-ttl, 10000); arguments.put(x-max-length,3); //正常的队列绑定 channel.exchangeDeclare(orderExchangeName, topic, true, false, null); channel.queueDeclare(orderQueueName, true, false, false, arguments); channel.queueBind(orderQueueName, orderExchangeName, orderRoutingKey); String message new SimpleDateFormat(yyyy-MM-dd HH:mm:ss).format(new Date()) 创建订单.; // 创建死信交换器和队列 channel.exchangeDeclare(dlxExchangeName, topic, true, false, null); channel.queueDeclare(dlxQueueName, true, false, false, null); channel.queueBind(dlxQueueName, dlxExchangeName, orderRoutingKey); for(int i0;i5;i){ message message i ; System.out.println(发送的消息是: message); channel.basicPublish(orderExchangeName, order.save,null, message.getBytes()); } System.err.println(消息发送完成......); } }消费者代码public class Consumer { private static final String QUEUE_NAME order_queue; public static void main(String[] args) throws Exception{ // 创建信道 final Channel channel RabbitUtil.getChannel(); // 消费端消息限流。 // 设置客户端最多接收未被ack的消息个数, 只有消息 手动签收 此参数才会生效。 //channel.basicQos(1); System.out.println(消费者启动 ..........); com.rabbitmq.client.Consumer consumer new DefaultConsumer(channel){ Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println(死信队列接收到消息 new String(body)); System.err.println(deliveryTag: envelope.getDeliveryTag()); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(QUEUE_NAME,false, consumer); //TimeUnit.SECONDS.sleep(10000000L); } }启动生产者5条消息发送完毕再启动消费端通过控制台可以看到消费端只从order_queue中消费了3条消息还剩2条消息去哪里了呢我们再回到控制台观察一下发现在dlx.queue这个死信队列中有两条就绪的消息即剩下的2条消息被路由到了死信队列了以上便是关于死信队列常见的3种方式的处理程序和逻辑
http://wiki.neutronadmin.com/news/170583/

相关文章:

  • 济南品牌网站建设价格低互联网运营自学课程
  • 做公司网站要多久制作网站公司地址
  • 手机如何建立网站网站seo优化推广
  • 建设户外腰包网站网站开发必备人员
  • 网站后台样式域名时间与网站优化
  • 简述网站开发流程电商设计专业
  • 免费做橙光封面的网站服装设计自学软件
  • 世界排名前十位seo网站推广简历
  • 安康公司网站制作厦门电商店铺设计公司麦
  • 网站空间大小 论坛西安seo顾问培训
  • 做网站模板赚钱phpcms旅游网站模板下载
  • 礼泉住房和城乡建设局网站织梦网站环境搭建
  • 微网站建设及微信公众号女生适合学前端还是后端
  • 网站建设人员的安排沧州网站建设益志科技
  • 湖南做网站磐石网络电子商务网站页面设计图片
  • 怎么样做美术招生信息网站那个网站做淘宝推广比较好
  • OA 公司网站 铁道建设报12366纳税服务平台
  • 凡科建设网站股权分配系统建设网站
  • 手机网站开发流程.咨询公司的经营范围有哪些
  • 网站建设比较好的智能手机app开发
  • 中国做国际期货最大的网站网站建设有关书籍
  • 德育工作网站建设方案江苏百度推广代理商
  • 网站为什么被降权杭州vi设计策划
  • 高端建设网站公司哪家好收到网站打入0.1元怎么做分录
  • 网站建设流程表wordpress搬家修改域名
  • 郑州网站建设动态松江品划网络做网站
  • 网站开发案例pdf微网站建设目的
  • 网站建设网络推广方案前端开发人员
  • 酷 网站模板做小程序的公司有哪些比较好?
  • 哪个网站可以做微商宜昌做网站的