Skip to content

Commit ac8909c

Browse files
authored
Merge branch 'main' into release-0.7.0
2 parents dc90349 + 54840e7 commit ac8909c

File tree

17 files changed

+1889
-521
lines changed

17 files changed

+1889
-521
lines changed

.vscode/settings.json

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
"go.formatFlags": [
2424
"-extra"
2525
],
26-
"go.formatTool": "gofumpt",
2726
"go.lintTool": "golangci-lint-v2",
2827
"go.useLanguageServer": true,
2928
"gopls": {
@@ -40,5 +39,31 @@
4039
"tests"
4140
],
4241
"python.testing.pytestEnabled": true,
43-
"python.testing.unittestEnabled": false
42+
"python.testing.unittestEnabled": false,
43+
"cSpell.words": [
44+
"bson",
45+
"clustersync",
46+
"cmdutil",
47+
"codegen",
48+
"colls",
49+
"connstring",
50+
"contextcheck",
51+
"Debugf",
52+
"errgroup",
53+
"errorlint",
54+
"Infof",
55+
"keygen",
56+
"mapstructure",
57+
"nolint",
58+
"opencode",
59+
"pcsm",
60+
"pipefail",
61+
"readconcern",
62+
"readpref",
63+
"Warnf",
64+
"wrapcheck",
65+
"Wrapf",
66+
"writeconcern",
67+
"zerolog"
68+
]
4469
}

config/config.go

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
// Package config provides configuration management for PCSM using Viper.
2+
package config
3+
4+
import (
5+
"context"
6+
"math"
7+
"os"
8+
"slices"
9+
"strings"
10+
"time"
11+
12+
"github.com/dustin/go-humanize"
13+
"github.com/go-viper/mapstructure/v2"
14+
"github.com/spf13/cobra"
15+
"github.com/spf13/viper"
16+
17+
"github.com/percona/percona-clustersync-mongodb/errors"
18+
"github.com/percona/percona-clustersync-mongodb/log"
19+
)
20+
21+
// Config holds all PCSM configuration.
22+
type Config struct {
23+
Port int `mapstructure:"port"`
24+
Source string `mapstructure:"source"`
25+
Target string `mapstructure:"target"`
26+
27+
Log LogConfig `mapstructure:",squash"`
28+
29+
MongoDB MongoDBConfig `mapstructure:",squash"`
30+
31+
UseCollectionBulkWrite bool `mapstructure:"use-collection-bulk-write"`
32+
33+
Clone CloneConfig `mapstructure:",squash"`
34+
35+
// hidden startup flags
36+
Start bool `mapstructure:"start"`
37+
ResetState bool `mapstructure:"reset-state"`
38+
PauseOnInitialSync bool `mapstructure:"pause-on-initial-sync"`
39+
}
40+
41+
// LogConfig holds logging configuration.
42+
type LogConfig struct {
43+
Level string `mapstructure:"log-level"`
44+
JSON bool `mapstructure:"log-json"`
45+
NoColor bool `mapstructure:"log-no-color"`
46+
}
47+
48+
// MongoDBConfig holds MongoDB client configuration.
49+
type MongoDBConfig struct {
50+
OperationTimeout time.Duration `mapstructure:"mongodb-operation-timeout"`
51+
TargetCompressors []string `mapstructure:"dev-target-client-compressors"`
52+
}
53+
54+
// CloneConfig holds clone operation configuration.
55+
type CloneConfig struct {
56+
// NumParallelCollections is the number of collections to clone in parallel.
57+
// 0 means auto (calculated at runtime).
58+
NumParallelCollections int `mapstructure:"clone-num-parallel-collections"`
59+
// NumReadWorkers is the number of read workers during clone.
60+
// 0 means auto (calculated at runtime).
61+
NumReadWorkers int `mapstructure:"clone-num-read-workers"`
62+
// NumInsertWorkers is the number of insert workers during clone.
63+
// 0 means auto (calculated at runtime).
64+
NumInsertWorkers int `mapstructure:"clone-num-insert-workers"`
65+
// SegmentSize is the segment size for clone operations (e.g., "500MB", "1GiB").
66+
// Empty string means auto (calculated at runtime for each collection).
67+
SegmentSize string `mapstructure:"clone-segment-size"`
68+
// ReadBatchSize is the read batch size during clone (e.g., "16MiB", "100MB").
69+
// Empty string means auto (calculated at runtime for each collection).
70+
ReadBatchSize string `mapstructure:"clone-read-batch-size"`
71+
}
72+
73+
// Load initializes Viper and populates the provided Config.
74+
func Load(cmd *cobra.Command, cfg *Config) error {
75+
viper.SetEnvPrefix("PCSM")
76+
viper.SetEnvKeyReplacer(strings.NewReplacer("-", "_"))
77+
viper.AutomaticEnv()
78+
79+
if cmd.PersistentFlags() != nil {
80+
_ = viper.BindPFlags(cmd.PersistentFlags())
81+
}
82+
83+
if cmd.Flags() != nil {
84+
_ = viper.BindPFlags(cmd.Flags())
85+
}
86+
87+
bindEnvVars()
88+
89+
err := viper.Unmarshal(cfg, viper.DecodeHook(
90+
mapstructure.ComposeDecodeHookFunc(
91+
mapstructure.StringToTimeDurationHookFunc(),
92+
mapstructure.StringToSliceHookFunc(","),
93+
),
94+
))
95+
if err != nil {
96+
return errors.Wrap(err, "unmarshal config")
97+
}
98+
99+
cfg.MongoDB.TargetCompressors = filterCompressors(cfg.MongoDB.TargetCompressors)
100+
101+
if viper.GetBool("no-color") {
102+
cfg.Log.NoColor = true
103+
}
104+
105+
return nil
106+
}
107+
108+
// WarnDeprecatedEnvVars logs warnings for any deprecated environment variables that are set.
109+
// Expects the logger to be initialized.
110+
func WarnDeprecatedEnvVars(ctx context.Context) {
111+
deprecated := map[string]string{
112+
"PLM_MONGODB_CLI_OPERATION_TIMEOUT": "PCSM_MONGODB_OPERATION_TIMEOUT",
113+
"PCSM_NO_COLOR": "PCSM_LOG_NO_COLOR",
114+
}
115+
116+
for old, replacement := range deprecated {
117+
if _, ok := os.LookupEnv(old); ok {
118+
log.Ctx(ctx).Warnf(
119+
"Environment variable %s is deprecated; use %s instead",
120+
old, replacement,
121+
)
122+
}
123+
}
124+
}
125+
126+
func bindEnvVars() {
127+
_ = viper.BindEnv("port", "PCSM_PORT")
128+
129+
_ = viper.BindEnv("source", "PCSM_SOURCE_URI")
130+
_ = viper.BindEnv("target", "PCSM_TARGET_URI")
131+
132+
_ = viper.BindEnv("log-level", "PCSM_LOG_LEVEL")
133+
_ = viper.BindEnv("log-json", "PCSM_LOG_JSON")
134+
_ = viper.BindEnv("log-no-color",
135+
"PCSM_LOG_NO_COLOR",
136+
"PCSM_NO_COLOR", // deprecated
137+
)
138+
139+
_ = viper.BindEnv("mongodb-operation-timeout",
140+
"PCSM_MONGODB_OPERATION_TIMEOUT",
141+
"PLM_MONGODB_CLI_OPERATION_TIMEOUT", // deprecated
142+
)
143+
144+
_ = viper.BindEnv("use-collection-bulk-write", "PCSM_USE_COLLECTION_BULK_WRITE")
145+
146+
_ = viper.BindEnv("dev-target-client-compressors", "PCSM_DEV_TARGET_CLIENT_COMPRESSORS")
147+
148+
_ = viper.BindEnv("clone-num-parallel-collections", "PCSM_CLONE_NUM_PARALLEL_COLLECTIONS")
149+
_ = viper.BindEnv("clone-num-read-workers", "PCSM_CLONE_NUM_READ_WORKERS")
150+
_ = viper.BindEnv("clone-num-insert-workers", "PCSM_CLONE_NUM_INSERT_WORKERS")
151+
_ = viper.BindEnv("clone-segment-size", "PCSM_CLONE_SEGMENT_SIZE")
152+
_ = viper.BindEnv("clone-read-batch-size", "PCSM_CLONE_READ_BATCH_SIZE")
153+
}
154+
155+
//nolint:gochecknoglobals
156+
var allowedCompressors = []string{"zstd", "zlib", "snappy"}
157+
158+
func filterCompressors(compressors []string) []string {
159+
if len(compressors) == 0 {
160+
return nil
161+
}
162+
163+
filtered := make([]string, 0, len(allowedCompressors))
164+
165+
for _, c := range compressors {
166+
c = strings.TrimSpace(c)
167+
if slices.Contains(allowedCompressors, c) && !slices.Contains(filtered, c) {
168+
filtered = append(filtered, c)
169+
}
170+
}
171+
172+
return filtered
173+
}
174+
175+
// ParseAndValidateCloneSegmentSize parses a byte size string and validates it.
176+
// It allows 0 (auto) or values within [MinCloneSegmentSizeBytes, MaxCloneSegmentSizeBytes].
177+
func ParseAndValidateCloneSegmentSize(value string) (int64, error) {
178+
sizeBytes, err := humanize.ParseBytes(value)
179+
if err != nil {
180+
return 0, errors.Wrapf(err, "invalid cloneSegmentSize value: %s", value)
181+
}
182+
183+
err = ValidateCloneSegmentSize(sizeBytes)
184+
if err != nil {
185+
return 0, err
186+
}
187+
188+
return int64(min(sizeBytes, math.MaxInt64)), nil //nolint:gosec
189+
}
190+
191+
// ParseAndValidateCloneReadBatchSize parses a byte size string and validates it.
192+
// It allows 0 (auto) or values within [[MinCloneReadBatchSizeBytes], [MaxCloneReadBatchSizeBytes]].
193+
func ParseAndValidateCloneReadBatchSize(value string) (int32, error) {
194+
sizeBytes, err := humanize.ParseBytes(value)
195+
if err != nil {
196+
return 0, errors.Wrapf(err, "invalid cloneReadBatchSize value: %s", value)
197+
}
198+
199+
err = ValidateCloneReadBatchSize(sizeBytes)
200+
if err != nil {
201+
return 0, err
202+
}
203+
204+
return int32(min(sizeBytes, math.MaxInt32)), nil //nolint:gosec
205+
}

config/config_test.go

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
package config_test
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
7+
"github.com/dustin/go-humanize"
8+
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/require"
10+
11+
"github.com/percona/percona-clustersync-mongodb/config"
12+
)
13+
14+
func TestParseAndValidateCloneSegmentSize(t *testing.T) {
15+
t.Parallel()
16+
17+
tests := []struct {
18+
name string
19+
value string
20+
want int64
21+
wantErr string
22+
}{
23+
{
24+
name: "valid size 500MB (above minimum)",
25+
value: "500MB",
26+
want: 500 * humanize.MByte,
27+
},
28+
{
29+
name: "valid size 1GiB",
30+
value: "1GiB",
31+
want: humanize.GiByte,
32+
},
33+
{
34+
name: "zero value (auto)",
35+
value: "0",
36+
want: 0,
37+
},
38+
{
39+
name: "below minimum (100MB)",
40+
value: "100MB",
41+
wantErr: "cloneSegmentSize must be at least",
42+
},
43+
{
44+
name: "above maximum",
45+
value: "100GiB",
46+
wantErr: "cloneSegmentSize must be at most",
47+
},
48+
{
49+
name: "at minimum boundary (using exact bytes)",
50+
value: fmt.Sprintf("%dB", config.MinCloneSegmentSizeBytes),
51+
want: int64(config.MinCloneSegmentSizeBytes),
52+
},
53+
{
54+
name: "at maximum boundary",
55+
value: "64GiB",
56+
want: int64(config.MaxCloneSegmentSizeBytes),
57+
},
58+
{
59+
name: "invalid format",
60+
value: "abc",
61+
wantErr: "invalid cloneSegmentSize value",
62+
},
63+
{
64+
name: "empty string",
65+
value: "",
66+
wantErr: "invalid cloneSegmentSize value",
67+
},
68+
}
69+
70+
for _, tt := range tests {
71+
t.Run(tt.name, func(t *testing.T) {
72+
t.Parallel()
73+
74+
got, err := config.ParseAndValidateCloneSegmentSize(tt.value)
75+
76+
if tt.wantErr == "" {
77+
require.NoError(t, err)
78+
assert.Equal(t, tt.want, got)
79+
} else {
80+
require.Error(t, err)
81+
assert.Contains(t, err.Error(), tt.wantErr)
82+
}
83+
})
84+
}
85+
}
86+
87+
func TestParseAndValidateCloneReadBatchSize(t *testing.T) {
88+
t.Parallel()
89+
90+
tests := []struct {
91+
name string
92+
value string
93+
want int32
94+
wantErr string
95+
}{
96+
{
97+
name: "valid size 16MiB",
98+
value: "16MiB",
99+
want: 16 * humanize.MiByte,
100+
},
101+
{
102+
name: "valid size 48MB",
103+
value: "48MB",
104+
want: 48 * humanize.MByte,
105+
},
106+
{
107+
name: "zero value (auto)",
108+
value: "0",
109+
want: 0,
110+
},
111+
{
112+
name: "below minimum",
113+
value: "1KB",
114+
wantErr: "cloneReadBatchSize must be at least",
115+
},
116+
{
117+
name: "at minimum boundary (using exact bytes)",
118+
value: fmt.Sprintf("%dB", config.MinCloneReadBatchSizeBytes),
119+
want: config.MinCloneReadBatchSizeBytes,
120+
},
121+
{
122+
name: "invalid format",
123+
value: "xyz",
124+
wantErr: "invalid cloneReadBatchSize value",
125+
},
126+
{
127+
name: "empty string",
128+
value: "",
129+
wantErr: "invalid cloneReadBatchSize value",
130+
},
131+
}
132+
133+
for _, tt := range tests {
134+
t.Run(tt.name, func(t *testing.T) {
135+
t.Parallel()
136+
137+
got, err := config.ParseAndValidateCloneReadBatchSize(tt.value)
138+
139+
if tt.wantErr == "" {
140+
require.NoError(t, err)
141+
assert.Equal(t, tt.want, got)
142+
} else {
143+
require.Error(t, err)
144+
assert.Contains(t, err.Error(), tt.wantErr)
145+
}
146+
})
147+
}
148+
}

0 commit comments

Comments
 (0)