广州陈村网站建设,品牌网站制作简创网络,温州推广团队,影视动漫专业【RAEDME】
本文中#xff0c; java客户端作为生产者#xff0c; centos中consumer线程作为消费者#xff1b; 【1】拦截器简述
1#xff09;拦截器是什么#xff1f; 很明显#xff0c;为了实现面向切面编码#xff0c;即在 具体逻辑的上下文 添加一些逻辑#xff1…【RAEDME】
本文中 java客户端作为生产者 centos中consumer线程作为消费者 【1】拦截器简述
1拦截器是什么 很明显为了实现面向切面编码即在 具体逻辑的上下文 添加一些逻辑如
逻辑1
具体逻辑
逻辑2
2什么时候调用拦截器这就要从 kafka生产者发送数据说起了
kafka生产者使用了2个线程来发送数据
step1生产者中的main线程把数据经过 拦截器-》序列化器-》分区器 处理然后再把数据写到 RecordAccumulator
step2send 线程从 RecordAccumulator 中取出数据写入到broker list
【2】拦截器实现
/* 添加拦截器 */
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList(TimeInterceptor.class.getName(), CounterInterceptor.class.getName()));
1需求
第1个拦截器 在消息发送前将时间戳加到消息value的 最前面
第2个拦截器在消息发送后更新成功发送消息数或失败发送消息数
2代码实现
-- 添加拦截器
/* 添加拦截器 */
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList(TimeInterceptor.class.getName(), CounterInterceptor.class.getName()));
-- 带有拦截器的生产者
/*** 带有拦截器的生产者*/
public class InterceptorProducer {public static void main(String[] args) {/* 1.创建kafka生产者的配置信息 */Properties props new Properties();/*2.指定连接的kafka集群, broker-list */props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, centos201:9092); /*3.ack应答级别*/props.put(ProducerConfig.ACKS_CONFIG, all);/*4.重试次数*/ props.put(ProducerConfig.RETRIES_CONFIG, 3); /*5.批次大小一次发送多少数据当数据大于16k生产者会发送数据到 kafka集群 */props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * KfkNumConst._1K); /*6.等待时间 等待时间超过1毫秒即便数据没有大于16k 也会写数据到kafka集群 */props.put(ProducerConfig.LINGER_MS_CONFIG, 1); /*7. RecordAccumulator 缓冲区大小*/ props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M); /*8. key, value 的序列化类 */ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());/* 添加拦截器 */props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList(TimeInterceptor.class.getName(), CounterInterceptor.class.getName())); System.out.println(props); /* 9.创建生产者对象 */KafkaProducerString, String producer new KafkaProducer(props); /* 10.发送数据 */ for (int i 0; i 10; i) { producer.send(new ProducerRecord(first, keyi, value-first-20210101--J i));}/* 11.关闭资源 */ producer.close(); // 间接调用了拦截器的close 方法 System.out.println(kafka生产者写入数据完成); }
}
/*** 时间拦截器-在消息前添加时间戳 */
public class TimeInterceptor implements ProducerInterceptorString, String{Overridepublic void configure(MapString, ? configs) {}/*生产者确保在 消息被序列化以及计算分区前调用该方法。*/Overridepublic ProducerRecordString, String onSend(ProducerRecordString, String record) {// 获取消息值 String value record.value();return new ProducerRecord(record.topic(), record.partition(), record.key(), System.currentTimeMillis() , record.value()); }/*该方法会在消息从 RecordAccumulator 成功发送到Kafka Broker之后或者在发送过程中失败时调用 且通常在 生产者回调逻辑触发之前调用*/Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}Overridepublic void close() {}
}/*** 计数拦截器*/
public class CounterInterceptor implements ProducerInterceptorString, String{int sucCounter 0;int errCounter 0;Overridepublic void configure(MapString, ? configs) {}/*生产者确保在 消息被序列化以及计算分区前调用该方法。*/Overridepublic ProducerRecordString, String onSend(ProducerRecordString, String record) {return record; }/*该方法会在消息从 RecordAccumulator 成功发送到Kafka Broker之后或者在发送过程中失败时调用 且通常在 生产者回调逻辑触发之前调用*/Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {if(metadata !null ) {sucCounter;} else {errCounter; }}Overridepublic void close() {System.out.println(sucCounter sucCounter , errCounter errCounter); }
}
-- java生产者打印日志
sucCounter 10, errCounter0
kafka生产者写入数据完成
3centos消费者消费结果
[rootcentos201 ~]# kafka-console-consumer.sh --topic first --zookeeper centos201:2181
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
1609599630884,value-first-20210102--A0
1609599631203,value-first-20210102--A1
1609599631204,value-first-20210102--A6
1609599631203,value-first-20210102--A2
1609599631203,value-first-20210102--A3
1609599631204,value-first-20210102--A4
1609599631204,value-first-20210102--A5
1609599631204,value-first-20210102--A7
1609599631204,value-first-20210102--A8
1609599631204,value-first-20210102--A9