Copyright Debezium Authors. Licensed under the Apache License, Version 2.0.
A Debezium connector for capturing changes from CockroachDB databases.
The Debezium CockroachDB connector processes row-level changes from CockroachDB databases that have been captured and streamed to Apache Kafka topics by CockroachDB's native changefeed mechanism.
The connector uses a two-stage Kafka architecture:
-
CockroachDB changefeed -> Intermediate Kafka: The connector creates a single CockroachDB changefeed covering all configured tables (
CREATE CHANGEFEED FOR table1, table2, ...) with theenrichedenvelope format. CockroachDB automatically routes events to per-table Kafka topics in the intermediate cluster. The enriched format includes both schema metadata and the full before/after row state. -
Intermediate Kafka -> Debezium -> Output Kafka: The connector subscribes to all per-table Kafka topics in a single KafkaConsumer, routes each event to the correct table based on the topic name, transforms the enriched changefeed events into the standard Debezium envelope format (with
before,after,source, andopfields), and produces them to the final output Kafka topics.
Using a single multi-table changefeed is the recommended approach to stay within CockroachDB's limit of approximately 80 changefeed jobs per cluster.
Status: This connector is currently in incubation phase and is being developed and tested.
- CockroachDB v25.2+ with rangefeed enabled (enriched envelope support introduced in v25.2)
- CockroachDB v24.2+ for pgvector-compatible VECTOR type support
- Kafka Connect
- JDK 21+
- Maven 3.9.8 or later
The database user must have one of the following to create changefeeds:
CHANGEFEEDprivilege on monitored tables, orALLprivilege on monitored tables, or- Membership in the
adminrole
Additionally, the kv.rangefeed.enabled cluster setting must be true (the connector checks this at startup).
Grant VIEWCLUSTERSETTING if the user is not admin so the connector can verify the setting:
GRANT CHANGEFEED ON TABLE mydb.public.* TO myuser;
GRANT VIEWCLUSTERSETTING TO myuser;
SET CLUSTER SETTING kv.rangefeed.enabled = true;./mvnw clean package -PassemblyExample connector configuration:
{
"name": "cockroachdb-connector",
"config": {
"connector.class": "io.debezium.connector.cockroachdb.CockroachDBConnector",
"database.hostname": "cockroachdb",
"database.port": "26257",
"database.user": "testuser",
"database.password": "",
"database.dbname": "testdb",
"database.server.name": "cockroachdb",
"topic.prefix": "cockroachdb",
"table.include.list": "public.orders,public.customers",
"cockroachdb.changefeed.envelope": "enriched",
"cockroachdb.changefeed.enriched.properties": "source,schema",
"cockroachdb.changefeed.sink.type": "kafka",
"cockroachdb.changefeed.sink.uri": "kafka://kafka-test:9092",
"cockroachdb.changefeed.sink.options": "",
"cockroachdb.changefeed.resolved.interval": "10s",
"cockroachdb.changefeed.include.updated": true,
"cockroachdb.changefeed.include.diff": true,
"snapshot.mode": "initial",
"cockroachdb.changefeed.cursor": "now",
"cockroachdb.changefeed.batch.size": 1000,
"cockroachdb.changefeed.poll.interval.ms": 100,
"connection.timeout.ms": 30000,
"connection.retry.delay.ms": 1000,
"connection.max.retries": 3
}
}| Option | Default | Description |
|---|---|---|
database.hostname |
- | CockroachDB host |
database.port |
26257 | CockroachDB port |
database.user |
- | Database user |
database.password |
- | Database password |
database.dbname |
- | Database name |
database.server.name |
- | Unique server name for topic prefix |
| Option | Default | Description |
|---|---|---|
table.include.list |
- | Comma-separated list of tables to monitor |
| Option | Default | Description |
|---|---|---|
cockroachdb.changefeed.envelope |
enriched | Envelope type: enriched, wrapped, bare |
cockroachdb.changefeed.enriched.properties |
source | Comma-separated enriched properties |
cockroachdb.changefeed.sink.type |
kafka | Sink type (kafka, webhook, pubsub, etc.) |
cockroachdb.changefeed.sink.uri |
- | Sink URI (required). e.g. kafka://host:port |
cockroachdb.changefeed.sink.topic.prefix |
"" | Prefix for intermediate changefeed Kafka topic names. If empty, defaults to topic.prefix |
cockroachdb.changefeed.sink.options |
"" | Additional sink options in key=value format |
cockroachdb.changefeed.resolved.interval |
10s | Resolved timestamp interval |
cockroachdb.changefeed.include.updated |
false | Include updated column information |
cockroachdb.changefeed.include.diff |
false | Include before/after diff information |
cockroachdb.changefeed.cursor |
now | Start cursor position |
cockroachdb.changefeed.batch.size |
1000 | Batch size for changefeed processing |
cockroachdb.changefeed.poll.interval.ms |
100 | Poll interval in milliseconds |
The connector uses CockroachDB's native changefeed initial_scan option to backfill existing rows instead of a separate JDBC-based snapshot phase. During the initial scan, events are marked with op=r (read). Once the scan completes and streaming begins, events use the standard op=c/u/d operation types.
| Option | Default | Description |
|---|---|---|
snapshot.mode |
initial |
Controls whether existing rows are backfilled on startup. See the mapping table below for all supported modes. |
Snapshot mode to CockroachDB initial_scan mapping:
| Snapshot Mode | initial_scan | Behavior |
|---|---|---|
initial (default) |
yes / no |
On first start (no prior offset), backfills all rows. On restart, resumes from stored cursor. |
always |
yes |
Always backfills all existing rows, even on restart. |
initial_only |
only |
Backfills all existing rows, then stops the connector. Useful for one-time data migration. |
no_data / never |
no |
Skips the initial scan. Only ongoing changes are captured. |
when_needed |
yes / no |
Like initial, but also re-snapshots if the stored offset is no longer valid (e.g. GC TTL expired). |
The connector supports Debezium's signal-based incremental snapshots. This allows you to re-snapshot existing table data on demand -- without stopping the connector or missing any in-flight changes.
Setup:
- Create a signaling table in CockroachDB:
CREATE TABLE debezium_signal (
id STRING PRIMARY KEY,
type STRING NOT NULL,
data STRING
);- Configure the connector to monitor the signaling table:
{
"signal.data.collection": "mydb.public.debezium_signal",
"table.include.list": "public.my_table,public.debezium_signal"
}The signaling table must be included in table.include.list so the changefeed delivers signal events to the connector.
Triggering a snapshot:
Insert a row into the signaling table to trigger an incremental snapshot of one or more tables:
INSERT INTO debezium_signal (id, type, data) VALUES
('snap-1', 'execute-snapshot',
'{"data-collections": ["mydb.public.my_table"]}');The connector will re-read all rows from the specified table(s) and emit them as op=r (read) events, while continuing to capture any concurrent DML changes without interruption.
Use cases:
- Re-populate a downstream consumer that lost data
- Backfill a newly added sink or topic
- Verify source-target consistency by re-snapshotting and comparing
| Option | Default | Description |
|---|---|---|
cockroachdb.changefeed.kafka.bootstrap.servers |
- | Consumer bootstrap servers. When not set, derived from sink.uri. Use when CockroachDB connects to Kafka via internal DNS but the connector JVM requires an external address. |
cockroachdb.changefeed.kafka.consumer.group.prefix |
cockroachdb-connector | Kafka consumer group ID |
cockroachdb.changefeed.kafka.poll.timeout.ms |
100 | Kafka consumer poll timeout in milliseconds |
cockroachdb.changefeed.kafka.auto.offset.reset |
earliest | Kafka consumer auto offset reset policy |
| Option | Default | Description |
|---|---|---|
connection.timeout.ms |
30000 | Connection timeout in milliseconds |
connection.retry.delay.ms |
1000 | Delay between connection retries in ms |
connection.max.retries |
3 | Maximum number of connection retry attempts |
connection.validation.timeout.seconds |
5 | Timeout for validating JDBC connections |
The connector maps CockroachDB column types to Kafka Connect schema types:
| CockroachDB Type | Kafka Connect Type | Notes |
|---|---|---|
BOOL |
BOOLEAN |
|
INT2, SMALLINT |
INT16 |
|
INT4, INT, INTEGER |
INT32 |
|
INT8, BIGINT, SERIAL |
INT64 |
|
FLOAT4, REAL |
FLOAT32 |
|
FLOAT8, DOUBLE PRECISION |
FLOAT64 |
|
NUMERIC, DECIMAL |
STRING |
Preserves precision |
VARCHAR, TEXT, STRING |
STRING |
|
BYTEA, BYTES |
BYTES |
|
DATE |
io.debezium.time.Date |
Days since epoch |
TIMESTAMP, TIMESTAMPTZ |
io.debezium.time.MicroTimestamp |
Microseconds since epoch |
JSON, JSONB |
io.debezium.data.Json |
|
UUID |
io.debezium.data.Uuid |
|
VECTOR |
io.debezium.data.DoubleVector |
pgvector-compatible (CockroachDB 24.2+) |
GEOGRAPHY, GEOMETRY |
STRING |
Spatial types |
INET |
STRING |
|
INTERVAL |
STRING |
|
ENUM |
STRING |
|
Array types (INT[], TEXT[], etc.) |
STRING |
JSON array representation |
Events are produced in Debezium's enriched envelope format. For details on the changefeed message format, see the CockroachDB changefeed messages documentation.
{
"before": null,
"after": {
"id": "...",
"name": "...",
"...": "..."
},
"source": {
"changefeed_sink": "kafka",
"cluster_id": "...",
"database_name": "testdb",
"table_name": "products",
"...": "..."
},
"op": "c",
"ts_ns": 1751407136710963868
}For CockroachDB Cloud (Serverless or Dedicated), use verify-full SSL mode:
{
"database.hostname": "my-cluster-1234.abc.cockroachlabs.cloud",
"database.port": "26257",
"database.sslmode": "verify-full",
"database.user": "myuser",
"database.password": "mypassword",
"database.dbname": "defaultdb"
}CockroachDB Cloud clusters use publicly trusted certificates, so no sslrootcert is needed.
CockroachDB changefeed resolved timestamps serve as natural heartbeats. When the connector receives a resolved timestamp it updates the stored offset cursor and dispatches a Debezium heartbeat event, ensuring offsets advance even during idle periods with no data changes.
To emit heartbeat records to the __debezium-heartbeat.<topic.prefix> Kafka topic, set:
{
"heartbeat.interval.ms": "10000"
}The cockroachdb.changefeed.resolved.interval property (default 10s) controls how frequently CockroachDB emits resolved timestamps.
The connector automatically detects DDL changes (ALTER TABLE ADD COLUMN, DROP COLUMN, RENAME COLUMN) without requiring a restart. When an incoming changefeed event contains fields that don't match the registered table schema, the connector:
- Detects the mismatch by comparing event field names against registered column names
- Re-queries
information_schemato get the updated table definition - Refreshes the internal schema and continues processing with the new schema
CockroachDB changefeeds handle schema changes natively by performing a backfill (re-emitting all rows with the new schema), so no events are lost during the transition.
Run all unit tests:
./mvnw clean testRun integration tests (requires Docker for Testcontainers):
./mvnw clean test -Dtest="*IT"Run against a specific CockroachDB version (default is v25.4.6):
./mvnw clean test -Dtest="*IT" -Dcockroachdb.version=v25.2.3Run CockroachDB Cloud connectivity tests (requires a Cloud instance):
CRDB_CLOUD_URL="postgresql://user:pass@host:26257/defaultdb?sslmode=verify-full" \
./mvnw test -Dtest=CockroachDBCloudConnectionITThe Cloud IT is guarded by @EnabledIfEnvironmentVariable and will be skipped in CI when the env var is absent.
For docker-compose based testing with a specific version:
COCKROACHDB_VERSION=v25.2.3 docker-compose -f src/test/scripts/docker-compose.yml up-
Single changefeed job: The connector creates a single multi-table changefeed (
CREATE CHANGEFEED FOR table1, table2, ...) and consumes all per-table Kafka topics concurrently in a single KafkaConsumer. This is the recommended approach to stay within CockroachDB's ~80 changefeed job limit per cluster. -
Kafka-only sink: Only Kafka sinks are supported. Webhook, Pub/Sub, and cloud storage sinks are planned.
- Permission Errors: Ensure CHANGEFEED and SELECT privileges are granted on all monitored tables.
- Rangefeed Disabled: Enable with
SET CLUSTER SETTING kv.rangefeed.enabled = true; - No Events: Check connector logs and changefeed job status.
- Configuration Issues: Verify all required changefeed parameters are properly configured.