当前位置: 首页 > news >正文

河北路泰建设工程有限公司网站公司网站备案具体什么情况

河北路泰建设工程有限公司网站,公司网站备案具体什么情况,微信注册小程序步骤,网站建设营销话术目录 至少一次#xff08;at least once#xff09; 最多一次#xff08;at most once#xff09; 精确一次#xff08;exactly once#xff09; 幂等性 幂等性作用范围 实现方法 代码 事务 事务作用范围 实现方法 代码 我们知道Kafka的消息交付可靠性保障分为…目录 至少一次at least once 最多一次at most once 精确一次exactly once 幂等性 幂等性作用范围 实现方法 代码 事务 事务作用范围 实现方法 代码 我们知道Kafka的消息交付可靠性保障分为 最多一次at most once至少一次at least once精确一次exactly once 至少一次at least once 什么时候Producer数据会重复发送 呢 比如当Producer发送一条数据当数据发送过去了由于某种原因Broker没有反馈给Producer已经提交成功Producer此时设置了重试机制retries 设置方法props.put(ProducerConfig.RETRIES_CONFIG, 5); ,则会再次发送数据此时会导致数据重复发送 最多一次at most once 与at least once 相反我们把retries 禁止则就是最多一次如果禁止重试会导致数据丢失 精确一次exactly once 如何实现精确一次呢 Producer 有两种方法 幂等性与事务型 幂等性 幂等性作用范围 只能保证单个Producer不会产生重复数据如果Producer重启或者多个Producer无法保证数据不重复 实现方法 设置一下配置即可 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG true) 代码 import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import wiki.hadoop.kafka.config.Constant;import java.util.Properties; import java.util.concurrent.ExecutionException;/*** 幂等性生产者** 它只能保证单分区上的幂等性即一个幂等性 Producer 能够保证某个主题的一个 分区上不出现重复消息它无法实现多个分区的幂等性* 它只能实现单会话上的幂等性不能实现跨会话的幂等性。这里的会话你可以理 解为 Producer 进程的一次运行。当你重启了 Producer 进程之后这种幂等性保 证就丧失了* author jast* date 2020/4/19 22:38*/ public class IdempotenceProducer {private static ProducerString, String producer ;public IdempotenceProducer() {Properties props new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.KAFKA_BROKERS);props.put(ProducerConfig.ACKS_CONFIG, all);props.put(ProducerConfig.RETRIES_CONFIG, 5);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024 * 1024);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);//设置Producer幂等性,其他不用变化props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);producer new KafkaProducerString, String(props);}public ProducerString,String getProducer(){return producer;}public static void main(String[] args) throws ExecutionException, InterruptedException {IdempotenceProducer idempotenceProducer new IdempotenceProducer();ProducerString, String producer idempotenceProducer.getProducer();producer.send(new ProducerRecordString,String(test,1234)).get();}}事务 事务作用范围 全部 实现方法 Producer设置//设置Producer幂等性,其他不用变化 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true); //设置事务同时也要指定幂等性自定义id名称 props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,jast-acid);-------------------------------------------------------------------Consumer设置//设置只读事务提交成功后的数据props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase()); 代码 Producer import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import wiki.hadoop.kafka.config.Constant;import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit;/*** Kafka事务提交保证exactly once producer* 要么全部成功要么全部失败* author jast* date 2020/4/21 22:38*/ public class TransactionProducer {private static ProducerString, String producer ;public TransactionProducer() {Properties props new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.KAFKA_BROKERS);props.put(ProducerConfig.ACKS_CONFIG, all);props.put(ProducerConfig.RETRIES_CONFIG, 5);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024 * 1024);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);//设置Producer幂等性,其他不用变化props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);//设置事务同时也要指定幂等性自定义id名称props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,jast-acid);producer new KafkaProducerString, String(props);}public ProducerString,String getProducer(){return producer;}public static void main(String[] args) throws ExecutionException, InterruptedException {TransactionProducer transactionProducer new TransactionProducer();ProducerString, String producer transactionProducer.getProducer();//初始化事务producer.initTransactions();boolean flag true;//循环四次最后一次我们把事务成功提交//理想结果前三次事务提交失败// 事务消费者消费不到数据1,2第四次可以消费到1,2,3,4// 普通消费者可以消费到前三次的1,2 ,也可以消费到第四次1,2,3,4// 运行方法 TransactionConsumer/*** 结果如下事务提交成功* 普通消费者消费数据-1 partition:2 offset:3080713* 事务消费者消费数据-3 partition:2 offset:3080717* 普通消费者消费数据-2 partition:1 offset:3081410* 普通消费者消费数据-1 partition:3 offset:3081465* 普通消费者消费数据-1 partition:2 offset:3080715* 普通消费者消费数据-3 partition:2 offset:3080717* 事务消费者消费数据-4 partition:1 offset:3081414* 事务消费者消费数据-2 partition:0 offset:3081470* 事务消费者消费数据-1 partition:3 offset:3081467* 普通消费者消费数据-2 partition:1 offset:3081412* 普通消费者消费数据-4 partition:1 offset:3081414* 普通消费者消费数据-2 partition:0 offset:3081468* 普通消费者消费数据-2 partition:0 offset:3081470* 普通消费者消费数据-1 partition:3 offset:3081467*/for(int i0;i3;i) {if(i3)flag false;try {//事务开始producer.beginTransaction();producer.send(new ProducerRecordString, String(test, 1)).get();producer.send(new ProducerRecordString, String(test, 2)).get();//手动制造异常if (flag)throw new RuntimeException(程序异常);producer.send(new ProducerRecordString, String(test, 3)).get();producer.send(new ProducerRecordString, String(test, 4)).get();//事务提交producer.commitTransaction();} catch (Exception e) {//中止事务producer.abortTransaction();e.printStackTrace();}}} }Consumer import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.requests.IsolationLevel; import org.apache.kafka.common.serialization.StringDeserializer; import wiki.hadoop.kafka.config.Constant; import wiki.hadoop.kafka.util.LogInit;import java.util.Arrays; import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit;/*** 消费Kafka保证事务性* author jast* date 2020/4/21 22:54*/ public class TransactionConsumer {/*** 事务性kafka消费* return KafkaConsumerString,String* param topic* param max_poll_records* param group* return*/public KafkaConsumerString, String transactionConsumer(String topic, String group , int max_poll_records , boolean isLatest) {Properties props new Properties();//-----------------------------------------------------------------------------------//设置只读事务提交成功后的数据props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase());//-----------------------------------------------------------------------------------props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.KAFKA_BROKERS);props.put(ConsumerConfig.GROUP_ID_CONFIG, group);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, max_poll_records);//控制每次poll的数量props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);//自动提交 falseprops.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 3000);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, isLatesttrue ? latest : earliest);props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 5 * 1024 * 1024);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumerString, String consumer new KafkaConsumerString, String(props);consumer.subscribe(Arrays.asList(topic));return consumer;}public KafkaConsumerString, String consumer(String topic, String group , int max_poll_records , boolean isLatest) {Properties props new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.KAFKA_BROKERS);props.put(ConsumerConfig.GROUP_ID_CONFIG, group);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, max_poll_records);//控制每次poll的数量props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);//自动提交 falseprops.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 3000);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, isLatesttrue ? latest : earliest);props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 5 * 1024 * 1024);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumerString, String consumer new KafkaConsumerString, String(props);consumer.subscribe(Arrays.asList(topic));return consumer;}public static void main(String[] args) throws InterruptedException, ExecutionException {TransactionConsumer transactionConsumer new TransactionConsumer();TransactionConsumer transactionConsumer2 new TransactionConsumer();KafkaConsumerString, String consumer transactionConsumer.consumer(test, test, 10, false);KafkaConsumerString, String consumer2 transactionConsumer2.transactionConsumer(test, test2, 10, false);CompletableFuture.runAsync(()-{while(true) {ConsumerRecordsString, String records consumer.poll(1000);for (ConsumerRecordString, String record : records) {System.out.println(普通消费者消费数据- record.value() partition:record.partition() offset:record.offset());} // System.out.println(普通消费者休眠1秒);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}});CompletableFuture.runAsync(()-{while(true) {ConsumerRecordsString, String records2 consumer2.poll(1000);for (ConsumerRecordString, String record : records2) {System.out.println(事务消费者消费数据- record.value() partition:record.partition() offset:record.offset());} // System.out.println(事务消费者休眠1秒);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}}).get();} }
http://wiki.neutronadmin.com/news/180593/

相关文章:

  • 计算机考试模拟网站怎么做儿童编程哪家培训机构好
  • 建设 网站工作汇报海城建设网站
  • 青岛信息推广网站王占山 同济大学
  • 北京网站设计技术云南网站建设公司哪家好
  • 牡丹江百姓信息网广告公司seo是什么职位
  • 重庆网站建设哪家有网站界面版式
  • 网站是空间备案新能源汽车价格补贴
  • 驻马店市住房和城乡建设局网站.net网站服务器
  • 电影网站如何建设佛山网站建设no.1
  • 苏宁网站优化与推广在线书店网站怎么做
  • 传媒公司可以做网站么做公司官网需要哪些数据
  • 东莞网站建设规范美食网页设计免费模板
  • 零遁nas做网站dz怎么做视频网站
  • 源码网站模板台州专业做网站
  • 资源下载网站建设网站开发维护前景
  • 网站和app设计区别wordpress如何设置外网访问
  • 企业网站报告册设计模板有域名了怎么建立网站
  • 网站建设基本情况境外企业网站推广
  • 自己做的网站很慢网站开发 知乎
  • wap网站不流行wordpress阿里矢量图使用方法
  • 鞍钢节能公司网站开发受欢迎的昆明网站建设
  • 海外网站cdn加速下载百度推广非企代理
  • 贵州专业网站建设公司哪家好网页升级访问最新区域每天自动更新
  • 如何使用ftp上传网站简洁大气企业网站
  • 宏润建设集团股份有限公司网站网站导航
  • 哪个网站做欧洲旅行比较好网站做子页面怎么做
  • 网站logo设计创意非标自动化东莞网站建设
  • 赢展网站建设郑州网站制作公司
  • 交互式网站是什么郑州网站制作网页
  • 网站上怎么做动图上海网站建设服务市价