Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
69f0cfa
PCSM-219: Add Viper dependency and create config/config.go
chupe Dec 23, 2025
ffb345e
PCSM-219: Add CLI flag for MongoDB operation timeout (visible)
chupe Dec 23, 2025
3f2ae63
PCSM-219: Add hidden CLI flag for bulk write option
chupe Dec 23, 2025
94a83ca
PCSM-219: Add hidden CLI flags for clone tuning options (no env var)
chupe Dec 23, 2025
230d759
PCSM-219: Add validator/v10 and create validation package
chupe Dec 23, 2025
7b41462
PCSM-219: Add config/config.go
chupe Dec 23, 2025
fe51882
PCSM-219: Extend StartOptions and HTTP startRequest with validation
chupe Dec 23, 2025
74b3a89
PCSM-219: Update README with configuration documentation
chupe Dec 23, 2025
9ef4de1
Fix .vscode/settings.json formatting conflict
chupe Dec 23, 2025
e358ba1
PCSM-219: Simplify port flag handling with Viper
chupe Dec 23, 2025
cd7ffc1
PCSM-219: Consolidate configuration documentation
chupe Dec 23, 2025
be65433
PCSM-219: Remove external decision references from comments
chupe Dec 23, 2025
7547453
PCSM-219: Remove unnecessary Viper wrappers
chupe Dec 23, 2025
7133a98
PCSM-219: Move to mapstruct for config
chupe Dec 24, 2025
1ae84b6
PCSM-219: Use embedded options for Clone and Repl
chupe Dec 24, 2025
2b30ea2
PCSM-219: Undo README changes
chupe Dec 24, 2025
2a0e34c
PCSM-219: Undoing using the validator
chupe Dec 26, 2025
aed7710
PCSM-219: Add integration tests for the CLI client
chupe Dec 29, 2025
c401994
PCSM-219: Move clone flags to start subcommand
chupe Dec 30, 2025
37ac3df
Merge branch 'main' into pcsm-219
inelpandzic Dec 31, 2025
9deb753
PCSM-219: Align config options with Notion spec
chupe Dec 31, 2025
0504347
PCSM-219: Integrate target compressors into Viper config
chupe Dec 31, 2025
da314d6
PCSM-219: Remove manual timeout parsing
chupe Dec 31, 2025
eed97ff
PCSM-219: Reintroduce env vars for clone options
chupe Jan 5, 2026
9c12910
PCSM-219: Consolidate naming for log no-color option
chupe Jan 5, 2026
1255d8f
PCSM-219: Add backward compatible --no-color
chupe Jan 5, 2026
e0db093
PCSM-219: Address code location comments
chupe Jan 7, 2026
b8793b0
PCSM-219: Move config validation from runServer to rootCmd.RunE
chupe Jan 7, 2026
492fb3c
PCSM-219: Remove viper calls from main.go
chupe Jan 7, 2026
6a3ccfa
PCSM-219: Remove clone options flag reading
chupe Jan 7, 2026
1bb77f5
PCSM-219: Add tests for start subcommand
chupe Jan 7, 2026
14affd6
Merge branch 'main' into pcsm-219
inelpandzic Jan 7, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
"go.formatFlags": [
"-extra"
],
"go.formatTool": "gofumpt",
"go.lintTool": "golangci-lint-v2",
"go.useLanguageServer": true,
"gopls": {
Expand All @@ -40,5 +39,31 @@
"tests"
],
"python.testing.pytestEnabled": true,
"python.testing.unittestEnabled": false
"python.testing.unittestEnabled": false,
"cSpell.words": [
"bson",
"clustersync",
"cmdutil",
"codegen",
"colls",
"connstring",
"contextcheck",
"Debugf",
"errgroup",
"errorlint",
"Infof",
"keygen",
"mapstructure",
"nolint",
"opencode",
"pcsm",
"pipefail",
"readconcern",
"readpref",
"Warnf",
"wrapcheck",
"Wrapf",
"writeconcern",
"zerolog"
]
}
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ When starting the PCSM server, you can use the following options:
- `--target`: The MongoDB connection string for the target cluster
- `--log-level`: The log level (default: "info")
- `--log-json`: Output log in JSON format with disabled color
- `--no-color`: Disable log ASCI color
- `--log-no-color`: Disable log ASCI color

Example:

Expand Down Expand Up @@ -213,7 +213,11 @@ Example:

```json
{
"includeNamespaces": ["dbName.*", "anotherDB.collName1", "anotherDB.collName2"],
"includeNamespaces": [
"dbName.*",
"anotherDB.collName1",
"anotherDB.collName2"
],
"excludeNamespaces": ["dbName.collName"]
}
```
Expand Down Expand Up @@ -303,7 +307,6 @@ The /status endpoint provides the current state of the PCSM replication process,
- `lastReplicatedOpTime.ts`: op time timestamp
- `lastReplicatedOpTime.isoDate`: op time ts in human-readable form


- `initialSync.completed`: indicates if the initial sync is completed.
- `initialSync.lagTimeSeconds`: the lag time in logical seconds until the initial sync completed.

Expand Down
180 changes: 180 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
// Package config provides configuration management for PCSM using Viper.
package config

import (
"math"
"slices"
"strings"
"time"

"github.com/dustin/go-humanize"
"github.com/go-viper/mapstructure/v2"
"github.com/spf13/cobra"
"github.com/spf13/viper"

"github.com/percona/percona-clustersync-mongodb/errors"
)

// Config holds all PCSM configuration.
type Config struct {
Port int `mapstructure:"port"`
Source string `mapstructure:"source"`
Target string `mapstructure:"target"`

Log LogConfig `mapstructure:",squash"`

MongoDB MongoDBConfig `mapstructure:",squash"`

UseCollectionBulkWrite bool `mapstructure:"use-collection-bulk-write"`

Clone CloneConfig `mapstructure:",squash"`

// hidden startup flags
Start bool `mapstructure:"start"`
ResetState bool `mapstructure:"reset-state"`
PauseOnInitialSync bool `mapstructure:"pause-on-initial-sync"`
}

// LogConfig holds logging configuration.
type LogConfig struct {
Level string `mapstructure:"log-level"`
JSON bool `mapstructure:"log-json"`
NoColor bool `mapstructure:"log-no-color"`
}

// MongoDBConfig holds MongoDB client configuration.
type MongoDBConfig struct {
OperationTimeout time.Duration `mapstructure:"mongodb-operation-timeout"`
TargetCompressors []string `mapstructure:"dev-target-client-compressors"`
}

// CloneConfig holds clone operation configuration.
type CloneConfig struct {
// NumParallelCollections is the number of collections to clone in parallel.
// 0 means auto (calculated at runtime).
NumParallelCollections int `mapstructure:"clone-num-parallel-collections"`
// NumReadWorkers is the number of read workers during clone.
// 0 means auto (calculated at runtime).
NumReadWorkers int `mapstructure:"clone-num-read-workers"`
// NumInsertWorkers is the number of insert workers during clone.
// 0 means auto (calculated at runtime).
NumInsertWorkers int `mapstructure:"clone-num-insert-workers"`
// SegmentSize is the segment size for clone operations (e.g., "500MB", "1GiB").
// Empty string means auto (calculated at runtime for each collection).
SegmentSize string `mapstructure:"clone-segment-size"`
// ReadBatchSize is the read batch size during clone (e.g., "16MiB", "100MB").
// Empty string means auto (calculated at runtime for each collection).
ReadBatchSize string `mapstructure:"clone-read-batch-size"`
}

// Load initializes Viper and returns a validated Config.
func Load(cmd *cobra.Command) (*Config, error) {
viper.SetEnvPrefix("PCSM")
viper.SetEnvKeyReplacer(strings.NewReplacer("-", "_"))
viper.AutomaticEnv()

if cmd.PersistentFlags() != nil {
_ = viper.BindPFlags(cmd.PersistentFlags())
}

if cmd.Flags() != nil {
_ = viper.BindPFlags(cmd.Flags())
}

bindEnvVars()

var cfg Config

err := viper.Unmarshal(&cfg, viper.DecodeHook(
mapstructure.ComposeDecodeHookFunc(
mapstructure.StringToTimeDurationHookFunc(),
mapstructure.StringToSliceHookFunc(","),
),
))
if err != nil {
return nil, errors.Wrap(err, "unmarshal config")
}

cfg.MongoDB.TargetCompressors = filterCompressors(cfg.MongoDB.TargetCompressors)

if viper.GetBool("no-color") {
cfg.Log.NoColor = true
}

return &cfg, nil
}

func bindEnvVars() {
_ = viper.BindEnv("port", "PCSM_PORT")

_ = viper.BindEnv("source", "PCSM_SOURCE_URI")
_ = viper.BindEnv("target", "PCSM_TARGET_URI")

_ = viper.BindEnv("log-level", "PCSM_LOG_LEVEL")
_ = viper.BindEnv("log-json", "PCSM_LOG_JSON")
_ = viper.BindEnv("log-no-color", "PCSM_LOG_NO_COLOR", "PCSM_NO_COLOR")

_ = viper.BindEnv("mongodb-operation-timeout", "PCSM_MONGODB_OPERATION_TIMEOUT")

_ = viper.BindEnv("use-collection-bulk-write", "PCSM_USE_COLLECTION_BULK_WRITE")

_ = viper.BindEnv("dev-target-client-compressors", "PCSM_DEV_TARGET_CLIENT_COMPRESSORS")

_ = viper.BindEnv("clone-num-parallel-collections", "PCSM_CLONE_NUM_PARALLEL_COLLECTIONS")
_ = viper.BindEnv("clone-num-read-workers", "PCSM_CLONE_NUM_READ_WORKERS")
_ = viper.BindEnv("clone-num-insert-workers", "PCSM_CLONE_NUM_INSERT_WORKERS")
_ = viper.BindEnv("clone-segment-size", "PCSM_CLONE_SEGMENT_SIZE")
_ = viper.BindEnv("clone-read-batch-size", "PCSM_CLONE_READ_BATCH_SIZE")
}

//nolint:gochecknoglobals
var allowedCompressors = []string{"zstd", "zlib", "snappy"}

func filterCompressors(compressors []string) []string {
if len(compressors) == 0 {
return nil
}

filtered := make([]string, 0, len(allowedCompressors))

for _, c := range compressors {
c = strings.TrimSpace(c)
if slices.Contains(allowedCompressors, c) && !slices.Contains(filtered, c) {
filtered = append(filtered, c)
}
}

return filtered
}

// ParseAndValidateCloneSegmentSize parses a byte size string and validates it.
// It allows 0 (auto) or values within [MinCloneSegmentSizeBytes, MaxCloneSegmentSizeBytes].
func ParseAndValidateCloneSegmentSize(value string) (int64, error) {
sizeBytes, err := humanize.ParseBytes(value)
if err != nil {
return 0, errors.Wrapf(err, "invalid cloneSegmentSize value: %s", value)
}

err = ValidateCloneSegmentSize(sizeBytes)
if err != nil {
return 0, err
}

return int64(min(sizeBytes, math.MaxInt64)), nil //nolint:gosec
}

// ParseAndValidateCloneReadBatchSize parses a byte size string and validates it.
// It allows 0 (auto) or values within [[MinCloneReadBatchSizeBytes], [MaxCloneReadBatchSizeBytes]].
func ParseAndValidateCloneReadBatchSize(value string) (int32, error) {
sizeBytes, err := humanize.ParseBytes(value)
if err != nil {
return 0, errors.Wrapf(err, "invalid cloneReadBatchSize value: %s", value)
}

err = ValidateCloneReadBatchSize(sizeBytes)
if err != nil {
return 0, err
}

return int32(min(sizeBytes, math.MaxInt32)), nil //nolint:gosec
}
Loading
Loading