From 26a7e1b3bd1185c60cd5dcf4f8fe3ef060516591 Mon Sep 17 00:00:00 2001 From: Joy Gao <17896160+jgao54@users.noreply.github.com> Date: Mon, 7 Jul 2025 11:43:52 -1000 Subject: [PATCH 1/2] mongo version check --- flow/connectors/mongo/mongo.go | 47 ++++--------------------------- flow/connectors/mongo/validate.go | 36 +++++++++++++++++++++++ flow/shared/mongo/util.go | 44 +++++++++++++++++++++++++++++ flow/shared/mongo/validation.go | 46 ++++++++++++++++++++++++++++++ 4 files changed, 132 insertions(+), 41 deletions(-) create mode 100644 flow/connectors/mongo/validate.go create mode 100644 flow/shared/mongo/util.go create mode 100644 flow/shared/mongo/validation.go diff --git a/flow/connectors/mongo/mongo.go b/flow/connectors/mongo/mongo.go index 7d56a521da..ea370ac804 100644 --- a/flow/connectors/mongo/mongo.go +++ b/flow/connectors/mongo/mongo.go @@ -21,6 +21,7 @@ import ( "github.com/PeerDB-io/peerdb/flow/model" "github.com/PeerDB-io/peerdb/flow/otel_metrics" "github.com/PeerDB-io/peerdb/flow/shared" + peerdb_mongo "github.com/PeerDB-io/peerdb/flow/shared/mongo" "github.com/PeerDB-io/peerdb/flow/shared/types" ) @@ -45,7 +46,7 @@ func NewMongoConnector(ctx context.Context, config *protos.MongoConfig) (*MongoC client, err := mongo.Connect(options.Client(). SetAppName("PeerDB Mongo Connector"). - SetReadPreference(readpref.Primary()). + SetReadPreference(readpref.SecondaryPreferred()). SetCompressors([]string{"zstd", "snappy"}). ApplyURI(config.Uri)) if err != nil { @@ -80,47 +81,11 @@ func (c *MongoConnector) ConnectionActive(ctx context.Context) error { } func (c *MongoConnector) GetVersion(ctx context.Context) (string, error) { - db := c.client.Database("admin") - - var buildInfoDoc bson.M - if err := db.RunCommand(ctx, bson.D{bson.E{Key: "buildInfo", Value: 1}}).Decode(&buildInfoDoc); err != nil { - return "", fmt.Errorf("failed to run buildInfo command: %w", err) - } - - version, ok := buildInfoDoc["version"].(string) - if !ok { - return "", fmt.Errorf("buildInfo.version is not a string, but %T", buildInfoDoc["version"]) - } - return version, nil -} - -func (c *MongoConnector) ValidateMirrorSource(ctx context.Context, cfg *protos.FlowConnectionConfigs) error { - if cfg.DoInitialSnapshot && cfg.InitialSnapshotOnly { - return nil - } - - // Check if MongoDB is configured as a replica set - var result bson.M - if err := c.client.Database("admin").RunCommand(ctx, bson.D{ - {Key: "replSetGetStatus", Value: 1}, - }).Decode(&result); err != nil { - return fmt.Errorf("failed to get replica set status: %w", err) - } - - // Check if this is a replica set - if _, ok := result["set"]; !ok { - return errors.New("MongoDB is not configured as a replica set, which is required for CDC") - } - - if myState, ok := result["myState"]; !ok { - return errors.New("myState not found in response") - } else if myStateInt, ok := myState.(int32); !ok { - return fmt.Errorf("failed to convert myState %v to int32", myState) - } else if myStateInt != 1 { - return fmt.Errorf("MongoDB is not the primary node in the replica set, current state: %d", myState) + buildInfo, err := peerdb_mongo.GetBuildInfo(ctx, c.client) + if err != nil { + return "", err } - - return nil + return buildInfo.Version, nil } func (c *MongoConnector) GetTableSchema( diff --git a/flow/connectors/mongo/validate.go b/flow/connectors/mongo/validate.go new file mode 100644 index 0000000000..3db7d028c2 --- /dev/null +++ b/flow/connectors/mongo/validate.go @@ -0,0 +1,36 @@ +package connmongo + +import ( + "context" + "fmt" + + "github.com/PeerDB-io/peerdb/flow/generated/protos" + shared_mongo "github.com/PeerDB-io/peerdb/flow/shared/mongo" +) + +func (c *MongoConnector) ValidateCheck(ctx context.Context) error { + version, err := c.GetVersion(ctx) + if err != nil { + return err + } + cmp, err := shared_mongo.CompareServerVersions(version, shared_mongo.MinSupportedVersion) + if err != nil { + return err + } + if cmp == -1 { + return fmt.Errorf("require minimum mongo version %s", shared_mongo.MinSupportedVersion) + } + return nil +} + +func (c *MongoConnector) ValidateMirrorSource(ctx context.Context, cfg *protos.FlowConnectionConfigs) error { + if cfg.DoInitialSnapshot && cfg.InitialSnapshotOnly { + return nil + } + + _, err := shared_mongo.GetReplSetGetStatus(ctx, c.client) + if err != nil { + return fmt.Errorf("failed to get replica set status: %w", err) + } + return nil +} diff --git a/flow/shared/mongo/util.go b/flow/shared/mongo/util.go new file mode 100644 index 0000000000..5f0d97adac --- /dev/null +++ b/flow/shared/mongo/util.go @@ -0,0 +1,44 @@ +package mongo + +import ( + "cmp" + "fmt" + "strconv" + "strings" +) + +func CompareServerVersions(a, b string) (int, error) { + aMajor, aRest, _ := strings.Cut(a, ".") + bMajor, bRest, _ := strings.Cut(b, ".") + + if majorCompare, err := compareSubVersion("major", aMajor, bMajor, a, b); err != nil || majorCompare != 0 { + return majorCompare, err + } + + aMinor, aPatch, _ := strings.Cut(aRest, ".") + bMinor, bPatch, _ := strings.Cut(bRest, ".") + + if minorCompare, err := compareSubVersion("minor", aMinor, bMinor, a, b); err != nil || minorCompare != 0 { + return minorCompare, err + } + + return compareSubVersion("patch", aPatch, bPatch, a, b) +} + +func compareSubVersion(typ, a, b, aFull, bFull string) (int, error) { + if a == "" || b == "" { + return 0, nil + } + + var aNum, bNum int + var err error + + if aNum, err = strconv.Atoi(a); err != nil { + return 0, fmt.Errorf("cannot parse %s version %s of %s", typ, a, aFull) + } + if bNum, err = strconv.Atoi(b); err != nil { + return 0, fmt.Errorf("cannot parse %s version %s of %s", typ, b, bFull) + } + + return cmp.Compare(aNum, bNum), nil +} diff --git a/flow/shared/mongo/validation.go b/flow/shared/mongo/validation.go new file mode 100644 index 0000000000..13fc67075d --- /dev/null +++ b/flow/shared/mongo/validation.go @@ -0,0 +1,46 @@ +package mongo + +import ( + "context" + "fmt" + + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo" +) + +const MinSupportedVersion = "5.1.0" + +type BuildInfo struct { + Version string `bson:"version"` +} + +type ReplSetGetStatus struct { + Set string `bson:"set"` + MyState int `bson:"myState"` +} + +func GetBuildInfo(ctx context.Context, client *mongo.Client) (*BuildInfo, error) { + singleResult := client.Database("admin").RunCommand(ctx, bson.D{bson.E{Key: "buildInfo", Value: 1}}) + if singleResult.Err() != nil { + return nil, fmt.Errorf("failed to run 'buildInfo' command: %w", singleResult.Err()) + } + var info BuildInfo + if err := singleResult.Decode(&info); err != nil { + return nil, fmt.Errorf("failed to decode BuildInfo: %w", err) + } + return &info, nil +} + +func GetReplSetGetStatus(ctx context.Context, client *mongo.Client) (*ReplSetGetStatus, error) { + singleResult := client.Database("admin").RunCommand(ctx, bson.D{ + bson.E{Key: "replSetGetStatus", Value: 1}, + }) + if singleResult.Err() != nil { + return nil, fmt.Errorf("failed to run 'replSetGetStatus' command: %w", singleResult.Err()) + } + var status ReplSetGetStatus + if err := singleResult.Decode(&status); err != nil { + return nil, fmt.Errorf("failed to decode ReplSetGetStatus: %w", err) + } + return &status, nil +} From 3ddf089e000ee6de6b8ceef3d3ca001fe9027498 Mon Sep 17 00:00:00 2001 From: Joy Gao <17896160+jgao54@users.noreply.github.com> Date: Mon, 7 Jul 2025 18:40:32 -1000 Subject: [PATCH 2/2] ci --- .github/workflows/flow.yml | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index 959a932177..d30f980fbb 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -328,10 +328,18 @@ jobs: EOF - mkdir ch1 ch2 chkeep + mkdir chkeep ch1 ch2 + (cd chkeep && ../clickhouse keeper -C ../config-keeper.xml) & + while true; do + if echo "ruok" | nc -w 3 127.0.0.1 2181 2>/dev/null | grep -q "imok"; then + break + fi + echo "Waiting for keeper..." + sleep 1 + done + sleep 5 (cd ch1 && ../clickhouse server -C ../config1.xml) & (cd ch2 && ../clickhouse server -C ../config2.xml) & - (cd chkeep && ../clickhouse keeper -C ../config-keeper.xml) & - name: Install Temporal CLI uses: temporalio/setup-temporal@1059a504f87e7fa2f385e3fa40d1aa7e62f1c6ca # v0