个人网站备案内容,浙江省嘉兴市建设局网站,河北商城网站搭建多少钱,如何做网站制作1.Debezium-Embedded 简介 Debezium连接器的操作通常是将它们部署到Kafka Connect服务#xff0c;并配置一个或多个连接器来监控上游数据库#xff0c;并为它们在上游数据库中看到的所有更改生成数据更改事件。这些数据更改事件被写入Kafka#xff0c;在那里它们可以被许多不…1.Debezium-Embedded 简介 Debezium连接器的操作通常是将它们部署到Kafka Connect服务并配置一个或多个连接器来监控上游数据库并为它们在上游数据库中看到的所有更改生成数据更改事件。这些数据更改事件被写入Kafka在那里它们可以被许多不同的应用程序独立使用。Kafka Connect提供了出色的容错性和可扩展性因为它作为分布式服务运行并确保所有注册和配置的连接器始终在运行。例如即使集群中的一个Kafka Connect端点出现故障其余的Kafka连接端点也会重新启动以前在现已终止的端点上运行的任何连接器从而最大限度地减少停机时间并消除管理活动。 并不是每个应用程序都需要这种级别的容错和可靠性他们可能不想依赖外部的Kafka代理和Kafka Connect服务集群。相反一些应用程序更喜欢将Debezium连接器直接嵌入到应用程序空间中。他们仍然想要相同的数据更改事件但更喜欢让连接器将它们直接发送到应用程序而不是将它们保存在Kafka中。 这个Debezium-Embedded模块定义了一个小型库允许应用程序轻松配置和运行debezium连接器。
2.MySQL端配置
2.1 开启日志 MySQL开启日志配置可参考MySQL 主从配置-CSDN博客实现。 show variables like log_%; 2.2 创建监控账号并授权 #创建账号 create user debezium% identified with mysql_native_password by wsx-123; #给账号授权 GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO debezium%; #刷新权限 FLUSH PRIVILEGES; 3.应用端开发
3.1 maven 引用debezium-embedded dependencygroupIdio.debezium/groupIdartifactIddebezium-embedded/artifactIdversion${debezium-embedded.version}/version
/dependency
dependencygroupIdio.debezium/groupIdartifactIddebezium-connector-mysql/artifactIdversion${debezium-embedded.version}/version
/dependency 3.2 代码开发
package com.dayesmart.dataplusjava.util;import io.debezium.config.Configuration;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.engine.DebeziumEngine;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.source.SourceRecord;import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;Slf4j
public class DebeziumTest {public static void main(String[] args) {Executor executor Executors.newSingleThreadExecutor();Configuration config Configuration.create()/* begin engine properties */.with(connector.class,io.debezium.connector.mysql.MySqlConnector).with(offset.storage,org.apache.kafka.connect.storage.FileOffsetBackingStore).with(offset.storage.file.filename,E:/tmp/debezium/offset.dat).with(offset.flush.interval.ms, 60000)/* begin connector properties */.with(name, my-sql-connector).with(database.hostname, 127.0.0.1).with(database.port, 3307).with(database.user, debezium).with(database.password, wsx-123).with(database.connectionTimeZone, Asia/Shanghai).with(database.server.id, 85744).with(database.include.list,test).with(snapshot.mode,initial).with(database.server.name,weisx).with(database.history,io.debezium.relational.history.FileDatabaseHistory).with(database.history.file.filename,E:/tmp/debezium/schemahistory.dat).build();// Create the engine with this configuration ...EmbeddedEngine engine EmbeddedEngine.create().using(config).notifying(new EmbeddedEngine.ChangeConsumer(){Overridepublic void handleBatch(ListSourceRecord list, DebeziumEngine.RecordCommitterSourceRecord recordCommitter) throws InterruptedException {log.info({},list);}}).using((success,message,error) -{log.info(success:{},message:{},error:{},success,message,error);}).build();// Run the engine asynchronously ...executor.execute(engine);}}