Expose xrepl_origin_id on DML change events#400
Expose xrepl_origin_id on DML change events#400kgalieva wants to merge 5 commits intoyugabyte:mainfrom
Conversation
Surface the replication origin ID from YugabyteDB CDC RowMessage proto onto every DML (INSERT/UPDATE/DELETE) Debezium change event. The field appears as `xrepl_origin_id` in the `source` block of the Kafka message envelope, only when non-zero.
| public static final String TABLE_ID = "table_id"; | ||
| public static final String TABLET_ID = "tablet_id"; | ||
| public static final String PARTITION_ID_KEY = "partition_id"; | ||
| public static final String XREPL_ORIGIN_ID = "xrepl_origin_id"; |
There was a problem hiding this comment.
Change this to below to match the upstream Debezium behaviour:
public static final String XREPL_ORIGIN_ID = "origin";
https://github.com/debezium/debezium/pull/7009/files
| private String tabletId; | ||
| private Long commitTime; | ||
| private Long recordTime; | ||
| private int xreplOriginId; |
There was a problem hiding this comment.
Wondering if we should keep it as String instead of int so that we are similar to Upstream Debezium. Thoughts?
There was a problem hiding this comment.
Good question. In upstream Postgres Debezium, origin is a string because Postgres's CDC stream provides the replication origin name. In YugabyteDB, the CDC proto only surfaces xrepl_origin_id as a uint32 — the origin name lives in the PG catalog (pg_replication_origin) and isn't accessible from the CDC/DocDB layer.
We could stringify the int to match the upstream schema type, but then consumers would get "42" instead of a meaningful name like "my_origin" - which feels misleading. I'd prefer to keep it as int since that's what YB actually provides, and rename the field to "origin" for partial alignment with upstream.
There was a problem hiding this comment.
Hmm true, then I believe we should not use the below as it may cause confusion if a cluster is consuming data from both Logical and gRPC
public static final String XREPL_ORIGIN_ID = "origin";
Maybe we can change it to below?
public static final String XREPL_ORIGIN_ID = "origin_id";
|
Minor comments but overall looks good |
- Rename source info field from "xrepl_origin_id" to "origin" to align with upstream Debezium naming convention - Add xreplOriginId to SourceInfo.toString() when non-zero for debugging - Add YugabyteDBReplicationOriginTest with integration tests covering origin propagation on DML events, omission when zero, and multiple origins in the same session
|
@kgalieva, which YB-client version are you using? Compiling your PR changes throws me this error |
|
@shishir2001-yb I updated the client version, ready to run the tests |
|
@kgalieva, just one comment needs to be addressed #400 (comment) |
| sb.append(", table=").append(tableName); | ||
| } | ||
| if (xreplOriginId != 0) { | ||
| sb.append(", origin=").append(xreplOriginId); |
There was a problem hiding this comment.
Change this to origin_id
Expose
xrepl_origin_idon CDC change eventsSummary
xrepl_origin_idfield from YugabyteDB CDCRowMessageproto onto every DML (INSERT/UPDATE/DELETE) and COMMIT Debezium change eventxrepl_origin_id(optional int32) in thesourceblock of the Kafka message envelope, only when non-zeroxrepl_origin_idto every DML CDC event on the server sideMotivation
Previously,
xrepl_origin_idwas only set on COMMIT records in the CDC stream and the Debezium connector did not surface it at all. The upstream YugabyteDB change now attaches it to every DML record as well. This connector change threads that value through to the Debezium output on both DML and COMMIT events so consumers can use the replication origin to filter or route events (e.g. for loop-prevention in bidirectional replication).Changes
pom.xmlxrepl_origin_idin proto)ReplicationMessage.javagetXreplOriginId()method returning 0YbProtoReplicationMessage.javarawMessage.getXreplOriginId()SourceInfo.javaxreplOriginIdfield, constant, accessor, and parameter toupdate()YugabyteDBOffsetContext.javaupdateRecordPosition()acceptingxreplOriginIdYugabyteDBStreamingChangeEventSource.javamessage.getXreplOriginId()YugabyteDBConsistentStreamingSource.javamessage.getXreplOriginId()YugabyteDBSourceInfoStructMaker.javaxrepl_origin_id(optional int32) to schema and structSourceInfoTest.javaupdate()signature and schema