Skip to content

Commit 288a42b

Browse files
PCSM-223. Using storageStats with mongos causes data mismatch error (#147)
Co-authored-by: Inel Pandzic <[email protected]>
1 parent f63b3bb commit 288a42b

File tree

3 files changed

+93
-5
lines changed

3 files changed

+93
-5
lines changed

pcsm/copy.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -601,7 +601,7 @@ func NewSegmenter(
601601
return nil, NamespaceNotFoundError{ns.Database, ns.Collection}
602602
}
603603

604-
return nil, errors.Wrap(err, "$collStats")
604+
return nil, errors.Wrap(err, "get collection stats")
605605
}
606606

607607
if stats.AvgObjSize == 0 {
@@ -900,7 +900,7 @@ func NewCappedSegmenter(
900900
return nil, NamespaceNotFoundError{ns.Database, ns.Collection}
901901
}
902902

903-
return nil, errors.Wrap(err, "$collStats")
903+
return nil, errors.Wrap(err, "get collection stats")
904904
}
905905

906906
if stats.AvgObjSize == 0 {

tests/test_collections_sharded.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,3 +85,12 @@ def test_create_collection_with_collation_with_shard_key_index_prefix(
8585
)
8686

8787
t.compare_all_sharded()
88+
89+
@pytest.mark.parametrize("phase", [Runner.Phase.CLONE])
90+
def test_clone_document_sharded(t: Testing, phase: Runner.Phase):
91+
with t.run(phase):
92+
t.source["db_1"].create_collection("coll_1")
93+
t.source.admin.command("shardCollection", "db_1.coll_1", key={"_id": "hashed"})
94+
t.source["db_1"]["coll_1"].insert_one({"name": "Alice", "age": 30})
95+
96+
t.compare_all_sharded()

topo/topo.go

Lines changed: 82 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,16 +152,43 @@ func GetDBStats(ctx context.Context, m *mongo.Client, dbName string) (*DBStats,
152152
return result, err //nolint:wrapcheck
153153
}
154154

155-
// GetCollStats runs the collStats aggregate stage.
155+
// GetCollStats retrieves statistics for a specific collection.
156156
func GetCollStats(ctx context.Context, m *mongo.Client, db, coll string) (*CollStats, error) {
157-
cur, err := m.Database(db).Collection(coll).Aggregate(ctx, mongo.Pipeline{
157+
stats, err := collStatsFromStorageStats(ctx, m, db, coll)
158+
if err != nil {
159+
return nil, err //nolint:wrapcheck
160+
}
161+
162+
// If avgObjSize is 0, $collStats may have stale metadata (common with newly sharded collections).
163+
// Fall back to document-based aggregation for accurate statistics.
164+
if stats.AvgObjSize == 0 {
165+
log.Ctx(ctx).Debugf("Collection %s.%s has avgObjSize=0 from $collStats (possibly stale metadata). "+
166+
"Falling back to document-based bsonSize aggregation for accurate stats", db, coll)
167+
168+
stats, err = collStatsFromDocsAggregation(ctx, m, db, coll)
169+
if err != nil {
170+
return nil, err //nolint:wrapcheck
171+
}
172+
173+
log.Ctx(ctx).Debugf("Collection %s.%s stats from fallback: count=%d, size=%d, avgObjSize=%d",
174+
db, coll, stats.Count, stats.Size, stats.AvgObjSize)
175+
}
176+
177+
return stats, nil
178+
}
179+
180+
// collStatsFromStorageStats retrieves statistics for a specific collection using $collStats.
181+
func collStatsFromStorageStats(ctx context.Context, m *mongo.Client, db, coll string) (*CollStats, error) {
182+
p := mongo.Pipeline{
158183
{{"$collStats", bson.D{{"storageStats", bson.D{}}}}},
159184
{{"$project", bson.D{
160185
{"size", "$storageStats.size"},
161186
{"count", "$storageStats.count"},
162187
{"avgObjSize", "$storageStats.avgObjSize"},
163188
}}},
164-
})
189+
}
190+
191+
cur, err := m.Database(db).Collection(coll).Aggregate(ctx, p)
165192
if err != nil {
166193
if IsNamespaceNotFound(err) {
167194
err = ErrNotFound
@@ -178,6 +205,7 @@ func GetCollStats(ctx context.Context, m *mongo.Client, db, coll string) (*CollS
178205
}()
179206

180207
stats := &CollStats{}
208+
181209
if !cur.Next(ctx) {
182210
err = cur.Err()
183211
if err == nil {
@@ -195,6 +223,57 @@ func GetCollStats(ctx context.Context, m *mongo.Client, db, coll string) (*CollS
195223
return stats, nil
196224
}
197225

226+
// collStatsFromDocsAggregation retrieves statistics for a specific collection by aggregating document sizes.
227+
func collStatsFromDocsAggregation(ctx context.Context, m *mongo.Client, db, coll string) (*CollStats, error) {
228+
p := mongo.Pipeline{
229+
bson.D{{"$group", bson.D{
230+
{"_id", nil},
231+
{"size", bson.D{{"$sum", bson.D{{"$bsonSize", "$$ROOT"}}}}},
232+
{"count", bson.D{{"$sum", 1}}},
233+
}}},
234+
bson.D{{"$project", bson.D{
235+
{"_id", 0},
236+
{"size", 1},
237+
{"count", 1},
238+
{"avgObjSize", bson.D{{"$divide", bson.A{"$size", "$count"}}}},
239+
}}},
240+
}
241+
242+
cur, err := m.Database(db).Collection(coll).Aggregate(ctx, p)
243+
if err != nil {
244+
if IsNamespaceNotFound(err) {
245+
err = ErrNotFound
246+
}
247+
248+
return nil, errors.Wrap(err, "aggregate coll stats")
249+
}
250+
251+
defer func() {
252+
err := util.CtxWithTimeout(context.Background(), config.CloseCursorTimeout, cur.Close)
253+
if err != nil {
254+
log.Ctx(ctx).Errorf(err, "aggregate coll stats: %s: close cursor", db)
255+
}
256+
}()
257+
258+
stats := &CollStats{}
259+
260+
if !cur.Next(ctx) {
261+
err = cur.Err()
262+
if err == nil {
263+
return &CollStats{Count: 0, Size: 0, AvgObjSize: 0}, nil
264+
}
265+
266+
return nil, errors.Wrap(err, "cursor")
267+
}
268+
269+
err = cur.Decode(stats)
270+
if err != nil {
271+
return nil, errors.Wrap(err, "decode")
272+
}
273+
274+
return stats, nil
275+
}
276+
198277
// RunWithRetry executes the provided function with retry logic for transient errors.
199278
// It retries the function up to maxRetries times,
200279
// with an exponential backoff starting from retryInterval.

0 commit comments

Comments
 (0)