Skip to content

Commit ed70fe1

Browse files
committed
mongo version check
1 parent d875058 commit ed70fe1

File tree

6 files changed

+103
-42
lines changed

6 files changed

+103
-42
lines changed

flow/connectors/mongo/mongo.go

Lines changed: 5 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/PeerDB-io/peerdb/flow/model"
2222
"github.com/PeerDB-io/peerdb/flow/otel_metrics"
2323
"github.com/PeerDB-io/peerdb/flow/shared"
24+
peerdb_mongo "github.com/PeerDB-io/peerdb/flow/shared/mongo"
2425
"github.com/PeerDB-io/peerdb/flow/shared/types"
2526
)
2627

@@ -80,47 +81,11 @@ func (c *MongoConnector) ConnectionActive(ctx context.Context) error {
8081
}
8182

8283
func (c *MongoConnector) GetVersion(ctx context.Context) (string, error) {
83-
db := c.client.Database("admin")
84-
85-
var buildInfoDoc bson.M
86-
if err := db.RunCommand(ctx, bson.D{bson.E{Key: "buildInfo", Value: 1}}).Decode(&buildInfoDoc); err != nil {
87-
return "", fmt.Errorf("failed to run buildInfo command: %w", err)
88-
}
89-
90-
version, ok := buildInfoDoc["version"].(string)
91-
if !ok {
92-
return "", fmt.Errorf("buildInfo.version is not a string, but %T", buildInfoDoc["version"])
93-
}
94-
return version, nil
95-
}
96-
97-
func (c *MongoConnector) ValidateMirrorSource(ctx context.Context, cfg *protos.FlowConnectionConfigs) error {
98-
if cfg.DoInitialSnapshot && cfg.InitialSnapshotOnly {
99-
return nil
100-
}
101-
102-
// Check if MongoDB is configured as a replica set
103-
var result bson.M
104-
if err := c.client.Database("admin").RunCommand(ctx, bson.D{
105-
{Key: "replSetGetStatus", Value: 1},
106-
}).Decode(&result); err != nil {
107-
return fmt.Errorf("failed to get replica set status: %w", err)
108-
}
109-
110-
// Check if this is a replica set
111-
if _, ok := result["set"]; !ok {
112-
return errors.New("MongoDB is not configured as a replica set, which is required for CDC")
113-
}
114-
115-
if myState, ok := result["myState"]; !ok {
116-
return errors.New("myState not found in response")
117-
} else if myStateInt, ok := myState.(int32); !ok {
118-
return fmt.Errorf("failed to convert myState %v to int32", myState)
119-
} else if myStateInt != 1 {
120-
return fmt.Errorf("MongoDB is not the primary node in the replica set, current state: %d", myState)
84+
buildInfo, err := peerdb_mongo.GetBuildInfo(ctx, c.client)
85+
if err != nil {
86+
return "", err
12187
}
122-
123-
return nil
88+
return buildInfo.Version, nil
12489
}
12590

12691
func (c *MongoConnector) GetTableSchema(

flow/connectors/mongo/validate.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package connmongo
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/PeerDB-io/peerdb/flow/generated/protos"
8+
shared_mongo "github.com/PeerDB-io/peerdb/flow/shared/mongo"
9+
)
10+
11+
const MinSupportedVersion = "5.1.0"
12+
13+
func (c *MongoConnector) ValidateCheck(ctx context.Context) error {
14+
cmp, err := shared_mongo.CompareServerVersion(ctx, c.client, MinSupportedVersion)
15+
if err != nil {
16+
return err
17+
}
18+
if cmp == -1 {
19+
return fmt.Errorf("require minimum mongo version %s", MinSupportedVersion)
20+
}
21+
return nil
22+
}
23+
24+
func (c *MongoConnector) ValidateMirrorSource(ctx context.Context, cfg *protos.FlowConnectionConfigs) error {
25+
if cfg.DoInitialSnapshot && cfg.InitialSnapshotOnly {
26+
return nil
27+
}
28+
29+
_, err := shared_mongo.GetReplSetGetStatus(ctx, c.client)
30+
if err != nil {
31+
return fmt.Errorf("failed to get replica set status: %w", err)
32+
}
33+
return nil
34+
}

flow/e2eshared/e2eshared.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func RunSuite[T Suite](t *testing.T, setup func(t *testing.T) T) {
3333
subtest.Parallel()
3434
suite := setup(subtest)
3535
subtest.Cleanup(func() {
36-
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
36+
ctx, cancel := context.WithTimeout(t.Context(), time.Minute)
3737
defer cancel()
3838
suite.Teardown(ctx)
3939
})
@@ -56,7 +56,7 @@ func RunSuiteNoParallel[T Suite](t *testing.T, setup func(t *testing.T) T) {
5656
t.Run(m.Name, func(subtest *testing.T) {
5757
suite := setup(subtest)
5858
subtest.Cleanup(func() {
59-
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
59+
ctx, cancel := context.WithTimeout(t.Context(), time.Minute)
6060
defer cancel()
6161
suite.Teardown(ctx)
6262
})

flow/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ require (
1212
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.3.0
1313
github.com/ClickHouse/ch-go v0.66.0
1414
github.com/ClickHouse/clickhouse-go/v2 v2.37.1
15+
github.com/Masterminds/semver v1.5.0
1516
github.com/PeerDB-io/glua64 v1.0.1
1617
github.com/PeerDB-io/gluabit32 v1.0.2
1718
github.com/PeerDB-io/gluajson v1.0.2

flow/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/cloudmock v0
8282
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/cloudmock v0.53.0/go.mod h1:jUZ5LYlw40WMd07qxcQJD5M40aUxrfwqQX1g7zxYnrQ=
8383
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.53.0 h1:Ron4zCA/yk6U7WOBXhTJcDpsUBG9npumK6xw2auFltQ=
8484
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.53.0/go.mod h1:cSgYe11MCNYunTnRXrKiR/tHc0eoKjICUuWpNZoVCOo=
85+
github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww=
86+
github.com/Masterminds/semver v1.5.0/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y=
8587
github.com/PeerDB-io/glua64 v1.0.1 h1:biXLlFF/L5pnJCwDon7hkWkuQPozC8NjKS3J7Wzi69I=
8688
github.com/PeerDB-io/glua64 v1.0.1/go.mod h1:UHmAhniv61bJPMhQvxkpC7jXbn353dSbQviu83bgQVg=
8789
github.com/PeerDB-io/gluabit32 v1.0.2 h1:AGI1Z7dwDVotakpuOOuyTX4/QGi5HUYsipL/VfodmO4=

flow/shared/mongo/validation.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package mongo
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/Masterminds/semver"
8+
"go.mongodb.org/mongo-driver/v2/bson"
9+
"go.mongodb.org/mongo-driver/v2/mongo"
10+
)
11+
12+
type BuildInfo struct {
13+
Version string `bson:"version"`
14+
}
15+
16+
type ReplSetGetStatus struct {
17+
Set string `bson:"set"`
18+
MyState int `bson:"myState"`
19+
}
20+
21+
func GetBuildInfo(ctx context.Context, client *mongo.Client) (*BuildInfo, error) {
22+
singleResult := client.Database("admin").RunCommand(ctx, bson.D{bson.E{Key: "buildInfo", Value: 1}})
23+
if singleResult.Err() != nil {
24+
return nil, fmt.Errorf("failed to run 'buildInfo' command: %w", singleResult.Err())
25+
}
26+
var info BuildInfo
27+
if err := singleResult.Decode(&info); err != nil {
28+
return nil, fmt.Errorf("failed to decode BuildInfo: %w", err)
29+
}
30+
return &info, nil
31+
}
32+
33+
func CompareServerVersion(ctx context.Context, client *mongo.Client, versionString string) (int, error) {
34+
buildInfo, err := GetBuildInfo(ctx, client)
35+
if err != nil {
36+
return 0, fmt.Errorf("failed to get mongo version: %w", err)
37+
}
38+
parsedVersion, err := semver.NewVersion(buildInfo.Version)
39+
if err != nil {
40+
return 0, fmt.Errorf("failed to parse mongo version %w", err)
41+
}
42+
compareVersion := semver.MustParse(versionString)
43+
return parsedVersion.Compare(compareVersion), nil
44+
}
45+
46+
func GetReplSetGetStatus(ctx context.Context, client *mongo.Client) (*ReplSetGetStatus, error) {
47+
db := client.Database("admin")
48+
singleResult := db.RunCommand(ctx, bson.D{
49+
bson.E{Key: "replSetGetStatus", Value: 1},
50+
})
51+
if singleResult.Err() != nil {
52+
return nil, fmt.Errorf("failed to run 'replSetGetStatus' command: %w", singleResult.Err())
53+
}
54+
var status ReplSetGetStatus
55+
if err := singleResult.Decode(&status); err != nil {
56+
return nil, fmt.Errorf("failed to decode ReplSetGetStatus: %w", err)
57+
}
58+
return &status, nil
59+
}

0 commit comments

Comments
 (0)