当前位置: 首页 > news >正文

邯郸建设企业网站十大装饰公司排行榜

邯郸建设企业网站,十大装饰公司排行榜,软文写作技巧,十大战略咨询公司水善利万物而不争#xff0c;处众人之所恶#xff0c;故几于道#x1f4a6; 文章目录 1. Kafka_Sink 2. Kafka_Sink - 自定义序列化器 3. Redis_Sink_String 4. Redis_Sink_list 5. Redis_Sink_set 6. Redis_Sink_hash 7. 有界流数据写入到ES 8. 无界流数据写入到ES 9. 自定… 水善利万物而不争处众人之所恶故几于道 文章目录 1. Kafka_Sink 2. Kafka_Sink - 自定义序列化器 3. Redis_Sink_String 4. Redis_Sink_list 5. Redis_Sink_set 6. Redis_Sink_hash 7. 有界流数据写入到ES 8. 无界流数据写入到ES 9. 自定义sink - mysql_Sink 10. Jdbc_Sink 官方文档 - Flink1.13 1. Kafka_Sink addSink(new FlinkKafkaProducer String(kafka_address,topic,序列化器) 要先添加依赖 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka_2.12/artifactIdversion1.13.6/version /dependencypublic static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ArrayListWaterSensor waterSensors new ArrayList();waterSensors.add(new WaterSensor(sensor_1, 1607527992000L, 20));waterSensors.add(new WaterSensor(sensor_1, 1607527994000L, 50));waterSensors.add(new WaterSensor(sensor_1, 1607527996000L, 50));waterSensors.add(new WaterSensor(sensor_2, 1607527993000L, 10));waterSensors.add(new WaterSensor(sensor_2, 1607527995000L, 30));DataStreamSourceWaterSensor stream env.fromCollection(waterSensors);stream.keyBy(WaterSensor::getId).sum(vc).map(JSON::toJSONString).addSink(new FlinkKafkaProducerString(hadoop101:9092, // kafaka地址flink_sink_kafka, //要写入的Kafkatopicnew SimpleStringSchema() // 序列化器));try {env.execute();} catch (Exception e) {e.printStackTrace();} }运行结果 2. Kafka_Sink - 自定义序列化器 自定义序列化器new FlinkKafkaProducer()的时候选择四个参数的构造方法然后使用new KafkaSerializationSchema序列化器。然后重写serialize方法 public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ArrayListWaterSensor waterSensors new ArrayList();waterSensors.add(new WaterSensor(sensor_1, 1607527992000L, 20));waterSensors.add(new WaterSensor(sensor_1, 1607527994000L, 50));waterSensors.add(new WaterSensor(sensor_1, 1607527996000L, 50));waterSensors.add(new WaterSensor(sensor_2, 1607527993000L, 10));waterSensors.add(new WaterSensor(sensor_2, 1607527995000L, 30));DataStreamSourceWaterSensor stream env.fromCollection(waterSensors);Properties sinkConfig new Properties();sinkConfig.setProperty(bootstrap.servers,hadoop101:9092);stream.keyBy(WaterSensor::getId).sum(vc).addSink(new FlinkKafkaProducerWaterSensor(defaultTopic, // 默认发往的topic ,一般用不上new KafkaSerializationSchemaWaterSensor() { // 自定义的序列化器Overridepublic ProducerRecordbyte[], byte[] serialize(WaterSensor waterSensor,Nullable Long aLong) {String s JSON.toJSONString(waterSensor);return new ProducerRecord(flink_sink_kafka,s.getBytes(StandardCharsets.UTF_8));}},sinkConfig, // Kafka的配置FlinkKafkaProducer.Semantic.AT_LEAST_ONCE // 一致性语义现在只能传入至少一次));try {env.execute();} catch (Exception e) {e.printStackTrace();} }运行结果 3. Redis_Sink_String addSink(new RedisSink(config, new RedisMapper WaterSensor() {} 写到String结构里面 添加依赖 dependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.83/version /dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-redis_2.11/artifactIdversion1.1.5/version /dependencypublic static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ArrayListWaterSensor waterSensors new ArrayList();waterSensors.add(new WaterSensor(sensor_1, 1607527992000L, 20));waterSensors.add(new WaterSensor(sensor_1, 1607527994000L, 50));waterSensors.add(new WaterSensor(sensor_1, 1607527996000L, 50));waterSensors.add(new WaterSensor(sensor_2, 1607527993000L, 10));waterSensors.add(new WaterSensor(sensor_2, 1607527995000L, 30));DataStreamSourceWaterSensor stream env.fromCollection(waterSensors);SingleOutputStreamOperatorWaterSensor result stream.keyBy(WaterSensor::getId).sum(vc);/* 往redis里面写字符串string 命令提示符用set 假设写的key是id,value是整个json格式的字符串 key value sensor_1 json格式字符串*/// new一个单机版的配置FlinkJedisPoolConfig config new FlinkJedisPoolConfig.Builder().setHost(hadoop101).setPort(6379).setMaxTotal(100) //最大连接数量.setMaxIdle(10) // 连接池里面的最大空闲.setMinIdle(2) // 连接池里面的最小空闲.setTimeout(10*1000) // 超时时间.build();// 写出到redis中result.addSink(new RedisSink(config, new RedisMapperWaterSensor() {// 返回命令描述符往不同的数据结构写数据用的方法不一样Overridepublic RedisCommandDescription getCommandDescription() {// 写入到字符串用setreturn new RedisCommandDescription(RedisCommand.SET);}Overridepublic String getKeyFromData(WaterSensor waterSensor) {return waterSensor.getId();}Overridepublic String getValueFromData(WaterSensor waterSensor) {return JSON.toJSONString(waterSensor);}}));try {env.execute();} catch (Exception e) {e.printStackTrace();} }运行结果 4. Redis_Sink_list addSink(new RedisSink(config, new RedisMapper WaterSensor() {} 写到 list 结构里面 public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ArrayListWaterSensor waterSensors new ArrayList();waterSensors.add(new WaterSensor(sensor_1, 1607527992000L, 20));waterSensors.add(new WaterSensor(sensor_1, 1607527994000L, 50));waterSensors.add(new WaterSensor(sensor_1, 1607527996000L, 50));waterSensors.add(new WaterSensor(sensor_2, 1607527993000L, 10));waterSensors.add(new WaterSensor(sensor_2, 1607527995000L, 30));DataStreamSourceWaterSensor stream env.fromCollection(waterSensors);SingleOutputStreamOperatorWaterSensor result stream.keyBy(WaterSensor::getId).sum(vc);// key是idvalue是处理后的json格式字符串FlinkJedisPoolConfig config new FlinkJedisPoolConfig.Builder().setHost(hadoop101).setPort(6379).setMaxTotal(100) //最大连接数量.setMaxIdle(10) // 连接池里面的最大空闲.setMinIdle(2) // 连接池里面的最小空闲.setTimeout(10*1000) // 超时时间.build();result.addSink(new RedisSink(config, new RedisMapperWaterSensor() {Overridepublic RedisCommandDescription getCommandDescription() {// 写入listreturn new RedisCommandDescription(RedisCommand.RPUSH);}Overridepublic String getKeyFromData(WaterSensor waterSensor) {return waterSensor.getId();}Overridepublic String getValueFromData(WaterSensor waterSensor) {return JSON.toJSONString(waterSensor);}}));try {env.execute();} catch (Exception e) {e.printStackTrace();} }运行结果 5. Redis_Sink_set addSink(new RedisSink(config, new RedisMapper WaterSensor() {} 写到 set 结构里面 public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ArrayListWaterSensor waterSensors new ArrayList();waterSensors.add(new WaterSensor(sensor_1, 1607527992000L, 20));waterSensors.add(new WaterSensor(sensor_1, 1607527994000L, 50));waterSensors.add(new WaterSensor(sensor_1, 1607527996000L, 50));waterSensors.add(new WaterSensor(sensor_2, 1607527993000L, 10));waterSensors.add(new WaterSensor(sensor_2, 1607527995000L, 30));DataStreamSourceWaterSensor stream env.fromCollection(waterSensors);SingleOutputStreamOperatorWaterSensor result stream.keyBy(WaterSensor::getId).sum(vc);FlinkJedisPoolConfig config new FlinkJedisPoolConfig.Builder().setHost(hadoop101).setPort(6379).setMaxTotal(100).setMaxIdle(10).setMinIdle(2).setTimeout(10*1000).build();result.addSink(new RedisSink(config, new RedisMapperWaterSensor() {Overridepublic RedisCommandDescription getCommandDescription() {// 数据写入set集合return new RedisCommandDescription(RedisCommand.SADD);}Overridepublic String getKeyFromData(WaterSensor waterSensor) {return waterSensor.getId();}Overridepublic String getValueFromData(WaterSensor waterSensor) {return JSON.toJSONString(waterSensor);}}));try {env.execute();} catch (Exception e) {e.printStackTrace();} }运行结果 6. Redis_Sink_hash addSink(new RedisSink(config, new RedisMapper WaterSensor() {} 写到 hash结构里面 public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ArrayListWaterSensor waterSensors new ArrayList();waterSensors.add(new WaterSensor(sensor_1, 1607527992000L, 20));waterSensors.add(new WaterSensor(sensor_1, 1607527994000L, 50));waterSensors.add(new WaterSensor(sensor_1, 1607527996000L, 50));waterSensors.add(new WaterSensor(sensor_2, 1607527993000L, 10));waterSensors.add(new WaterSensor(sensor_2, 1607527995000L, 30));DataStreamSourceWaterSensor stream env.fromCollection(waterSensors);SingleOutputStreamOperatorWaterSensor result stream.keyBy(WaterSensor::getId).sum(vc);FlinkJedisPoolConfig config new FlinkJedisPoolConfig.Builder().setHost(hadoop101).setPort(6379).setMaxTotal(100).setMaxIdle(10).setMinIdle(2).setTimeout(10*1000).build();result.addSink(new RedisSink(config, new RedisMapperWaterSensor() {Overridepublic RedisCommandDescription getCommandDescription() {// 数据写入hashreturn new RedisCommandDescription(RedisCommand.HSET,a);}Overridepublic String getKeyFromData(WaterSensor waterSensor) {return waterSensor.getId();}Overridepublic String getValueFromData(WaterSensor waterSensor) {return JSON.toJSONString(waterSensor);}}));try {env.execute();} catch (Exception e) {e.printStackTrace();} }运行结果 7. 有界流数据写入到ES中 new ElasticsearchSink.Builder() public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ArrayListWaterSensor waterSensors new ArrayList();waterSensors.add(new WaterSensor(sensor_1, 1607527992000L, 20));waterSensors.add(new WaterSensor(sensor_1, 1607527994000L, 50));waterSensors.add(new WaterSensor(sensor_1, 1607527996000L, 50));waterSensors.add(new WaterSensor(sensor_2, 1607527993000L, 10));waterSensors.add(new WaterSensor(sensor_2, 1607527995000L, 30));DataStreamSourceWaterSensor stream env.fromCollection(waterSensors);SingleOutputStreamOperatorWaterSensor result stream.keyBy(WaterSensor::getId).sum(vc);ListHttpHost hosts Arrays.asList(new HttpHost(hadoop101, 9200),new HttpHost(hadoop102, 9200),new HttpHost(hadoop103, 9200));ElasticsearchSink.BuilderWaterSensor builder new ElasticsearchSink.BuilderWaterSensor(hosts,new ElasticsearchSinkFunctionWaterSensor() {Overridepublic void process(WaterSensor element, // 需要写出的元素RuntimeContext runtimeContext, // 运行时上下文 不是context上下文对象RequestIndexer requestIndexer) { // 把要写出的数据封装到RequestIndexer里面String msg JSON.toJSONString(element);IndexRequest ir Requests.indexRequest(sensor).type(_doc) // 定义type的时候, 不能下划线开头. _doc是唯一的特殊情况.id(element.getId()) // 定义每条数据的id. 如果不指定id, 会随机分配一个id. id重复的时候会更新数据.source(msg, XContentType.JSON);requestIndexer.add(ir); // 把ir存入到indexer, 就会自动的写入到es中}});result.addSink(builder.build());try {env.execute();} catch (Exception e) {e.printStackTrace();} }8. 无界流数据写入到ES   和有界差不多 只不过把数据源换成socket然后因为无界流它高效不是你来一条就刷出去所以设置刷新时间、大小、条数才能看到结果。 public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);SingleOutputStreamOperatorWaterSensor result env.socketTextStream(hadoop101,9999).map(line-{String[] data line.split(,);return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).sum(vc);ListHttpHost hosts Arrays.asList(new HttpHost(hadoop101, 9200),new HttpHost(hadoop102, 9200),new HttpHost(hadoop103, 9200));ElasticsearchSink.BuilderWaterSensor builder new ElasticsearchSink.BuilderWaterSensor(hosts,new ElasticsearchSinkFunctionWaterSensor() {Overridepublic void process(WaterSensor element, // 需要写出的元素RuntimeContext runtimeContext, // 运行时上下文 不是context上下文对象RequestIndexer requestIndexer) { // 把要写出的数据封装到RequestIndexer里面String msg JSON.toJSONString(element);IndexRequest ir Requests.indexRequest(sensor).type(_doc) // 定义type的时候, 不能下划线开头. _doc是唯一的特殊情况.id(element.getId()) // 定义每条数据的id. 如果不指定id, 会随机分配一个id. id重复的时候会更新数据.source(msg, XContentType.JSON);requestIndexer.add(ir); // 把ir存入到indexer, 就会自动的写入到es中}});// 自动刷新时间builder.setBulkFlushInterval(2000); // 默认不会根据时间自动刷新builder.setBulkFlushMaxSizeMb(1024); // 当批次中的数据大于等于这个值刷新builder.setBulkFlushMaxActions(2); // 每来多少条数据刷新一次// 这三个是或的关系只要有一个满足就会刷新result.addSink(builder.build());try {env.execute();} catch (Exception e) {e.printStackTrace();} }9. 自定义sink - mysql_Sink 需要写一个类实现RichSinkFunction然后实现invoke方法。这里因为是写MySQL所以需要建立连接那就用Rich版本。 记得导入MySQL依赖 public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port, 1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ArrayListWaterSensor waterSensors new ArrayList();waterSensors.add(new WaterSensor(sensor_1, 1607527992000L, 20));waterSensors.add(new WaterSensor(sensor_1, 1607527994000L, 50));waterSensors.add(new WaterSensor(sensor_1, 1607527996000L, 50));waterSensors.add(new WaterSensor(sensor_2, 1607527993000L, 10));waterSensors.add(new WaterSensor(sensor_2, 1607527995000L, 30));DataStreamSourceWaterSensor stream env.fromCollection(waterSensors);SingleOutputStreamOperatorWaterSensor result stream.keyBy(WaterSensor::getId).sum(vc);result.addSink(new MySqlSink());try {env.execute();} catch (Exception e) {e.printStackTrace();}}public static class MySqlSink extends RichSinkFunctionWaterSensor {private Connection connection;Overridepublic void open(Configuration parameters) throws Exception {Class.forName(com.mysql.cj.jdbc.Driver);connection DriverManager.getConnection(jdbc:mysql://hadoop101:3306/test?useSSLfalse, root, 123456);}Overridepublic void close() throws Exception {if (connection!null){connection.close();}}// 调用每来一条元素这个方法执行一次Overridepublic void invoke(WaterSensor value, Context context) throws Exception {// jdbc的方式想MySQL写数据 // String sql insert into sensor(id,ts,vc)values(?,?,?);//如果主键不重复就新增主键重复就更新 // String sql insert into sensor(id,ts,vc)values(?,?,?) duplicate key update vc?;String sql replace into sensor(id,ts,vc)values(?,?,?);// 1. 得到预处理语句PreparedStatement ps connection.prepareStatement(sql);// 2. 给sql中的占位符进行赋值ps.setString(1,value.getId());ps.setLong(2,value.getTs());ps.setInt(3,value.getVc()); // ps.setInt(4,value.getVc());// 3. 执行ps.execute();// 4. 提交 // connection.commit(); MySQL默认自动提交所以这个地方不用调用// 5. 关闭预处理ps.close();} }运行结果 10. Jdbc_Sink addSink(JdbcSink.sink(sql,JdbcStatementBuilder,执行参数,连接参数) 对于jdbc数据库我们其实没必要自定义因为官方给我们了一个JDBC Sink - 官方JDBC Sink 传送门 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc_2.11/artifactIdversion1.13.6/version /dependencypublic static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ArrayListWaterSensor waterSensors new ArrayList();waterSensors.add(new WaterSensor(sensor_1, 1607527992000L, 20));waterSensors.add(new WaterSensor(sensor_1, 1607527994000L, 50));waterSensors.add(new WaterSensor(sensor_1, 1607527996000L, 50));waterSensors.add(new WaterSensor(sensor_2, 1607527993000L, 10));waterSensors.add(new WaterSensor(sensor_2, 1607527995000L, 30));DataStreamSourceWaterSensor stream env.fromCollection(waterSensors);SingleOutputStreamOperatorWaterSensor result stream.keyBy(WaterSensor::getId).sum(vc);result.addSink(JdbcSink.sink(replace into sensor(id,ts,vc)values(?,?,?),new JdbcStatementBuilderWaterSensor() {Overridepublic void accept(PreparedStatement ps,WaterSensor waterSensor) throws SQLException {// 只做一件事给占位符赋值ps.setString(1,waterSensor.getId());ps.setLong(2,waterSensor.getTs());ps.setInt(3,waterSensor.getVc());}},new JdbcExecutionOptions.Builder() //设置执行参数.withBatchSize(1024) // 刷新大小上限.withBatchIntervalMs(2000) //刷新间隔.withMaxRetries(3) // 重试次数.build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(com.mysql.cj.jdbc.Driver).withUrl(jdbc:mysql://hadoop101:3306/test?useSSLfalse).withUsername(root).withPassword(123456).build()));try {env.execute();} catch (Exception e) {e.printStackTrace();} }运行结果
http://wiki.neutronadmin.com/news/253614/

相关文章:

  • 怎么做饲料电商网站资格证网站怎么做
  • 网站安全制度体系的建设情况如何免费建造网站
  • 湘潭自助建站系统申请免费空间
  • 基础网站建设熵网站
  • 在线图片编辑器官网网站建设 网站优化
  • 徐州泉山区建设局网站做外贸主要在那些网站找单
  • 信息管理的基本原理分析网站建设北京电力交易中心主任
  • 经典企业网站天津建筑网站建设
  • 网站对应的ip中国摄影网站
  • 新网站排名优化携程网站用js怎么做
  • 创新型的网站建设j网站开发的相关知识
  • 湖北长欣建设有限公司网站学校网站建设的必要性
  • 惠阳网站建设公司wordpress博客站点
  • 凌云县 城市建设 网站广告设计与制作用什么软件
  • 网站建设图片排版外管局网站做延期收款报告
  • 呼和浩特商城网站建设番禺学校网站建设建议
  • 可以做系统同步时间的网站淘宝客网站搜索怎么做
  • apple 官网网站模板酒店seo是什么意思
  • 台州网站建设优化内蒙建设厅网站
  • 网站免费正能量不用下载在原备案号下增加新网站
  • 可以做网站的公司有哪些项目建设目标怎么写
  • 户外网站 整站下载网站开发目录结构
  • 深圳北网站建设众筹网站开发成本
  • 潜江网站开发公众号小程序免费开通
  • 怀柔富阳网站建设企业联系电话
  • 网站域名被注销托管经营
  • 网站开发脚本解析器百度公司好进吗
  • 北京网站建设的服务用手机域名做网站有多少
  • 如何推广网站网站推广常用方法上海建交人才网官网
  • 乐清网站优化推广广州市南沙住房和建设局网站