Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
064d7c6
Implement document level parallel replication.
inelpandzic Feb 13, 2026
8e1cfe3
fix: drain worker event channels before barrier flush
inelpandzic Feb 13, 2026
ce6f93e
Tune parallel replication config and stagger worker flushes.
inelpandzic Feb 13, 2026
8f3b707
Defer DML parsing to workers and optimize reader pipeline.
inelpandzic Feb 14, 2026
5e5c135
Add Prometheus metrics for parallel replication pipeline.
inelpandzic Feb 14, 2026
6220ecd
Add Prometheus metrics for parallel replication pipeline
inelpandzic Feb 14, 2026
8d87e4a
Merge branch 'main' into pcsm-273-parallel-repl
inelpandzic Feb 15, 2026
1f4e820
Fix
inelpandzic Feb 15, 2026
9223840
Route capped collection events to same worker to preserve insertion o…
inelpandzic Feb 16, 2026
c595f3f
Fix barrier deadlock race conditions in worker pool.
inelpandzic Feb 16, 2026
505b0d3
Handle DDL events within transactions during parallel replication
inelpandzic Feb 16, 2026
d727b95
Logs to trace level.
inelpandzic Feb 16, 2026
fed1387
Add configurable replication tuning options.
inelpandzic Feb 17, 2026
b2b3ad7
Merge branch 'main' into pcsm-273-parallel-repl
inelpandzic Feb 20, 2026
6bbc5b5
Remove transaction metrics.
inelpandzic Feb 17, 2026
55e1051
Replace repl.Options getter methods with applyDefaults.
inelpandzic Feb 20, 2026
8fc17fb
Clearing worker pool when run finishes.
inelpandzic Feb 20, 2026
c371e66
Fix parallel replication event loss and data race.
inelpandzic Feb 23, 2026
4f02fff
Fix poolIdle func and revert checking poolIdle on advanceTimePseudoEv…
inelpandzic Feb 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Config struct {

UseCollectionBulkWrite bool `mapstructure:"use-collection-bulk-write"`

Repl ReplConfig `mapstructure:",squash"`
Clone CloneConfig `mapstructure:",squash"`

// hidden startup flags
Expand All @@ -51,6 +52,26 @@ type MongoDBConfig struct {
TargetCompressors []string `mapstructure:"dev-target-client-compressors"`
}

// ReplConfig holds replication operation configuration.
type ReplConfig struct {
// NumWorkers is the number of replication workers.
// 0 means auto (defaults to runtime.NumCPU()).
NumWorkers int `mapstructure:"repl-num-workers"`
// ChangeStreamBatchSize is the batch size for MongoDB change streams.
// 0 means auto (defaults to config.ChangeStreamBatchSize).
ChangeStreamBatchSize int `mapstructure:"repl-change-stream-batch-size"`
// EventQueueSize is the buffer size of the channel between the change stream
// reader and the dispatcher.
// 0 means auto (defaults to config.ReplQueueSize).
EventQueueSize int `mapstructure:"repl-event-queue-size"`
// WorkerQueueSize is the per-worker routed event channel buffer size.
// 0 means auto (defaults to config.ReplQueueSize).
WorkerQueueSize int `mapstructure:"repl-worker-queue-size"`
// BulkOpsSize is the maximum number of operations per bulk write.
// 0 means auto (defaults to config.BulkOpsSize).
BulkOpsSize int `mapstructure:"repl-bulk-ops-size"`
}

// CloneConfig holds clone operation configuration.
type CloneConfig struct {
// NumParallelCollections is the number of collections to clone in parallel.
Expand Down Expand Up @@ -143,6 +164,12 @@ func bindEnvVars() {

_ = viper.BindEnv("use-collection-bulk-write", "PCSM_USE_COLLECTION_BULK_WRITE")

_ = viper.BindEnv("repl-num-workers", "PCSM_REPL_NUM_WORKERS")
_ = viper.BindEnv("repl-change-stream-batch-size", "PCSM_REPL_CHANGE_STREAM_BATCH_SIZE")
_ = viper.BindEnv("repl-event-queue-size", "PCSM_REPL_EVENT_QUEUE_SIZE")
_ = viper.BindEnv("repl-worker-queue-size", "PCSM_REPL_WORKER_QUEUE_SIZE")
_ = viper.BindEnv("repl-bulk-ops-size", "PCSM_REPL_BULK_OPS_SIZE")

_ = viper.BindEnv("dev-target-client-compressors", "PCSM_DEV_TARGET_CLIENT_COMPRESSORS")

_ = viper.BindEnv("clone-num-parallel-collections", "PCSM_CLONE_NUM_PARALLEL_COLLECTIONS")
Expand Down
16 changes: 10 additions & 6 deletions config/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,20 @@ const (
// Change stream and replication settings.
const (
// ChangeStreamBatchSize is the batch size for MongoDB change streams.
ChangeStreamBatchSize = 1000
// Larger values reduce getMore round-trips to the source.
ChangeStreamBatchSize = 10_000
// ChangeStreamAwaitTime is the maximum amount of time to wait for new change event.
ChangeStreamAwaitTime = time.Second
// ReplQueueSize defines the buffer size of the internal channel used to transfer
// events between the change stream read and the change replication.
ReplQueueSize = ChangeStreamBatchSize
// events between the change stream reader and the dispatcher, and also the buffer
// size of each worker's routed event channel. Larger values absorb flush latency
// spikes and prevent head-of-line blocking across workers.
ReplQueueSize = 5_000
// BulkOpsSize is the maximum number of operations in a bulk write.
BulkOpsSize = ChangeStreamBatchSize
// BulkOpsInterval is the maximum interval between bulk write operations.
BulkOpsInterval = ChangeStreamAwaitTime
// Sized to allow efficient large batches when workers drain accumulated events.
BulkOpsSize = 5_000
// WorkerFlushInterval is the maximum interval between worker bulk write flushes.
WorkerFlushInterval = time.Second
// InitialSyncCheckInterval is the interval for checking the initial sync status.
InitialSyncCheckInterval = 10 * time.Second
// PrintLagTimeInterval is the interval at which the lag time is printed to the logs.
Expand Down
2 changes: 1 addition & 1 deletion hack/metrics/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ services:
container_name: pcsm-grafana
volumes:
- ./provisioning:/etc/grafana/provisioning:ro
- ./graphana-board.json:/var/lib/grafana/dashboards/pcsm.json:ro
- ./grafana-board.json:/var/lib/grafana/dashboards/pcsm.json:ro
environment:
- GF_SECURITY_ADMIN_USER=admin
- GF_SECURITY_ADMIN_PASSWORD=admin
Expand Down
Loading
Loading