Skip to content

Commit 35bdcd0

Browse files
authored
mongo: move schema logic to shared logic (#3357)
Moved ListDatabases and ListSchema to shared logic for reuse. Test: validated logic locally in peerdb, system dbs/tables and views are not included.
1 parent 75c670e commit 35bdcd0

File tree

2 files changed

+74
-45
lines changed

2 files changed

+74
-45
lines changed

flow/connectors/mongo/schema.go

Lines changed: 7 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -3,24 +3,22 @@ package connmongo
33
import (
44
"context"
55
"fmt"
6-
"slices"
7-
8-
"go.mongodb.org/mongo-driver/v2/bson"
96

107
"github.com/PeerDB-io/peerdb/flow/generated/protos"
8+
shared_mongo "github.com/PeerDB-io/peerdb/flow/shared/mongo"
119
)
1210

1311
func (c *MongoConnector) GetAllTables(ctx context.Context) (*protos.AllTablesResponse, error) {
1412
tableNames := make([]string, 0)
1513

16-
dbNames, err := c.getAllDatabaseNames(ctx)
14+
dbNames, err := shared_mongo.GetDatabaseNames(ctx, c.client)
1715
if err != nil {
1816
return nil, fmt.Errorf("failed to get databases: %w", err)
1917
}
2018
for _, dbName := range dbNames {
21-
collNames, err := c.getCollectionNames(ctx, dbName)
19+
collNames, err := shared_mongo.GetCollectionNames(ctx, c.client, dbName)
2220
if err != nil {
23-
return nil, err
21+
return nil, fmt.Errorf("failed to get collections: %w", err)
2422
}
2523
for _, collName := range collNames {
2624
tableNames = append(tableNames, fmt.Sprintf("%s.%s", dbName, collName))
@@ -32,7 +30,7 @@ func (c *MongoConnector) GetAllTables(ctx context.Context) (*protos.AllTablesRes
3230
}
3331

3432
func (c *MongoConnector) GetSchemas(ctx context.Context) (*protos.PeerSchemasResponse, error) {
35-
dbNames, err := c.getAllDatabaseNames(ctx)
33+
dbNames, err := shared_mongo.GetDatabaseNames(ctx, c.client)
3634
if err != nil {
3735
return nil, fmt.Errorf("failed to get databases: %w", err)
3836
}
@@ -42,9 +40,9 @@ func (c *MongoConnector) GetSchemas(ctx context.Context) (*protos.PeerSchemasRes
4240
}
4341

4442
func (c *MongoConnector) GetTablesInSchema(ctx context.Context, schema string, cdcEnabled bool) (*protos.SchemaTablesResponse, error) {
45-
collectionNames, err := c.getCollectionNames(ctx, schema)
43+
collectionNames, err := shared_mongo.GetCollectionNames(ctx, c.client, schema)
4644
if err != nil {
47-
return nil, err
45+
return nil, fmt.Errorf("failed to get collections: %w", err)
4846
}
4947

5048
response := protos.SchemaTablesResponse{
@@ -69,39 +67,3 @@ func (c *MongoConnector) GetColumns(ctx context.Context, version uint32, schema
6967
Columns: []*protos.ColumnsItem{},
7068
}, nil
7169
}
72-
73-
func (c *MongoConnector) getCollectionNames(ctx context.Context, databaseName string) ([]string, error) {
74-
collectionNames, err := c.client.Database(databaseName).ListCollectionNames(ctx, bson.M{
75-
"name": bson.M{
76-
"$not": bson.Regex{
77-
Pattern: "^system\\.",
78-
},
79-
},
80-
"type": bson.M{
81-
"$ne": "view",
82-
},
83-
})
84-
if err != nil {
85-
return nil, fmt.Errorf("failed to get collections: %w", err)
86-
}
87-
slices.Sort(collectionNames)
88-
return collectionNames, nil
89-
}
90-
91-
// Get all database names, but excluding MongoDB's default databases
92-
func (c *MongoConnector) getAllDatabaseNames(ctx context.Context) ([]string, error) {
93-
dbs, err := c.client.ListDatabaseNames(ctx, bson.M{
94-
"name": bson.M{
95-
"$not": bson.Regex{
96-
Pattern: "^(admin|local|config)$",
97-
},
98-
},
99-
})
100-
if err != nil {
101-
return nil, err
102-
}
103-
filteredDbNames := make([]string, 0, len(dbs))
104-
filteredDbNames = append(filteredDbNames, dbs...)
105-
slices.Sort(filteredDbNames)
106-
return filteredDbNames, nil
107-
}

flow/shared/mongo/schema.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package mongo
2+
3+
import (
4+
"context"
5+
"slices"
6+
"strings"
7+
8+
"go.mongodb.org/mongo-driver/v2/bson"
9+
"go.mongodb.org/mongo-driver/v2/mongo"
10+
)
11+
12+
// Filter out system databases after listing all databases.
13+
// Some MongoDB Atlas tiers (e.g., free tier) don't support regex filters in ListDatabases.
14+
func GetDatabaseNames(ctx context.Context, client *mongo.Client) ([]string, error) {
15+
dbs, err := client.ListDatabases(ctx, bson.M{})
16+
if err != nil {
17+
return nil, err
18+
}
19+
20+
filteredDbNames := make([]string, 0, len(dbs.Databases))
21+
for _, db := range dbs.Databases {
22+
if db.Name == "admin" || db.Name == "local" || db.Name == "config" {
23+
continue
24+
}
25+
filteredDbNames = append(filteredDbNames, db.Name)
26+
}
27+
slices.Sort(filteredDbNames)
28+
return filteredDbNames, nil
29+
}
30+
31+
// Filter out views and system collections after listing all collections.
32+
// Some MongoDB Atlas tiers (e.g., free tier) don't support regex filters in ListCollections.
33+
func GetCollectionNames(ctx context.Context, client *mongo.Client, databaseName string) ([]string, error) {
34+
db := client.Database(databaseName)
35+
cur, err := db.ListCollections(ctx, bson.D{})
36+
if err != nil {
37+
return nil, err
38+
}
39+
defer cur.Close(ctx)
40+
41+
type CollectionSpec struct {
42+
Name string `bson:"name"`
43+
Type string `bson:"type"` // "collection" | "view" | etc
44+
}
45+
46+
filteredCollNames := make([]string, 0, 100)
47+
for cur.Next(ctx) {
48+
if err := cur.Err(); err != nil {
49+
return nil, err
50+
}
51+
52+
var coll CollectionSpec
53+
if err := cur.Decode(&coll); err != nil {
54+
return nil, err
55+
}
56+
57+
if strings.HasPrefix(coll.Name, "system.") {
58+
continue
59+
}
60+
if strings.EqualFold(coll.Type, "view") {
61+
continue
62+
}
63+
filteredCollNames = append(filteredCollNames, coll.Name)
64+
}
65+
slices.Sort(filteredCollNames)
66+
return filteredCollNames, nil
67+
}

0 commit comments

Comments
 (0)