当前位置: 首页 > news >正文

广州陈村网站建设品牌网站制作简创网络

广州陈村网站建设,品牌网站制作简创网络,温州推广团队,影视动漫专业【RAEDME】 本文中#xff0c; java客户端作为生产者#xff0c; centos中consumer线程作为消费者#xff1b; 【1】拦截器简述 1#xff09;拦截器是什么#xff1f; 很明显#xff0c;为了实现面向切面编码#xff0c;即在 具体逻辑的上下文 添加一些逻辑#xff1…【RAEDME】 本文中 java客户端作为生产者 centos中consumer线程作为消费者 【1】拦截器简述 1拦截器是什么 很明显为了实现面向切面编码即在 具体逻辑的上下文 添加一些逻辑如 逻辑1 具体逻辑 逻辑2 2什么时候调用拦截器这就要从 kafka生产者发送数据说起了 kafka生产者使用了2个线程来发送数据 step1生产者中的main线程把数据经过 拦截器-》序列化器-》分区器 处理然后再把数据写到 RecordAccumulator step2send 线程从 RecordAccumulator 中取出数据写入到broker list 【2】拦截器实现 /* 添加拦截器 */ props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList(TimeInterceptor.class.getName(), CounterInterceptor.class.getName())); 1需求 第1个拦截器 在消息发送前将时间戳加到消息value的 最前面 第2个拦截器在消息发送后更新成功发送消息数或失败发送消息数 2代码实现 -- 添加拦截器 /* 添加拦截器 */ props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList(TimeInterceptor.class.getName(), CounterInterceptor.class.getName())); -- 带有拦截器的生产者 /*** 带有拦截器的生产者*/ public class InterceptorProducer {public static void main(String[] args) {/* 1.创建kafka生产者的配置信息 */Properties props new Properties();/*2.指定连接的kafka集群, broker-list */props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, centos201:9092); /*3.ack应答级别*/props.put(ProducerConfig.ACKS_CONFIG, all);/*4.重试次数*/ props.put(ProducerConfig.RETRIES_CONFIG, 3); /*5.批次大小一次发送多少数据当数据大于16k生产者会发送数据到 kafka集群 */props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * KfkNumConst._1K); /*6.等待时间 等待时间超过1毫秒即便数据没有大于16k 也会写数据到kafka集群 */props.put(ProducerConfig.LINGER_MS_CONFIG, 1); /*7. RecordAccumulator 缓冲区大小*/ props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M); /*8. key, value 的序列化类 */ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());/* 添加拦截器 */props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList(TimeInterceptor.class.getName(), CounterInterceptor.class.getName())); System.out.println(props); /* 9.创建生产者对象 */KafkaProducerString, String producer new KafkaProducer(props); /* 10.发送数据 */ for (int i 0; i 10; i) { producer.send(new ProducerRecord(first, keyi, value-first-20210101--J i));}/* 11.关闭资源 */ producer.close(); // 间接调用了拦截器的close 方法 System.out.println(kafka生产者写入数据完成); } } /*** 时间拦截器-在消息前添加时间戳 */ public class TimeInterceptor implements ProducerInterceptorString, String{Overridepublic void configure(MapString, ? configs) {}/*生产者确保在 消息被序列化以及计算分区前调用该方法。*/Overridepublic ProducerRecordString, String onSend(ProducerRecordString, String record) {// 获取消息值 String value record.value();return new ProducerRecord(record.topic(), record.partition(), record.key(), System.currentTimeMillis() , record.value()); }/*该方法会在消息从 RecordAccumulator 成功发送到Kafka Broker之后或者在发送过程中失败时调用 且通常在 生产者回调逻辑触发之前调用*/Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}Overridepublic void close() {} }/*** 计数拦截器*/ public class CounterInterceptor implements ProducerInterceptorString, String{int sucCounter 0;int errCounter 0;Overridepublic void configure(MapString, ? configs) {}/*生产者确保在 消息被序列化以及计算分区前调用该方法。*/Overridepublic ProducerRecordString, String onSend(ProducerRecordString, String record) {return record; }/*该方法会在消息从 RecordAccumulator 成功发送到Kafka Broker之后或者在发送过程中失败时调用 且通常在 生产者回调逻辑触发之前调用*/Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {if(metadata !null ) {sucCounter;} else {errCounter; }}Overridepublic void close() {System.out.println(sucCounter sucCounter , errCounter errCounter); } } -- java生产者打印日志  sucCounter 10, errCounter0 kafka生产者写入数据完成 3centos消费者消费结果 [rootcentos201 ~]# kafka-console-consumer.sh --topic first --zookeeper centos201:2181 Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]. 1609599630884,value-first-20210102--A0 1609599631203,value-first-20210102--A1 1609599631204,value-first-20210102--A6 1609599631203,value-first-20210102--A2 1609599631203,value-first-20210102--A3 1609599631204,value-first-20210102--A4 1609599631204,value-first-20210102--A5 1609599631204,value-first-20210102--A7 1609599631204,value-first-20210102--A8 1609599631204,value-first-20210102--A9
http://www.yutouwan.com/news/9113/

相关文章:

  • 适合机械网站的wordpress主题模板网站宣传工作
  • 网站聊天工具代码泉州网站建设开发
  • 深圳网站建设公司收费政务网站建设工作计划
  • 建设局域网网站wordpress文章内模板
  • 贸易网站建设案例房产网站制作方案
  • 自助网站建设费用哪个网站名片做的号
  • 网站维护需要做什么应用下载app
  • 移动终端网站开发一个网站有几个域名
  • nas可做网站服务器吗怎么在百度上搜到自己的网站
  • 怎么免费做文学网站公司网站建app
  • 数据来源于网站需如何做脚注加强企业门户网站建设
  • 东西湖网站建设网站ftp根目录
  • 去哪儿网站做宣传多少钱wordpress 如何安装插件
  • 北京赛车网站开发公司网络营销运营推广方案下载
  • 护栏板销售网站怎么做wordpress增加导航栏
  • 已有域名 搭建网站新东方厨师学费价目表
  • 品牌网站首页怎么设计做网站运营有前景么
  • 写一篇软文1000字宁波seo外包公司
  • 广州商城建站网站开发任务清单
  • 做网站的骗术0点开服的网页游戏
  • 广州站是指哪个站自己做的网站上传到
  • 江苏网站建设企业网站建设咋打开自己网站主页网址
  • 哪个网站做二微码高端网站建站 北京
  • 手机记事本做网站怎样利用互联网进行网络推广
  • 投资建设集团网站给wordpress替换主题
  • 做奖状的网站发现了一个做字素的网站
  • 中铁建设工程项目公示网站宠物店网站建设策划书
  • 零食网站源码如何在网站添加代码
  • 讯美智能网站建设怎么制作自己公司网站
  • 做网站需要先搞目录么wordpress icon class