当前位置: 首页 > news >正文

辽河油田建设有限公司网站怎么样做网站推广

辽河油田建设有限公司网站,怎么样做网站推广,河南专业网站建设公司首选,如何自已建网站前言 今天是我写博客的第 200 篇#xff0c;恍惚间两年过去了#xff0c;现在已经是大三的学长了。仍然记得两年前第一次写博客的时候#xff0c;当时学的应该是 Java 语言#xff0c;菜的一批#xff0c;写了就删#xff0c;怕被人看到丢脸。当时就想着自己一年之后恍惚间两年过去了现在已经是大三的学长了。仍然记得两年前第一次写博客的时候当时学的应该是 Java 语言菜的一批写了就删怕被人看到丢脸。当时就想着自己一年之后两年之后能学到什么水平什么是 JDBC、什么是 MVC、SSM在当时都是特别好奇的东西不过都在后来的学习中慢慢接触到并且好多已经烂熟于心了。 那今天我在畅想一下一年后的今天我又学到了什么水平能否达到三花聚顶、草木山石皆可为码的超凡入圣的境界拿没拿到心仪的 offer和那个心动过的女孩相处怎么样了哈哈哈哈哈 输出算子Sink 学完了 Flink 在不同执行环境本地测试环境和集群环境下的多种读取多种数据源和转换操作多种转换算子最后就是输出操作了。 1、连接到外部系统 Flink 1.12 之前Sink 算子是通过调用 DataStream 的 addSink 方法来实现的 stream.addSink(new SinkFunction(...)); 从 Flink 1.12 开始Flink 重构了 Sink 架构 stream.sinkTo(...) 查看 Flink 支持的连接器 需要我们自己导入依赖比如上面的 Kfaka 和 DataGen 我们之前使用的时候都导入过相关依赖需要知道有的是只支持source有的只支持sink有的全都支持。 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-datagen/artifactIdversion${flink.version}/version/dependency 2、输出到文件 Flink 专门提供了一个流式文件系统的连接器FileSink为批处理和流处理提供了一个统一的 Sink它可以将分区文件写入 Flink支持的文件系统。         它的主要操作是将数据写入桶buckets每个桶中的数据都可以分割成一个个大小有限的分区文件这样一来就实现真正意义上的分布式文件存储。我们可以通过各种配置来控制“分桶”的操作默认的分桶方式是基于时间的我们每小时写入一个新的桶。换句话说每个桶内保存的文件记录的都是 1 小时的输出数据。         FileSink 支持行编码Row-encoded和批量编码Bulk-encoded比如 Parquet格式。这两种不同的方式都有各自的构建器builder调用方法也非常简单可以直接调用 FileSink 的静态方法 行编码FileSink.forRowFormatbasePathrowEncoder。批量编码FileSink.forBulkFormatbasePathbulkWriterFactory。 在创建行或批量编码 Sink 时我们需要传入两个参数用来指定存储桶的基本路径basePath和数据的编码逻辑rowEncoder 或 bulkWriterFactory。 package com.lyh.sink;import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; import org.apache.flink.configuration.MemorySize; import org.apache.flink.connector.datagen.source.DataGeneratorSource; import org.apache.flink.connector.datagen.source.GeneratorFunction; import org.apache.flink.connector.file.sink.FileSink; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;import java.time.Duration; import java.time.ZoneId;/*** author 刘xx* version 1.0* date 2023-11-18 9:51*/ public class SinkFile {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 必须开启 检查点 不然一直都是 .inprogressenv.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);DataGeneratorSourceString dataGeneratorSource new DataGeneratorSourceString(new GeneratorFunctionLong, String() {Overridepublic String map(Long value) throws Exception {return Number:value;}},Long.MAX_VALUE,RateLimiterStrategy.perSecond(10), // 每s 10条Types.STRING);DataStreamSourceString dataGen env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), data-generate);// todo 输出到文件系统FileSinkString fileSink FileSink.// 泛型方法 需要和输出结果的泛型保持一致StringforRowFormat(new Path(D:/Desktop), // 指定输出路径 可以是 hdfs:// 路径new SimpleStringEncoder(UTF-8)) // 指定编码.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix(lyh).withPartSuffix(.log).build())// 按照目录分桶 一个小时一个目录(这里的时间格式别改为分钟 会报错: flink Relative path in absolute URI:).withBucketAssigner(new DateTimeBucketAssigner(yyyy-MM-dd HH, ZoneId.systemDefault()))// 设置文件滚动策略-时间或者大小 10s 或 1KB 或 5min内没有新数据写入 滚动一次// 滚动的时候 文件就会更名为我们设定的格式(前缀)不再写入.withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofSeconds(10L)) // 10s.withMaxPartSize(new MemorySize(1024)) // 1KB.withInactivityInterval(Duration.ofMinutes(5)) // 5min.build()).build();dataGen.sinkTo(fileSink);env.execute();} }这里我们创建了一个简单的文件 Sink通过.withRollingPolicy()方法指定了一个“滚动策略”。“滚动”的概念在日志文件的写入中经常遇到因为文件会有内容持续不断地写入所以我们应该给一个标准到什么时候就开启新的文件将之前的内容归档保存。也就是说上面的代码设置了在以下 3 种情况下我们就会滚动分区文件 ⚫ 至少包含 10 秒的数据 ⚫ 最近 5 分钟没有收到新的数据 ⚫ 文件大小已达到 1 KB 通过 withOutputFileConfig()方法指定了输出的文件名前缀和后缀。 需要特别注意的就是一定要开启检查点否则我们的数据一直都是正在写入的状态具体原因后面学习到检查点的时候会详细说。 运行结果 3、输出到 Kafka 需要添加 Kafka 依赖之前导入过了启动 Kafka编写示例代码 package com.lyh.sink;import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.kafka.clients.producer.ProducerConfig;/*** author 刘xx* version 1.0* date 2023-11-18 11:20*/ public class SinkKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 如果是 精准一次 必须开启 checkpointenv.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);SingleOutputStreamOperatorString sensorDS env.socketTextStream(localhost, 9999);KafkaSinkString kafkaSink KafkaSink.Stringbuilder()// 指定 kafka 的地址和端口.setBootstrapServers(hadoop102:9092,hadoop103:9092,hadoop104:9092)// 指定序列化器 我们是发送方 所以我们是生产者.setRecordSerializer(KafkaRecordSerializationSchema.Stringbuilder().setTopic(like).setValueSerializationSchema(new SimpleStringSchema()).build())// 写到 kafka 的一致性级别: 精准一次 / 至少一次// 如果是精准一次// 1.必须开启检查点 env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE)// 2.必须设置事务的前缀// 3.必须设置事务的超时时间: 大于 checkpoint间隔 小于 max 15分钟.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix(lyh-).setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,10*60*1000).build();sensorDS.sinkTo(kafkaSink);env.execute();} }启动 kafka 并开启一个消费者 kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic like运行结果 需要特别注意的三点 如果是精准一次1.必须开启检查点 env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE)2.必须设置事务的前缀3.必须设置事务的超时时间: 大于 checkpoint间隔 小于 max 15分钟 自定义序列化器 我们上面用的自带的序列化器但是如果我们有 key 的话就需要自定义序列化器了替换上面的代码 .setRecordSerializer(/*** 如果要指定写入 kafka 的key 就需要自定义序列化器* 实现一个接口 重写序列化方法* 指定key 转为 bytes[]* 指定value 转为 bytes[]* 返回一个 ProducerRecord(topic名,key,value)对象*/new KafkaRecordSerializationSchemaString() {NullableOverride// ProducerRecordbyte[], byte[] 返回一个生产者消息,key,value 分别对应两个字节数组public ProducerRecordbyte[], byte[] serialize(String element, KafkaSinkContext context, Long timestamp) {String[] datas element.split(,);byte[] key datas[0].getBytes(StandardCharsets.UTF_8);byte[] value element.getBytes(StandardCharsets.UTF_8);return new ProducerRecord(like,key,value);}} ) 运行结果  4、输出到 MySQL 添加依赖1.17版本的依赖需要指定仓库才能找到因为阿里云和默认的maven仓库是没有的 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc/artifactIdversion1.17-SNAPSHOT/version/dependency dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion8.0.31/version/dependency....repositoriesrepositoryidapache-snapshots/idnameapache snapshots/nameurlhttps://repository.apache.org/content/repositories/snapshots//url/repository/repositories 创建表格  编写代码将输入的数据行分隔为对象参数每行数据生成一个对象进行处理。  package com.lyh.sink;import com.lyh.bean.WaterSensor; import function.WaterSensorFunction; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; import org.apache.flink.connector.jdbc.JdbcSink; import org.apache.flink.connector.jdbc.JdbcStatementBuilder; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.sql.PreparedStatement; import java.sql.SQLException;/*** author 刘xx* version 1.0* date 2023-11-18 12:32*/ public class SinkMySQL {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(localhost, 9999).map(new WaterSensorFunction()); //输入进来的数据自动转为 WaterSensor类型/*** todo 写入 mysql* 1.这里需要用旧的sink写法addSink* 2.JDBC的4个参数* (1) 执行的sql语句* (2) 对占位符进行填充* (3) 执行选项 - 攒批,重试* (4) 连接选项 - driver,username,password,url*/SinkFunctionWaterSensor jdbcSink JdbcSink.sink(insert into flink.ws values(?,?,?),// 指定 sql 中占位符的值new JdbcStatementBuilderWaterSensor() {Overridepublic void accept(PreparedStatement stmt, WaterSensor sensor) throws SQLException {// 占位符从 1 开始stmt.setString(1, sensor.getId());stmt.setLong(2, sensor.getTs());stmt.setInt(3, sensor.getVc());}}, JdbcExecutionOptions.builder().withMaxRetries(3) //最多重试3次(不包括第一次,共4次).withBatchSize(100) //每收集100条记录进行一次写入.withBatchIntervalMs(3000) // 批次3s(即使没有达到100条记录,只要过了3s JDBCSink也会进行记录的写入),这有助于确保数据及时写入而不是无限期地等待批处理大小达到。.build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl(jdbc:mysql://localhost:3306/flink?serverTimezoneAsia/ShanghaiuseUnicodetruecharacterEncodingUTF-8).withDriverName(com.mysql.cj.jdbc.Driver).withUsername(root).withPassword(Yan1029.)// mysql 默认8小时不使用连接就主动断开连接.withConnectionCheckTimeoutSeconds(60) // 重试连接直接的间隔,上面我们设置最多重试3次,每次间隔60s.build());sensorDS.addSink(jdbcSink);env.execute();} }查询结果 5、自定义 Sink 输出 与 Source 类似Flink 为我们提供了通用的 SinkFunction 接口和对应的 RichSinkDunction抽象类只要实现它通过简单地调用 DataStream 的.addSink()方法就可以自定义写入任何外部存储。 这里我们自定义实现一个向 HBase 中插入数据的 Sink。 注意这里只是做一个简单的 Demo下面的代码不难发现我们只是对 nosq:student 表下的 info:name 进行了两次的覆盖。如果要实现复杂的处理功能需要对数据类型进行定义因为 HBase 的数据是按列存储的所以对于复杂的 Hbase 表我们难以通过 Java bean 来插入数据。而且一般经常用的连接器Flink 大部分已经提供了开发中我们一般也很少自定义 Sink 输出。 package com.lyh.sink;import com.lyh.utils.HBaseConnection; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table;import java.nio.charset.StandardCharsets;/*** author 刘xx* version 1.0* date 2023-11-18 15:59*/ public class SinkCustomHBase {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.fromElements(tom,bob).addSink(new RichSinkFunctionString() {public Connection con;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);con HBaseConnection.getConnection(hadoop102:2181);}Overridepublic void invoke(String value, Context context) throws Exception {super.invoke(value, context);Table table con.getTable(TableName.valueOf(nosql,student));Put put new Put(1001.getBytes(StandardCharsets.UTF_8));put.addColumn(info.getBytes(StandardCharsets.UTF_8),name.getBytes(StandardCharsets.UTF_8),value.getBytes(StandardCharsets.UTF_8));table.put(put);table.close();}Overridepublic void close() throws Exception {super.close();HBaseConnection.close();}});env.execute();} }这里用到一个简单的连接 HBase 的工具类   package com.lyh.utils;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory;import java.io.IOException;/*** author 刘xx* version 1.0* date 2023-11-18 16:04*/ public class HBaseConnection {private static Connection connection;public static Connection getConnection(String hosts) throws IOException {Configuration conf new Configuration();conf.set(hbase.zookeeper.quorum, hosts);conf.setInt(hbase.rpc.timeout, 10000); // 设置最大超时 10 sconnection ConnectionFactory.createConnection(conf);return connection;}public static void close() throws IOException {if (connection!null)connection.close();} }
http://wiki.neutronadmin.com/news/52505/

相关文章:

  • oss可以做视频网站吗c 网站开发 书
  • 甘肃永靖建设住建局网站网站建设 采集
  • 网站开发文档是什么概念品牌宣传型网站建设方案
  • 镇江做网站seo做国际贸易用什么网站
  • 免费看电影的网站是什么什么是seo
  • 外贸网站的推广技巧有哪些wordpress分类图标
  • 网站建设工作室发展化妆品网站栏目策划
  • 天津响应式网站建设制作济南手机网站建设专业定制
  • 手机端网站建设方案html网页制作大作业范例
  • 技术支持 东莞网站建设网上注册公司在哪个平台注册
  • 红叶网站建设方案wordpress在线监测
  • 做网站公司怎么做百度官网首页
  • 网站的验证码是怎么做的温州开发网站公司
  • 网站直播用php怎么做钢丝高频退火设备网站建设
  • 南昌网站建设方案优化我想给网站网站做代理
  • 报名网站建设定做工程信息建程网
  • 成都网站建设低价wordpress网站关键词设置
  • 网站建设未来高密 网站建设
  • 同时在线上万人的网站需要什么配置云服务器wordpress照片墙
  • 建设一个网站需要多长时间郑州家居网站建设服务公司
  • 建设网站好公司简介wordpress 高端主题
  • 海口市公司网站建设seo快速排名外包
  • 旅游网站建设总结报告网站建设驻地开发合同
  • js代码下载网站wordpress 抱歉您不能访问此页面
  • 品牌红酒的网站建设南通模板建站定制
  • 网站建设与网页设计从入门到精通 pdf上海网站备案信息注销
  • 浅谈企业网站建设的目标关键词搜索站长工具
  • 广州 seo的网站人人秀h5制作软件下载
  • 做网站有没有前景移动互联网公司有哪些
  • 站长查询域名海港开发区人才网