Skip to content
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
69f0cfa
PCSM-219: Add Viper dependency and create config/config.go
chupe Dec 23, 2025
ffb345e
PCSM-219: Add CLI flag for MongoDB operation timeout (visible)
chupe Dec 23, 2025
3f2ae63
PCSM-219: Add hidden CLI flag for bulk write option
chupe Dec 23, 2025
94a83ca
PCSM-219: Add hidden CLI flags for clone tuning options (no env var)
chupe Dec 23, 2025
230d759
PCSM-219: Add validator/v10 and create validation package
chupe Dec 23, 2025
7b41462
PCSM-219: Add config/config.go
chupe Dec 23, 2025
fe51882
PCSM-219: Extend StartOptions and HTTP startRequest with validation
chupe Dec 23, 2025
74b3a89
PCSM-219: Update README with configuration documentation
chupe Dec 23, 2025
9ef4de1
Fix .vscode/settings.json formatting conflict
chupe Dec 23, 2025
e358ba1
PCSM-219: Simplify port flag handling with Viper
chupe Dec 23, 2025
cd7ffc1
PCSM-219: Consolidate configuration documentation
chupe Dec 23, 2025
be65433
PCSM-219: Remove external decision references from comments
chupe Dec 23, 2025
7547453
PCSM-219: Remove unnecessary Viper wrappers
chupe Dec 23, 2025
7133a98
PCSM-219: Move to mapstruct for config
chupe Dec 24, 2025
1ae84b6
PCSM-219: Use embedded options for Clone and Repl
chupe Dec 24, 2025
2b30ea2
PCSM-219: Undo README changes
chupe Dec 24, 2025
2a0e34c
PCSM-219: Undoing using the validator
chupe Dec 26, 2025
aed7710
PCSM-219: Add integration tests for the CLI client
chupe Dec 29, 2025
c401994
PCSM-219: Move clone flags to start subcommand
chupe Dec 30, 2025
37ac3df
Merge branch 'main' into pcsm-219
inelpandzic Dec 31, 2025
9deb753
PCSM-219: Align config options with Notion spec
chupe Dec 31, 2025
0504347
PCSM-219: Integrate target compressors into Viper config
chupe Dec 31, 2025
da314d6
PCSM-219: Remove manual timeout parsing
chupe Dec 31, 2025
eed97ff
PCSM-219: Reintroduce env vars for clone options
chupe Jan 5, 2026
9c12910
PCSM-219: Consolidate naming for log no-color option
chupe Jan 5, 2026
1255d8f
PCSM-219: Add backward compatible --no-color
chupe Jan 5, 2026
e0db093
PCSM-219: Address code location comments
chupe Jan 7, 2026
b8793b0
PCSM-219: Move config validation from runServer to rootCmd.RunE
chupe Jan 7, 2026
492fb3c
PCSM-219: Remove viper calls from main.go
chupe Jan 7, 2026
6a3ccfa
PCSM-219: Remove clone options flag reading
chupe Jan 7, 2026
1bb77f5
PCSM-219: Add tests for start subcommand
chupe Jan 7, 2026
14affd6
Merge branch 'main' into pcsm-219
inelpandzic Jan 7, 2026
35cca5c
PCSM-219: PLM_MONGODB_CLI_OPERATION_TIMEOUT depr
chupe Jan 8, 2026
1e388e1
Merge branch 'main' into pcsm-219
chupe Jan 8, 2026
d291c60
Merge branch 'main' into pcsm-219
inelpandzic Jan 8, 2026
3052aaf
PCSM-219: Use logger package
chupe Jan 8, 2026
d37a534
PCSM-219: Clean up config reassignment
chupe Jan 8, 2026
4f59d32
PCSM-219: PCSM_NO_COLOR depr
chupe Jan 8, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
"go.formatFlags": [
"-extra"
],
"go.formatTool": "gofumpt",
"go.lintTool": "golangci-lint-v2",
"go.useLanguageServer": true,
"gopls": {
Expand All @@ -40,5 +39,31 @@
"tests"
],
"python.testing.pytestEnabled": true,
"python.testing.unittestEnabled": false
"python.testing.unittestEnabled": false,
"cSpell.words": [
"bson",
"clustersync",
"cmdutil",
"codegen",
"colls",
"connstring",
"contextcheck",
"Debugf",
"errgroup",
"errorlint",
"Infof",
"keygen",
"mapstructure",
"nolint",
"opencode",
"pcsm",
"pipefail",
"readconcern",
"readpref",
"Warnf",
"wrapcheck",
"Wrapf",
"writeconcern",
"zerolog"
]
}
122 changes: 122 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Package config provides configuration management for PCSM using Viper.
package config

import (
"math"
"slices"
"strings"

"github.com/dustin/go-humanize"
"github.com/go-viper/mapstructure/v2"
"github.com/spf13/cobra"
"github.com/spf13/viper"

"github.com/percona/percona-clustersync-mongodb/errors"
)

// Load initializes Viper and returns a validated Config.
func Load(cmd *cobra.Command) (*Config, error) {
viper.SetEnvPrefix("PCSM")
viper.SetEnvKeyReplacer(strings.NewReplacer("-", "_"))
viper.AutomaticEnv()

if cmd.PersistentFlags() != nil {
_ = viper.BindPFlags(cmd.PersistentFlags())
}
if cmd.Flags() != nil {
_ = viper.BindPFlags(cmd.Flags())
}

bindEnvVars()

var cfg Config

err := viper.Unmarshal(&cfg, viper.DecodeHook(
mapstructure.ComposeDecodeHookFunc(
mapstructure.StringToTimeDurationHookFunc(),
mapstructure.StringToSliceHookFunc(","),
),
))
if err != nil {
return nil, errors.Wrap(err, "unmarshal config")
}

cfg.MongoDB.TargetCompressors = filterCompressors(cfg.MongoDB.TargetCompressors)

return &cfg, nil
}

func bindEnvVars() {
_ = viper.BindEnv("port", "PCSM_PORT")

_ = viper.BindEnv("source", "PCSM_SOURCE_URI")
_ = viper.BindEnv("target", "PCSM_TARGET_URI")

_ = viper.BindEnv("log-level", "PCSM_LOG_LEVEL")
_ = viper.BindEnv("log-json", "PCSM_LOG_JSON")
_ = viper.BindEnv("no-color", "PCSM_NO_COLOR")

_ = viper.BindEnv("mongodb-operation-timeout", "PCSM_MONGODB_OPERATION_TIMEOUT")

_ = viper.BindEnv("use-collection-bulk-write", "PCSM_USE_COLLECTION_BULK_WRITE")

_ = viper.BindEnv("dev-target-client-compressors", "PCSM_DEV_TARGET_CLIENT_COMPRESSORS")

_ = viper.BindEnv("clone-num-parallel-collections", "PCSM_CLONE_NUM_PARALLEL_COLLECTIONS")
_ = viper.BindEnv("clone-num-read-workers", "PCSM_CLONE_NUM_READ_WORKERS")
_ = viper.BindEnv("clone-num-insert-workers", "PCSM_CLONE_NUM_INSERT_WORKERS")
_ = viper.BindEnv("clone-segment-size", "PCSM_CLONE_SEGMENT_SIZE")
_ = viper.BindEnv("clone-read-batch-size", "PCSM_CLONE_READ_BATCH_SIZE")
}

//nolint:gochecknoglobals
var allowedCompressors = []string{"zstd", "zlib", "snappy"}

func filterCompressors(compressors []string) []string {
if len(compressors) == 0 {
return nil
}

filtered := make([]string, 0, len(allowedCompressors))

for _, c := range compressors {
c = strings.TrimSpace(c)
if slices.Contains(allowedCompressors, c) && !slices.Contains(filtered, c) {
filtered = append(filtered, c)
}
}

return filtered
}

// ParseAndValidateCloneSegmentSize parses a byte size string and validates it.
// It allows 0 (auto) or values within [MinCloneSegmentSizeBytes, MaxCloneSegmentSizeBytes].
func ParseAndValidateCloneSegmentSize(value string) (int64, error) {
sizeBytes, err := humanize.ParseBytes(value)
if err != nil {
return 0, errors.Wrapf(err, "invalid cloneSegmentSize value: %s", value)
}

err = ValidateCloneSegmentSize(sizeBytes)
if err != nil {
return 0, err
}

return int64(min(sizeBytes, math.MaxInt64)), nil //nolint:gosec
}

// ParseAndValidateCloneReadBatchSize parses a byte size string and validates it.
// It allows 0 (auto) or values within [[MinCloneReadBatchSizeBytes], [MaxCloneReadBatchSizeBytes]].
func ParseAndValidateCloneReadBatchSize(value string) (int32, error) {
sizeBytes, err := humanize.ParseBytes(value)
if err != nil {
return 0, errors.Wrapf(err, "invalid cloneReadBatchSize value: %s", value)
}

err = ValidateCloneReadBatchSize(sizeBytes)
if err != nil {
return 0, err
}

return int32(min(sizeBytes, math.MaxInt32)), nil //nolint:gosec
}
148 changes: 148 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package config_test

import (
"fmt"
"testing"

"github.com/dustin/go-humanize"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/percona/percona-clustersync-mongodb/config"
)

func TestParseAndValidateCloneSegmentSize(t *testing.T) {
t.Parallel()

tests := []struct {
name string
value string
want int64
wantErr string
}{
{
name: "valid size 500MB (above minimum)",
value: "500MB",
want: 500 * humanize.MByte,
},
{
name: "valid size 1GiB",
value: "1GiB",
want: humanize.GiByte,
},
{
name: "zero value (auto)",
value: "0",
want: 0,
},
{
name: "below minimum (100MB)",
value: "100MB",
wantErr: "cloneSegmentSize must be at least",
},
{
name: "above maximum",
value: "100GiB",
wantErr: "cloneSegmentSize must be at most",
},
{
name: "at minimum boundary (using exact bytes)",
value: fmt.Sprintf("%dB", config.MinCloneSegmentSizeBytes),
want: int64(config.MinCloneSegmentSizeBytes),
},
{
name: "at maximum boundary",
value: "64GiB",
want: int64(config.MaxCloneSegmentSizeBytes),
},
{
name: "invalid format",
value: "abc",
wantErr: "invalid cloneSegmentSize value",
},
{
name: "empty string",
value: "",
wantErr: "invalid cloneSegmentSize value",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

got, err := config.ParseAndValidateCloneSegmentSize(tt.value)

if tt.wantErr == "" {
require.NoError(t, err)
assert.Equal(t, tt.want, got)
} else {
require.Error(t, err)
assert.Contains(t, err.Error(), tt.wantErr)
}
})
}
}

func TestParseAndValidateCloneReadBatchSize(t *testing.T) {
t.Parallel()

tests := []struct {
name string
value string
want int32
wantErr string
}{
{
name: "valid size 16MiB",
value: "16MiB",
want: 16 * humanize.MiByte,
},
{
name: "valid size 48MB",
value: "48MB",
want: 48 * humanize.MByte,
},
{
name: "zero value (auto)",
value: "0",
want: 0,
},
{
name: "below minimum",
value: "1KB",
wantErr: "cloneReadBatchSize must be at least",
},
{
name: "at minimum boundary (using exact bytes)",
value: fmt.Sprintf("%dB", config.MinCloneReadBatchSizeBytes),
want: config.MinCloneReadBatchSizeBytes,
},
{
name: "invalid format",
value: "xyz",
wantErr: "invalid cloneReadBatchSize value",
},
{
name: "empty string",
value: "",
wantErr: "invalid cloneReadBatchSize value",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

got, err := config.ParseAndValidateCloneReadBatchSize(tt.value)

if tt.wantErr == "" {
require.NoError(t, err)
assert.Equal(t, tt.want, got)
} else {
require.Error(t, err)
assert.Contains(t, err.Error(), tt.wantErr)
}
})
}
}
7 changes: 3 additions & 4 deletions config/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,9 @@ const (
DisconnectTimeout = 5 * time.Second
// CloseCursorTimeout is the timeout duration for closing cursor.
CloseCursorTimeout = 10 * time.Second
// DefaultMongoDBCliOperationTimeout is the default timeout duration for MongoDB client
// operations like insert, update, delete, etc. It can be overridden via
// environment variable (see config.OperationMongoDBCliTimeout()).
DefaultMongoDBCliOperationTimeout = 5 * time.Minute
// DefaultMongoDBOperationTimeout is the default timeout for MongoDB client operations.
// Override via --mongodb-operation-timeout flag or PCSM_MONGODB_OPERATION_TIMEOUT env var.
DefaultMongoDBOperationTimeout = 5 * time.Minute
)

// Change stream and replication settings.
Expand Down
58 changes: 58 additions & 0 deletions config/schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package config

import (
"time"
)

// Config holds all PCSM configuration.
type Config struct {
Port int `mapstructure:"port"`
Source string `mapstructure:"source"`
Target string `mapstructure:"target"`

Log LogConfig `mapstructure:",squash"`

MongoDB MongoDBConfig `mapstructure:",squash"`

UseCollectionBulkWrite bool `mapstructure:"use-collection-bulk-write"`

Clone CloneConfig `mapstructure:",squash"`

// hidden startup flags
Start bool `mapstructure:"start"`
ResetState bool `mapstructure:"reset-state"`
PauseOnInitialSync bool `mapstructure:"pause-on-initial-sync"`
}

// LogConfig holds logging configuration.
type LogConfig struct {
Level string `mapstructure:"log-level"`
JSON bool `mapstructure:"log-json"`
NoColor bool `mapstructure:"no-color"`
}

// MongoDBConfig holds MongoDB client configuration.
type MongoDBConfig struct {
OperationTimeout time.Duration `mapstructure:"mongodb-operation-timeout"`
TargetCompressors []string `mapstructure:"dev-target-client-compressors"`
}

// CloneConfig holds clone operation configuration.
// These options can be set via environment variables and overridden by CLI flags or HTTP params.
type CloneConfig struct {
// NumParallelCollections is the number of collections to clone in parallel.
// 0 means auto (calculated at runtime).
NumParallelCollections int `mapstructure:"clone-num-parallel-collections"`
// NumReadWorkers is the number of read workers during clone.
// 0 means auto (calculated at runtime).
NumReadWorkers int `mapstructure:"clone-num-read-workers"`
// NumInsertWorkers is the number of insert workers during clone.
// 0 means auto (calculated at runtime).
NumInsertWorkers int `mapstructure:"clone-num-insert-workers"`
// SegmentSize is the segment size for clone operations (e.g., "500MB", "1GiB").
// Empty string means auto (calculated at runtime for each collection).
SegmentSize string `mapstructure:"clone-segment-size"`
// ReadBatchSize is the read batch size during clone (e.g., "16MiB", "100MB").
// Empty string means auto (calculated at runtime for each collection).
ReadBatchSize string `mapstructure:"clone-read-batch-size"`
}
Loading
Loading