移动app设计网站建设,网站建设从初级到精通,前端网页制作,wordpress 动画插件Redis 发布订阅机制 简介#xff1a; Redis 发布订阅#xff08;Pus/Sub#xff09;是一种消息通信模式#xff1a;发送者通过 PUBLISH发布消息#xff0c;订阅者通过 SUBSCRIBE 订阅接收消息或通过UNSUBSCRIBE 取消订阅。主要由「发布者」、「订阅者」、「Channel」三个部…Redis 发布订阅机制 简介 Redis 发布订阅Pus/Sub是一种消息通信模式发送者通过 PUBLISH发布消息订阅者通过 SUBSCRIBE 订阅接收消息或通过UNSUBSCRIBE 取消订阅。主要由「发布者」、「订阅者」、「Channel」三个部分组成。 发布者和订阅者属于客户端Channel 是 Redis 服务端发布者将消息发布到频道订阅这个频道的订阅者则收到消息。
1 基于频道的发布订阅 //在redisServer中有一个字典类型字段pubsub_channels 用来保存订阅信息其中key为频道value为订阅该频道的客户端 struct redisServer{ pid_t pid; //将频道映射到已订阅客户端的列表 dict *pubsub_channels }
2 基于模式的发布订阅 //在redisServer中有一个pubsub_patterns属性该属性表示一个链表链表中保存着所有和模式相关的信息 struct redisServer{ list *pubsub_patterns } typedef struct pubsubPattern{ client *client; – 订阅模式客户端 robj *pattern; --被订阅的模式 } pubsubPattern;
需要注意的是发布消息与监听消息要运行在不同的 JVM如果使用同一个 redissonClient 发布的话不会监听到自己的消息。
缺陷 发布者不知道订阅者是否收到发布的消息 订阅者不知道自己是否收到了发布者发出的所有消息 发送者不能获知订阅者的执行情况 没人知道订阅者何时开始收到消息
实现 生产者代码 * 发布消息到 Topic* param message 消息* return 接收消息的客户端数量
public long sendMessage(String message) {RTopic topic redissonClient.getTopic(CHANNEL);long publish topic.publish(message);log.info(生产者发送消息成功msg {}, message);return publish;
}消费者代码
public void onMessage() {// in other thread or JVMRTopic topic redissonClient.getTopic(CHANNEL);topic.addListener(String.class, (channel, msg) - {log.info(channel: {} 收到消息 {}., channel, msg);});
}Spring boot整合redis
消息监听配置
Configuration
public class RedisSubConfig {public static final String SUB_KEY chat;//频道channel* redis消息监听器容器* 可以添加多个监听不同话题的redis监听器只需要把消息监听器和相应的消息订阅处理器绑定该消息监听器* 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理* param connectionFactory* param listenerAdapter* returnBeanRedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {RedisMessageListenerContainer container new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);//订阅了一个频道container.addMessageListener(listenerAdapter, new PatternTopic(RedisSubConfig.SUB_KEY));return container;}* 消息监听器适配器绑定消息处理器利用反射技术调用消息处理器的业务方法BeanMessageListenerAdapter listenerAdapter(RedisReceiver receiver) {return new MessageListenerAdapter(receiver, receiveMessage);}/*** redis 读取内容的template* param connectionFactory* return*/BeanStringRedisTemplate template(RedisConnectionFactory connectionFactory) {return new StringRedisTemplate(connectionFactory);}
}接收消息
Service
public class RedisReceiver {public void receiveMessage(String message) {System.out.println(接收消息 message);}
}采用定时器发布消息
EnableScheduling //开启定时器功能
Component
public class MessageSender {Autowiredprivate StringRedisTemplate stringRedisTemplate;Scheduled(fixedRate 5000) //间隔5s 通过StringRedisTemplate对象向redis消息队列chat频道发布消息public void sendMessage(){stringRedisTemplate.convertAndSend(chat, hello new Date());}
}kafka相关 消息头格式 RecordHeaders(headers [RecordHeader(key messageType, value [0, 0, 0, 1]), RecordHeader(key operationCode, value [0, 0, 0, 1]), RecordHeader(key messageId, value [52, 52, 52, 53, 53, 53])], isReadOnly false) 使用java读取消息头
private MsgHeader parseMsgHeaders(Headers headers) {MsgHeader msgHeader new MsgHeader();Header xxxHeader headers.lastHeader(xxx);if (xxxHeader ! null) {msgHeader.setXXX(new String(xxxHeader.value()));}return msgHeader;}使用go发送消息头
headers : []sarma.RecordHeader{sarama.RecordHeader{Key: []byte(kkk),Value: []byte(vvv),
}}
msg : sarama.ProducerMessage{Topic: topic,Key: sarama.StringEncoder( ),Value: sarama.StringEncode( ),Headers: headers,
}