河北路泰建设工程有限公司网站,公司网站备案具体什么情况,微信注册小程序步骤,网站建设营销话术目录
至少一次#xff08;at least once#xff09;
最多一次#xff08;at most once#xff09;
精确一次#xff08;exactly once#xff09;
幂等性
幂等性作用范围
实现方法
代码
事务
事务作用范围
实现方法
代码 我们知道Kafka的消息交付可靠性保障分为…目录
至少一次at least once
最多一次at most once
精确一次exactly once
幂等性
幂等性作用范围
实现方法
代码
事务
事务作用范围
实现方法
代码 我们知道Kafka的消息交付可靠性保障分为 最多一次at most once至少一次at least once精确一次exactly once 至少一次at least once
什么时候Producer数据会重复发送 呢
比如当Producer发送一条数据当数据发送过去了由于某种原因Broker没有反馈给Producer已经提交成功Producer此时设置了重试机制retries 设置方法props.put(ProducerConfig.RETRIES_CONFIG, 5); ,则会再次发送数据此时会导致数据重复发送
最多一次at most once
与at least once 相反我们把retries 禁止则就是最多一次如果禁止重试会导致数据丢失 精确一次exactly once
如何实现精确一次呢
Producer 有两种方法 幂等性与事务型
幂等性
幂等性作用范围
只能保证单个Producer不会产生重复数据如果Producer重启或者多个Producer无法保证数据不重复
实现方法
设置一下配置即可
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG true)
代码 import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import wiki.hadoop.kafka.config.Constant;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** 幂等性生产者** 它只能保证单分区上的幂等性即一个幂等性 Producer 能够保证某个主题的一个 分区上不出现重复消息它无法实现多个分区的幂等性* 它只能实现单会话上的幂等性不能实现跨会话的幂等性。这里的会话你可以理 解为 Producer 进程的一次运行。当你重启了 Producer 进程之后这种幂等性保 证就丧失了* author jast* date 2020/4/19 22:38*/
public class IdempotenceProducer {private static ProducerString, String producer ;public IdempotenceProducer() {Properties props new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.KAFKA_BROKERS);props.put(ProducerConfig.ACKS_CONFIG, all);props.put(ProducerConfig.RETRIES_CONFIG, 5);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024 * 1024);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);//设置Producer幂等性,其他不用变化props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);producer new KafkaProducerString, String(props);}public ProducerString,String getProducer(){return producer;}public static void main(String[] args) throws ExecutionException, InterruptedException {IdempotenceProducer idempotenceProducer new IdempotenceProducer();ProducerString, String producer idempotenceProducer.getProducer();producer.send(new ProducerRecordString,String(test,1234)).get();}}事务
事务作用范围
全部
实现方法
Producer设置//设置Producer幂等性,其他不用变化
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
//设置事务同时也要指定幂等性自定义id名称
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,jast-acid);-------------------------------------------------------------------Consumer设置//设置只读事务提交成功后的数据props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase());
代码
Producer
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import wiki.hadoop.kafka.config.Constant;import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;/*** Kafka事务提交保证exactly once producer* 要么全部成功要么全部失败* author jast* date 2020/4/21 22:38*/
public class TransactionProducer {private static ProducerString, String producer ;public TransactionProducer() {Properties props new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.KAFKA_BROKERS);props.put(ProducerConfig.ACKS_CONFIG, all);props.put(ProducerConfig.RETRIES_CONFIG, 5);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024 * 1024);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);//设置Producer幂等性,其他不用变化props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);//设置事务同时也要指定幂等性自定义id名称props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,jast-acid);producer new KafkaProducerString, String(props);}public ProducerString,String getProducer(){return producer;}public static void main(String[] args) throws ExecutionException, InterruptedException {TransactionProducer transactionProducer new TransactionProducer();ProducerString, String producer transactionProducer.getProducer();//初始化事务producer.initTransactions();boolean flag true;//循环四次最后一次我们把事务成功提交//理想结果前三次事务提交失败// 事务消费者消费不到数据1,2第四次可以消费到1,2,3,4// 普通消费者可以消费到前三次的1,2 ,也可以消费到第四次1,2,3,4// 运行方法 TransactionConsumer/*** 结果如下事务提交成功* 普通消费者消费数据-1 partition:2 offset:3080713* 事务消费者消费数据-3 partition:2 offset:3080717* 普通消费者消费数据-2 partition:1 offset:3081410* 普通消费者消费数据-1 partition:3 offset:3081465* 普通消费者消费数据-1 partition:2 offset:3080715* 普通消费者消费数据-3 partition:2 offset:3080717* 事务消费者消费数据-4 partition:1 offset:3081414* 事务消费者消费数据-2 partition:0 offset:3081470* 事务消费者消费数据-1 partition:3 offset:3081467* 普通消费者消费数据-2 partition:1 offset:3081412* 普通消费者消费数据-4 partition:1 offset:3081414* 普通消费者消费数据-2 partition:0 offset:3081468* 普通消费者消费数据-2 partition:0 offset:3081470* 普通消费者消费数据-1 partition:3 offset:3081467*/for(int i0;i3;i) {if(i3)flag false;try {//事务开始producer.beginTransaction();producer.send(new ProducerRecordString, String(test, 1)).get();producer.send(new ProducerRecordString, String(test, 2)).get();//手动制造异常if (flag)throw new RuntimeException(程序异常);producer.send(new ProducerRecordString, String(test, 3)).get();producer.send(new ProducerRecordString, String(test, 4)).get();//事务提交producer.commitTransaction();} catch (Exception e) {//中止事务producer.abortTransaction();e.printStackTrace();}}}
}Consumer import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.serialization.StringDeserializer;
import wiki.hadoop.kafka.config.Constant;
import wiki.hadoop.kafka.util.LogInit;import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;/*** 消费Kafka保证事务性* author jast* date 2020/4/21 22:54*/
public class TransactionConsumer {/*** 事务性kafka消费* return KafkaConsumerString,String* param topic* param max_poll_records* param group* return*/public KafkaConsumerString, String transactionConsumer(String topic, String group , int max_poll_records , boolean isLatest) {Properties props new Properties();//-----------------------------------------------------------------------------------//设置只读事务提交成功后的数据props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase());//-----------------------------------------------------------------------------------props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.KAFKA_BROKERS);props.put(ConsumerConfig.GROUP_ID_CONFIG, group);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, max_poll_records);//控制每次poll的数量props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);//自动提交 falseprops.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 3000);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, isLatesttrue ? latest : earliest);props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 5 * 1024 * 1024);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumerString, String consumer new KafkaConsumerString, String(props);consumer.subscribe(Arrays.asList(topic));return consumer;}public KafkaConsumerString, String consumer(String topic, String group , int max_poll_records , boolean isLatest) {Properties props new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.KAFKA_BROKERS);props.put(ConsumerConfig.GROUP_ID_CONFIG, group);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, max_poll_records);//控制每次poll的数量props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);//自动提交 falseprops.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 3000);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, isLatesttrue ? latest : earliest);props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 5 * 1024 * 1024);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumerString, String consumer new KafkaConsumerString, String(props);consumer.subscribe(Arrays.asList(topic));return consumer;}public static void main(String[] args) throws InterruptedException, ExecutionException {TransactionConsumer transactionConsumer new TransactionConsumer();TransactionConsumer transactionConsumer2 new TransactionConsumer();KafkaConsumerString, String consumer transactionConsumer.consumer(test, test, 10, false);KafkaConsumerString, String consumer2 transactionConsumer2.transactionConsumer(test, test2, 10, false);CompletableFuture.runAsync(()-{while(true) {ConsumerRecordsString, String records consumer.poll(1000);for (ConsumerRecordString, String record : records) {System.out.println(普通消费者消费数据- record.value() partition:record.partition() offset:record.offset());}
// System.out.println(普通消费者休眠1秒);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}});CompletableFuture.runAsync(()-{while(true) {ConsumerRecordsString, String records2 consumer2.poll(1000);for (ConsumerRecordString, String record : records2) {System.out.println(事务消费者消费数据- record.value() partition:record.partition() offset:record.offset());}
// System.out.println(事务消费者休眠1秒);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}}).get();}
}