Skip to content

Commit 5853366

Browse files
committed
Update getMultiTypeIDKeyRanges function to exclute NaN _id values.
1 parent b2356c1 commit 5853366

File tree

2 files changed

+39
-23
lines changed

2 files changed

+39
-23
lines changed

plm/copy.go

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -654,24 +654,25 @@ func NewSegmenter(
654654
return s, nil
655655
}
656656

657-
keyRangeByType, err := getIDKeyRangeByType(ctx, mcoll)
657+
multiTypeIDkeyRanges, err := getMultiTypeIDKeyRanges(ctx, mcoll)
658658
if err != nil {
659659
return nil, errors.Wrap(err, "get ID key range by type")
660660
}
661661

662-
if len(keyRangeByType) == 0 {
662+
if len(multiTypeIDkeyRanges) == 0 {
663663
return nil, errEOC // empty collection
664664
}
665665

666-
currIDRange := keyRangeByType[0]
667-
keyRanges := keyRangeByType[1:]
666+
currIDRange := multiTypeIDkeyRanges[0]
667+
remainingKeyRanges := multiTypeIDkeyRanges[1:]
668668

669669
s := &Segmenter{
670670
mcoll: mcoll,
671671
segmentSize: segmentSize,
672672
batchSize: batchSize,
673-
keyRanges: keyRanges,
673+
keyRanges: remainingKeyRanges,
674674
currIDRange: currIDRange,
675+
nanDoc: *nanDoc,
675676
}
676677

677678
return s, nil
@@ -848,30 +849,44 @@ func getIDKeyRange(ctx context.Context, mcoll *mongo.Collection) (keyRange, *bso
848849
return ret, &nanDoc, nil
849850
}
850851

851-
// getIDKeyRangeByType returns a slice of keyRange grouped by the BSON type of the _id field.
852+
// getMultiTypeIDKeyRanges returns a slice of keyRange grouped by the BSON type of the _id field.
852853
// It performs an aggregation that groups documents by _id type, computing the min and max _id
853854
// for each group. This allows the Segmenter to handle collections with heterogeneous _id types
854855
// by processing each type range sequentially.
855-
func getIDKeyRangeByType(ctx context.Context, mcoll *mongo.Collection) ([]keyRange, error) {
856-
cur, err := mcoll.Aggregate(ctx, mongo.Pipeline{
857-
bson.D{{"$group", bson.D{
858-
{"_id", bson.D{{"type", bson.D{{"$type", "$_id"}}}}},
859-
{"minKey", bson.D{{"$min", "$_id"}}},
860-
{"maxKey", bson.D{{"$max", "$_id"}}},
861-
}}},
862-
})
856+
func getMultiTypeIDKeyRanges(ctx context.Context, mcoll *mongo.Collection) ([]keyRange, error) {
857+
cur, err := mcoll.Aggregate(ctx,
858+
mongo.Pipeline{
859+
// Match only numeric types that are not NaN
860+
bson.D{{"$match", bson.D{
861+
{"$expr", bson.D{
862+
// Only allow if _id is not NaN
863+
{"$ne", bson.A{"$_id", bson.D{{"$literal", math.NaN()}}}},
864+
}},
865+
}}},
866+
// Group by type and find min/max
867+
bson.D{{"$group", bson.D{
868+
{"_id", bson.D{{"type", bson.D{{"$type", "$_id"}}}}},
869+
{"minKey", bson.D{{"$min", "$_id"}}},
870+
{"maxKey", bson.D{{"$max", "$_id"}}},
871+
}}},
872+
})
863873
if err != nil {
864874
return nil, errors.Wrap(err, "query")
865875
}
866876

867-
var segmentRanges []keyRange
877+
var keyRanges []keyRange
868878

869-
err = cur.All(ctx, &segmentRanges)
879+
err = cur.All(ctx, &keyRanges)
870880
if err != nil {
871881
return nil, errors.Wrap(err, "all")
872882
}
873883

874-
return segmentRanges, nil
884+
for i := range keyRanges {
885+
log.Ctx(ctx).Debugf("Keyrange %d: type: %s, range [%v <=> %v]", i+1,
886+
keyRanges[i].Min.Type.String(), keyRanges[i].Min, keyRanges[i].Max)
887+
}
888+
889+
return keyRanges, nil
875890
}
876891

877892
// CappedSegmenter provides sequential cursor access for capped collections.

tests/test_collections.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -632,13 +632,13 @@ def test_plm_110_rename_during_clone_and_repl(t: Testing):
632632
t.compare_all()
633633

634634

635-
def test_plm_126_clone_with_nan_id_document(t: Testing):
635+
def test_clone_with_nan_id_document(t: Testing):
636636
t.source["db_1"]["coll_1"].insert_one({"_id": float("nan"), "i": 100})
637637
t.source["db_1"]["coll_1"].insert_many(
638638
[{"_id": random.uniform(1e5, 1e10), "i": i} for i in range(50)]
639639
)
640640

641-
with t.run(phase=Runner.Phase.CLONE) as r:
641+
with t.run(phase=Runner.Phase.MANUAL) as r:
642642
r.start()
643643
r.wait_for_clone_completed()
644644

@@ -647,7 +647,6 @@ def test_plm_126_clone_with_nan_id_document(t: Testing):
647647
assert sourceDocCount == targetDocCount
648648

649649

650-
@pytest.mark.skip(reason="Clone with NaN _id is not supported for multi-id types")
651650
def test_clone_with_nan_id_document_multi_id_types(t: Testing):
652651
t.source["db_1"]["coll_1"].insert_one({"_id": Decimal128("NaN"), "i": 200})
653652
t.source["db_1"]["coll_1"].insert_many(
@@ -657,11 +656,13 @@ def test_clone_with_nan_id_document_multi_id_types(t: Testing):
657656
[{"_id": Decimal128(str(random.uniform(1e5, 1e10))), "i": i} for i in range(50)]
658657
)
659658
t.source["db_1"]["coll_1"].insert_many(
660-
[{"_id": "inel" + str(random.uniform(1e5, 1e10)), "i": i} for i in range(50)]
659+
[{"_id": str(random.uniform(1e5, 1e10)), "i": i} for i in range(50)]
661660
)
662661

663-
with t.run(phase=Runner.Phase.CLONE) as r:
662+
with t.run(phase=Runner.Phase.MANUAL) as r:
664663
r.start()
665664
r.wait_for_clone_completed()
666665

667-
t.compare_all()
666+
sourceDocCount = t.source["db_1"]["coll_1"].count_documents({})
667+
targetDocCount = t.target["db_1"]["coll_1"].count_documents({})
668+
assert sourceDocCount == targetDocCount

0 commit comments

Comments
 (0)