Skip to content

Comments

PCSM-273: Implement document level parallel replication#187

Merged
inelpandzic merged 19 commits intomainfrom
pcsm-273-parallel-repl
Feb 24, 2026
Merged

PCSM-273: Implement document level parallel replication#187
inelpandzic merged 19 commits intomainfrom
pcsm-273-parallel-repl

Conversation

@inelpandzic
Copy link
Collaborator

@inelpandzic inelpandzic commented Feb 13, 2026

PCSM-273

Problem

Change replication processes events sequentially in a single goroutine — reading from the change stream, parsing BSON, and writing to the target all happen on the same thread. This becomes a bottleneck under high write throughput because bulk write latency on the target blocks the reader from consuming new events, increasing replication lag.

Additionally, all replication tuning parameters (worker count, queue sizes, bulk write batch size, change stream batch size) are hardcoded constants with no way to adjust them without rebuilding the binary.

Solution

Implement document-level parallel replication using a worker pool architecture. Events are read from the change stream by a single reader goroutine, then dispatched to workers based on a consistent hash of the document key. This preserves per-document ordering while parallelizing BSON parsing and bulk writes across multiple workers.

Key components:

  • Worker pool with configurable number of workers (default: runtime.NumCPU())
  • Document key hashing to route events — same document always goes to the same worker, preserving operation order
  • Capped collection routing by namespace instead of document key to preserve insertion order
  • Barrier mechanism for DDL and transaction events — flushes all workers before applying schema changes and transactions, then resumes
  • Transaction handling — collects all events in a transaction and applies them as a single ordered bulk write
  • Deferred DML parsing — raw BSON is passed to workers and parsed in parallel rather than in the single-threaded reader
  • Staggered flush tickers — worker flush intervals are offset to spread writes evenly and avoid contention bursts

The previously hardcoded ReplQueueSize constant was split into two independent options since it controlled two architecturally different buffers: the dispatcher queue (between change stream reader and dispatcher) and the per-worker queue (between dispatcher and each worker).

All replication tuning parameters are now configurable via env vars, CLI flags, and HTTP API, following the same pattern as existing clone options.

New configuration options

All options default to 0 which means "auto" (use the built-in default).

CLI flag Env var HTTP API field Default Description
--repl-num-workers PCSM_REPL_NUM_WORKERS replNumWorkers runtime.NumCPU() Number of replication workers
--repl-change-stream-batch-size PCSM_REPL_CHANGE_STREAM_BATCH_SIZE replChangeStreamBatchSize 10000 Change stream batch size
--repl-event-queue-size PCSM_REPL_EVENT_QUEUE_SIZE replEventQueueSize 5000 Dispatcher event queue buffer size
--repl-worker-queue-size PCSM_REPL_WORKER_QUEUE_SIZE replWorkerQueueSize 5000 Per-worker routed event queue buffer size
--repl-bulk-ops-size PCSM_REPL_BULK_OPS_SIZE replBulkOpsSize 5000 Max operations per bulk write

@inelpandzic inelpandzic force-pushed the pcsm-273-parallel-repl branch from dde6bcd to 41292b6 Compare February 14, 2026 20:41
Instrument the replication dispatcher and per-worker flush loop with
Prometheus metrics: event queue backpressure gauges, worker throughput
counters, flush batch size and duration histograms, and transaction
counters with size/duration histograms. Add log.String helper for
string-typed log attributes. Change worker.id from int to string to
avoid repeated strconv conversions in metric labels.

Update the Grafana dashboard with a new Change Replication row (event
queue sizes, worker throughput, flush batch size/duration p99,
transactions) and Process panels (CPU, memory, file descriptors, GC
pauses, heap breakdown). Queue size panels include a red threshold
line at 5000 to indicate max capacity.
@inelpandzic inelpandzic force-pushed the pcsm-273-parallel-repl branch from 41292b6 to 6220ecd Compare February 14, 2026 20:42
@inelpandzic inelpandzic force-pushed the pcsm-273-parallel-repl branch from 56894ef to 1f4e820 Compare February 15, 2026 09:45
@inelpandzic inelpandzic marked this pull request as ready for review February 15, 2026 10:04
@inelpandzic inelpandzic requested a review from chupe as a code owner February 15, 2026 10:04
Comment on lines 98 to 99
func New(scope string) Logger {
log := zerolog.Ctx(context.Background()).With().Logger()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ: I know it's not part of this PR but I'm just wondering why are we using a fresh context for each new logger?

@inelpandzic inelpandzic force-pushed the pcsm-273-parallel-repl branch from 571eb5d to 4f02fff Compare February 24, 2026 07:08
@inelpandzic inelpandzic merged commit f7f06c4 into main Feb 24, 2026
70 of 73 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants