当前位置: 首页 > news >正文

网站个人备案做企业网站东莞东坑网站建设

网站个人备案做企业网站,东莞东坑网站建设,吉安做网站的,绿色风格网站一、实时流式计算 1. 概念 一般流式计算会与批量计算相比较。在流式计算模型中#xff0c;输入是持续的#xff0c;可以认为在时间上是无界的#xff0c;也就意味着#xff0c;永远拿不到全量数据去做计算。同时#xff0c;计算结果是持续输出的#xff0c;也即计算结果…一、实时流式计算 1. 概念 一般流式计算会与批量计算相比较。在流式计算模型中输入是持续的可以认为在时间上是无界的也就意味着永远拿不到全量数据去做计算。同时计算结果是持续输出的也即计算结果在时间上也是无界的。流式计算一般对实时性要求较高同时一般是先定义目标计算然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率往往尽可能采用增量计算代替全量计算 流式计算就相当于上图的右侧扶梯是可以源源不断的产生数据源源不断的接收数据没有边界。 2. 应用场景 日志分析: 网站的用户访问日志进行实时的分析计算访问量用户画像留存率等等实时的进行数据分析帮助企业进行决策大屏看板统计: 可以实时的查看网站注册数量订单数量购买数量金额等。公交实时数据: 可以随时更新公交车方位计算多久到达站牌等实时文章分值计算 比如应用较广的 头条类文章的分值计算通过用户的行为实时文章的分值分值越高就越被推荐。 3. Kafka Stream 近些年来开源流处理领域涌现出了很多优秀框架。光是在 Apache 基金会孵化的项目关于流处理的大数据框架就有十几个之多比如早期的 Apache Samza、Apache Storm以及这些年火爆的 Spark 以及 Flink 等。 3.1 Kafka Streams的特点 Kafka Stream提供了一个非常简单而轻量的Library它可以非常方便地嵌入任意Java应用中也可以任意方式打包和部署除了Kafka外无任何外部依赖充分利用Kafka分区机制实现水平扩展和顺序性保证通过可容错的state store实现高效的状态操作如windowed join和aggregation支持正好一次处理语义提供记录级的处理能力从而实现毫秒级的低延迟支持基于事件时间的窗口操作并且可处理晚到的数据late arrival of records同时提供底层的处理原语Processor类似于Storm的spout和bolt以及高层抽象的DSL类似于Spark的map/group/reduce 3.2 关键概念 一个最简单的Streaming的结构如下图所示 从一个Topic中读取到数据经过一些处理操作之后写入到另一个Topic中这就是一个最简单的Streaming流式计算。其中Source Topic中的数据会源源不断的产生新数据。 那么我们再在上面的结构之上扩展一下假设定义了多个Source Topic及Destination Topic那就构成如下图所示的较为复杂的拓扑结构 源处理器Source Processor源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。Sink处理器sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题。 Kafka Streams被认为是开发实时应用程序的最简单方法。它是一个Kafka的客户端API库编写简单的java就可以实现流式处理。 3.3 KStream KStream:数据结构类似于map如下图key-value键值对 KStream数据流data stream是一段顺序的可以无限长不断更新的数据集。 数据流中比较常记录的是事件这些事件可以是一次鼠标点击click一次交易或是传感器记录的位置数据。 KStream负责抽象的就是数据流。与Kafka自身topic中的数据一样类似日志每一次操作都是向其中插入insert新数据。 二、测试kafkaStream 先看下简单的kafkaStream的KStream测试 需求分析求单词个数word count 1. pom.xml引入依赖 !-- kafka --dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactIdexclusionsexclusiongroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId/exclusion/exclusions/dependencydependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactId/dependencydependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-streams/artifactIdexclusionsexclusionartifactIdconnect-json/artifactIdgroupIdorg.apache.kafka/groupId/exclusionexclusiongroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId/exclusion/exclusions/dependency2. 配置文件 server:port: 9991 spring:application:name: kafka-demokafka:bootstrap-servers: 192.168.200.130:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializercompression-type: lz4consumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer3. 编写生产者 ProducerQuickStart.java package com.kafka.sample;import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.*;import java.util.Properties;Slf4j public class ProducerQuickStart {public static void main(String[] args) {//1. kafka的配置信息Properties prop new Properties();//kafka的链接信息prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.200.130:9092);//配置重试次数prop.put(ProducerConfig.RETRIES_CONFIG, 5);//数据压缩prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,lz4);//ack配置 消息确认机制 默认ack1,即只要集群首领节点收到消息生产者就会收到一个来自服务器的成功响应 // prop.put(ProducerConfig.ACKS_CONFIG,all);消息key的序列化器prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);//消息value的序列化器prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);//2. 生产者对象KafkaProducerString, String producer new KafkaProducerString, String(prop);//封装发送的消息ProducerRecordString, String producerRecord new ProducerRecordString, String(itcast-topic-input, key_001, hello kafka);//3. 发送消息for (int i 0; i 5; i) {producer.send(producerRecord);}//4. 关闭消息通道 必须关闭否则消息发不出去producer.close();} }4 编写kafkaStream流式处理 KafkaStreamQuickStart.java package com.kafka.sample;import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.ValueMapper;import java.time.Duration; import java.util.Arrays; import java.util.Properties;/*** 流式处理*/ public class KafkaStreamQuickStart {public static void main(String[] args) {//kafka的配置信心Properties prop new Properties();prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.200.130:9092);prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.APPLICATION_ID_CONFIG,streams-quickstart);//stream 构建器StreamsBuilder streamsBuilder new StreamsBuilder();//流式计算streamProcessor(streamsBuilder);//创建kafkaStream对象KafkaStreams kafkaStreams new KafkaStreams(streamsBuilder.build(),prop);//开启流式计算kafkaStreams.start();}/*** 流式计算* 消息的内容hello kafka hello itcast* param streamsBuilder*/private static void streamProcessor(StreamsBuilder streamsBuilder) {//创建kstream对象同时指定从那个topic中接收消息KStreamString, String stream streamsBuilder.stream(itcast-topic-input);/*** 处理消息的value*/stream.flatMapValues(new ValueMapperString, IterableString() {Overridepublic IterableString apply(String value) {return Arrays.asList(value.split( ));}})//按照value进行聚合处理.groupBy((key,value)-value)//时间窗口.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//统计单词的个数.count()//转换为kStream.toStream().map((key,value)-{System.out.println(key:key,vlaue:value);return new KeyValue(key.key().toString(),value.toString());})//发送消息.to(itcast-topic-out);} }5. 编写消费者 ConsumerQuickStart.java package com.kafka.sample;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration; import java.util.Collections; import java.util.Properties;public class ConsumerQuickStart {public static void main(String[] args) {//1. 添加kafka的配置信息Properties properties new Properties();// 配置链接信息properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.200.130:9092);//配置消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, group-2);//配置消息的反序列化器properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);//2. 消费者对象KafkaConsumerString, String consumer new KafkaConsumerString, String(properties);//3. 订阅主题consumer.subscribe(Collections.singletonList(itcast-topic-out));//当前线程一直监听消息while(true){//4. 消费者拉取消息 每秒拉取一次ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString, String record : records) {System.out.println(record.key());System.out.println(record.value());}}} }启动项目 在远端192.168.200.130:9092启动docker中的kafka容器启动消费者ConsumerQuickStart的main函数启动kafkastream的mian函数启动生产者ProducerQuickStart的main函数 5. 控制台打印结果 整个过程 生产者向kafka中发送了5条“hello kafka”消息topic均为itcast-topic-input。kafkastream监听这个topic每10秒进行一次流式处理将“hello kakfa”字符串分割并统计每个单词出现的次数。然后转为kstream发送消息到kafka中的topicitcast-topic-out”。消费者监听“itcast-topic-out”的topic消费消息。 三、Springboot整合kafkaStream 1. 配置文件新增 application.yml server:port: 9991 spring:application:name: kafka-demokafka:bootstrap-servers: 192.168.200.130:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializercompression-type: lz4consumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer # kafkaStream新增以下配置 kafka:hosts: 192.168.200.130:9092group: ${spring.application.name}2. 在微服务中新增配置类 KafkaStreamConfig.java package com.kafka.config;import lombok.Getter; import lombok.Setter; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafkaStreams; import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; import org.springframework.kafka.config.KafkaStreamsConfiguration;import java.util.HashMap; import java.util.Map;/*** 通过重新注册KafkaStreamsConfiguration对象设置自定配置参数*/Setter Getter Configuration EnableKafkaStreams ConfigurationProperties(prefixkafka) public class KafkaStreamConfig {private static final int MAX_MESSAGE_SIZE 16* 1024 * 1024;private String hosts;private String group;Bean(name KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {MapString, Object props new HashMap();props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()_stream_aid);props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()_stream_cid);props.put(StreamsConfig.RETRIES_CONFIG, 10);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());return new KafkaStreamsConfiguration(props);} }3. 使用kafkaStream监听消息 KafkaStreamHelloListener.java package com.kafka.stream;import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.ValueMapper; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;import java.time.Duration; import java.util.Arrays;Configuration Slf4j public class KafkaStreamHelloListener {Beanpublic KStreamString,String kStream(StreamsBuilder streamsBuilder){//创建kstream对象同时指定从那个topic中接收消息KStreamString, String stream streamsBuilder.stream(itcast-topic-input);stream.flatMapValues(new ValueMapperString, IterableString() {Overridepublic IterableString apply(String value) {return Arrays.asList(value.split( ));}})//根据value进行聚合分组.groupBy((key,value)-value)//聚合计算时间间隔.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//求单词的个数.count().toStream()//处理后的结果转换为string字符串.map((key,value)-{System.out.println(key:key,value:value);return new KeyValue(key.key().toString(),value.toString());})//发送消息.to(itcast-topic-out);return stream;} }测试 启动springboot应用程序运行之前的ProducerQuickStart来生产消息约10秒后看到kafkaStream消息的处理结果 说明kafkaStream接收到消息并将多条消息进行了统一处理。 参考推荐阅读 https://cloud.tencent.com/developer/article/2100664https://www.cnblogs.com/tree1123/p/11457851.html
http://wiki.neutronadmin.com/news/182814/

相关文章:

  • 公众平台安全助手深圳网站优化计划
  • 提供做网站公司有哪些石家庄上门足疗
  • 网站选项卡大连建设工程信息网下载中心
  • 怎么自己做购物网站做网站ps切图
  • 大网站的二级域名网页升级中每天自动更新
  • 合川网站优化建行官方网站
  • 怎样做免费网站会员做网站需要套模板
  • 企业网站管理系统 源码互联网装饰公司
  • 大学院系网站建设最新seo黑帽技术工具软件
  • 免费信息发布网站大全设计类网站排名
  • 如何给自己的网站做优化查排名官网
  • 加盟餐饮的网站建设注册一个免费的网站
  • 印刷 网站模板在线证件照生成器
  • 泰国房产网站大全深圳大簇激光公司网站
  • 网站建设三亚南京ui培训
  • 和女人做的电影网站网站中页面链接怎么做的
  • 制定一个网站建设方案wordpress增加邀请功能
  • cc网站域名注册西安企业网站搭建
  • 漳州 网站设计培训网站建设方案模板下载
  • 潍坊设计网站万网域名官网
  • 怎么开通网站和进行网页设计上海南山做网站
  • 浙江建设厅 继续教育 网站首页wordpress支持大数据处理
  • 医院网站建设细节科技公司排名
  • 网站建设者属于广告经营者吗网站建设公司面临的问题
  • 电子商务网站建设规划的论文企业如何建公司网站
  • 网站建设规划论文电商网站开发资金预算
  • 网站建设设计公司类网站织梦模板 带手机端做网站公司做网站公司有哪些
  • 鞍山百度做网站wordpress 打开满
  • 门户网站开发设计方案建筑网站源码
  • 网站哪里备案网站一个一个关键词做