邯郸建设企业网站,十大装饰公司排行榜,软文写作技巧,十大战略咨询公司水善利万物而不争#xff0c;处众人之所恶#xff0c;故几于道#x1f4a6; 文章目录 1. Kafka_Sink 2. Kafka_Sink - 自定义序列化器 3. Redis_Sink_String 4. Redis_Sink_list 5. Redis_Sink_set 6. Redis_Sink_hash 7. 有界流数据写入到ES 8. 无界流数据写入到ES 9. 自定… 水善利万物而不争处众人之所恶故几于道 文章目录 1. Kafka_Sink 2. Kafka_Sink - 自定义序列化器 3. Redis_Sink_String 4. Redis_Sink_list 5. Redis_Sink_set 6. Redis_Sink_hash 7. 有界流数据写入到ES 8. 无界流数据写入到ES 9. 自定义sink - mysql_Sink 10. Jdbc_Sink 官方文档 - Flink1.13 1. Kafka_Sink
addSink(new FlinkKafkaProducer String(kafka_address,topic,序列化器)
要先添加依赖
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka_2.12/artifactIdversion1.13.6/version
/dependencypublic static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ArrayListWaterSensor waterSensors new ArrayList();waterSensors.add(new WaterSensor(sensor_1, 1607527992000L, 20));waterSensors.add(new WaterSensor(sensor_1, 1607527994000L, 50));waterSensors.add(new WaterSensor(sensor_1, 1607527996000L, 50));waterSensors.add(new WaterSensor(sensor_2, 1607527993000L, 10));waterSensors.add(new WaterSensor(sensor_2, 1607527995000L, 30));DataStreamSourceWaterSensor stream env.fromCollection(waterSensors);stream.keyBy(WaterSensor::getId).sum(vc).map(JSON::toJSONString).addSink(new FlinkKafkaProducerString(hadoop101:9092, // kafaka地址flink_sink_kafka, //要写入的Kafkatopicnew SimpleStringSchema() // 序列化器));try {env.execute();} catch (Exception e) {e.printStackTrace();}
}运行结果
2. Kafka_Sink - 自定义序列化器 自定义序列化器new FlinkKafkaProducer()的时候选择四个参数的构造方法然后使用new KafkaSerializationSchema序列化器。然后重写serialize方法
public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ArrayListWaterSensor waterSensors new ArrayList();waterSensors.add(new WaterSensor(sensor_1, 1607527992000L, 20));waterSensors.add(new WaterSensor(sensor_1, 1607527994000L, 50));waterSensors.add(new WaterSensor(sensor_1, 1607527996000L, 50));waterSensors.add(new WaterSensor(sensor_2, 1607527993000L, 10));waterSensors.add(new WaterSensor(sensor_2, 1607527995000L, 30));DataStreamSourceWaterSensor stream env.fromCollection(waterSensors);Properties sinkConfig new Properties();sinkConfig.setProperty(bootstrap.servers,hadoop101:9092);stream.keyBy(WaterSensor::getId).sum(vc).addSink(new FlinkKafkaProducerWaterSensor(defaultTopic, // 默认发往的topic ,一般用不上new KafkaSerializationSchemaWaterSensor() { // 自定义的序列化器Overridepublic ProducerRecordbyte[], byte[] serialize(WaterSensor waterSensor,Nullable Long aLong) {String s JSON.toJSONString(waterSensor);return new ProducerRecord(flink_sink_kafka,s.getBytes(StandardCharsets.UTF_8));}},sinkConfig, // Kafka的配置FlinkKafkaProducer.Semantic.AT_LEAST_ONCE // 一致性语义现在只能传入至少一次));try {env.execute();} catch (Exception e) {e.printStackTrace();}
}运行结果
3. Redis_Sink_String
addSink(new RedisSink(config, new RedisMapper WaterSensor() {}
写到String结构里面
添加依赖
dependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.83/version
/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-redis_2.11/artifactIdversion1.1.5/version
/dependencypublic static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ArrayListWaterSensor waterSensors new ArrayList();waterSensors.add(new WaterSensor(sensor_1, 1607527992000L, 20));waterSensors.add(new WaterSensor(sensor_1, 1607527994000L, 50));waterSensors.add(new WaterSensor(sensor_1, 1607527996000L, 50));waterSensors.add(new WaterSensor(sensor_2, 1607527993000L, 10));waterSensors.add(new WaterSensor(sensor_2, 1607527995000L, 30));DataStreamSourceWaterSensor stream env.fromCollection(waterSensors);SingleOutputStreamOperatorWaterSensor result stream.keyBy(WaterSensor::getId).sum(vc);/*
往redis里面写字符串string 命令提示符用set
假设写的key是id,value是整个json格式的字符串
key value
sensor_1 json格式字符串*/// new一个单机版的配置FlinkJedisPoolConfig config new FlinkJedisPoolConfig.Builder().setHost(hadoop101).setPort(6379).setMaxTotal(100) //最大连接数量.setMaxIdle(10) // 连接池里面的最大空闲.setMinIdle(2) // 连接池里面的最小空闲.setTimeout(10*1000) // 超时时间.build();// 写出到redis中result.addSink(new RedisSink(config, new RedisMapperWaterSensor() {// 返回命令描述符往不同的数据结构写数据用的方法不一样Overridepublic RedisCommandDescription getCommandDescription() {// 写入到字符串用setreturn new RedisCommandDescription(RedisCommand.SET);}Overridepublic String getKeyFromData(WaterSensor waterSensor) {return waterSensor.getId();}Overridepublic String getValueFromData(WaterSensor waterSensor) {return JSON.toJSONString(waterSensor);}}));try {env.execute();} catch (Exception e) {e.printStackTrace();}
}运行结果
4. Redis_Sink_list
addSink(new RedisSink(config, new RedisMapper WaterSensor() {}
写到 list 结构里面
public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ArrayListWaterSensor waterSensors new ArrayList();waterSensors.add(new WaterSensor(sensor_1, 1607527992000L, 20));waterSensors.add(new WaterSensor(sensor_1, 1607527994000L, 50));waterSensors.add(new WaterSensor(sensor_1, 1607527996000L, 50));waterSensors.add(new WaterSensor(sensor_2, 1607527993000L, 10));waterSensors.add(new WaterSensor(sensor_2, 1607527995000L, 30));DataStreamSourceWaterSensor stream env.fromCollection(waterSensors);SingleOutputStreamOperatorWaterSensor result stream.keyBy(WaterSensor::getId).sum(vc);// key是idvalue是处理后的json格式字符串FlinkJedisPoolConfig config new FlinkJedisPoolConfig.Builder().setHost(hadoop101).setPort(6379).setMaxTotal(100) //最大连接数量.setMaxIdle(10) // 连接池里面的最大空闲.setMinIdle(2) // 连接池里面的最小空闲.setTimeout(10*1000) // 超时时间.build();result.addSink(new RedisSink(config, new RedisMapperWaterSensor() {Overridepublic RedisCommandDescription getCommandDescription() {// 写入listreturn new RedisCommandDescription(RedisCommand.RPUSH);}Overridepublic String getKeyFromData(WaterSensor waterSensor) {return waterSensor.getId();}Overridepublic String getValueFromData(WaterSensor waterSensor) {return JSON.toJSONString(waterSensor);}}));try {env.execute();} catch (Exception e) {e.printStackTrace();}
}运行结果
5. Redis_Sink_set
addSink(new RedisSink(config, new RedisMapper WaterSensor() {}
写到 set 结构里面
public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ArrayListWaterSensor waterSensors new ArrayList();waterSensors.add(new WaterSensor(sensor_1, 1607527992000L, 20));waterSensors.add(new WaterSensor(sensor_1, 1607527994000L, 50));waterSensors.add(new WaterSensor(sensor_1, 1607527996000L, 50));waterSensors.add(new WaterSensor(sensor_2, 1607527993000L, 10));waterSensors.add(new WaterSensor(sensor_2, 1607527995000L, 30));DataStreamSourceWaterSensor stream env.fromCollection(waterSensors);SingleOutputStreamOperatorWaterSensor result stream.keyBy(WaterSensor::getId).sum(vc);FlinkJedisPoolConfig config new FlinkJedisPoolConfig.Builder().setHost(hadoop101).setPort(6379).setMaxTotal(100).setMaxIdle(10).setMinIdle(2).setTimeout(10*1000).build();result.addSink(new RedisSink(config, new RedisMapperWaterSensor() {Overridepublic RedisCommandDescription getCommandDescription() {// 数据写入set集合return new RedisCommandDescription(RedisCommand.SADD);}Overridepublic String getKeyFromData(WaterSensor waterSensor) {return waterSensor.getId();}Overridepublic String getValueFromData(WaterSensor waterSensor) {return JSON.toJSONString(waterSensor);}}));try {env.execute();} catch (Exception e) {e.printStackTrace();}
}运行结果
6. Redis_Sink_hash
addSink(new RedisSink(config, new RedisMapper WaterSensor() {}
写到 hash结构里面
public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ArrayListWaterSensor waterSensors new ArrayList();waterSensors.add(new WaterSensor(sensor_1, 1607527992000L, 20));waterSensors.add(new WaterSensor(sensor_1, 1607527994000L, 50));waterSensors.add(new WaterSensor(sensor_1, 1607527996000L, 50));waterSensors.add(new WaterSensor(sensor_2, 1607527993000L, 10));waterSensors.add(new WaterSensor(sensor_2, 1607527995000L, 30));DataStreamSourceWaterSensor stream env.fromCollection(waterSensors);SingleOutputStreamOperatorWaterSensor result stream.keyBy(WaterSensor::getId).sum(vc);FlinkJedisPoolConfig config new FlinkJedisPoolConfig.Builder().setHost(hadoop101).setPort(6379).setMaxTotal(100).setMaxIdle(10).setMinIdle(2).setTimeout(10*1000).build();result.addSink(new RedisSink(config, new RedisMapperWaterSensor() {Overridepublic RedisCommandDescription getCommandDescription() {// 数据写入hashreturn new RedisCommandDescription(RedisCommand.HSET,a);}Overridepublic String getKeyFromData(WaterSensor waterSensor) {return waterSensor.getId();}Overridepublic String getValueFromData(WaterSensor waterSensor) {return JSON.toJSONString(waterSensor);}}));try {env.execute();} catch (Exception e) {e.printStackTrace();}
}运行结果
7. 有界流数据写入到ES中
new ElasticsearchSink.Builder()
public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ArrayListWaterSensor waterSensors new ArrayList();waterSensors.add(new WaterSensor(sensor_1, 1607527992000L, 20));waterSensors.add(new WaterSensor(sensor_1, 1607527994000L, 50));waterSensors.add(new WaterSensor(sensor_1, 1607527996000L, 50));waterSensors.add(new WaterSensor(sensor_2, 1607527993000L, 10));waterSensors.add(new WaterSensor(sensor_2, 1607527995000L, 30));DataStreamSourceWaterSensor stream env.fromCollection(waterSensors);SingleOutputStreamOperatorWaterSensor result stream.keyBy(WaterSensor::getId).sum(vc);ListHttpHost hosts Arrays.asList(new HttpHost(hadoop101, 9200),new HttpHost(hadoop102, 9200),new HttpHost(hadoop103, 9200));ElasticsearchSink.BuilderWaterSensor builder new ElasticsearchSink.BuilderWaterSensor(hosts,new ElasticsearchSinkFunctionWaterSensor() {Overridepublic void process(WaterSensor element, // 需要写出的元素RuntimeContext runtimeContext, // 运行时上下文 不是context上下文对象RequestIndexer requestIndexer) { // 把要写出的数据封装到RequestIndexer里面String msg JSON.toJSONString(element);IndexRequest ir Requests.indexRequest(sensor).type(_doc) // 定义type的时候, 不能下划线开头. _doc是唯一的特殊情况.id(element.getId()) // 定义每条数据的id. 如果不指定id, 会随机分配一个id. id重复的时候会更新数据.source(msg, XContentType.JSON);requestIndexer.add(ir); // 把ir存入到indexer, 就会自动的写入到es中}});result.addSink(builder.build());try {env.execute();} catch (Exception e) {e.printStackTrace();}
}8. 无界流数据写入到ES 和有界差不多 只不过把数据源换成socket然后因为无界流它高效不是你来一条就刷出去所以设置刷新时间、大小、条数才能看到结果。
public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);SingleOutputStreamOperatorWaterSensor result env.socketTextStream(hadoop101,9999).map(line-{String[] data line.split(,);return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).sum(vc);ListHttpHost hosts Arrays.asList(new HttpHost(hadoop101, 9200),new HttpHost(hadoop102, 9200),new HttpHost(hadoop103, 9200));ElasticsearchSink.BuilderWaterSensor builder new ElasticsearchSink.BuilderWaterSensor(hosts,new ElasticsearchSinkFunctionWaterSensor() {Overridepublic void process(WaterSensor element, // 需要写出的元素RuntimeContext runtimeContext, // 运行时上下文 不是context上下文对象RequestIndexer requestIndexer) { // 把要写出的数据封装到RequestIndexer里面String msg JSON.toJSONString(element);IndexRequest ir Requests.indexRequest(sensor).type(_doc) // 定义type的时候, 不能下划线开头. _doc是唯一的特殊情况.id(element.getId()) // 定义每条数据的id. 如果不指定id, 会随机分配一个id. id重复的时候会更新数据.source(msg, XContentType.JSON);requestIndexer.add(ir); // 把ir存入到indexer, 就会自动的写入到es中}});// 自动刷新时间builder.setBulkFlushInterval(2000); // 默认不会根据时间自动刷新builder.setBulkFlushMaxSizeMb(1024); // 当批次中的数据大于等于这个值刷新builder.setBulkFlushMaxActions(2); // 每来多少条数据刷新一次// 这三个是或的关系只要有一个满足就会刷新result.addSink(builder.build());try {env.execute();} catch (Exception e) {e.printStackTrace();}
}9. 自定义sink - mysql_Sink 需要写一个类实现RichSinkFunction然后实现invoke方法。这里因为是写MySQL所以需要建立连接那就用Rich版本。 记得导入MySQL依赖
public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port, 1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ArrayListWaterSensor waterSensors new ArrayList();waterSensors.add(new WaterSensor(sensor_1, 1607527992000L, 20));waterSensors.add(new WaterSensor(sensor_1, 1607527994000L, 50));waterSensors.add(new WaterSensor(sensor_1, 1607527996000L, 50));waterSensors.add(new WaterSensor(sensor_2, 1607527993000L, 10));waterSensors.add(new WaterSensor(sensor_2, 1607527995000L, 30));DataStreamSourceWaterSensor stream env.fromCollection(waterSensors);SingleOutputStreamOperatorWaterSensor result stream.keyBy(WaterSensor::getId).sum(vc);result.addSink(new MySqlSink());try {env.execute();} catch (Exception e) {e.printStackTrace();}}public static class MySqlSink extends RichSinkFunctionWaterSensor {private Connection connection;Overridepublic void open(Configuration parameters) throws Exception {Class.forName(com.mysql.cj.jdbc.Driver);connection DriverManager.getConnection(jdbc:mysql://hadoop101:3306/test?useSSLfalse, root, 123456);}Overridepublic void close() throws Exception {if (connection!null){connection.close();}}// 调用每来一条元素这个方法执行一次Overridepublic void invoke(WaterSensor value, Context context) throws Exception {// jdbc的方式想MySQL写数据
// String sql insert into sensor(id,ts,vc)values(?,?,?);//如果主键不重复就新增主键重复就更新
// String sql insert into sensor(id,ts,vc)values(?,?,?) duplicate key update vc?;String sql replace into sensor(id,ts,vc)values(?,?,?);// 1. 得到预处理语句PreparedStatement ps connection.prepareStatement(sql);// 2. 给sql中的占位符进行赋值ps.setString(1,value.getId());ps.setLong(2,value.getTs());ps.setInt(3,value.getVc());
// ps.setInt(4,value.getVc());// 3. 执行ps.execute();// 4. 提交
// connection.commit(); MySQL默认自动提交所以这个地方不用调用// 5. 关闭预处理ps.close();}
}运行结果
10. Jdbc_Sink
addSink(JdbcSink.sink(sql,JdbcStatementBuilder,执行参数,连接参数) 对于jdbc数据库我们其实没必要自定义因为官方给我们了一个JDBC Sink - 官方JDBC Sink 传送门
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc_2.11/artifactIdversion1.13.6/version
/dependencypublic static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ArrayListWaterSensor waterSensors new ArrayList();waterSensors.add(new WaterSensor(sensor_1, 1607527992000L, 20));waterSensors.add(new WaterSensor(sensor_1, 1607527994000L, 50));waterSensors.add(new WaterSensor(sensor_1, 1607527996000L, 50));waterSensors.add(new WaterSensor(sensor_2, 1607527993000L, 10));waterSensors.add(new WaterSensor(sensor_2, 1607527995000L, 30));DataStreamSourceWaterSensor stream env.fromCollection(waterSensors);SingleOutputStreamOperatorWaterSensor result stream.keyBy(WaterSensor::getId).sum(vc);result.addSink(JdbcSink.sink(replace into sensor(id,ts,vc)values(?,?,?),new JdbcStatementBuilderWaterSensor() {Overridepublic void accept(PreparedStatement ps,WaterSensor waterSensor) throws SQLException {// 只做一件事给占位符赋值ps.setString(1,waterSensor.getId());ps.setLong(2,waterSensor.getTs());ps.setInt(3,waterSensor.getVc());}},new JdbcExecutionOptions.Builder() //设置执行参数.withBatchSize(1024) // 刷新大小上限.withBatchIntervalMs(2000) //刷新间隔.withMaxRetries(3) // 重试次数.build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(com.mysql.cj.jdbc.Driver).withUrl(jdbc:mysql://hadoop101:3306/test?useSSLfalse).withUsername(root).withPassword(123456).build()));try {env.execute();} catch (Exception e) {e.printStackTrace();}
}运行结果