Skip to content

Commit 1ae84b6

Browse files
committed
PCSM-219: Use embedded options for Clone and Repl
1 parent 7133a98 commit 1ae84b6

File tree

4 files changed

+66
-50
lines changed

4 files changed

+66
-50
lines changed

main.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -712,45 +712,46 @@ func resolveStartOptions(cfg *config.Config, params startRequest) (*pcsm.StartOp
712712
PauseOnInitialSync: params.PauseOnInitialSync,
713713
IncludeNamespaces: params.IncludeNamespaces,
714714
ExcludeNamespaces: params.ExcludeNamespaces,
715+
Repl: pcsm.ReplOptions{
716+
UseCollectionBulkWrite: cfg.UseCollectionBulkWrite,
717+
},
718+
Clone: pcsm.CloneOptions{},
715719
}
716720

717721
// Clone parallelism: HTTP > CLI > default
718722
if params.CloneNumParallelCollections != nil {
719-
options.CloneParallelism = *params.CloneNumParallelCollections
723+
options.Clone.Parallelism = *params.CloneNumParallelCollections
720724
} else {
721-
options.CloneParallelism = cfg.Clone.NumParallelCollections
725+
options.Clone.Parallelism = cfg.Clone.NumParallelCollections
722726
}
723727

724728
// Clone read workers: HTTP > CLI > default
725729
if params.CloneNumReadWorkers != nil {
726-
options.CloneReadWorkers = *params.CloneNumReadWorkers
730+
options.Clone.ReadWorkers = *params.CloneNumReadWorkers
727731
} else {
728-
options.CloneReadWorkers = cfg.Clone.NumReadWorkers
732+
options.Clone.ReadWorkers = cfg.Clone.NumReadWorkers
729733
}
730734

731735
// Clone insert workers: HTTP > CLI > default
732736
if params.CloneNumInsertWorkers != nil {
733-
options.CloneInsertWorkers = *params.CloneNumInsertWorkers
737+
options.Clone.InsertWorkers = *params.CloneNumInsertWorkers
734738
} else {
735-
options.CloneInsertWorkers = cfg.Clone.NumInsertWorkers
739+
options.Clone.InsertWorkers = cfg.Clone.NumInsertWorkers
736740
}
737741

738742
// Clone segment size: HTTP > CLI > default
739743
segmentSize, err := resolveCloneSegmentSize(cfg, params.CloneSegmentSize)
740744
if err != nil {
741745
return nil, err
742746
}
743-
options.CloneSegmentSizeBytes = segmentSize
747+
options.Clone.SegmentSizeBytes = segmentSize
744748

745749
// Clone read batch size: HTTP > CLI > default
746750
batchSize, err := resolveCloneReadBatchSize(cfg, params.CloneReadBatchSize)
747751
if err != nil {
748752
return nil, err
749753
}
750-
options.CloneReadBatchSizeBytes = batchSize
751-
752-
// UseCollectionBulkWrite: internal only, always from config (CLI + env var via Viper)
753-
options.UseCollectionBulkWrite = cfg.UseCollectionBulkWrite
754+
options.Clone.ReadBatchSizeBytes = batchSize
754755

755756
return options, nil
756757
}

pcsm/clone.go

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,32 @@ import (
2222
"github.com/percona/percona-clustersync-mongodb/topo"
2323
)
2424

25+
// CloneOptions configures the clone behavior.
26+
type CloneOptions struct {
27+
// Parallelism is the number of collections to clone in parallel.
28+
// Default: 2 (config.DefaultCloneNumParallelCollection)
29+
Parallelism int
30+
// ReadWorkers is the number of read workers during clone.
31+
// Default: auto (0 = runtime.NumCPU()/4)
32+
ReadWorkers int
33+
// InsertWorkers is the number of insert workers during clone.
34+
// Default: auto (0 = runtime.NumCPU()*2)
35+
InsertWorkers int
36+
// SegmentSizeBytes is the segment size for clone operations in bytes.
37+
// Default: auto (0 = calculated per collection)
38+
SegmentSizeBytes int64
39+
// ReadBatchSizeBytes is the read batch size during clone in bytes.
40+
// Default: ~47.5MB (config.DefaultCloneReadBatchSizeBytes)
41+
ReadBatchSizeBytes int32
42+
}
43+
2544
// Clone handles the cloning of data from a source MongoDB to a target MongoDB.
2645
type Clone struct {
2746
source *mongo.Client // Source MongoDB client
2847
target *mongo.Client // Target MongoDB client
2948
catalog *Catalog // Catalog for managing collections and indexes
3049
nsFilter sel.NSFilter // Namespace filter
31-
options *StartOptions // Clone options from StartOptions
50+
options *CloneOptions // Clone options
3251

3352
lock sync.Mutex
3453
err error // Error encountered during the cloning process
@@ -80,7 +99,7 @@ func NewClone(
8099
source, target *mongo.Client,
81100
catalog *Catalog,
82101
nsFilter sel.NSFilter,
83-
opts *StartOptions,
102+
opts *CloneOptions,
84103
) *Clone {
85104
return &Clone{
86105
source: source,
@@ -300,18 +319,18 @@ func (c *Clone) run() error {
300319
func (c *Clone) doClone(ctx context.Context, namespaces []namespaceInfo) error {
301320
cloneLogger := log.Ctx(ctx)
302321

303-
numParallelCollections := c.options.CloneParallelism
322+
numParallelCollections := c.options.Parallelism
304323
if numParallelCollections < 1 {
305324
numParallelCollections = config.DefaultCloneNumParallelCollection
306325
}
307326

308327
cloneLogger.Debugf("NumParallelCollections: %d", numParallelCollections)
309328

310329
copyManager := NewCopyManager(c.source, c.target, CopyManagerOptions{
311-
NumReadWorkers: c.options.CloneReadWorkers,
312-
NumInsertWorkers: c.options.CloneInsertWorkers,
313-
SegmentSizeBytes: c.options.CloneSegmentSizeBytes,
314-
ReadBatchSizeBytes: c.options.CloneReadBatchSizeBytes,
330+
NumReadWorkers: c.options.ReadWorkers,
331+
NumInsertWorkers: c.options.InsertWorkers,
332+
SegmentSizeBytes: c.options.SegmentSizeBytes,
333+
ReadBatchSizeBytes: c.options.ReadBatchSizeBytes,
315334
})
316335
defer copyManager.Close()
317336

pcsm/pcsm.go

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -168,9 +168,8 @@ func (ml *PCSM) Recover(ctx context.Context, data []byte) error {
168168
nsFilter := sel.MakeFilter(cp.NSInclude, cp.NSExclude)
169169
catalog := NewCatalog(ml.target)
170170
// Use default options for recovery (clone tuning is less relevant when resuming from checkpoint)
171-
defaultOpts := &StartOptions{}
172-
clone := NewClone(ml.source, ml.target, catalog, nsFilter, defaultOpts)
173-
repl := NewRepl(ml.source, ml.target, catalog, nsFilter, false)
171+
clone := NewClone(ml.source, ml.target, catalog, nsFilter, &CloneOptions{})
172+
repl := NewRepl(ml.source, ml.target, catalog, nsFilter, &ReplOptions{})
174173

175174
if cp.Catalog != nil {
176175
err = catalog.Recover(cp.Catalog)
@@ -285,27 +284,17 @@ func (ml *PCSM) resetError() {
285284

286285
// StartOptions represents the options for starting the PCSM.
287286
type StartOptions struct {
288-
// PauseOnInitialSync indicates whether to finalize after the initial sync.
287+
// PauseOnInitialSync indicates whether to pause after the initial sync completes.
289288
PauseOnInitialSync bool
290-
// IncludeNamespaces are the namespaces to include.
289+
// IncludeNamespaces are the namespaces to include in replication.
291290
IncludeNamespaces []string
292-
// ExcludeNamespaces are the namespaces to exclude.
291+
// ExcludeNamespaces are the namespaces to exclude from replication.
293292
ExcludeNamespaces []string
294293

295-
// Clone tuning options
296-
// CloneParallelism is the number of collections to clone in parallel.
297-
CloneParallelism int
298-
// CloneReadWorkers is the number of read workers during clone.
299-
CloneReadWorkers int
300-
// CloneInsertWorkers is the number of insert workers during clone.
301-
CloneInsertWorkers int
302-
// CloneSegmentSizeBytes is the segment size for clone operations in bytes.
303-
CloneSegmentSizeBytes int64
304-
// CloneReadBatchSizeBytes is the read batch size during clone in bytes.
305-
CloneReadBatchSizeBytes int32
306-
307-
// UseCollectionBulkWrite indicates whether to use collection-level bulk write.
308-
UseCollectionBulkWrite bool
294+
// Clone contains clone tuning options.
295+
Clone CloneOptions
296+
// Repl contains replication behavior options.
297+
Repl ReplOptions
309298
}
310299

311300
// Start starts the replication process with the given options.
@@ -336,8 +325,8 @@ func (ml *PCSM) Start(_ context.Context, options *StartOptions) error {
336325
ml.nsFilter = sel.MakeFilter(ml.nsInclude, ml.nsExclude)
337326
ml.pauseOnInitialSync = options.PauseOnInitialSync
338327
ml.catalog = NewCatalog(ml.target)
339-
ml.clone = NewClone(ml.source, ml.target, ml.catalog, ml.nsFilter, options)
340-
ml.repl = NewRepl(ml.source, ml.target, ml.catalog, ml.nsFilter, options.UseCollectionBulkWrite)
328+
ml.clone = NewClone(ml.source, ml.target, ml.catalog, ml.nsFilter, &options.Clone)
329+
ml.repl = NewRepl(ml.source, ml.target, ml.catalog, ml.nsFilter, &options.Repl)
341330
ml.state = StateRunning
342331

343332
go ml.run()

pcsm/repl.go

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,13 @@ var (
2828

2929
const advanceTimePseudoEvent = "@tick"
3030

31+
// ReplOptions configures the replication behavior.
32+
type ReplOptions struct {
33+
// UseCollectionBulkWrite indicates whether to use collection-level bulk write
34+
// instead of client bulk write. Default: false (use client bulk write).
35+
UseCollectionBulkWrite bool
36+
}
37+
3138
// Repl handles replication from a source MongoDB to a target MongoDB.
3239
type Repl struct {
3340
source *mongo.Client // Source MongoDB client
@@ -36,7 +43,7 @@ type Repl struct {
3643
nsFilter sel.NSFilter // Namespace filter
3744
catalog *Catalog // Catalog for managing collections and indexes
3845

39-
useCollectionBulkWrite bool // Whether to use collection-level bulk write
46+
options *ReplOptions // Replication options
4047

4148
lastReplicatedOpTime bson.Timestamp
4249

@@ -91,16 +98,16 @@ func NewRepl(
9198
source, target *mongo.Client,
9299
catalog *Catalog,
93100
nsFilter sel.NSFilter,
94-
useCollectionBulkWrite bool,
101+
opts *ReplOptions,
95102
) *Repl {
96103
return &Repl{
97-
source: source,
98-
target: target,
99-
nsFilter: nsFilter,
100-
catalog: catalog,
101-
useCollectionBulkWrite: useCollectionBulkWrite,
102-
pauseC: make(chan struct{}),
103-
doneSig: make(chan struct{}),
104+
source: source,
105+
target: target,
106+
nsFilter: nsFilter,
107+
catalog: catalog,
108+
options: opts,
109+
pauseC: make(chan struct{}),
110+
doneSig: make(chan struct{}),
104111
}
105112
}
106113

@@ -230,7 +237,7 @@ func (r *Repl) Start(ctx context.Context, startAt bson.Timestamp) error {
230237
return errors.Wrap(err, "major version")
231238
}
232239

233-
if topo.Support(targetVer).ClientBulkWrite() && !r.useCollectionBulkWrite {
240+
if topo.Support(targetVer).ClientBulkWrite() && !r.options.UseCollectionBulkWrite {
234241
r.bulkWrite = newClientBulkWrite(config.BulkOpsSize, targetVer.Major() < 8) //nolint:mnd
235242
} else {
236243
r.bulkWrite = newCollectionBulkWrite(config.BulkOpsSize, targetVer.Major() < 8) //nolint:mnd

0 commit comments

Comments
 (0)