Skip to content

Commit 24e90d3

Browse files
committed
PCSM-219: Integrate target compressors into Viper config
1 parent 9deb753 commit 24e90d3

File tree

5 files changed

+24
-120
lines changed

5 files changed

+24
-120
lines changed

config/config.go

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package config
33

44
import (
55
"math"
6-
"os"
76
"slices"
87
"strings"
98

@@ -36,11 +35,11 @@ func Load(cmd *cobra.Command) (*Config, error) {
3635
return nil, errors.Wrap(err, "unmarshal config")
3736
}
3837

38+
cfg.MongoDB.TargetCompressors = parseCompressors(cfg.MongoDB.TargetCompressorsRaw)
39+
3940
return &cfg, nil
4041
}
4142

42-
// bindEnvVars binds environment variable names to Viper keys.
43-
// Note: Clone tuning options are CLI/HTTP only (no env var support).
4443
func bindEnvVars() {
4544
_ = viper.BindEnv("port", "PCSM_PORT")
4645

@@ -54,23 +53,23 @@ func bindEnvVars() {
5453
_ = viper.BindEnv("mongodb-operation-timeout", "PCSM_MONGODB_OPERATION_TIMEOUT")
5554

5655
_ = viper.BindEnv("use-collection-bulk-write", "PCSM_USE_COLLECTION_BULK_WRITE")
56+
57+
_ = viper.BindEnv("dev-target-client-compressors", "PCSM_DEV_TARGET_CLIENT_COMPRESSORS")
5758
}
5859

59-
// UseTargetClientCompressors returns a list of enabled compressors (from "zstd", "zlib", "snappy")
60-
// for the target MongoDB client connection, as specified by the comma-separated environment
61-
// variable PCSM_DEV_TARGET_CLIENT_COMPRESSORS. If unset or empty, returns nil.
62-
func UseTargetClientCompressors() []string {
63-
s := strings.TrimSpace(os.Getenv("PCSM_DEV_TARGET_CLIENT_COMPRESSORS"))
60+
//nolint:gochecknoglobals
61+
var allowedCompressors = []string{"zstd", "zlib", "snappy"}
62+
63+
func parseCompressors(raw string) []string {
64+
s := strings.TrimSpace(raw)
6465
if s == "" {
6566
return nil
6667
}
6768

68-
allowCompressors := []string{"zstd", "zlib", "snappy"}
69-
70-
rv := make([]string, 0, min(len(s), len(allowCompressors)))
69+
rv := make([]string, 0, len(allowedCompressors))
7170
for a := range strings.SplitSeq(s, ",") {
7271
a = strings.TrimSpace(a)
73-
if slices.Contains(allowCompressors, a) && !slices.Contains(rv, a) {
72+
if slices.Contains(allowedCompressors, a) && !slices.Contains(rv, a) {
7473
rv = append(rv, a)
7574
}
7675
}
@@ -95,7 +94,7 @@ func ParseAndValidateCloneSegmentSize(value string) (int64, error) {
9594
}
9695

9796
// ParseAndValidateCloneReadBatchSize parses a byte size string and validates it.
98-
// It allows 0 (auto) or values within [MinCloneReadBatchSizeBytes, MaxCloneReadBatchSizeBytes].
97+
// It allows 0 (auto) or values within [[MinCloneReadBatchSizeBytes], [MaxCloneReadBatchSizeBytes]].
9998
func ParseAndValidateCloneReadBatchSize(value string) (int32, error) {
10099
sizeBytes, err := humanize.ParseBytes(value)
101100
if err != nil {

config/config_test.go

Lines changed: 0 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -146,84 +146,3 @@ func TestParseAndValidateCloneReadBatchSize(t *testing.T) {
146146
})
147147
}
148148
}
149-
150-
func TestUseTargetClientCompressors(t *testing.T) {
151-
tests := []struct {
152-
name string
153-
envVal string
154-
want []string
155-
wantNil bool
156-
}{
157-
{
158-
name: "empty env - returns nil",
159-
envVal: "",
160-
want: nil,
161-
wantNil: true,
162-
},
163-
{
164-
name: "single valid compressor zstd",
165-
envVal: "zstd",
166-
want: []string{"zstd"},
167-
},
168-
{
169-
name: "single valid compressor zlib",
170-
envVal: "zlib",
171-
want: []string{"zlib"},
172-
},
173-
{
174-
name: "single valid compressor snappy",
175-
envVal: "snappy",
176-
want: []string{"snappy"},
177-
},
178-
{
179-
name: "multiple valid compressors",
180-
envVal: "zstd,zlib,snappy",
181-
want: []string{"zstd", "zlib", "snappy"},
182-
},
183-
{
184-
name: "compressors with spaces",
185-
envVal: " zstd , zlib , snappy ",
186-
want: []string{"zstd", "zlib", "snappy"},
187-
},
188-
{
189-
name: "invalid compressor ignored",
190-
envVal: "zstd,invalid,zlib",
191-
want: []string{"zstd", "zlib"},
192-
},
193-
{
194-
name: "all invalid compressors - returns empty slice",
195-
envVal: "invalid,gzip,lz4",
196-
want: []string{},
197-
},
198-
{
199-
name: "duplicate compressors - deduplicated",
200-
envVal: "zstd,zstd,zlib,zstd",
201-
want: []string{"zstd", "zlib"},
202-
},
203-
{
204-
name: "whitespace only - returns nil",
205-
envVal: " ",
206-
want: nil,
207-
wantNil: true,
208-
},
209-
{
210-
name: "mixed valid and invalid with spaces",
211-
envVal: " zstd , invalid , snappy ",
212-
want: []string{"zstd", "snappy"},
213-
},
214-
}
215-
216-
for _, tt := range tests {
217-
t.Run(tt.name, func(t *testing.T) {
218-
t.Setenv("PCSM_DEV_TARGET_CLIENT_COMPRESSORS", tt.envVal)
219-
220-
got := config.UseTargetClientCompressors()
221-
222-
if tt.wantNil {
223-
assert.Nil(t, got)
224-
} else {
225-
assert.Equal(t, tt.want, got)
226-
}
227-
})
228-
}
229-
}

config/schema.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,17 @@ import (
66

77
// Config holds all PCSM configuration.
88
type Config struct {
9-
// Connection
109
Port int `mapstructure:"port"`
1110
Source string `mapstructure:"source"`
1211
Target string `mapstructure:"target"`
1312

14-
// Logging (squash keeps flat keys)
1513
Log LogConfig `mapstructure:",squash"`
1614

17-
// MongoDB client options
1815
MongoDB MongoDBConfig `mapstructure:",squash"`
1916

20-
// Internal options
2117
UseCollectionBulkWrite bool `mapstructure:"use-collection-bulk-write"`
2218

23-
// Hidden startup flags
19+
// hidden startup flags
2420
Start bool `mapstructure:"start"`
2521
ResetState bool `mapstructure:"reset-state"`
2622
PauseOnInitialSync bool `mapstructure:"pause-on-initial-sync"`
@@ -36,6 +32,14 @@ type LogConfig struct {
3632
// MongoDBConfig holds MongoDB client configuration.
3733
type MongoDBConfig struct {
3834
OperationTimeout string `mapstructure:"mongodb-operation-timeout"`
35+
36+
// TargetCompressorsRaw is the raw comma-separated env var value (dev option).
37+
// Parsed into TargetCompressors during Load().
38+
TargetCompressorsRaw string `mapstructure:"dev-target-client-compressors"`
39+
40+
// TargetCompressors is the parsed list of compressors for the target connection.
41+
// Populated from TargetCompressorsRaw during Load().
42+
TargetCompressors []string `mapstructure:"-"`
3943
}
4044

4145
// OperationTimeoutDuration returns the parsed timeout or default.

main.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -514,9 +514,7 @@ func createServer(ctx context.Context, cfg *config.Config) (*Server, error) {
514514
lg.Infof("Connected to source cluster [%s]: %s://%s",
515515
sourceVersion.FullString(), cs.Scheme, strings.Join(cs.Hosts, ","))
516516

517-
target, err := topo.ConnectWithOptions(ctx, cfg.Target, cfg, &topo.ConnectOptions{
518-
Compressors: config.UseTargetClientCompressors(),
519-
})
517+
target, err := topo.Connect(ctx, cfg.Target, cfg)
520518
if err != nil {
521519
return nil, errors.Wrap(err, "connect to target cluster")
522520
}

topo/connect.go

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,8 @@ import (
1919
"github.com/percona/percona-clustersync-mongodb/util"
2020
)
2121

22-
type ConnectOptions struct {
23-
Compressors []string
24-
}
25-
2622
// Connect establishes a connection to a MongoDB instance using the provided URI.
27-
// If the URI is empty, it returns an error.
2823
func Connect(ctx context.Context, uri string, cfg *config.Config) (*mongo.Client, error) {
29-
return ConnectWithOptions(ctx, uri, cfg, &ConnectOptions{})
30-
}
31-
32-
// ConnectWithOptions establishes a connection to a MongoDB instance using the provided URI and options.
33-
// If the URI is empty, it returns an error.
34-
func ConnectWithOptions(
35-
ctx context.Context,
36-
uri string,
37-
cfg *config.Config,
38-
connOpts *ConnectOptions,
39-
) (*mongo.Client, error) {
4024
if uri == "" {
4125
return nil, errors.New("invalid MongoDB URI")
4226
}
@@ -61,8 +45,8 @@ func ConnectWithOptions(
6145
SetWriteConcern(writeconcern.Majority()).
6246
SetTimeout(cfg.MongoDB.OperationTimeoutDuration())
6347

64-
if connOpts != nil && connOpts.Compressors != nil {
65-
opts.SetCompressors(connOpts.Compressors)
48+
if uri == cfg.Target && len(cfg.MongoDB.TargetCompressors) > 0 {
49+
opts.SetCompressors(cfg.MongoDB.TargetCompressors)
6650
}
6751

6852
if config.MongoLogEnabled {

0 commit comments

Comments
 (0)