睢宁网站建设xzqjwl,网站主要功能,学校网站建设的难点,淘宝网站建设的特点0、题记实际业务场景中#xff0c;会遇到基础数据存在 Mysql 中#xff0c;实时写入数据量比较大的情景。迁移至kafka是一种比较好的业务选型方案。而mysql写入kafka的选型方案有#xff1a;方案一#xff1a;logstash_output_kafka 插件。方案二#xff1a;kafka_connect…0、题记实际业务场景中会遇到基础数据存在 Mysql 中实时写入数据量比较大的情景。迁移至kafka是一种比较好的业务选型方案。 而mysql写入kafka的选型方案有方案一logstash_output_kafka 插件。方案二kafka_connector。方案三debezium 插件。方案四flume。方案五其他类似方案。其中debezium和flume是基于 mysql binlog 实现的。如果需要同步历史全量数据实时更新数据建议使用logstash。1、logstash同步原理常用的logstash的插件是logstash_input_jdbc实现关系型 数据库 到Elasticsearch等的同步。实际上 核心logstash的同步原理的掌握 有助于大家理解类似的各种库之间的同步。logstash 核心原理 输入生成事件过滤器修改它们输出将它们发送到其他地方。logstash核心三部分组成input、filter、output。 input { }filter { }output { }1.1 input输入包含但远不限于jdbc关系型数据库mysql、 oracle 等。file从文件系统上的文件读取。syslog在已知端口514上侦听syslog消息。redisredis消息。beats处理 Beats发送的事件。kafkakafka实时数据流。1.2 filter过滤器过滤器是Logstash管道中的中间处理设备。您可以将过滤器与条件组合以便在事件满足特定条件时对其执行操作。可以把它比作数据处理的 ETL 环节。一些有用的过滤包括grok解析并构造任意文本。 Grok是目前Logstash中将非结构化日志数据解析为结构化和可查询内容的最佳方式 。有了内置于Logstash的120种模式您很可能会找到满足您需求的模式mutate对事件字段执行常规转换。您可以重命名删除替换和修改事件中的字段。drop完全删除事件例如调试事件。clone制作事件的副本可能添加或删除字段。geoip添加有关IP地址的地理位置的信息。1.3 output输出输出是Logstash管道的最后阶段。一些常用的输出包括elasticsearch将事件数据发送到Elasticsearch。file将事件数据写入磁盘上的文件。kafka将事件写入Kafka。详细的filter demo参考http://t.cn/EaAt4zP2、同步Mysql到kafka配置参考input {jdbc {jdbc_connection_string jdbc:mysql://192.168.1.12:3306/news_basejdbc_user rootjdbc_password xxxxxxxjdbc_driver_library /home/logstash-6.4.0/lib/mysql-connector-java-5.1.47.jarjdbc_driver_class com.mysql.jdbc.Driver#schedule * * * * *statement SELECT * from news_info WHERE id :sql_last_value order by iduse_column_value truetracking_column id tracking_column_type numericrecord_last_run truelast_run_metadata_path /home/logstash-6.4.0/sync_data/news_last_run }}filter {ruby{code event.set(gather_time_unix,event.get(gather_time).to_i*1000)}ruby{code event.set(publish_time_unix,event.get(publish_time).to_i*1000)}mutate {remove_field [ version ]remove_field [ timestamp ]remove_field [ gather_time ]remove_field [ publish_time ]}}output {kafka {bootstrap_servers 192.168.1.13:9092codec json_linestopic_id mytopic}file {codec json_linespath /tmp/output_a.log}}以上内容不复杂不做细讲。注意Mysql借助logstash同步后日期类型格式“2019-04-20 13:55:53”已经被识别为日期格式。code event.set(gather_time_unix,event.get(gather_time).to_i*1000)是将Mysql中的时间格式转化为时间戳格式。3、坑总结3.1 坑1字段大小写问题from星友使用logstash同步mysql数据的因为在jdbc.conf里面没有添加 lowercase_column_names false 这个属性所以logstash默认把查询结果的列明改为了小写同步进了es所以就导致es里面看到的字段名称全是小写。最后总结es是支持大写字段名称的问题出在logstash没用好需要在同步配置中加上 lowercase_column_names false 。记录下来希望可以帮到更多人。3.2 同步到ES中的数据会不会重复想将关系数据库的数据同步至ES中如果在集群的多台 服务器 上同时启动logstash。解读实际项目中就是没用随机id 使用指定id作为es的_id 指定id可以是url的md5.这样相同数据就会走更新覆盖以前数据3.3 相同配置logstash,升级6.3之后不能同步数据。解读高版本基于时间增量有优化。tracking_column_type timestamp 应该是需要指定标识为时间类型默认为数字类型numeric3.4 ETL字段统一在哪处理?解读可以logstash同步mysql的时候sql查询阶段处理如 select a_value as avalue*** 。或者filter阶段处理, mutate rename 处理。mutate {rename [shortHostname, hostname ]}或者kafka阶段借助kafka stream处理。4、小结相关配置和同步都不复杂复杂点往往在于filter阶段的解析还有logstash性能问题。需要结合实际业务场景做深入的研究和性能分析。有问题欢迎留言讨论。推荐阅读3、 一张图理清楚关系型数据库与Elasticsearch同步 http://t.cn/EaAceD34、新的实现http://t.cn/EaAt60O5、mysql2mysql: http://t.cn/EaAtK7r 6、推荐开源实现 http://t.cn/EaAtjqN 加入星球更短时间更快习得更多干货