php微信微网站怎么做,松岗建设网站,wordpress_zh,公司展示网站制作redis实现
查看redis版本
redis需要5.0 Stream 是 Redis 5.0 引入的一种专门为消息队列设计的数据类型#xff0c;Stream 是一个包含 0 个或者多个元素的有序队列#xff0c;这些元素根据 ID 的大小进行有序排列。
它实现了大部分消息队列的功能#xff1a;
消息 ID…redis实现
查看redis版本
redis需要5.0 Stream 是 Redis 5.0 引入的一种专门为消息队列设计的数据类型Stream 是一个包含 0 个或者多个元素的有序队列这些元素根据 ID 的大小进行有序排列。
它实现了大部分消息队列的功能
消息 ID 系列化生成;消息遍历;消息的阻塞和非阻塞读;Consumer Groups 消费组;ACK 确认机制。支持多播。
本次主要实现基本的消息发送接受确认消费组有需要的可以看参考的文章
info插入消息
XADD streamName id field value [field value ...]
# 消息队列名称后面的 「*」 表示让 Redis 为插入的消息自动生成唯一 ID当然也可以自己定义。
# 消息 ID 由两部分组成当前毫秒内的时间戳; 顺序编号。从 0 为起始值用于区分同一时间内产生的多个命令
XADD queue01 * name wjl age 25 gender male读取消息
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
XREAD COUNT 1 BLOCK 0 STREAMS queue01 0-0
# 指定消费组在 Stream 中的起始 ID它决定了消费者组从哪个 ID 之后开始读取消息0-0 从第一条开始读取 $ 表示从最后一条向后开始读取只接收新消息。
# 如果想使用 XREAD 进行顺序消费每次读取后要记住返回的消息 ID下次调用 XREAD 就将上一次返回的消息 ID 作为参数传递到下一次调用就可以继续消费后续的消息了。这里只是开胃菜通过 XREAD 读取的数据其实并没有被删除当重新执行 XREAD COUNT 1 BLOCK 0 STREAMS queue01 0-0指令的时候又会重新读取到。
创建消费组
# Stream 通过 XGROUP CREATE 指令创建消费组 (Consumer Group)需要传递起始消息 ID 参数用来初始化 last_delivered_id 变量。
# 随便再插入一些数据
XADD queue01 * name zhangsan age 52 gender male
XADD queue01 * name lisi age 34 gender male
XADD queue01 * name xiaomei age 24 gender famale
# 创建消费组的指令
# 格式
XGROUP CREATE stream group start_id
# stream指定队列的名字;
# group指定消费组名字;
# start_id指定消费组在 Stream 中的起始 ID它决定了消费者组从哪个 ID 之后开始读取消息0-0 从第一条开始读取 $ 表示从最后一条向后开始读取只接收新消息。
# MKSTREAM默认情况下XGROUP CREATE命令在目标流不存在时返回错误。可以使用可选MKSTREAM子命令作为 之后的最后一个参数来自动创建流。# 新建group01消费组
XGROUP CREATE queue01 group01 0-0 MKSTREAM读取群组消息
XREADGROUP GROUP groupName consumerName [COUNT n] [BLOCK ms] STREAMS streamName [stream ...] id [id ...]XREADGROUP GROUP group01 consumer01 COUNT 1 BLOCK 0 STREAMS queue01
# 命令的最后参数 表示从尚未被消费的消息开始读取;
# BLOCK 0表示阻塞读取要是大于0就是等待多少毫秒如果消息队列中的消息被消费组的一个消费者消费了这条消息就不会再被这个消费组的其他消费者读取到。 查看已读未确认消息
XREADGROUP GROUP groupName consumerName
XPENDING queue01 group01 1 # 未读消息条数
1696822787364-0 #所有消费者读取的消息最小和最大 ID;这是最小
1696822787364-0 #所有消费者读取的消息最小和最大 ID;这是最大
consumer01
1查看消费者读取了哪些数据
XPENDING queue01 group01 - 10 consumer01确认消息
XACK key group-key ID [ID ...]XACK queue01 group01 1696822787364-0再次查询未读消息
XPENDING queue01 group01
XREADGROUP GROUP group01 consumer01 COUNT 1 BLOCK 0 STREAMS queue01 C#操作redis实现
使用FreeRedis类库熟悉了上面的流程直接上代码
using FreeRedis;namespace RedisMQStu01
{internal class Program{async static Task Main(string[] args){var cli new RedisClient(127.0.0.1:6379,password,defaultDatabase99);var queueName queue01;//队列的名字var groupName group01;//读取队列的群组的名字var consumerName consumer01;//消费者的名字//添加数据await cli.XAddAsync(queueName, name, wjl, age, 25, gender, male);await cli.XAddAsync(queueName, name, zhangsan, age, 52, gender, male);await cli.XAddAsync(queueName, name, lisi, age, 34, gender, male);await cli.XAddAsync(queueName, name, xiaomei, age, 24, gender, famale);//创建群组,如果数据存在则不需要执行了第一次需要执行await cli.XGroupCreateAsync(queueName, groupName, id: 0-0, MkStream: true);//读取群组消息var ids new Dictionarystring, string();ids.Add(queue01, );var result await cli.XReadGroupAsync(groupName, consumerName,1, 0, noack: false, ids);//查看已读未确认的消息var unReadResults await cli.XPendingAsync(queueName, groupName);await Console.Out.WriteLineAsync($未读消息条数为:{unReadResults.count});foreach (var item in result){await Console.Out.WriteLineAsync(item.key);//群组名字foreach (var entry in item.entries){await Console.Out.WriteLineAsync($\t{entry.id});//消息队列idawait Console.Out.WriteAsync($\t);foreach (var field in entry.fieldValues){await Console.Out.WriteAsync($\t{field.ToString()});}await Console.Out.WriteLineAsync();//确认消息await cli.XAckAsync(queueName,groupName, entry.id);}}await Console.Out.WriteLineAsync(完成);}}
}上面的代码是生产者和消费者在一块不满足生产环境要求因为生产环境大多需要分开生产者只负责生产消费者只负责消费
生产者
using FreeRedis;namespace RedisMQProductor01
{internal class Program{/// summary/// redis消息队列的生产者/// /summary/// param nameargs/param/// returns/returnsasync static Task Main(string[] args){var cli new RedisClient(127.0.0.1:6379,password,defaultDatabase99);var queueName queue01;//队列的名字//添加数据await cli.XAddAsync(queueName, name, wjl, age, 25, gender, male);await cli.XAddAsync(queueName, name, zhangsan, age, 52, gender, male);await cli.XAddAsync(queueName, name, lisi, age, 34, gender, male);await cli.XAddAsync(queueName, name, xiaomei, age, 24, gender, famale);await Console.Out.WriteLineAsync(生产者添加数据完成);}}
}消费者
using FreeRedis;namespace RedisMQConsumer01
{/// summary/// redis消息队列的消费者/// /summaryinternal class Program{async static Task Main(string[] args){var cli new RedisClient(127.0.0.1:6379,password,defaultDatabase99);var queueName queue01;//队列的名字var groupName group01;//读取队列的群组的名字var consumerName consumer01;//消费者的名字//如果数据存在则不需要执行了第一次需要执行var info await cli.XInfoGroupsAsync(queueName);if (info null || info.Length 1){//创建群组await cli.XGroupCreateAsync(queueName, groupName, id: 0-0, MkStream: true);}//读取群组消息var ids new Dictionarystring, string();ids.Add(queue01, );//block的值是0表示无限等待var result await cli.XReadGroupAsync(groupName, consumerName, 1, 0, noack: false, ids);while (true){if (result ! null result.Length 0){foreach (var item in result){await Console.Out.WriteLineAsync(item.key);//群组名字foreach (var entry in item.entries){await Console.Out.WriteLineAsync($\t{entry.id});//消息队列idawait Console.Out.WriteAsync($\t);foreach (var field in entry.fieldValues){await Console.Out.WriteAsync($\t{field.ToString()});}await Console.Out.WriteLineAsync();//确认消息await cli.XAckAsync(queueName, groupName, entry.id);}}await Console.Out.WriteLineAsync(本次处理完毕);}//继续等待result await cli.XReadGroupAsync(groupName, consumerName, 1, 0, noack: false, ids);}}}
}先启动生产者在启动消费者查看效果
方法改善
改善之后可以先启动消费者然后等待生产者投递数据即可
消费者
using FreeRedis;
using Newtonsoft.Json;
using SqlSugar;namespace CelueStu02
{/// summary/// 备份策略消费者/// /summaryinternal class Program{async static Task Main(string[] args){var cli new RedisClient(127.0.0.1:6379,password,defaultDatabase99);var queueName queue01;//队列的名字var groupName group01;//读取队列的群组的名字var consumerName consumer01;//消费者的名字try{var streamInfo cli.XInfoStream(queueName);}catch{await cli.XAddAsync(queueName, student, );}//如果数据存在则不需要执行了第一次需要执行var info await cli.XInfoGroupsAsync(queueName);if (info null || info.Length 1){//创建群组await cli.XGroupCreateAsync(queueName, groupName, id: 0-0, MkStream: true);}//读取群组消息var ids new Dictionarystring, string();ids.Add(queue01, );//block的值是0表示无限等待var result await cli.XReadGroupAsync(groupName, consumerName, 1, 0, noack: false, ids);ConnectionConfig connectionConfig new ConnectionConfig(){ConnectionString ,//自己写数据库链接字符串IsAutoCloseConnection true,DbType DbType.SqlServer};using SqlSugarClient db new SqlSugarClient(connectionConfig);//初始化表格db.CodeFirst.InitTables(typeof(Student));while (true){if (result ! null result.Length 0){foreach (var item in result){await Console.Out.WriteLineAsync(item.key);//群组名字foreach (var entry in item.entries){await Console.Out.WriteLineAsync($\t{entry.id});//消息队列idfor (int i 0; i entry.fieldValues.Length; i){var field entry.fieldValues[i];if (field.ToString() student){var studentListJson entry.fieldValues[i 1]?.ToString() ?? ;if (string.IsNullOrWhiteSpace(studentListJson)){continue;}var students JsonConvert.DeserializeObjectListStudent(studentListJson);await db.Storageable(students).ExecuteCommandAsync();}}//确认消息await cli.XAckAsync(queueName, groupName, entry.id);}}await Console.Out.WriteLineAsync(本次处理完毕);}//继续等待result await cli.XReadGroupAsync(groupName, consumerName, 1, 0, noack: false, ids);}}}
}生产者
using FreeRedis;
using Newtonsoft.Json;
using SqlSugar;namespace CelueStu01
{/// summary/// 备份策略生产者/// /summaryinternal class Program{async static Task Main(string[] args){var cli new RedisClient(127.0.0.1:6379,password,defaultDatabase99);var queueName queue01;//队列的名字var perProcessNumber 1000;//每次处理的数据条数int totalPage 0;//总页码数ConnectionConfig connectionConfig new ConnectionConfig(){ConnectionString ,IsAutoCloseConnection true,DbType DbType.SqlServer};using (SqlSugarClient db new SqlSugarClient(connectionConfig)){//初始化表格db.CodeFirst.InitTables(typeof(Student));do{int count await db.QueryableStudent().CountAsync();totalPage count % perProcessNumber 0 ? count / perProcessNumber : (count / perProcessNumber) 1;var students await db.QueryableStudent().ToPageListAsync(totalPage, perProcessNumber);//批量发送redis频繁写入会报rdb错误限制一下写入频率await cli.XAddAsync(queueName, student, JsonConvert.SerializeObject(students));Listint deleteStudents students.Select(p p.Id).ToList();if (deleteStudents.Any()){//批量删除await db.DeleteableStudent().Where(p deleteStudents.Contains(p.Id)).ExecuteCommandAsync();}totalPage - 1;//Thread.Sleep(2000);} while (totalPage 0);}await Console.Out.WriteLineAsync(生产者添加数据完成);}}
}参考