Skip to content

Commit 26a7e1b

Browse files
committed
mongo version check
1 parent 16bd254 commit 26a7e1b

File tree

4 files changed

+132
-41
lines changed

4 files changed

+132
-41
lines changed

flow/connectors/mongo/mongo.go

Lines changed: 6 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/PeerDB-io/peerdb/flow/model"
2222
"github.com/PeerDB-io/peerdb/flow/otel_metrics"
2323
"github.com/PeerDB-io/peerdb/flow/shared"
24+
peerdb_mongo "github.com/PeerDB-io/peerdb/flow/shared/mongo"
2425
"github.com/PeerDB-io/peerdb/flow/shared/types"
2526
)
2627

@@ -45,7 +46,7 @@ func NewMongoConnector(ctx context.Context, config *protos.MongoConfig) (*MongoC
4546

4647
client, err := mongo.Connect(options.Client().
4748
SetAppName("PeerDB Mongo Connector").
48-
SetReadPreference(readpref.Primary()).
49+
SetReadPreference(readpref.SecondaryPreferred()).
4950
SetCompressors([]string{"zstd", "snappy"}).
5051
ApplyURI(config.Uri))
5152
if err != nil {
@@ -80,47 +81,11 @@ func (c *MongoConnector) ConnectionActive(ctx context.Context) error {
8081
}
8182

8283
func (c *MongoConnector) GetVersion(ctx context.Context) (string, error) {
83-
db := c.client.Database("admin")
84-
85-
var buildInfoDoc bson.M
86-
if err := db.RunCommand(ctx, bson.D{bson.E{Key: "buildInfo", Value: 1}}).Decode(&buildInfoDoc); err != nil {
87-
return "", fmt.Errorf("failed to run buildInfo command: %w", err)
88-
}
89-
90-
version, ok := buildInfoDoc["version"].(string)
91-
if !ok {
92-
return "", fmt.Errorf("buildInfo.version is not a string, but %T", buildInfoDoc["version"])
93-
}
94-
return version, nil
95-
}
96-
97-
func (c *MongoConnector) ValidateMirrorSource(ctx context.Context, cfg *protos.FlowConnectionConfigs) error {
98-
if cfg.DoInitialSnapshot && cfg.InitialSnapshotOnly {
99-
return nil
100-
}
101-
102-
// Check if MongoDB is configured as a replica set
103-
var result bson.M
104-
if err := c.client.Database("admin").RunCommand(ctx, bson.D{
105-
{Key: "replSetGetStatus", Value: 1},
106-
}).Decode(&result); err != nil {
107-
return fmt.Errorf("failed to get replica set status: %w", err)
108-
}
109-
110-
// Check if this is a replica set
111-
if _, ok := result["set"]; !ok {
112-
return errors.New("MongoDB is not configured as a replica set, which is required for CDC")
113-
}
114-
115-
if myState, ok := result["myState"]; !ok {
116-
return errors.New("myState not found in response")
117-
} else if myStateInt, ok := myState.(int32); !ok {
118-
return fmt.Errorf("failed to convert myState %v to int32", myState)
119-
} else if myStateInt != 1 {
120-
return fmt.Errorf("MongoDB is not the primary node in the replica set, current state: %d", myState)
84+
buildInfo, err := peerdb_mongo.GetBuildInfo(ctx, c.client)
85+
if err != nil {
86+
return "", err
12187
}
122-
123-
return nil
88+
return buildInfo.Version, nil
12489
}
12590

12691
func (c *MongoConnector) GetTableSchema(

flow/connectors/mongo/validate.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package connmongo
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/PeerDB-io/peerdb/flow/generated/protos"
8+
shared_mongo "github.com/PeerDB-io/peerdb/flow/shared/mongo"
9+
)
10+
11+
func (c *MongoConnector) ValidateCheck(ctx context.Context) error {
12+
version, err := c.GetVersion(ctx)
13+
if err != nil {
14+
return err
15+
}
16+
cmp, err := shared_mongo.CompareServerVersions(version, shared_mongo.MinSupportedVersion)
17+
if err != nil {
18+
return err
19+
}
20+
if cmp == -1 {
21+
return fmt.Errorf("require minimum mongo version %s", shared_mongo.MinSupportedVersion)
22+
}
23+
return nil
24+
}
25+
26+
func (c *MongoConnector) ValidateMirrorSource(ctx context.Context, cfg *protos.FlowConnectionConfigs) error {
27+
if cfg.DoInitialSnapshot && cfg.InitialSnapshotOnly {
28+
return nil
29+
}
30+
31+
_, err := shared_mongo.GetReplSetGetStatus(ctx, c.client)
32+
if err != nil {
33+
return fmt.Errorf("failed to get replica set status: %w", err)
34+
}
35+
return nil
36+
}

flow/shared/mongo/util.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package mongo
2+
3+
import (
4+
"cmp"
5+
"fmt"
6+
"strconv"
7+
"strings"
8+
)
9+
10+
func CompareServerVersions(a, b string) (int, error) {
11+
aMajor, aRest, _ := strings.Cut(a, ".")
12+
bMajor, bRest, _ := strings.Cut(b, ".")
13+
14+
if majorCompare, err := compareSubVersion("major", aMajor, bMajor, a, b); err != nil || majorCompare != 0 {
15+
return majorCompare, err
16+
}
17+
18+
aMinor, aPatch, _ := strings.Cut(aRest, ".")
19+
bMinor, bPatch, _ := strings.Cut(bRest, ".")
20+
21+
if minorCompare, err := compareSubVersion("minor", aMinor, bMinor, a, b); err != nil || minorCompare != 0 {
22+
return minorCompare, err
23+
}
24+
25+
return compareSubVersion("patch", aPatch, bPatch, a, b)
26+
}
27+
28+
func compareSubVersion(typ, a, b, aFull, bFull string) (int, error) {
29+
if a == "" || b == "" {
30+
return 0, nil
31+
}
32+
33+
var aNum, bNum int
34+
var err error
35+
36+
if aNum, err = strconv.Atoi(a); err != nil {
37+
return 0, fmt.Errorf("cannot parse %s version %s of %s", typ, a, aFull)
38+
}
39+
if bNum, err = strconv.Atoi(b); err != nil {
40+
return 0, fmt.Errorf("cannot parse %s version %s of %s", typ, b, bFull)
41+
}
42+
43+
return cmp.Compare(aNum, bNum), nil
44+
}

flow/shared/mongo/validation.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package mongo
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"go.mongodb.org/mongo-driver/v2/bson"
8+
"go.mongodb.org/mongo-driver/v2/mongo"
9+
)
10+
11+
const MinSupportedVersion = "5.1.0"
12+
13+
type BuildInfo struct {
14+
Version string `bson:"version"`
15+
}
16+
17+
type ReplSetGetStatus struct {
18+
Set string `bson:"set"`
19+
MyState int `bson:"myState"`
20+
}
21+
22+
func GetBuildInfo(ctx context.Context, client *mongo.Client) (*BuildInfo, error) {
23+
singleResult := client.Database("admin").RunCommand(ctx, bson.D{bson.E{Key: "buildInfo", Value: 1}})
24+
if singleResult.Err() != nil {
25+
return nil, fmt.Errorf("failed to run 'buildInfo' command: %w", singleResult.Err())
26+
}
27+
var info BuildInfo
28+
if err := singleResult.Decode(&info); err != nil {
29+
return nil, fmt.Errorf("failed to decode BuildInfo: %w", err)
30+
}
31+
return &info, nil
32+
}
33+
34+
func GetReplSetGetStatus(ctx context.Context, client *mongo.Client) (*ReplSetGetStatus, error) {
35+
singleResult := client.Database("admin").RunCommand(ctx, bson.D{
36+
bson.E{Key: "replSetGetStatus", Value: 1},
37+
})
38+
if singleResult.Err() != nil {
39+
return nil, fmt.Errorf("failed to run 'replSetGetStatus' command: %w", singleResult.Err())
40+
}
41+
var status ReplSetGetStatus
42+
if err := singleResult.Decode(&status); err != nil {
43+
return nil, fmt.Errorf("failed to decode ReplSetGetStatus: %w", err)
44+
}
45+
return &status, nil
46+
}

0 commit comments

Comments
 (0)