怎么封锁网站,江门住房城乡建设厅网站,互联网网站商标,做网站宁波在消息队列中#xff0c;当消费者去消费消息的时候#xff0c;无论是通过 pull 的方式还是 push 的方式#xff0c;都可能会出现大批量的消息突刺。如果此时要处理所有消息#xff0c;很可能会导致系统负载过高#xff0c;影响稳定性。但其实可能后面几秒之内都没有消息投…在消息队列中当消费者去消费消息的时候无论是通过 pull 的方式还是 push 的方式都可能会出现大批量的消息突刺。如果此时要处理所有消息很可能会导致系统负载过高影响稳定性。但其实可能后面几秒之内都没有消息投递若直接把多余的消息丢掉则没有充分利用系统处理消息的能力。我们希望可以把消息突刺均摊到一段时间内让系统负载保持在消息处理水位之下的同时尽可能地处理更多消息从而起到“削峰填谷”的效果 上图中红色的部分代表超出消息处理能力的部分。
我们可以看到消息突刺往往都是瞬时的、不规律的其后一段时间系统往往都会有空闲资源。我们希望把红色的那部分消息平摊到后面空闲时去处理这样既可以保证系统负载处在一个稳定的水位又可以尽可能地处理更多消息这时候我们就需要一个能够控制消费端消息匀速处理的利器 — AHAS 流控降级来为消息队列削峰填谷保驾护航。
AHAS 是如何削峰填谷的
AHAS 的流控降级是面向分布式服务架构的专业流量控制组件主要以流量为切入点从流量控制、熔断降级、系统保护等多个维度来帮助您保障服务的稳定性同时提供强大的聚合监控和历史监控查询功能。
AHAS 专门为这种场景提供了匀速排队的控制特性可以把突然到来的大量请求以匀速的形式均摊以固定的间隔时间让请求通过以稳定的速度逐步处理这些请求起到“削峰填谷”的效果从而避免流量突刺造成系统负载过高。同时堆积的请求将会排队逐步进行处理当请求排队预计超过最大超时时长的时候则直接拒绝而不是拒绝全部请求。
比如在 RocketMQ 的场景下配置了匀速模式下请求 QPS 为 5则会每 200 ms 处理一条消息多余的处理任务将排队同时设置了超时时间预计排队时长超过超时时间的处理任务将会直接被拒绝。示意图如下图所示 RocketMQ Consumer 接入示例
本部分将引导您快速在 RocketMQ 消费端接入 AHAS 流控降级 Sentinel。
1. 开通 AHAS
首先您需要到AHAS 控制台开通 AHAS 功能免费。可以根据 开通 AHAS 文档 里面的指引进行开通。
2. 代码改造
在结合阿里云 RocketMQ Client 使用 Sentinel 时用户需要引入 AHAS Sentinel 的依赖 ahas-sentinel-client 以 Maven 为例
dependencygroupIdcom.alibaba.csp/groupIdartifactIdahas-sentinel-client/artifactIdversion1.1.0/version
/dependency
由于 RocketMQ Client 未提供相应拦截机制而且每次收到都可能是批量的消息因此用户在处理消息时需要手动进行资源定义埋点。我们可以在处理消息的逻辑处手动进行埋点资源名可以根据需要来确定如 groupId topic 的组合 private static Action handleMessage(Message message, String groupId, String topic) {Entry entry null;try {// 资源名称为 groupId 和 topic 的组合便于标识同时可以针对不同的 groupId 和 topic 配置不同的规则entry SphU.entry(handleMqMessage: groupId : topic);// 在此处编写真实的处理逻辑System.out.println(System.currentTimeMillis() | handling message: message);return Action.CommitMessage;} catch (BlockException ex) {// 在编写处理被流控的逻辑// 示例可以在此处记录错误或进行重试System.err.println(Blocked, will retry later: message);return Action.ReconsumeLater; // 会触发消息重新投递} finally {if (entry ! null) {entry.exit();}}}
消费者订阅消息的逻辑示例
Consumer consumer ONSFactory.createConsumer(properties);
consumer.subscribe(topic, *, (message, context) - {return handleMessage(message);
});
consumer.start();
更多关于 RocketMQ SDK 的信息可以参考 消息队列 RocketMQ 入门文档。
3. 获取 AHAS 启动参数
注意若在本地运行接入 AHAS Sentinel 控制台需要在页面左上角选择 公网 环境若在阿里云 ECS 环境则在页面左上角选择对应的 Region 环境。
我们可以进入 AHAS 控制台点击左侧侧边栏的 流控降级进入 AHAS 流控降级控制台应用总览页面。在页面右上角单击添加应用选择 SDK 接入页签到 配置启动参数 页签拿到需要的启动参数详情请参考 SDK 接入文档类似于
-Dproject.nameAppName -Dahas.licenseLicense
其中 project.name 配置项代表应用名会显示在控制台比如 MqConsumerDemoahas.license配置项代表自己的授权 licenseECS 环境不需要此项。
4. 启动 Consumer配置规则
接下来我们添加获取到的启动参数启动修改好的 Consumer 应用。由于 AHAS 流控降级需要进行资源调用才能触发初始化因此首先需要向对应 group/topic 发送一条消息触发初始化。消费端接收到消息后我们就可以在 AHAS Sentinel 控制台上看到我们的应用了。点击应用卡片进入详情页面后点击左侧侧边栏的“机器列表”。我们可以在机器列表页面看到刚刚接入的机器代表接入成功 点击“请求链路”页面我们可以看到之前定义的资源。点击右边的“流控”按钮添加新的流控规则 我们在“流控方式”中选择“排队等待”设置 QPS 为 10代表每 100ms 匀速通过一个请求并且设置最大超时时长为 2000ms超出此超时时间的请求将不会排队立即拒绝。配置完成后点击新建按钮。
5. 发送消息查看效果
下面我们可以在 Producer 端批量发送消息然后在 Consumer 端的控制台输出处观察效果。可以看到消息消费的速率是匀速的大约每 100 ms 消费一条消息
1550732955137 | handling message: Hello MQ 2453
1550732955236 | handling message: Hello MQ 9162
1550732955338 | handling message: Hello MQ 4944
1550732955438 | handling message: Hello MQ 5582
1550732955538 | handling message: Hello MQ 4493
1550732955637 | handling message: Hello MQ 3036
1550732955738 | handling message: Hello MQ 1381
1550732955834 | handling message: Hello MQ 1450
1550732955937 | handling message: Hello MQ 5871
同时不断有排队的处理任务完成超出等待时长的处理请求直接被拒绝。注意在处理请求被拒绝的时候需要根据需求决定是否需要重新消费消息。
我们也可以点击左侧侧边栏的“监控详情”进入监控详情页面查看处理消息的监控曲线 对比普通限流模式的监控曲线最右面的部分 如果不开启匀速模式只是普通的限流模式则只会同时处理 10 条消息其余的全部被拒绝即使后面的时间系统资源充足多余的请求也无法被处理因而浪费了许多空闲资源。两种模式对比说明匀速模式下消息处理能力得到了更好的利用。
Kafka 接入代码示例
Kafka 消费端接入 AHAS 流控降级的思路与上面的 RocketMQ 类似这里给出一个简单的代码示例
private static void handleMessage(ConsumerRecordString, String record, String groupId, String topic) {pool.submit(() - {Entry entry null;try {// 资源名称为 groupId 和 topic 的组合便于标识同时可以针对不同的 groupId 和 topic 配置不同的规则entry SphU.entry(handleKafkaMessage: groupId : topic);// 在此处理消息.System.out.printf([%d] Receive new messages: %s%n, System.currentTimeMillis(), record.toString());} catch (BlockException ex) {// Blocked.// NOTE: 在处理请求被拒绝的时候需要根据需求决定是否需要重新消费消息System.err.println(Blocked: record.toString());} finally {if (entry ! null) {entry.exit();}}});
}
消费消息的逻辑
while (true) {try {ConsumerRecordsString, String records consumer.poll(1000);// 必须在下次 poll 之前消费完这些数据, 且总耗时不得超过 SESSION_TIMEOUT_MS_CONFIG// 建议开一个单独的线程池来消费消息然后异步返回结果for (ConsumerRecordString, String record : records) {handleMessage(record, groupId, topic);}} catch (Exception e) {try {Thread.sleep(1000);} catch (Throwable ignore) {}e.printStackTrace();}
}
其它
以上介绍的只是 AHAS 流控降级的其中一个场景 —— 请求匀速它还可以处理更复杂的各种情况比如
流量控制可以针对不同的调用关系以不同的运行指标如 QPS、线程数、系统负载等为基准对资源调用进行流量控制将随机的请求调整成合适的形状请求匀速、Warm Up 等。熔断降级当调用链路中某个资源出现不稳定的情况如平均 RT 增高、异常比例升高的时候会使对此资源的调用请求快速失败避免影响其它的资源导致级联失败。系统负载保护对系统的维度提供保护。当系统负载较高的时候提供了对应的保护机制让系统的入口流量和系统的负载达到一个平衡保证系统在能力范围之内处理最多的请求。
原文链接 本文为云栖社区原创内容未经允许不得转载。