Skip to content

Commit 7fbb069

Browse files
feat: Add Debezium-style Snapshot for Initial Data Capture (#37)
* feat: add snapshot feature like debezium for initial data * chore: lint * feat: multiple instance and table snapshot (#38) * feat: distirbuted snapshot support (wip) * refactor: file paths for snapshot * feat: snapshot lsn cdc integration * feat: add retry to the all snapshot db operation * feat: add example test plan etc * chore: wip * chore: wip * feat: first test successful * feat: log update * feat: some testing updates * feat: fix snapshot conn close issue and advisory lock release issue when testing multiple instance * feat: worker transaction wrap and retry * feat: snapshot divide (prepare, execute), lsn slot cdc avoid data loss * feat: refactor ^^ * feat: refactor worker side, fix basic bug * feat: waitForCoordinator refactor, remove workerConn * feat: review notes * feat: add 2 integration test and documentation, fix estimate count and increase coordinator wait timeout * feat: add 4 integration test and increase wal sender, max slot * feat: refactor snapshot tests * feat: linter error * feat: refactor isTransient error method * chore: fix import * chore: fix import * refactor: time format * refactor: disable timeout when opening snapshot conn for exporting * refactor: add keep alive select 1 for snapshot conn * feat: change create table if not exist logic for permission problems * chore: no lint fun len * feat: publication table exist or not support in snapshot * feat: extract replication and normal connections because of preventing max wal sender limit * feat: graceful shutdown snapshot conn * feat: coordinator failure worker suspend fix * chore: remove unused vars * chore: fix lint * chore: fix lint * chore: funlen change * feat: add snapshot only feature * chore: docs ref * feat: introduce snapshot tables field * chore: fix lint * chore: benchmark files of snapshot mode * chore: update benchmark build * feat: add initial benchmark test --------- Co-authored-by: Serhat Karabulut <serhat.karabulut@trendyolgo.com> * chore: benchmark initial updated, grafana etc. * refactor: performance improvements * chore: log * chore: benchmark initial multistage * chore: mem upgrade for debezium * add pk cache and limit offset integration etc * chore: fix lint * chore: benchmark added * chore: docs * chore: docs --------- Co-authored-by: Serhat Karabulut <serhat.karabulut@trendyolgo.com>
1 parent 418a96f commit 7fbb069

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

84 files changed

+16697
-54
lines changed

.golangci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
linters-settings:
22
funlen:
3-
lines: 70
3+
lines: 85
44

55
linters:
66
disable-all: true

README.md

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,35 @@ ensuring low resource consumption and high performance.
77

88
[Debezium vs go-pq-cdc benchmark](./benchmark)
99

10+
## 📸 NEW: Snapshot Feature
11+
12+
**Capture existing data before starting CDC!** The new snapshot feature enables initial data synchronization, ensuring downstream systems receive both historical and real-time data.
13+
14+
**Key Highlights:**
15+
- **Zero Data Loss**: Consistent point-in-time snapshot using PostgreSQL's `pg_export_snapshot()`
16+
- **Chunk-Based Processing**: Memory-efficient processing of large tables
17+
- **Multi-Instance Support**: Parallel processing across multiple instances
18+
- **Crash Recovery**: Automatic resume from failures
19+
- **No Duplicates**: Seamless transition from snapshot to CDC
20+
- **Snapshot Only Mode**: One-time data export without CDC (no replication slot required)
21+
22+
📚 **[Read Full Documentation](docs/SNAPSHOT_FEATURE.md)** for detailed architecture, configuration, and best practices.
23+
1024
### Contents
1125

12-
* [Why?](#why)
13-
* [Usage](#usage)
14-
* [Examples](#examples)
15-
* [Availability](#availability)
16-
* [Configuration](#configuration)
17-
* [API](#api)
18-
* [Exposed Metrics](#exposed-metrics)
19-
* [Compatibility](#compatibility)
20-
* [Breaking Changes](#breaking-changes)
26+
- [go-pq-cdc ](#go-pq-cdc---)
27+
- [📸 NEW: Snapshot Feature](#-new-snapshot-feature)
28+
- [Contents](#contents)
29+
- [Why?](#why)
30+
- [Usage](#usage)
31+
- [Examples](#examples)
32+
- [Availability](#availability)
33+
- [Configuration](#configuration)
34+
- [API](#api)
35+
- [Exposed Metrics](#exposed-metrics)
36+
- [Grafana Dashboard](#grafana-dashboard)
37+
- [Compatibility](#compatibility)
38+
- [Breaking Changes](#breaking-changes)
2139

2240
### Why?
2341

@@ -114,6 +132,8 @@ func Handler(ctx *replication.ListenerContext) {
114132

115133
* [Simple](./example/simple)
116134
* [Simple File Config](./example/simple-file-config)
135+
* [Snapshot Mode (Initial Data Capture)](./example/snapshotmode)
136+
* [Snapshot Only Mode (One-Time Export)](./example/snapshotonlymode)
117137
* [PostgreSQL to Elasticsearch](https://github.com/Trendyol/go-pq-cdc-elasticsearch/tree/main/example/simple)
118138
* [PostgreSQL to Kafka](https://github.com/Trendyol/go-pq-cdc-kafka/tree/main/example/simple)
119139
* [PostgreSQL to PostgreSQL](./example/postgresql)
@@ -154,6 +174,13 @@ This setup ensures continuous data synchronization and minimal downtime in captu
154174
| `slot.createIfNotExists` | bool | no | - | Create replication slot if not exists. Otherwise, return `replication slot is not exists` error. | |
155175
| `slot.name` | string | yes | - | Set the logical replication slot name | Should be unique and descriptive. |
156176
| `slot.slotActivityCheckerInterval` | int | no | 1000 | Set the slot activity check interval time in milliseconds | Specify as an integer value in milliseconds (e.g., `1000` for 1 second). |
177+
| `snapshot.enabled` | bool | no | false | Enable initial snapshot feature | When enabled, captures existing data before starting CDC. |
178+
| `snapshot.mode` | string | no | never | Snapshot mode: `initial`, `never`, or `snapshot_only` | **initial:** Take snapshot only if no previous snapshot exists, then start CDC. <br> **never:** Skip snapshot, start CDC immediately. <br> **snapshot_only:** Take snapshot and exit (no CDC, no replication slot required). |
179+
| `snapshot.chunkSize` | int64 | no | 8000 | Number of rows per chunk during snapshot | Adjust based on table size. Larger chunks = fewer chunks but more memory per chunk. |
180+
| `snapshot.claimTimeout` | duration | no | 30s | Timeout to reclaim stale chunks | If a worker doesn't send heartbeat for this duration, chunk is reclaimed by another worker. |
181+
| `snapshot.heartbeatInterval` | duration | no | 5s | Interval for worker heartbeat updates | Workers send heartbeat every N seconds to indicate they're processing a chunk. |
182+
| `snapshot.instanceId` | string | no | auto | Custom instance identifier (optional) | Auto-generated as `hostname-pid` if not specified. Useful for tracking workers. |
183+
| `snapshot.tables` | []Table | no* | - | Tables to snapshot (required for `snapshot_only` mode, optional for `initial` mode) | **snapshot_only:** Must be specified here (independent from publication). <br> **initial:** If specified, must be a subset of publication tables. If not specified, all publication tables are snapshotted. |
157184
| `extensionSupport.enableTimescaleDB` | bool | no | false | Enable support for TimescaleDB hypertables. Ensures proper handling of compressed chunks during replication. | |
158185

159186
### API
@@ -181,6 +208,12 @@ the `/metrics` endpoint.
181208
| go_pq_cdc_replication_slot_slot_is_active | Indicates whether the PostgreSQL replication slot is currently active (1 for active, 0 for inactive). | slot_name, host| Gauge |
182209
| go_pq_cdc_replication_slot_slot_lag | The replication lag measured by the difference between the current LSN and the confirmed flush LSN. | slot_name, host| Gauge |
183210
| go_pq_cdc_replication_slot_slot_retained_wal_size | The size of Write-Ahead Logging (WAL) files retained for the replication slot in bytes. | slot_name, host| Gauge |
211+
| go_pq_cdc_snapshot_in_progress | Indicates whether snapshot is currently in progress (1 for active, 0 for inactive). | slot_name, host| Gauge |
212+
| go_pq_cdc_snapshot_total_tables | Total number of tables to snapshot. | slot_name, host| Gauge |
213+
| go_pq_cdc_snapshot_total_chunks | Total number of chunks to process across all tables. | slot_name, host| Gauge |
214+
| go_pq_cdc_snapshot_completed_chunks | Number of chunks completed in snapshot. | slot_name, host| Gauge |
215+
| go_pq_cdc_snapshot_total_rows | Total number of rows read during snapshot. | slot_name, host| Counter |
216+
| go_pq_cdc_snapshot_duration_seconds | Duration of the last snapshot operation in seconds. | slot_name, host| Gauge |
184217
| runtime metrics | [Prometheus Collector](https://golang.bg/src/runtime/metrics/description.go) | N/A | N/A |
185218

186219
### Grafana Dashboard
File renamed without changes.
File renamed without changes.
File renamed without changes.
18.8 MB
Binary file not shown.

benchmark/go-pq-cdc-kafka/go.mod renamed to benchmark/benchmark_cdc/go-pq-cdc-kafka/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ module github.com/Trendyol/go-pq-cdc/benchmark/go-pq-cdc-kafka
22

33
go 1.22.4
44

5-
replace github.com/Trendyol/go-pq-cdc => ../../
5+
replace github.com/Trendyol/go-pq-cdc => ../../../
66

77
require (
88
github.com/Trendyol/go-pq-cdc v0.0.5

0 commit comments

Comments
 (0)