Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -328,10 +328,18 @@ jobs:
</keeper_server>
</clickhouse>
EOF
mkdir ch1 ch2 chkeep
mkdir chkeep ch1 ch2
(cd chkeep && ../clickhouse keeper -C ../config-keeper.xml) &
while true; do
if echo "ruok" | nc -w 3 127.0.0.1 2181 2>/dev/null | grep -q "imok"; then
break
fi
echo "Waiting for keeper..."
sleep 1
done
sleep 5
(cd ch1 && ../clickhouse server -C ../config1.xml) &
(cd ch2 && ../clickhouse server -C ../config2.xml) &
(cd chkeep && ../clickhouse keeper -C ../config-keeper.xml) &

- name: Install Temporal CLI
uses: temporalio/setup-temporal@1059a504f87e7fa2f385e3fa40d1aa7e62f1c6ca # v0
Expand Down
47 changes: 6 additions & 41 deletions flow/connectors/mongo/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/PeerDB-io/peerdb/flow/model"
"github.com/PeerDB-io/peerdb/flow/otel_metrics"
"github.com/PeerDB-io/peerdb/flow/shared"
peerdb_mongo "github.com/PeerDB-io/peerdb/flow/shared/mongo"
"github.com/PeerDB-io/peerdb/flow/shared/types"
)

Expand All @@ -45,7 +46,7 @@ func NewMongoConnector(ctx context.Context, config *protos.MongoConfig) (*MongoC

client, err := mongo.Connect(options.Client().
SetAppName("PeerDB Mongo Connector").
SetReadPreference(readpref.Primary()).
SetReadPreference(readpref.SecondaryPreferred()).
SetCompressors([]string{"zstd", "snappy"}).
ApplyURI(config.Uri))
if err != nil {
Expand Down Expand Up @@ -80,47 +81,11 @@ func (c *MongoConnector) ConnectionActive(ctx context.Context) error {
}

func (c *MongoConnector) GetVersion(ctx context.Context) (string, error) {
db := c.client.Database("admin")

var buildInfoDoc bson.M
if err := db.RunCommand(ctx, bson.D{bson.E{Key: "buildInfo", Value: 1}}).Decode(&buildInfoDoc); err != nil {
return "", fmt.Errorf("failed to run buildInfo command: %w", err)
}

version, ok := buildInfoDoc["version"].(string)
if !ok {
return "", fmt.Errorf("buildInfo.version is not a string, but %T", buildInfoDoc["version"])
}
return version, nil
}

func (c *MongoConnector) ValidateMirrorSource(ctx context.Context, cfg *protos.FlowConnectionConfigs) error {
if cfg.DoInitialSnapshot && cfg.InitialSnapshotOnly {
return nil
}

// Check if MongoDB is configured as a replica set
var result bson.M
if err := c.client.Database("admin").RunCommand(ctx, bson.D{
{Key: "replSetGetStatus", Value: 1},
}).Decode(&result); err != nil {
return fmt.Errorf("failed to get replica set status: %w", err)
}

// Check if this is a replica set
if _, ok := result["set"]; !ok {
return errors.New("MongoDB is not configured as a replica set, which is required for CDC")
}

if myState, ok := result["myState"]; !ok {
return errors.New("myState not found in response")
} else if myStateInt, ok := myState.(int32); !ok {
return fmt.Errorf("failed to convert myState %v to int32", myState)
} else if myStateInt != 1 {
return fmt.Errorf("MongoDB is not the primary node in the replica set, current state: %d", myState)
buildInfo, err := peerdb_mongo.GetBuildInfo(ctx, c.client)
if err != nil {
return "", err
}

return nil
return buildInfo.Version, nil
}

func (c *MongoConnector) GetTableSchema(
Expand Down
36 changes: 36 additions & 0 deletions flow/connectors/mongo/validate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package connmongo

import (
"context"
"fmt"

"github.com/PeerDB-io/peerdb/flow/generated/protos"
shared_mongo "github.com/PeerDB-io/peerdb/flow/shared/mongo"
)

func (c *MongoConnector) ValidateCheck(ctx context.Context) error {
version, err := c.GetVersion(ctx)
if err != nil {
return err
}
cmp, err := shared_mongo.CompareServerVersions(version, shared_mongo.MinSupportedVersion)
if err != nil {
return err
}
if cmp == -1 {
return fmt.Errorf("require minimum mongo version %s", shared_mongo.MinSupportedVersion)
}
return nil
}

func (c *MongoConnector) ValidateMirrorSource(ctx context.Context, cfg *protos.FlowConnectionConfigs) error {
if cfg.DoInitialSnapshot && cfg.InitialSnapshotOnly {
return nil
}

_, err := shared_mongo.GetReplSetGetStatus(ctx, c.client)
if err != nil {
return fmt.Errorf("failed to get replica set status: %w", err)
}
return nil
}
44 changes: 44 additions & 0 deletions flow/shared/mongo/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package mongo

import (
"cmp"
"fmt"
"strconv"
"strings"
)

func CompareServerVersions(a, b string) (int, error) {
aMajor, aRest, _ := strings.Cut(a, ".")
bMajor, bRest, _ := strings.Cut(b, ".")

if majorCompare, err := compareSubVersion("major", aMajor, bMajor, a, b); err != nil || majorCompare != 0 {
return majorCompare, err
}

aMinor, aPatch, _ := strings.Cut(aRest, ".")
bMinor, bPatch, _ := strings.Cut(bRest, ".")

if minorCompare, err := compareSubVersion("minor", aMinor, bMinor, a, b); err != nil || minorCompare != 0 {
return minorCompare, err
}

return compareSubVersion("patch", aPatch, bPatch, a, b)
}

func compareSubVersion(typ, a, b, aFull, bFull string) (int, error) {
if a == "" || b == "" {
return 0, nil
}

var aNum, bNum int
var err error

if aNum, err = strconv.Atoi(a); err != nil {
return 0, fmt.Errorf("cannot parse %s version %s of %s", typ, a, aFull)
}
if bNum, err = strconv.Atoi(b); err != nil {
return 0, fmt.Errorf("cannot parse %s version %s of %s", typ, b, bFull)
}

return cmp.Compare(aNum, bNum), nil
}
46 changes: 46 additions & 0 deletions flow/shared/mongo/validation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package mongo

import (
"context"
"fmt"

"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo"
)

const MinSupportedVersion = "5.1.0"

type BuildInfo struct {
Version string `bson:"version"`
}

type ReplSetGetStatus struct {
Set string `bson:"set"`
MyState int `bson:"myState"`
}

func GetBuildInfo(ctx context.Context, client *mongo.Client) (*BuildInfo, error) {
singleResult := client.Database("admin").RunCommand(ctx, bson.D{bson.E{Key: "buildInfo", Value: 1}})
if singleResult.Err() != nil {
return nil, fmt.Errorf("failed to run 'buildInfo' command: %w", singleResult.Err())
}
var info BuildInfo
if err := singleResult.Decode(&info); err != nil {
return nil, fmt.Errorf("failed to decode BuildInfo: %w", err)
}
return &info, nil
}

func GetReplSetGetStatus(ctx context.Context, client *mongo.Client) (*ReplSetGetStatus, error) {
singleResult := client.Database("admin").RunCommand(ctx, bson.D{
bson.E{Key: "replSetGetStatus", Value: 1},
})
if singleResult.Err() != nil {
return nil, fmt.Errorf("failed to run 'replSetGetStatus' command: %w", singleResult.Err())
}
var status ReplSetGetStatus
if err := singleResult.Decode(&status); err != nil {
return nil, fmt.Errorf("failed to decode ReplSetGetStatus: %w", err)
}
return &status, nil
}
Loading