做影视网站须要注意什么,广告资源网,广西金水建设开发有限公司网站,荣成网站制作公司Debezium日常分享系列之#xff1a;Debezium2.4版本之用于 MongoDB的Debezium 连接器 一、综述二、改变流三、阅读偏好四、MongoDB 连接器的工作原理五、支持的 MongoDB 拓扑六、所需的用户权限七、逻辑连接器名称八、执行快照九、临时快照十、增量快照1.增量快照流程2.Debezi… Debezium日常分享系列之Debezium2.4版本之用于 MongoDB的Debezium 连接器 一、综述二、改变流三、阅读偏好四、MongoDB 连接器的工作原理五、支持的 MongoDB 拓扑六、所需的用户权限七、逻辑连接器名称八、执行快照九、临时快照十、增量快照1.增量快照流程2.Debezium 如何解决具有相同主键的记录之间的冲突3.快照窗口4.分片集群的增量快照5.触发增量快照6.使用Kafka信令通道触发增量快照7.具有附加条件的临时增量快照8.停止增量快照9.使用Kafka信令通道停止增量快照10.阻止快照11.阻塞快照进程12.配置快照13.可能重复 十一、流变化十二、原像支持十三、主题名称十四、分区十五、交易元数据十六、变更数据事件丰富十七、数据变更事件十八、更改事件键十九、更改事件值二十、创建事件二十一、更新事件二十二、删除事件二十三、墓碑事件二十四、设置mongodb二十五、最佳 Oplog 配置二十六、部署二十七、MongoDB 连接器配置示例二十八、连接器属性二十九、Debezium 连接器 Kafka 信号配置属性三十、监控 Debezium打通Mongodb数据库数据采集系列文章
Debezium系列之打通Debezium对低版本MongoDB数据库4.0版本的数据采集技术Debezium系列之Debezium2.X版本Mysql数据库、Sqlserver数据库、MongoDB数据库debezium connector最新完整的参数配置并详细介绍参数含义Debezium系列之安装jmx导出器监控debezium指标Debezium系列之深入解读Debezium重要的jmx指标Debezium系列之prometheus采集debezium的jmx数据grafana通过dashboard展示debezium的jmx数据
更多debezium技术文章请阅读博主专栏
Debezium专栏
Debezium 的 MongoDB 连接器跟踪 MongoDB 副本集或 MongoDB 分片集群以获取数据库和集合中的文档更改并将这些更改记录为 Kafka 主题中的事件。连接器自动处理分片集群中分片的添加或删除、每个副本集成员资格的更改、每个副本集中的选举以及等待通信问题的解决。
一、综述
MongoDB 的复制机制提供了冗余和高可用性是在生产环境中运行 MongoDB 的首选方式。 MongoDB 连接器捕获副本集或分片集群中的更改。
MongoDB 副本集由一组服务器组成这些服务器都具有相同数据的副本并且复制可确保客户端对副本集主服务器上的文档所做的所有更改都正确应用于其他副本集的服务器称为辅助服务器。 MongoDB 复制的工作原理是让主数据库记录其 oplog或操作日志中的更改然后每个辅助数据库读取主数据库的 oplog 并按顺序将所有操作应用到自己的文档中。将新服务器添加到副本集时该服务器首先对主服务器上的所有数据库和集合执行快照然后读取主服务器的 oplog 以应用自开始快照以来可能进行的所有更改。当这个新服务器赶上主服务器 oplog 的尾部时它就成为辅助服务器并且能够处理查询。
二、改变流
尽管 Debezium MongoDB 连接器不会成为副本集的一部分但它使用类似的复制机制来获取 oplog 数据。主要区别在于连接器不直接读取 oplog。相反它将 oplog 数据的捕获和解码委托给 MongoDB 更改流功能。通过更改流MongoDB 服务器将集合中发生的更改公开为事件流。 Debezium 连接器监视流然后将更改传递给下游。连接器第一次检测到副本集时它会检查 oplog 以获取最后记录的事务然后执行主数据库和集合的快照。连接器完成数据复制后它会从之前读取的 oplog 位置开始创建更改流。
当 MongoDB 连接器处理更改时它会定期记录 oplog 流中事件起源的位置。当连接器停止时它会记录它处理的最后一个 oplog 流位置以便在重新启动后它可以从该位置恢复流式传输。换句话说连接器可以停止、升级或维护并在一段时间后重新启动并且始终准确地从中断处继续而不会丢失任何事件。当然MongoDB oplog 通常有最大大小上限因此如果连接器长时间停止oplog 中的操作可能会在连接器有机会读取它们之前被清除。在这种情况下重新启动后连接器会检测到丢失的 oplog 操作执行快照然后继续流式传输更改。
MongoDB 连接器还能够很好地容忍副本集的成员资格和领导权的变化、分片集群中分片的添加或删除以及可能导致通信失败的网络问题。连接器始终使用副本集的主节点来流式传输更改因此当副本集进行选举并且不同的节点成为主节点时连接器将立即停止流式传输更改连接到新的主节点并开始使用新的主节点流式传输更改节点。同样如果连接器无法与主副本集通信它会尝试重新连接使用指数退避以免淹没网络或副本集。重新建立连接后连接器将继续传输其捕获的最后一个事件的更改。通过这种方式连接器可以动态调整以适应副本集成员资格的变化并自动处理通信中断。
其他资源
复制机制副本集副本集选举分片集群分片添加分片清除改变流
三、阅读偏好
可以在连接器属性中指定连接的 MongoDB 读取首选项。用于设置读取首选项的方法取决于 MongoDB 拓扑和 mongodb.connection.mode。
副本集拓扑
在 mongodb.connection.string 中设置读取首选项。
分片集群拓扑
根据连接方式设置读取优先级如下表
表 1. 根据 mongodb.connection.mode 设置分片集群的读取首选项
连接方式用于指定读取首选项的属性shardedmongodb.connection.stringreplica_setmongodb.connection.string.shard.params
在分片集群中连接器首先启动与 mongodb.connection.string 中指定的 mongos 路由器的连接。对于该初始连接无论连接模式如何连接器都会遵循 mongodb.connection.string 中指定的读取首选项。当连接模式设置为replica_set时连接器建立初始路由器连接后会从路由器的config.shards中检索拓扑信息。然后它使用检索到的分片地址连接到集群中的各个分片构建使用 mongodb.connection.string.shard.params 中的连接参数的连接字符串。对于特定于分片的连接连接器会忽略 mongodb.connection.string 中设置的读取首选项。
四、MongoDB 连接器的工作原理
连接器支持的 MongoDB 拓扑概述对于规划您的应用程序非常有用。
配置和部署 MongoDB 连接器时它首先连接到种子地址处的 MongoDB 服务器并确定有关每个可用副本集的详细信息。由于每个副本集都有自己独立的oplog因此连接器将尝试为每个副本集使用单独的任务。连接器可以限制它将使用的最大任务数如果没有足够的任务可用连接器将为每个任务分配多个副本集尽管该任务仍将为每个副本集使用单独的线程。
针对分片集群运行连接器时请使用大于副本集数量的tasks.max 值。这将允许连接器为每个副本集创建一个任务并让 Kafka Connect 协调、分配和管理所有可用工作进程中的任务。
五、支持的 MongoDB 拓扑
MongoDB 连接器支持以下 MongoDB 拓扑 MongoDB 副本集 Debezium MongoDB 连接器可以捕获单个 MongoDB 副本集的更改。生产副本集至少需要三个成员。 要将 MongoDB 连接器与副本集结合使用必须将连接器配置中的mongodb.connection.string 属性的值设置为副本集连接字符串。当连接器准备好开始从 MongoDB 更改流捕获更改时它会启动连接任务。然后连接任务使用指定的连接字符串建立与可用副本集成员的连接。
MongoDB 分片集群 MongoDB 分片集群包括 一个或多个分片每个分片部署为一个副本集 充当集群配置服务器的单独副本集 客户端连接的一个或多个路由器也称为 mongos并将请求路由到适当的分片 要将 MongoDB 连接器与分片集群结合使用请在连接器配置中将 mongodb.connection.string 属性的值设置为分片集群连接字符串。
mongodb.connection.string 属性替换了已删除的 mongodb.hosts 属性该属性用于为早期版本的连接器提供配置服务器副本的主机地址。在当前版本中使用 mongodb.connection.string 为连接器提供 MongoDB 路由器也称为 mongos的地址。
当连接器连接到分片集群时它会发现有关代表集群中分片的每个副本集的信息。连接器使用单独的任务来捕获每个分片的更改。当在集群中添加或删除分片时连接器会动态调整任务数量以补偿变化。
MongoDB 独立服务器
MongoDB 连接器无法监视独立 MongoDB 服务器的更改因为独立服务器没有 oplog。如果独立服务器转换为具有一名成员的副本集则连接器将起作用。
MongoDB 不建议在生产中运行独立服务器。
六、所需的用户权限
为了从 MongoDB 捕获数据Debezium 以 MongoDB 用户身份连接到数据库。您为 Debezium 创建的 MongoDB 用户帐户需要特定的数据库权限才能从数据库中读取数据。连接器用户需要以下权限
从数据库中读取。运行 ping 命令。
连接器用户可能还需要以下权限
从 config.shards 系统集合中读取。
数据库读取权限
连接器用户必须能够从所有数据库读取或从特定数据库读取具体取决于连接器的 capture.scope 属性的值。根据 capture.scope 设置向用户分配以下权限之一
capture.scope 设置为部署授予用户读取任何数据库的权限。capture.scope 设置为数据库授予用户读取连接器的 capture.target 属性指定的数据库的权限。
使用 MongoDB ping 命令的权限
无论 capture.scope 设置如何用户都需要权限才能运行 MongoDB ping 命令。
读取 config.shards 集合的权限
对于从分片 MongoDB 集群集群更改且 mongodb.connection.mode 属性设置为replica_set 的连接器必须配置用户读取 config.shards 系统集合的权限。
七、逻辑连接器名称
连接器配置属性 topic.prefix 用作 MongoDB 副本集或分片集群的逻辑名称。连接器以多种方式使用逻辑名称作为所有主题名称的前缀以及在记录每个副本集的更改流位置时作为唯一标识符。
应该为每个 MongoDB 连接器指定一个唯一的逻辑名称以有意义地描述源 MongoDB 系统。我们建议逻辑名称以字母或下划线字符开头其余字符为字母数字或下划线。
八、执行快照
当 Debezium 任务开始使用副本集时它使用连接器的逻辑名称和副本集名称来查找描述连接器先前停止读取更改的位置的偏移量。如果可以找到偏移量并且它仍然存在于 oplog 中则任务立即从记录的偏移位置开始继续进行流式更改。
但是如果未找到偏移量或者 oplog 不再包含该位置则任务必须首先通过执行快照来获取副本集内容的当前状态。该过程首先记录 oplog 的当前位置并将其记录为偏移量以及表示快照已启动的标志。然后该任务继续复制每个集合生成尽可能多的线程最多为 snapshot.max.threads 配置属性的值以并行执行此工作。连接器为其看到的每个文档记录一个单独的读取事件。每个读取事件都包含对象的标识符、对象的完整状态以及有关找到该对象的 MongoDB 副本集的源信息。源信息还包括一个标志表示该事件是在快照期间生成的。
此快照将继续直到复制了与连接器的过滤器匹配的所有集合。如果连接器在任务快照完成之前停止则连接器重新启动后将再次开始快照。
当连接器执行任何副本集的快照时尽量避免任务重新分配和重新配置。连接器生成日志消息来报告快照的进度。为了提供最大程度的控制请为每个连接器运行单独的 Kafka Connect 集群。
九、临时快照
默认情况下连接器仅在首次启动后运行初始快照操作。在此初始快照之后在正常情况下连接器不会重复快照过程。连接器捕获的任何未来更改事件数据仅通过流处理传入。
但是在某些情况下连接器在初始快照期间获取的数据可能会过时、丢失或不完整。为了提供重新捕获收集数据的机制Debezium 包含一个执行临时快照的选项。在 Debezium 环境中发生以下任何更改后可能需要执行临时快照
修改连接器配置以捕获一组不同的集合。Kafka 主题已删除必须重建。由于配置错误或其他一些问题会发生数据损坏。
可以通过启动所谓的临时快照为之前捕获快照的集合重新运行快照。特别快照需要使用信令集合。可以通过向 Debezium 信令集合发送信号请求来启动临时快照。
当启动现有集合的临时快照时连接器会将内容附加到该集合已存在的主题中。如果删除了先前存在的主题并且启用了自动主题创建Debezium 可以自动创建主题。
即席快照信号指定要包含在快照中的集合。快照可以捕获数据库的全部内容也可以仅捕获数据库中集合的子集。
可以通过向信令集合发送执行快照消息来指定要捕获的集合。将执行快照信号的类型设置为增量或阻塞并提供要包含在快照中的集合的名称如下表所述
表 2. 即席执行快照信号记录示例
字段默认值值typeincremental指定要运行的快照的类型。目前您可以请求增量快照或阻塞快照。data-collectionsN/A包含与要快照的集合的完全限定名称匹配的正则表达式的数组。名称的格式与 signal.data.collection 配置选项相同。
触发临时增量快照
您可以通过将具有执行快照信号类型的条目添加到信令集合来启动临时增量快照。连接器处理消息后开始快照操作。快照进程读取第一个和最后一个主键值并将这些值用作每个集合的起点和终点。根据集合中的条目数和配置的块大小Debezium 将集合划分为块并继续对每个块进行快照一次一个。
触发临时阻塞快照
您可以通过将具有执行快照信号类型的条目添加到信令集合来启动临时阻塞快照。连接器处理消息后开始快照操作。连接器暂时停止流式传输然后启动指定集合的快照遵循初始快照期间使用的相同过程。快照完成后连接器将恢复流式传输。
十、增量快照
为了提供管理快照的灵活性Debezium 包含一个补充快照机制称为增量快照。增量快照依赖 Debezium 机制向 Debezium 连接器发送信号。增量快照基于DDD-3设计文档。
在增量快照中Debezium 不像初始快照那样一次性捕获数据库的完整状态而是以一系列可配置块的形式分阶段捕获每个集合。您可以指定希望快照捕获的集合以及每个块的大小。块大小确定快照在数据库上的每个提取操作期间收集的行数。增量快照的默认块大小为 1024 行。
随着增量快照的进行Debezium 使用水印来跟踪其进度维护其捕获的每个集合行的记录。与标准初始快照过程相比这种分阶段捕获数据的方法具有以下优点
您可以与流数据捕获并行运行增量快照而不是推迟流数据直到快照完成。连接器在整个快照过程中持续从更改日志中捕获近乎实时的事件并且两个操作都不会阻塞另一个操作。如果增量快照的进度中断您可以恢复增量快照而不会丢失任何数据。进程恢复后快照会从停止点开始而不是从头开始重新捕获集合。您可以随时按需运行增量快照并根据需要重复该过程以适应数据库更新。例如您可以在修改连接器配置以将集合添加到其 collection.include.list 属性后重新运行快照。
1.增量快照流程
当您运行增量快照时Debezium 按主键对每个集合进行排序然后根据配置的块大小将集合拆分为块。逐块工作然后捕获块中的每个集合行。对于它捕获的每一行快照都会发出一个 READ 事件。该事件表示块快照开始时行的值。
随着快照的进行其他进程可能会继续访问数据库从而可能修改集合记录。为了反映此类更改INSERT、UPDATE 或 DELETE 操作将照常提交到事务日志。同样正在进行的 Debezium 流处理继续检测这些更改事件并将相应的更改事件记录发送到 Kafka。
2.Debezium 如何解决具有相同主键的记录之间的冲突
在某些情况下流处理发出的 UPDATE 或 DELETE 事件的接收顺序不正确。也就是说在快照捕获包含该行的 READ 事件的块之前流处理可能会发出一个修改集合行的事件。当快照最终发出该行相应的 READ 事件时其值已被取代。为了确保按正确的逻辑顺序处理不按顺序到达的增量快照事件Debezium 采用缓冲方案来解决冲突。仅当快照事件和流式事件之间的冲突得到解决后Debezium 才会向 Kafka 发送事件记录。
3.快照窗口
为了帮助解决迟到的 READ 事件和修改同一集合行的流式事件之间的冲突Debezium 采用了所谓的快照窗口。快照窗口划定了增量快照捕获指定收集块的数据的时间间隔。在块的快照窗口打开之前Debezium 会遵循其通常的行为并将事件从事务日志直接向下游发送到目标 Kafka 主题。但从特定块的快照打开的那一刻起直到其关闭Debezium 都会执行重复数据删除步骤来解决具有相同主键的事件之间的冲突。
对于每个数据收集Debezium 会发出两种类型的事件并将它们的记录存储在单个目标 Kafka 主题中。它直接从表中捕获的快照记录作为 READ 操作发出。同时随着用户继续更新数据集合中的记录并且更新事务日志以反映每次提交Debezium 会针对每次更改发出 UPDATE 或 DELETE 操作。
当快照窗口打开时Debezium 开始处理快照块它将快照记录传送到内存缓冲区。在快照窗口期间缓冲区中 READ 事件的主键与传入流事件的主键进行比较。如果未找到匹配项则流式事件记录将直接发送到 Kafka。如果 Debezium 检测到匹配它会丢弃缓冲的 READ 事件并将流式记录写入目标主题因为流式事件在逻辑上取代静态快照事件。块的快照窗口关闭后缓冲区仅包含不存在相关事务日志事件的 READ 事件。 Debezium 将这些剩余的 READ 事件发送到集合的 Kafka 主题。
连接器对每个快照块重复该过程。
增量快照需要主键稳定有序。但是字符串可能无法保证稳定的排序因为编码和特殊字符可能会导致意外行为Mongo 排序字符串。执行增量快照时请考虑使用其他类型的主键。
4.分片集群的增量快照
要将增量快照与分片 MongoDB 集群一起使用您必须为以下属性设置特定值
将 mongodb.connection.mode 设置为分片。将incremental.snapshot.chunk.size设置为足够高的值以补偿变更流管道增加的复杂性。
5.触发增量快照
目前启动增量快照的唯一方法是将临时快照信号发送到源数据库上的信令集合。
您可以使用 MongoDB insert() 方法向信令集合提交信号。
Debezium 检测到信号集合中的变化后它会读取信号并运行请求的快照操作。
您提交的查询指定要包含在快照中的集合并且可以选择指定快照操作的类型。目前快照操作的唯一有效选项是默认值增量。
要指定要包含在快照中的集合请提供一个列出集合的数据集合数组或用于匹配集合的正则表达式数组例如{“data-collections”: [“public.Collection1”, “public.Collection2”]}
增量快照信号的数据收集数组没有默认值。如果数据收集数组为空Debezium 会检测到不需要执行任何操作并且不会执行快照。
如果要包含在快照中的集合的名称在数据库、架构或表的名称中包含点 (.)则要将该集合添加到数据集合数组中必须转义该集合的每个部分名称用双引号引起来。
例如要包含公共数据库中存在且名称为 My.Collection 的数据集合请使用以下格式“public”.“My.Collection”。
先决条件
信令已启用。 源数据库中存在信令数据集合。信令数据收集在 signal.data.collection 属性中指定。
使用源信令通道触发增量快照
将快照信号文档插入到信令集合中
signalDataCollection.insert({id : _idNumber,type : snapshotType, data : {data-collections [collectionName, collectionName],type: snapshotType}});例如
db.debeziumSignal.insert({
type : execute-snapshot,
data : {
data-collections [\public\.\Collection1\, \public\.\Collection2\],
type: incremental}
});命令中的id、type、data参数的取值与信令集合的字段相对应。
示例中的参数说明如下表
表 3. 用于将增量快照信号发送到信令集合的 MongoDB insert() 命令中的字段描述
1db.debeziumSignal指定源数据库上信令集合的完全限定名称。2null_id 参数指定指定为信号请求的 id 标识符的任意字符串。前面示例中的 insert 方法省略了可选 _id 参数的使用。由于文档没有显式地为参数分配值因此 MongoDB 自动分配给文档的任意 id 就成为信号请求的 id 标识符。使用此字符串来标识信令集合中条目的日志消息。 Debezium 不使用此标识符字符串。相反在快照期间Debezium 会生成自己的 id 字符串作为水印信号。3execute-snapshot指定类型参数指定信号要触发的操作。4data-collections信号数据字段的必需组件指定集合名称或正则表达式的数组以匹配要包含在快照中的集合名称。该数组列出了通过完全限定名称匹配集合的正则表达式使用与在 signal.data.collection 配置属性中指定连接器信令集合名称相同的格式。5incremental信号数据字段的可选类型组件指定要运行的快照操作的类型。目前唯一有效的选项是默认值增量。如果不指定值连接器将运行增量快照。
以下示例显示了连接器捕获的增量快照事件的 JSON。
示例增量快照事件消息
{before:null,after: {pk:1,value:New data},source: {...snapshot:incremental 1},op:r, 2ts_ms:1620393591654,transaction:null
}选项字段名称描述1snapshot指定要运行的快照操作的类型。目前唯一有效的选项是默认值增量。在提交到信令集合的 SQL 查询中指定类型值是可选的。如果不指定值连接器将运行增量快照。2op指定事件类型。快照事件的值为r表示READ操作。
6.使用Kafka信令通道触发增量快照
您可以向配置的 Kafka 主题发送消息请求连接器运行临时增量快照。
Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。
消息的值是一个带有类型和数据字段的 JSON 对象。
信号类型为execute-snapshot数据字段必须有以下字段
表 4. 执行快照数据字段
字段默认值值typeincremental要执行的快照的类型。目前 Debezium 仅支持增量类型。data-collectionsN/A一组以逗号分隔的正则表达式与要包含在快照中的表的完全限定名称相匹配。使用与 signal.data.collection 配置选项所需的格式相同的格式指定名称。additional-conditionN/A一个可选字符串指定连接器评估的条件以指定要包含在快照中的记录子集。此属性已弃用应由附加条件属性替换。additional-conditionsN/A附加条件的可选数组指定连接器评估的条件以指定要包含在快照中的记录子集。每个附加条件都是一个对象指定过滤临时快照捕获的数据的条件。您可以为每个附加条件设置以下参数 data-collection:: 过滤器应用到的集合的完全限定名称。您可以对每个集合应用不同的过滤器。 filter:: 指定数据库记录中必须存在的列值快照才能包含该列值例如“color‘blue’”。您分配给过滤器参数的值与您在为阻塞快照设置 snapshot.select.statement.overrides 属性时可能在 SELECT 语句的 WHERE 子句中指定的值类型相同。在早期 Debezium 版本中没有为快照信号定义显式过滤器参数相反过滤条件是由为现已弃用的附加条件参数指定的值隐含的。
执行快照 Kafka 消息的示例
Key test_connectorValue {type:execute-snapshot,data: {data-collections: [schema1.table1, schema1.table2], type: INCREMENTAL}}7.具有附加条件的临时增量快照
Debezium 使用附加条件字段来选择集合内容的子集。
通常当 Debezium 运行快照时它会运行 SQL 查询例如
SELECT * FROM tableName ….当快照请求包含附加条件属性时该属性的数据收集和过滤参数将附加到 SQL 查询中例如
SELECT * FROM data-collection WHERE filter ….例如给定一个包含 id主键、颜色和品牌列的产品集合如果您希望快照仅包含 color‘blue’ 的内容则当您请求快照时您可以添加附加 -用于过滤内容的条件属性
Key test_connectorValue {type:execute-snapshot,data: {data-collections: [schema1.products], type: INCREMENTAL, additional-conditions: [{data-collection: schema1.products ,filter:colorblue}]}}您可以使用additional-conditions 属性来传递基于多列的条件。例如使用与上一示例中相同的产品集合如果您希望快照仅包含产品集合中 color‘blue’、brand‘MyBrand’ 的内容则可以发送以下请求
Key test_connectorValue {type:execute-snapshot,data: {data-collections: [schema1.products], type: INCREMENTAL, additional-conditions: [{data-collection: schema1.products ,filter:colorblue AND brandMyBrand}]}}8.停止增量快照
您还可以通过向源数据库上的集合发送信号来停止增量快照。您可以通过将文档插入信号集合来提交停止快照信号。 Debezium 检测到信号集合中的变化后会读取信号并停止正在进行的增量快照操作。
您提交的查询指定增量快照操作以及可选要删除的当前运行快照的集合。
先决条件
信令已启用。 源数据库中存在信令数据集合。信令数据收集在 signal.data.collection 属性中指定。
使用源信令通道停止增量快照
将停止快照信号文档插入到信号集合中
signalDataCollection.insert({id : _idNumber,type : stop-snapshot, data : {data-collections [collectionName, collectionName],type: incremental}});例如
db.debeziumSignal.insert({
type : stop-snapshot,
data : {
data-collections [\public\.\Collection1\, \public\.\Collection2\],
type: incremental}
});signal命令中的id、type、data参数的取值对应于信令集合的字段。
示例中的参数说明如下表
表5 向信令集合发送停止增量快照文档的插入命令字段说明
选项值描述1db.debeziumSignal指定源数据库上信令集合的完全限定名称。2null前面示例中的 insert 方法省略了可选 _id 参数的使用。由于文档没有显式地为参数分配值因此 MongoDB 自动分配给文档的任意 id 就成为信号请求的 id 标识符。使用此字符串来标识信令集合中条目的日志消息。 Debezium 不使用此标识符字符串。3stop-snapshot类型参数指定信号要触发的操作。4data-collections信号数据字段的可选组件指定集合名称或正则表达式的数组以匹配要从快照中删除的集合名称。该数组列出了通过完全限定名称匹配集合的正则表达式使用与在 signal.data.collection 配置属性中指定连接器信令集合名称相同的格式。如果省略数据字段的该组成部分则该信号将停止正在进行的整个增量快照。5incremental信号数据字段的必需组成部分指定要停止的快照操作类型。目前唯一有效的选项是增量选项。如果不指定类型值则信号无法停止增量快照。
9.使用Kafka信令通道停止增量快照
您可以向配置的 Kafka 信令主题发送信号消息以停止即席增量快照。
Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。
消息的值是一个带有类型和数据字段的 JSON 对象。
信号类型为stop-snapshot数据字段必须有以下字段
表 6. 执行快照数据字段
字段默认值值typeincremental要执行的快照的类型。目前 Debezium 仅支持增量类型。data-collectionsN/A一个可选的逗号分隔正则表达式数组与要包含在快照中的表的完全限定名称相匹配。使用与 signal.data.collection 配置选项所需的格式相同的格式指定名称。
以下示例显示了典型的停止快照 Kafka 消息
Key test_connectorValue {type:stop-snapshot,data: {data-collections: [schema1.table1, schema1.table2], type: INCREMENTAL}}10.阻止快照
为了在管理快照方面提供更大的灵活性Debezium 包含一个补充的临时快照机制称为阻塞快照。阻止快照依赖 Debezium 机制向 Debezium 连接器发送信号。
阻塞快照的行为就像初始快照一样只是您可以在运行时触发它。
在以下情况下您可能希望运行阻塞快照而不是使用标准初始快照进程
您添加了一个新集合并且希望在连接器运行时完成快照。您添加了一个大型集合并且希望快照在比增量快照更短的时间内完成。
11.阻塞快照进程
当您运行阻塞快照时Debezium 会停止流式传输然后启动指定集合的快照遵循初始快照期间使用的相同流程。快照完成后流将恢复。
12.配置快照
您可以在信号的数据组件中设置以下属性
data-collections指定哪些集合必须是快照附加条件您可以为不同的集合指定不同的过滤器。 data-collection 属性是要应用过滤器的集合的完全限定名称。过滤器属性将具有与 snapshot.select.statement.overrides 中使用的相同值
例如
{type: blocking, data-collections: [schema1.table1, schema1.table2], additional-conditions: [{data-collection: schema1.table1, filter: SELECT * FROM [schema1].[table1] WHERE column1 0 ORDER BY column2 DESC}, {data-collection: schema1.table2, filter: SELECT * FROM [schema1].[table2] WHERE column2 0}]}13.可能重复
发送信号以触发快照的时间与流停止和快照开始的时间之间可能存在延迟。由于此延迟在快照完成后连接器可能会发出一些与快照捕获的记录重复的事件记录。
十一、流变化
副本集的连接器任务记录偏移量后它使用该偏移量来确定 oplog 中应开始流式传输更改的位置。然后该任务取决于配置要么连接到副本集的主节点要么连接到副本集范围的更改流并从该位置开始流式传输更改。它处理所有创建、插入和删除操作并将它们转换为 Debezium 更改事件。每个更改事件都包含 oplog 中找到操作的位置并且连接器会定期将其记录为最近的偏移量。记录偏移量的时间间隔由 offset.flush.interval.ms 控制这是 Kafka Connect 工作线程配置属性。
当连接器正常停止时会记录最后处理的偏移量以便在重新启动时连接器将准确地从其停止的位置继续。但是如果连接器的任务意外终止则任务可能在最后记录偏移量之后但在记录最后偏移量之前处理并生成事件重新启动后连接器从最后记录的偏移量开始可能会生成一些与崩溃之前生成的事件相同的事件。
注意当 Kafka 管道中的所有组件正常运行时Kafka 消费者只会接收每条消息一次。然而当出现问题时Kafka 只能保证消费者至少收到每条消息一次。为了避免意外结果消费者必须能够处理重复的消息。
如前所述连接器任务始终使用副本集的主节点来传输来自 oplog 的更改从而确保连接器尽可能看到最新的操作并且能够以比辅助节点更低的延迟捕获更改。代替使用。当副本集选择新的主节点时连接器立即停止流式传输更改连接到新的主节点并开始从新主节点的同一位置流式传输更改。同样如果连接器在与副本集成员通信时遇到任何问题它会尝试使用指数退避来重新连接以免淹没副本集并且一旦连接它就会从上次停止的位置继续流式传输更改。通过这种方式连接器能够动态调整以适应副本集成员资格的变化并自动处理通信故障。
总而言之MongoDB 连接器在大多数情况下都会继续运行。通信问题可能会导致连接器等待问题解决。
十二、原像支持
在 MongoDB 6.0 及更高版本中您可以配置更改流以发出文档的原像状态以填充 MongoDB 更改事件的 before 字段。要在 MongoDB 中使用原像您必须使用 db.createCollection()、create 或 collMod 为集合设置changeStreamPreAndPostImages。要使 Debezium MongoDB 能够在更改事件中包含原像请将连接器的 capture.mode 设置为 *_with_pre_image 选项之一。
注意MongoDB 更改流事件的大小限制
MongoDB 更改流事件的大小限制为 16 MB。因此原像的使用增加了超过该阈值的可能性这可能导致失败。
十三、主题名称
MongoDB 连接器将每个集合中文档的所有插入、更新和删除操作的事件写入单个 Kafka 主题。 Kafka 主题的名称始终采用逻辑名称.数据库名称.集合名称的形式其中逻辑名称是使用 topic.prefix 配置属性指定的连接器的逻辑名称数据库名称是发生操作的数据库的名称集合名称是受影响文档所在的 MongoDB 集合的名称。
例如考虑一个 MongoDB 副本集其库存数据库包含四个集合产品、现有产品、客户和订单。如果监控此数据库的连接器被赋予了fulfillment的逻辑名称那么连接器将生成关于这四个 Kafka 主题的事件
fulfillment.inventory.productsfulfillment.inventory.products_on_handfulfillment.inventory.customersfulfillment.inventory.orders
请注意主题名称不包含副本集名称或分片名称。因此对分片集合其中每个分片包含集合文档的子集的所有更改都将转到同一个 Kafka 主题。
您可以将 Kafka 设置为根据需要自动创建主题。如果没有则必须在启动连接器之前使用 Kafka 管理工具创建主题。
十四、分区
MongoDB 连接器不会明确确定如何对事件主题进行分区。相反它允许 Kafka 根据事件键确定如何对主题进行分区。您可以通过在 Kafka Connect 工作配置中定义 Partitioner 实现的名称来更改 Kafka 的分区逻辑。
Kafka 仅维护写入单个主题分区的事件的总顺序。按键对事件进行分区确实意味着具有相同键的所有事件始终进入同一分区。这可确保特定文档的所有事件始终完全有序。
十五、交易元数据
Debezium 可以生成代表事务元数据边界的事件并丰富变更数据事件消息。
Debezium 接收交易元数据的时间限制 Debezium 仅注册和接收部署连接器后发生的事务的元数据。部署连接器之前发生的事务的元数据不可用。
对于每笔交易的 BEGIN 和 ENDDebezium 都会生成一个包含以下字段的事件
状态
开始或结束
ID
唯一交易标识符的字符串表示形式。
event_count对于 END 事件
事务发出的事件总数。
data_collections对于 END 事件
data_collection 和 event_count 对的数组提供源自给定数据集合的更改所发出的事件数。
以下示例显示了一条典型消息
{status: BEGIN,id: 1462833718356672513,event_count: null,data_collections: null
}{status: END,id: 1462833718356672513,event_count: 2,data_collections: [{data_collection: rs0.testDB.collectiona,event_count: 1},{data_collection: rs0.testDB.collectionb,event_count: 1}]
}除非通过 topic.transaction 选项覆盖否则事务事件将写入名为 topic.prefix.transaction 的主题。
十六、变更数据事件丰富
启用事务元数据后数据消息信封将通过新的事务字段进行丰富。该字段以字段组合的形式提供有关每个事件的信息
ID
唯一交易标识符的字符串表示形式。
总订单数
该事件在事务生成的所有事件中的绝对位置。
数据收集顺序
事件在事务发出的所有事件中的每个数据收集位置。
以下是消息的示例
{after: {\_id\ : {\$numberLong\ : \1004\},\first_name\ : \Anne\,\last_name\ : \Kretchmar\,\email\ : \anneknoanswer.org\},source: {
...},op: c,ts_ms: 1580390884335,transaction: {id: 1462833718356672513,total_order: 1,data_collection_order: 1}
}十七、数据变更事件
Debezium MongoDB 连接器为插入、更新或删除数据的每个文档级操作生成数据更改事件。每个事件都包含一个键和一个值。键和值的结构取决于更改的集合。
Debezium 和 Kafka Connect 是围绕连续的事件消息流而设计的。然而这些事件的结构可能会随着时间的推移而改变这对消费者来说可能很难处理。为了解决这个问题每个事件都包含其内容的架构或者如果您使用架构注册表则还包含消费者可用于从注册表获取架构的架构 ID。这使得每个事件都是独立的。
以下 JSON 框架显示了更改事件的基本四个部分。但是您选择在应用程序中使用的 Kafka Connect 转换器的配置方式决定了这四个部分在更改事件中的表示。仅当您配置转换器来生成模式字段时模式字段才会处于更改事件中。同样仅当您配置转换器来生成事件键和事件负载时事件键和事件负载才会出现在更改事件中。如果您使用 JSON 转换器并将其配置为生成所有四个基本更改事件部分则更改事件具有以下结构
{schema: { 1...},payload: { 2...},schema: { 3...},payload: { 4...},
}表 7. 变更事件基本内容概述
选项字段名称描述1schema第一个架构字段是事件键的一部分。它指定了一个 Kafka Connect 架构该架构描述了事件键的有效负载部分中的内容。换句话说第一个模式字段描述了已更改的文档的键的结构。2payload第一个有效负载字段是事件键的一部分。它具有先前架构字段描述的结构并且包含已更改文档的键。3schema第二个架构字段是事件值的一部分。它指定 Kafka Connect 架构该架构描述事件值的有效负载部分中的内容。换句话说第二个架构描述了已更改的文档的结构。通常此模式包含嵌套模式。4payload第二个有效负载字段是事件值的一部分。它具有先前架构字段描述的结构并且包含已更改文档的实际数据。
默认情况下连接器将事件记录流更改为名称与事件的原始集合相同的主题。
MongoDB 连接器确保所有 Kafka Connect 架构名称都遵循 Avro 架构名称格式。这意味着逻辑服务器名称必须以拉丁字母或下划线开头即 a-z、A-Z 或 _。逻辑服务器名称中的每个剩余字符以及数据库和集合名称中的每个字符都必须是拉丁字母、数字或下划线即 a-z、A-Z、0-9 或 _。如果存在无效字符则将其替换为下划线字符。
如果逻辑服务器名称、数据库名称或集合名称包含无效字符并且唯一区分名称的字符无效并用下划线替换则可能会导致意外冲突。
十八、更改事件键
更改事件的键包含已更改文档的键的架构和已更改文档的实际键。对于给定的集合模式及其相应的负载都包含单个 id 字段。该字段的值是文档的标识符表示为从 MongoDB 扩展 JSON 序列化严格模式派生的字符串。
考虑一个逻辑名称为“fulfillment”的连接器、一个包含库存数据库的副本集以及一个包含如下文档的客户集合。
示例文档
{_id: 1004,first_name: Anne,last_name: Kretchmar,email: anneknoanswer.org
}更改事件键示例
捕获客户集合更改的每个更改事件都具有相同的事件键架构。只要客户集合具有先前的定义捕获客户集合更改的每个更改事件都具有以下关键结构。在 JSON 中它看起来像这样
{schema: { 1type: struct,name: fulfillment.inventory.customers.Key, 2optional: false, 3fields: [ 4{field: id,type: string,optional: false}]},payload: { 5id: 1004}
}表 8. 更改事件键的说明
选项字段描述1schema密钥的架构部分指定 Kafka Connect 架构该架构描述密钥的有效负载部分中的内容。2fulfillment.inventory.customers.Key定义密钥有效负载结构的架构名称。此架构描述了已更改文档的键的结构。键架构名称的格式为连接器名称.数据库名称.集合名称.Key。在这个例子中fulfillment 是生成此事件的连接器的名称。inventory 是包含已更改的集合的数据库。customers 是包含已更新文档的集合。3optional指示事件键是否必须在其负载字段中包含值。在此示例中需要密钥有效负载中的值。当文档没有密钥时密钥有效负载字段中的值是可选的。4fields指定负载中预期的每个字段包括每个字段的名称、类型以及是否必需。5payload包含为其生成此更改事件的文档的键。在此示例中键包含一个字符串类型的 id 字段其值为 1004。
此示例使用带有整数标识符的文档但任何有效的 MongoDB 文档标识符都以相同的方式工作包括文档标识符。对于文档标识符事件键的 Payload.id 值是一个字符串表示更新文档的原始 _id 字段作为使用严格模式的 MongoDB 扩展 JSON 序列化。下表提供了如何表示不同类型的 _id 字段的示例。
表 9. 事件密钥有效负载中表示文档 _id 字段的示例
类型MongoDB _id ValueKey’s payloadInteger1234{ “id” : “1234” }Float12.34{ “id” : “12.34” }String“1234”{ “id” : “1234” }Document{ “hi” : “kafka”, “nums” : [10.0, 100.0, 1000.0] }{ “id” : “{“hi” : “kafka”, “nums” : [10.0, 100.0, 1000.0]}” }ObjectIdObjectId(“596e275826f08b2730779e1f”){ “id” : “{”$oid : “596e275826f08b2730779e1f”} }BinaryBinData(“a2Fma2E”,0){ “id” : “{“KaTeX parse error: Expected group as argument to \ at position 9: binary\ ̲: \a2Fma2E\,…type” : “00”}” }
十九、更改事件值
更改事件中的值比键稍微复杂一些。与键一样值也具有模式部分和有效负载部分。模式部分包含描述有效负载部分的 Envelope 结构的模式包括其嵌套字段。创建、更新或删除数据的操作的更改事件都具有带有信封结构的值有效负载。
考虑用于显示更改事件键示例的相同示例文档
示例文档
{_id: 1004,first_name: Anne,last_name: Kretchmar,email: anneknoanswer.org
}针对每种事件类型描述了对此文档的更改的更改事件的值部分
创建事件更新事件删除事件墓碑事件
二十、创建事件
以下示例显示连接器为在客户集合中创建数据的操作生成的更改事件的值部分
{schema: { 1type: struct,fields: [{type: string,optional: true,name: io.debezium.data.Json, 2version: 1,field: after},{type: string,optional: true,name: io.debezium.data.Json, version: 1,field: patch},{type: struct,fields: [{type: string,optional: false,field: version},{type: string,optional: false,field: connector},{type: string,optional: false,field: name},{type: int64,optional: false,field: ts_ms},{type: boolean,optional: true,default: false,field: snapshot},{type: string,optional: false,field: db},{type: string,optional: false,field: rs},{type: string,optional: false,field: collection},{type: int32,optional: false,field: ord},{type: int64,optional: true,field: h}],optional: false,name: io.debezium.connector.mongo.Source, 3field: source},{type: string,optional: true,field: op},{type: int64,optional: true,field: ts_ms}],optional: false,name: dbserver1.inventory.customers.Envelope 4},payload: { 5after: {\_id\ : {\$numberLong\ : \1004\},\first_name\ : \Anne\,\last_name\ : \Kretchmar\,\email\ : \anneknoanswer.org\}, 6source: { 7version: 2.4.0.Final,connector: mongodb,name: fulfillment,ts_ms: 1558965508000,snapshot: false,db: inventory,rs: rs0,collection: customers,ord: 31,h: 1546547425148721999},op: c, 8ts_ms: 1558965515240 9}}表 10. 创建事件值字段的描述
选项字段名称描述1schema值的架构描述值有效负载的结构。连接器为特定集合生成的每个更改事件中更改事件的值架构都是相同的。2name在架构部分中每个名称字段指定值的有效负载中字段的架构。io.debezium.data.Json 是有效负载的 after、patch 和 filter 字段的架构。此架构特定于客户集合。创建事件是唯一一种包含 after 字段的事件。更新事件包含过滤字段和补丁字段。删除事件包含过滤字段但不包含 after 字段或 patch 字段。3nameio.debezium.connector.mongo.Source 是有效负载源字段的架构。此架构特定于 MongoDB 连接器。连接器将其用于它生成的所有事件。4namedbserver1.inventory.customers.Envelope 是负载整体结构的架构其中 dbserver1 是连接器名称inventory 是数据库customers 是集合。此架构特定于集合。5payload该值是实际数据。这是更改事件提供的信息。事件的 JSON 表示形式可能比它们描述的文档大得多。这是因为 JSON 表示必须包含消息的架构和有效负载部分。但是通过使用 Avro 转换器您可以显着减小连接器流式传输到 Kafka 主题的消息的大小。6after一个可选字段指定事件发生后文档的状态。在此示例中after 字段包含新文档的 _id、first_name、last_name 和 email 字段的值。之后的值始终是一个字符串。按照惯例它包含文档的 JSON 表示形式。当 capture.mode 选项设置为change_streams_update_full时MongoDB oplog条目仅包含_create_事件和更新事件的文档的完整状态换句话说无论 capture.mode 选项如何create 事件是唯一包含 after 字段的事件。7source描述事件源元数据的必填字段。此字段包含可用于将此事件与其他事件进行比较的信息包括事件的来源、事件发生的顺序以及事件是否属于同一事务的一部分。源元数据包括debezium版本。生成事件的连接器的名称。MongoDB 副本集的逻辑名称它形成生成事件的命名空间并在连接器写入的 Kafka 主题名称中使用。包含新文档的集合和数据库的名称。如果事件是快照的一部分。数据库中发生更改的时间戳以及时间戳内事件的序号。MongoDB操作的唯一标识符oplog事件中的h字段。MongoDB 会话 lsid 和事务号 txnNumber 的唯一标识符以防在事务内执行更改仅限更改流捕获模式。8op强制字符串描述导致连接器生成事件的操作类型。在此示例中c 表示该操作创建了一个文档。有效值为c创建u更新d 删除r 读取仅适用于快照9ts_ms显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。在源对象中ts_ms 表示数据库中进行更改的时间。通过将payload.source.ts_ms的值与payload.ts_ms的值进行比较您可以确定源数据库更新与Debezium之间的滞后。
二十一、更新事件
更改流捕获模式
示例客户集合中更新的更改事件的值与该集合的创建事件具有相同的架构。同样事件值的有效负载具有相同的结构。但是事件值有效负载在更新事件中包含不同的值。仅当 capture.mode 选项设置为change_streams_update_full 时更新事件才包含后值。如果 capture.mode 选项设置为 *_with_pre_image 选项之一则提供之前值。在本例中有一个新的结构化字段 updateDescription 以及一些附加字段
UpdatedFields 是一个字符串字段其中包含已更新文档字段及其值的 JSON 表示形式removedFields 是从文档中删除的字段名称的列表truncatedArrays 是文档中被截断的数组列表
以下是连接器为客户集合中的更新生成的事件中的更改事件值的示例
{schema: { ... },payload: {op: u, 1ts_ms: 1465491461815, 2before:{\_id\: {\$numberLong\: \1004\},\first_name\: 3\unknown\,\last_name\: \Kretchmar\,\email\: \anneknoanswer.org\}, after:{\_id\: {\$numberLong\: \1004\},\first_name\: \Anne Marie\,\last_name\: \Kretchmar\,\email\: \anneknoanswer.org\}, 4updateDescription: {removedFields: null,updatedFields: {\first_name\: \Anne Marie\}, 5truncatedArrays: null},source: { 6version: 2.4.0.Final,connector: mongodb,name: fulfillment,ts_ms: 1558965508000,snapshot: false,db: inventory,rs: rs0,collection: customers,ord: 1,h: null,tord: null,stxnid: null,lsid:{\id\: {\$binary\: \FA7YEzXgQXSX9OxmzllH2w\,\$type\: \04\},\uid\: {\$binary\: \47DEQpj8HBSa/TImW5JCeuQeRkm5NMpJWZG3hSuFU\,\$type\: \00\}},txnNumber:1}}}表 11. 更新事件值字段说明
选项字段名描述1op强制字符串描述导致连接器生成事件的操作类型。在此示例中u 表示该操作更新了文档。2ts_ms显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。在源对象中ts_ms 表示数据库中进行更改的时间。通过将payload.source.ts_ms的值与payload.ts_ms的值进行比较您可以确定源数据库更新与Debezium之间的滞后。3before包含更改前实际 MongoDB 文档的 JSON 字符串表示形式。 如果捕获模式未设置为 *_with_preimage 选项之一则更新事件值不包含 before 字段。4after包含实际 MongoDB 文档的 JSON 字符串表示形式。如果捕获模式未设置为change_streams_update_full则更新事件值不包含after字段5updatedFields包含文档更新的字段值的 JSON 字符串表示形式。在此示例中更新将first_name 字段更改为新值。6source描述事件源元数据的必填字段。该字段包含与同一集合的创建事件相同的信息但值不同因为该事件来自 oplog 中的不同位置。源元数据包括debezium版本。生成事件的连接器的名称。MongoDB 副本集的逻辑名称它形成生成事件的命名空间并在连接器写入的 Kafka 主题名称中使用。包含更新文档的集合和数据库的名称。如果事件是快照的一部分。数据库中发生更改的时间戳以及时间戳内事件的序号。MongoDB 会话 lsid 和事务号 txnNumber 的唯一标识符以防在事务内执行更改。
注意 事件中的后值应作为文档的时间点值进行处理。该值不是动态计算的而是从集合中获取的。因此如果多个更新紧密地一个接一个地进行则所有更新更新事件将包含相同的后值该后值将表示存储在文档中的最后一个值。
如果您的应用程序依赖于渐变演变那么您应该仅依赖 updateDescription。
二十二、删除事件
删除更改事件中的值与同一集合的创建和更新事件具有相同的架构部分。删除事件中的有效负载部分包含与同一集合的创建和更新事件不同的值。特别是删除事件既不包含后值也不包含 updateDescription 值。以下是客户集合中文档的删除事件的示例
{schema: { ... },payload: {op: d, 1ts_ms: 1465495462115, 2before:{\_id\: {\$numberLong\: \1004\},\first_name\: \Anne Marie\,\last_name\: \Kretchmar\,\email\: \anneknoanswer.org\}, 3source: { 4version: 2.4.0.Final,connector: mongodb,name: fulfillment,ts_ms: 1558965508000,snapshot: true,db: inventory,rs: rs0,collection: customers,ord: 6,h: 1546547425148721999}}}表 12. 删除事件值字段说明
选项字段值描述1op描述操作类型的强制字符串。 op字段值为d表示该文档已被删除。2ts_ms显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。在源对象中ts_ms 表示数据库中进行更改的时间。通过将payload.source.ts_ms的值与payload.ts_ms的值进行比较您可以确定源数据库更新与Debezium之间的滞后。3before包含更改前实际 MongoDB 文档的 JSON 字符串表示形式。 如果捕获模式未设置为 *_with_preimage 选项之一则更新事件值不包含 before 字段。4source描述事件源元数据的必填字段。该字段包含与同一集合的创建或更新事件相同的信息但值不同因为该事件来自 oplog 中的不同位置。源元数据包括debezium版本。生成事件的连接器的名称。MongoDB 副本集的逻辑名称它形成生成事件的命名空间并在连接器写入的 Kafka 主题名称中使用。包含已删除文档的集合和数据库的名称。如果事件是快照的一部分。数据库中发生更改的时间戳以及时间戳内事件的序号。MongoDB操作的唯一标识符oplog事件中的h字段。MongoDB 会话 lsid 和事务号 txnNumber 的唯一标识符以防在事务内执行更改仅限更改流捕获模式。
二十三、墓碑事件
MongoDB 连接器事件旨在与 Kafka 日志压缩配合使用。只要保留每个键的最新消息日志压缩就可以删除一些较旧的消息。这使得 Kafka 可以回收存储空间同时确保主题包含完整的数据集并且可用于重新加载基于键的状态。
二十四、设置mongodb
MongoDB 连接器使用 MongoDB 的更改流来捕获更改因此该连接器仅适用于 MongoDB 副本集或分片集群其中每个分片都是一个单独的副本集。有关设置副本集或分片集群的信息请参阅 MongoDB 文档。另外请务必了解如何使用副本集启用访问控制和身份验证。
您还必须有一个 MongoDB 用户该用户具有适当的角色来读取可以读取操作日志的管理数据库。此外用户还必须能够读取分片集群的配置服务器中的配置数据库并且必须具有 listDatabases 权限操作。当使用变更流默认时用户还必须具有集群范围的权限操作 find 和 changeStream。
当您打算使用原像并填充 before 字段时您需要首先使用 db.createCollection()、create 或 collMod 为集合启用changeStreamPreAndPostImages。
云中的 MongoDB
您可以将 MongoDB 的 Debezium 连接器与 MongoDB Atlas 结合使用。请注意MongoDB Atlas 仅支持通过 SSL 的安全连接即 mongodb.ssl.enabled 连接器选项必须设置为 true。
二十五、最佳 Oplog 配置
Debezium MongoDB 连接器读取更改流以获取副本集的 oplog 数据。因为 oplog 是一个固定大小、有上限的集合所以如果它超过其最大配置大小它就会开始覆盖其最旧的条目。如果连接器因任何原因停止当它重新启动时它会尝试从最后一个 oplog 流位置恢复流式传输。但是如果从 oplog 中删除了最后一个流位置则根据连接器的 snapshot.mode 属性中指定的值连接器可能无法启动并报告无效的恢复令牌错误。如果发生故障您必须创建一个新的连接器以使 Debezium 能够继续从数据库捕获记录。如果 snapshot.mode 设置为初始连接器在长时间停止后会失败。
为了确保 oplog 保留 Debezium 恢复流式传输所需的偏移值您可以使用以下方法之一
增加 oplog 的大小。根据您的典型工作负载将 oplog 大小设置为大于每小时 oplog 条目峰值数的值。增加 oplog 条目保留的最短小时数MongoDB 4.4 及更高版本。此设置是基于时间的这样即使 oplog 达到其最大配置大小也保证过去 n 小时内的条目可用。尽管这通常是首选选项但对于具有接近容量的高工作负载的集群请指定最大 oplog 大小。
为了帮助防止与丢失 oplog 条目相关的故障跟踪报告复制行为的指标并优化 oplog 大小以支持 Debezium 非常重要。特别是您应该监视 Oplog GB/Hour 和 Replication Oplog Window 的值。如果 Debezium 离线的时间间隔超过了复制 oplog 窗口的值并且主 oplog 的增长速度快于 Debezium 消耗条目的速度则可能会导致连接器故障。
有关如何监控这些指标的信息请参阅 MongoDB 文档。
最好将最大 oplog 大小设置为基于 oplog 的预期每小时增长Oplog GB/小时的值乘以解决 Debezium 故障可能需要的时间。
那是
Oplog GB/Hour X average reaction time to Debezium failure例如如果oplog大小限制设置为1GB并且oplog每小时增长3GB则oplog条目每小时会被清除3次。如果 Debezium 在这段时间内失败它的最后一个 oplog 位置可能会被删除。
如果 oplog 以 3GB/小时的速度增长并且 Debezium 离线两个小时那么您可以将 oplog 大小设置为 3GB/小时 X 2 小时即 6GB。
二十六、部署
下载mongodb数据库的debezium2.4版本插件
mongodb数据库的debezium2.4插件下载地址
部署加载mongodb数据库debezium2.4版本的详细步骤
Debezium系列之安装部署debezium详细步骤并把debezium服务托管到systemctl
二十七、MongoDB 连接器配置示例
以下是连接器实例的配置示例该实例从 192.168.99.100 端口 27017 上的 MongoDB 副本集 rs0 捕获数据我们在逻辑上将其命名为 fullfillment。通常您可以通过设置连接器可用的配置属性在 JSON 文件中配置 Debezium MongoDB 连接器。
您可以选择为特定 MongoDB 副本集或分片集群生成事件。或者您可以过滤掉不需要的集合。
{name: inventory-connector, 1config: {connector.class: io.debezium.connector.mongodb.MongoDbConnector, 2mongodb.connection.string: mongodb://192.168.99.100:27017/?replicaSetrs0, 3topic.prefix: fullfillment, 4collection.include.list: inventory[.]* 5}
}1.当我们向 Kafka Connect 服务注册连接器时的名称。2.MongoDB 连接器类的名称。3.用于连接到 MongoDB 副本集的连接字符串。4.MongoDB 副本集的逻辑名称它形成生成事件的命名空间并用于连接器写入的所有 Kafka 主题的名称、Kafka Connect 架构名称以及 Avro 启动时对应的 Avro 架构的命名空间使用转换器。5.与要监视的所有集合的集合命名空间例如.匹配的正则表达式列表。这是可选的。
您可以使用 POST 命令将此配置发送到正在运行的 Kafka Connect 服务。该服务记录配置并启动一个执行以下操作的连接器任务
连接到 MongoDB 副本集或分片集群。为每个副本集分配任务。如有必要执行快照。读取变更流。流将事件记录更改为 Kafka 主题。
完整案例
Debezium系列之打通Debezium对低版本MongoDB数据库4.0版本的数据采集技术Debezium系列之Debezium2.X版本Mysql数据库、Sqlserver数据库、MongoDB数据库debezium connector最新完整的参数配置并详细介绍参数含义
二十八、连接器属性
Debezium MongoDB 连接器具有许多配置属性您可以使用它们来为您的应用程序实现正确的连接器行为。许多属性都有默认值。有关属性的信息组织如下
必需的 Debezium MongoDB 连接器配置属性高级 Debezium MongoDB 连接器配置属性
除非有默认值否则需要以下配置属性。
表 13. 所需的 Debezium MongoDB 连接器配置属性
属性默认值描述name无默认值连接器的唯一名称。尝试使用相同名称再次注册将会失败。 所有 Kafka Connect 连接器都需要此属性。connector.class无默认值连接器的 Java 类的名称。始终对 MongoDB 连接器使用 io.debezium.connector.mongodb.MongoDbConnector 值。mongodb.connection.string无默认值指定连接器用于连接到 MongoDB 副本集的连接字符串。此属性替换了 MongoDB 连接器早期版本中提供的 mongodb.hosts 属性。仅当 mongodb.connection.mode 设置为replica_set 时从分片 MongoDB 集群捕获更改的连接器仅在初始分片发现过程中使用此连接字符串。在初始发现过程之后将为每个单独的分片生成连接字符串。mongodb.connection.string.shard.params无默认值指定连接器用于连接到 MongoDB 分片集群的各个分片的连接字符串的 URL 参数包括读取首选项。仅当 mongodb.connection.mode 设置为replica_set 时此属性才适用。mongodb.connection.modereplica_set指定连接器连接到分片 MongoDB 集群时使用的策略。将此属性设置为以下值之一副本集连接器为每个分片建立与副本集的单独连接。分片的连接器根据 mongodb.connection.string 的值建立与数据库的单个连接。副本集选项允许连接器跨多个连接器任务分配分片处理。但是在此配置中连接器在连接到各个分片时会绕过 MongoDB 路由器MongoDB 不建议这样做。连接模式之间的切换会使存储的偏移量失效从而触发新的快照。topic.prefix无默认值标识该连接器监控的连接器和/或 MongoDB 副本集或分片集群的唯一名称。每台服务器最多应由一个 Debezium 连接器监控因为该服务器名称为来自 MongoDB 副本集或集群的所有持久 Kafka 主题添加前缀。仅使用字母数字字符、连字符、点和下划线来构成名称。逻辑名称在所有其他连接器中应该是唯一的因为该名称用作命名从该连接器接收记录的 Kafka 主题的前缀。不要更改此属性的值。如果您更改名称值则在重新启动后连接器不会继续向原始主题发出事件而是向名称基于新值的主题发出后续事件。mongodb.authentication.classDefaultMongoDbAuthProvider完整的 Java 类名是 io.debezium.connector.mongodb.connection.MongoDbAuthProvider 接口的实现。此类处理 MongoDB 连接上的凭据设置在每次应用程序启动时调用。默认行为根据每个文档使用 mongodb.user、mongodb.password 和 mongodb.authsource 属性但其他实现可能会以不同方式使用它们或完全忽略它们。请注意mongodb.connection.string 中的任何设置都将覆盖此类设置的设置mongodb.user无默认值使用默认 mongodb.authentication.class 时连接到 MongoDB 时使用的数据库用户的名称。仅当 MongoDB 配置为使用身份验证时才需要这样做。mongodb.passwordNo default使用默认 mongodb.authentication.class 时连接到 MongoDB 时使用的密码。仅当 MongoDB 配置为使用身份验证时才需要这样做。mongodb.authsourceadmin使用默认 mongodb.authentication.class 时包含 MongoDB 凭据的数据库身份验证源。仅当 MongoDB 配置为使用 admin 之外的其他身份验证数据库进行身份验证时才需要这样做。mongodb.ssl.enabledfalse连接器将使用 SSL 连接到 MongoDB 实例。mongodb.ssl.invalid.hostname.allowedfalse启用 SSL 后此设置控制是否在连接阶段禁用严格的主机名检查。如果为 true连接将不会阻止中间人攻击。filters.match.moderegex用于根据包含/排除的数据库和集合名称来匹配事件的模式。将属性设置为以下值之一正则表达式数据库和集合包含/排除被评估为逗号分隔的正则表达式列表。文字数据库和集合包含/排除被评估为逗号分隔的字符串文字列表。这些文字周围的空白字符将被删除。database.include.listempty string与要监视的数据库名称匹配的可选逗号分隔正则表达式或文字列表。默认情况下所有数据库都会受到监控。设置database.include.list 后连接器仅监视该属性指定的数据库。其他数据库被排除在监控之外。为了匹配数据库的名称Debezium 根据filters.match.mode 属性的值执行以下操作之一应用您指定为锚定正则表达式的正则表达式。也就是说指定的表达式与数据库的整个名称字符串进行匹配它与数据库名称中可能存在的子字符串不匹配。将您指定的文字与数据库的整个名称字符串进行比较如果您在配置中包含此属性则不要同时设置database.exclude.list 属性。database.exclude.listempty string可选的以逗号分隔的正则表达式或文字列表与要从监视中排除的数据库名称相匹配。设置 database.exclude.list 后连接器将监视除该属性指定的数据库之外的每个数据库。为了匹配数据库的名称Debezium 根据filters.match.mode 属性的值执行以下操作之一应用您指定为锚定正则表达式的正则表达式。也就是说指定的表达式与数据库的整个名称字符串进行匹配它与数据库名称中可能存在的子字符串不匹配。将您指定的文字与数据库的整个名称字符串进行比较如果您在配置中包含此属性请不要设置database.include.list 属性。collection.include.listempty string可选的以逗号分隔的正则表达式或文字列表与要监视的 MongoDB 集合的完全限定命名空间相匹配。默认情况下连接器监视除本地和管理数据库中的集合之外的所有集合。设置 collection.include.list 后连接器仅监视该属性指定的集合。其他集合被排除在监控之外。集合标识符的格式为databaseName.collectionName。为了匹配命名空间的名称Debezium 根据filters.match.mode 属性的值执行以下操作之一应用您指定为锚定正则表达式的正则表达式。也就是说指定的表达式与命名空间的整个名称字符串进行匹配它与名称中的子字符串不匹配。将您指定的文字与命名空间的整个名称字符串进行比较如果您在配置中包含此属性则不要同时设置 collection.exclude.list 属性。collection.exclude.listempty string可选的以逗号分隔的正则表达式或文字列表与要从监视中排除的 MongoDB 集合的完全限定命名空间相匹配。设置 collection.exclude.list 后连接器将监视除该属性指定的集合之外的每个集合。集合标识符的格式为databaseName.collectionName。为了匹配命名空间的名称Debezium 根据filters.match.mode 属性的值执行以下操作之一应用您指定为锚定正则表达式的正则表达式。也就是说指定的表达式与命名空间的整个名称字符串进行匹配它与数据库名称中可能存在的子字符串不匹配。将您指定的文字与命名空间的整个名称字符串进行比较如果您在配置中包含此属性请不要设置 collection.include.list 属性。capture.modechange_streams_update_full指定连接器用于从 MongoDB 服务器捕获更新事件更改的方法。将此属性设置为以下值之一change_streams更新事件消息不包含完整文档。消息不包含表示更改之前文档状态的字段。change_streams_update_full更新事件消息包括完整的文档。消息不包含表示更新之前文档状态的 before 字段。事件消息在 after 字段中返回文档的完整状态。在某些情况下当 capture.mode 配置为返回完整文档时更新事件消息的 updateDescription 和 after 字段可能会报告不一致的值。对文档快速连续应用多个更新后可能会导致此类差异。连接器仅在收到事件 updateDescription 字段中描述的更新后才从 MongoDB 数据库请求完整文档。如果稍后的更新在连接器从数据库中检索源文档之前修改了源文档则连接器会收到由该后续更新修改的文档。change_streams_update_full_with_pre_image更新事件事件消息包括完整文档并包括表示更改之前文档状态的字段。change_streams_with_pre_image更新事件不包括完整文档但包括表示更改之前文档状态的字段。capture.scopedeployment指定连接器打开的变更流的范围。将此属性设置为以下值之一deployment打开部署副本集或分片集群的更改流游标以监视所有数据库管理、本地和配置除外中所有非系统集合的更改。database打开单个数据库的更改流游标以监视其所有非系统集合的更改。要支持 Debezium 信令如果将 capture.scope 设置为数据库则信令数据集合必须驻留在 capture.target 属性指定的数据库中。capture.target指定连接器监视更改的数据库。仅当 capture.scope 设置为数据库时此属性才适用。field.exclude.listempty string应从更改事件消息值中排除的字段的完全限定名称的可选逗号分隔列表。字段的完全限定名称的格式为databaseName.collectionName.fieldName.nestedFieldName其中databaseName 和collectionName 可以包含与任何字符匹配的通配符(*)。field.renamesempty string字段的完全限定替换的可选逗号分隔列表应用于重命名更改事件消息值中的字段。字段的完全限定替换的形式为databaseName.collectionName.fieldName.nestedFieldName:newNestedFieldName其中databaseName和collectionName可以包含匹配任何字符的通配符(*)冒号字符(:)用于确定字段的重命名映射。下一个字段替换将应用于列表中上一个字段替换的结果因此在重命名同一路径中的多个字段时请记住这一点。tasks.max1指定连接器用于连接到分片集群的最大任务数。当您将连接器与单个 MongoDB 副本集一起使用时默认值是可以接受的。但是当集群包含多个分片时为了使 Kafka Connect 能够为每个副本集分配工作请指定一个等于或大于集群中分片数量的值。然后MongoDB 连接器可以使用单独的任务连接到集群中每个分片的副本集。仅当连接器连接到分片 MongoDB 集群并且 mongodb.connection.mode 属性设置为replica_set 时此属性才有效。当 mongodb.connection.mode 设置为 sharded 时或者连接器连接到未分片的 MongoDB 副本集部署时连接器会忽略此设置并默认仅使用单个任务。tombstones.on.deletetrue控制删除事件后是否发生逻辑删除事件。true - 删除操作由删除事件和后续逻辑删除事件表示。false - 仅发出删除事件。删除源记录后如果为主题启用了日志压缩则发出逻辑删除事件默认行为允许 Kafka 完全删除与已删除行的键相关的所有事件。schema.name.adjustment.modenone指定应如何调整架构名称以与连接器使用的消息转换器兼容。可能的设置none 不应用任何调整。avro 将 Avro 类型名称中不能使用的字符替换为下划线。avro_unicode 将 Avro 类型名称中不能使用的下划线或字符替换为相应的 unicode如 _uxxxx。注意_是转义序列类似于Java中的反斜杠field.name.adjustment.modenone指定应如何调整字段名称以与连接器使用的消息转换器兼容。可能的设置none 不应用任何调整。avro 将 Avro 类型名称中不能使用的字符替换为下划线。avro_unicode 将 Avro 类型名称中不能使用的下划线或字符替换为相应的 unicode如 _uxxxx。注意_是转义序列类似于Java中的反斜杠
以下高级配置属性具有良好的默认值适用于大多数情况因此很少需要在连接器的配置中指定。
表 14. Debezium MongoDB 连接器高级配置属性
属性默认值描述max.batch.size2048正整数值指定在此连接器的每次迭代期间应处理的每批事件的最大大小。默认为 2048。max.queue.size8192正整数值指定阻塞队列可以保存的最大记录数。当 Debezium 读取从数据库流式传输的事件时它会将事件放入阻塞队列中然后再将其写入 Kafka。如果连接器摄取消息的速度快于将消息写入 Kafka 的速度或者当 Kafka 不可用时阻塞队列可以为从数据库读取更改事件提供反压。当连接器定期记录偏移量时队列中保存的事件将被忽略。始终将 max.queue.size 的值设置为大于 max.batch.size 的值。max.queue.size.in.bytes0一个长整数值指定阻塞队列的最大容量以字节为单位。默认情况下没有为阻塞队列指定卷限制。要指定队列可以使用的字节数请将此属性设置为正 long 值。如果还设置了 max.queue.size则当队列大小达到任一属性指定的限制时写入队列将被阻止。例如如果设置 max.queue.size1000并且 max.queue.size.in.bytes5000则在队列包含 1000 条记录后或在队列中的记录量达到后写入队列将被阻止达到5000字节。poll.interval.ms1000正整数值指定连接器在每次迭代期间应等待新更改事件出现的毫秒数。默认为 500 毫秒或 0.5 秒。connect.backoff.initial.delay.ms1000正整数值指定在第一次连接尝试失败后或没有可用的主节点时尝试重新连接到主节点时的初始延迟。默认为 1 秒1000 毫秒。connect.backoff.max.delay.ms1000正整数值指定在重复尝试连接失败后或没有可用的主节点时尝试重新连接到主节点时的最大延迟。默认为 120 秒120,000 毫秒。connect.max.attempts16正整数值指定在发生异常和任务中止之前尝试连接到主副本集失败的最大次数。默认为 16使用 connect.backoff.initial.delay.ms 和 connect.backoff.max.delay.ms 的默认值会导致尝试 20 多分钟后失败。source.struct.versionv2CDC 事件中源块的架构版本。 Debezium 0.10 引入了一些突破更改源块的结构以便统一所有连接器上的公开结构。通过将此选项设置为 v1可以生成早期版本中使用的结构。请注意不建议使用此设置并计划在未来的 Debezium 版本中删除此设置。heartbeat.interval.ms0控制发送心跳消息的频率。此属性包含一个时间间隔以毫秒为单位用于定义连接器将消息发送到检测信号主题的频率。这可用于监视连接器是否仍在接收来自数据库的更改事件。如果仅非捕获集合中的记录在较长时间内发生更改您还应该利用心跳消息。在这种情况下连接器将继续从数据库读取 oplog/更改流但不会将任何更改消息发送到 Kafka这又意味着不会向 Kafka 提交任何偏移更新。这将导致 oplog 文件被轮换但连接器不会注意到它因此重新启动时某些事件不再可用这导致需要重新执行初始快照。将此参数设置为 0 则根本不发送心跳消息。默认禁用。skipped.operationst在流式传输过程中将跳过的以逗号分隔的操作类型列表。这些操作包括c 表示插入/创建u 表示更新/替换d 表示删除t 表示截断none 表示不跳过任何上述操作。默认情况下为了与其他 Debezium 连接器保持一致会跳过截断操作此连接器不发出。但是由于 MongoDB 不支持截断更改事件因此这实际上与指定 none 相同。snapshot.collection.filter.overridesNo default控制快照中包含哪些集合项。此属性仅影响快照。以databaseName.collectionName 的形式指定以逗号分隔的集合名称列表。对于您指定的每个集合还指定另一个配置属性snapshot.collection.filter.overrides.databaseName.collectionName。例如其他配置属性的名称可能是snapshot.collection.filter.overrides.customers.orders。将此属性设置为有效的筛选表达式该表达式仅检索快照中所需的项目。当连接器执行快照时它仅检索与过滤器表达式匹配的项目。snapshot.delay.msNo default连接器在启动后拍摄快照之前应等待的时间间隔以毫秒为单位可用于避免在集群中启动多个连接器时发生快照中断这可能会导致连接器重新平衡。snapshot.fetch.size0指定在拍摄快照时应从每个集合一次性读取的最大文档数。连接器将按此大小分批读取集合内容。默认为 0表示服务器选择合适的获取大小。snapshot.include.collection.listcollection.include.list 中指定的所有集合可选的、以逗号分隔的正则表达式列表与要包含在快照中的架构的完全限定名称 (.) 相匹配。指定的项目必须在连接器的 collection.include.list 属性中命名。仅当连接器的 snapshot.mode 属性设置为 never 以外的值时此属性才会生效。此属性不会影响增量快照的行为。为了匹配模式的名称Debezium 应用您指定为锚定正则表达式的正则表达式。也就是说指定的表达式与模式的整个名称字符串进行匹配它与模式名称中可能存在的子字符串不匹配。snapshot.max.threads1正整数值指定用于执行副本集中集合的初始同步的最大线程数。默认为 1。snapshot.modeinitial指定连接器启动时执行快照的条件。将属性设置为以下值之一initial当连接器启动时如果它没有检测到其 offsets 主题中的值它将执行数据库快照。never当连接器启动时它会跳过快照过程并立即开始将数据库记录到 oplog 的操作的更改事件流式传输。provide.transaction.metadatafalse当设置为 true 时Debezium 会生成具有事务边界的事件并使用事务元数据丰富数据事件包络。retriable.restart.connector.wait.ms10000 (10 seconds)发生可重试错误后重新启动连接器之前要等待的毫秒数。mongodb.poll.interval.ms30000连接器轮询新的、已删除的或更改的副本集的时间间隔。mongodb.connect.timeout.ms10000 (10 seconds)中止新连接尝试之前驱动程序将等待的毫秒数。mongodb.heartbeat.frequency.ms10000 (10 seconds)集群监视器尝试访问每台服务器的频率。mongodb.socket.timeout.ms0发生超时之前套接字上发送/接收所需的毫秒数。值为 0 会禁用此行为。mongodb.server.selection.timeout.ms30000 (30 seconds)驱动程序在超时并抛出错误之前等待选择服务器的毫秒数。cursor.pipeline无默认值当流变化时此设置将处理更改流事件作为标准 MongoDB 聚合流管道的一部分。管道是 MongoDB 聚合管道由对数据库进行过滤或转换数据的指令组成。这可用于自定义连接器消耗的数据。此属性的值必须是 JSON 格式的允许聚合管道阶段的数组。请注意它附加在用于支持连接器的内部管道之后例如过滤操作类型、数据库名称、集合名称等。cursor.pipeline.orderinternal_first用于构造有效的 MongoDB 聚合流管道的顺序。将属性设置为以下值之一internal_first首先应用由连接器定义的内部阶段。这意味着只有应该由连接器捕获的事件才会被馈送到用户定义的阶段通过设置cursor.pipeline进行配置。user_first首先应用由“cursor.pipeline”属性定义的阶段。在此模式下所有事件包括连接器未捕获的事件都将馈送到用户定义的管道阶段。如果cursor.pipeline 的值包含复杂的操作此模式可能会对性能产生负面影响。user_only由“cursor.pipeline”属性定义的阶段将替换由连接器定义的内部阶段。此模式仅适用于专家用户因为所有事件仅由用户定义的管道阶段处理。此模式会对连接器的性能和整体功能产生负面影响cursor.oversize.handling.modefail用于处理超过指定 BSON 大小的文档的更改事件的策略。将属性设置为以下值之一fail如果更改事件的总大小超过最大 BSON 大小连接器将失败。skip超过最大大小由cursor.oversize.skip.threshold 属性指定的文档的任何更改事件都将被忽略cursor.oversize.skip.threshold0处理更改事件的存储文档的最大允许大小以字节为单位。这包括数据库操作之前和之后的大小更具体地说这限制了 MongoDB 更改事件的 fullDocument 和 fullDocumentBeforeChange 字段的大小。cursor.max.await.time.ms0指定 oplog/更改流游标在导致执行超时异常之前等待服务器生成结果的最大毫秒数。值 0 表示使用服务器/驱动程序默认等待超时。signal.data.collection无默认值用于向连接器发送信号的数据集合的完全限定名称。使用以下格式指定集合名称数据库名称.集合名称signal.enabled.channelssource为连接器启用的信令通道名称列表。默认情况下以下通道可用source、kafka、file、jmx 您还可以选择实现自定义信号通道。notification.enabled.channels无默认值为连接器启用的通知通道名称列表。默认情况下以下通道可用sink、log、jmx 您还可以选择实现自定义通知通道。incremental.snapshot.chunk.size1024连接器在增量快照块期间获取并读入内存的最大文档数。增加块大小可以提高效率因为快照运行的快照查询数量越大查询的数量就越少。然而较大的块大小也需要更多的内存来缓冲快照数据。将块大小调整为可在您的环境中提供最佳性能的值。topic.naming.strategyio.debezium.schema.DefaultTopicNamingStrategyTopicNamingStrategy 类的名称应用于确定数据更改、架构更改、事务、心跳事件等的主题名称默认为DefaultTopicNamingStrategy。topic.delimiter.指定主题名称的分隔符默认为…topic.cache.size10000用于在有界并发哈希图中保存主题名称的大小。该缓存将有助于确定与给定数据集合对应的主题名称。topic.heartbeat.prefix__debezium-heartbeat控制连接器向其发送心跳消息的主题的名称。主题名称具有以下模式topic.heartbeat.prefix.topic.prefix例如如果主题前缀为fulfillment则默认主题名称为__debezium-heartbeat.fulfillment。topic.transactiontransaction控制连接器向其发送事务元数据消息的主题的名称。主题名称具有以下模式topic.prefix.topic.transaction例如如果主题前缀是fulfillment则默认主题名称是fulfillment.transaction。custom.metric.tagsNo default自定义指标标签将接受键值对来自定义 MBean 对象名称该名称应附加在常规名称的末尾每个键代表 MBean 对象名称的一个标签相应的值将是该标签的值关键群岛。例如k1v1k2v2。errors.max.retries-1失败前可重试错误例如连接错误的最大重试次数-1 无限制0 禁用 0 重试次数。
二十九、Debezium 连接器 Kafka 信号配置属性
Debezium 提供了一组 signal.* 属性用于控制连接器如何与 Kafka 信号主题交互。
下表描述了 Kafka 信号属性。
表 15. Kafka 信号配置属性
属性默认值描述signal.kafka.topictopic.prefix-signal连接器监视临时信号的 Kafka 主题的名称。如果禁用自动创建主题则必须手动创建所需的信令主题。需要一个信令主题来保留信号顺序。信令主题必须具有单个分区。signal.kafka.groupIdkafka-signalKafka 消费者使用的组 ID 的名称。signal.kafka.bootstrap.serversNo default连接器用于建立与 Kafka 集群的初始连接的主机/端口对列表。每对都引用 Debezium Kafka Connect 进程使用的 Kafka 集群。signal.kafka.poll.timeout.ms100一个整数值指定连接器在轮询信号时等待的最大毫秒数。
Debezium 连接器传递信号 Kafka 消费者客户端配置属性
Debezium 连接器提供信号 Kafka 消费者的直通配置。直通信号属性以前缀 Signals.consumer.* 开头。例如连接器将 signal.consumer.security.protocolSSL 等属性传递给 Kafka 使用者。
Debezium 在将属性传递给 Kafka 信号使用者之前会去除属性中的前缀。
Debezium 连接器接收器通知配置属性
下表描述了通知属性。
表 16. 接收器通知配置属性
属性默认值描述notification.sink.topic.nameNo default从 Debezium 接收通知的主题的名称。当您配置 notification.enabled.channels 属性以将接收器包含为启用的通知通道之一时需要此属性。
三十、监控
Debezium系列之安装jmx导出器监控debezium指标Debezium系列之深入解读Debezium重要的jmx指标Debezium系列之prometheus采集debezium的jmx数据grafana通过dashboard展示debezium的jmx数据