Skip to content

Commit 3e7c5d5

Browse files
authored
PLM-195: Clone sharded collections (#135)
1 parent 6758ad0 commit 3e7c5d5

File tree

12 files changed

+497
-9
lines changed

12 files changed

+497
-9
lines changed

.github/workflows/e2etests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ jobs:
5151
export TEST_PLM_URL=http://127.0.0.1:2242
5252
export TEST_PLM_BIN=./bin/plm_test
5353
54-
poetry run pytest
54+
poetry run pytest tests/test_collections.py tests/test_documents.py tests/test_indexes.py tests/test_selective.py tests/test_transactions.py
5555
5656
- name: Teardown Docker Compose
5757
if: always()

.vscode/settings.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@
2828
"go.useLanguageServer": true,
2929
"gopls": {
3030
"analyses": {
31-
"composites": false
31+
"composites": false,
32+
"ST1000": false,
33+
"wsl_v5": false
3234
},
3335
"formatting.gofumpt": true,
3436
"formatting.local": "github.com/percona",

plm/catalog.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ type collectionCatalog struct {
9898

9999
type indexCatalogEntry struct {
100100
*topo.IndexSpecification
101+
101102
Incomplete bool `bson:"incomplete"`
102103
Failed bool `bson:"failed"`
103104
}
@@ -561,7 +562,7 @@ func (c *Catalog) ModifyChangeStreamPreAndPostImages(
561562
}) //nolint:wrapcheck
562563
}
563564

564-
// ModifyCappedCollection modifies a capped collection in the target MongoDB.
565+
// ModifyValidation modifies a capped collection in the target MongoDB.
565566
func (c *Catalog) ModifyValidation(
566567
ctx context.Context,
567568
db string,
@@ -1136,6 +1137,36 @@ func (c *Catalog) renameCollectionInCatalog(
11361137
lg.Debugf("Collection renamed in catalog %s.%s to %s.%s", db, coll, targetDB, targetColl)
11371138
}
11381139

1140+
func (c *Catalog) ShardCollection(
1141+
ctx context.Context,
1142+
db string,
1143+
coll string,
1144+
shardKey bson.D,
1145+
unique bool,
1146+
) error {
1147+
cmd := bson.D{
1148+
{"shardCollection", db + "." + coll},
1149+
{"key", shardKey},
1150+
}
1151+
1152+
if unique {
1153+
cmd = append(cmd, bson.E{"unique", true})
1154+
}
1155+
1156+
err := runWithRetry(ctx, func(ctx context.Context) error {
1157+
err := c.target.Database("admin").RunCommand(ctx, cmd).Err()
1158+
1159+
return errors.Wrap(err, "shard collection")
1160+
})
1161+
if err != nil {
1162+
return err //nolint:wrapcheck
1163+
}
1164+
1165+
log.Ctx(ctx).Infof("Sharded collection %s.%s", db, coll)
1166+
1167+
return nil
1168+
}
1169+
11391170
func runWithRetry(
11401171
ctx context.Context,
11411172
fn func(context.Context) error,

plm/clone.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,12 @@ func (c *Clone) doCollectionClone(
417417
return ErrTimeseriesUnsupported
418418
}
419419

420-
err = c.createCollection(ctx, ns, spec)
420+
shInfo, err := topo.GetCollectionShardingInfo(ctx, c.source, ns.Database, ns.Collection)
421+
if err != nil && !errors.Is(err, topo.ErrNotFound) {
422+
return errors.Wrap(err, "get sharding info")
423+
}
424+
425+
err = c.createCollection(ctx, ns, spec, shInfo)
421426
if err != nil {
422427
if !errors.Is(err, context.Canceled) {
423428
lg.Errorf(err, "Failed to create %q collection", ns.String())
@@ -436,6 +441,7 @@ func (c *Clone) doCollectionClone(
436441
lg.Infof("Collection %q has been created", ns.String())
437442

438443
c.catalog.SetCollectionTimestamp(ctx, ns.Database, ns.Collection, capturedAt)
444+
439445
if spec.UUID != nil {
440446
c.catalog.SetCollectionUUID(ctx, ns.Database, ns.Collection, spec.UUID)
441447
}
@@ -445,7 +451,8 @@ func (c *Clone) doCollectionClone(
445451
updateC := copyManager.Do(nsCtx, ns, spec)
446452

447453
for update := range updateC {
448-
if err := update.Err; err != nil {
454+
err := update.Err
455+
if err != nil {
449456
switch {
450457
case topo.IsCollectionDropped(err):
451458
lg.Warnf("Collection %q has been dropped during clone: %s", ns, err)
@@ -653,6 +660,7 @@ func (c *Clone) collectSizeMap(ctx context.Context) error {
653660

654661
type namespaceInfo struct {
655662
Namespace
663+
656664
UUID *bson.Binary
657665
}
658666

@@ -686,6 +694,7 @@ func (c *Clone) createCollection(
686694
ctx context.Context,
687695
ns Namespace,
688696
spec *topo.CollectionSpecification,
697+
shInfo *topo.ShardingInfo,
689698
) error {
690699
if spec.Type == topo.TypeTimeseries {
691700
return ErrTimeseriesUnsupported
@@ -708,6 +717,13 @@ func (c *Clone) createCollection(
708717
return errors.Wrap(err, "create collection")
709718
}
710719

720+
if shInfo != nil && shInfo.IsSharded() {
721+
err := c.catalog.ShardCollection(ctx, ns.Database, ns.Collection, shInfo.ShardKey, shInfo.Unique)
722+
if err != nil {
723+
return errors.Wrap(err, "shard collection")
724+
}
725+
}
726+
711727
return nil
712728
}
713729

plm/events.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,31 @@ type RenameEvent struct {
487487
OperationDescription renameOpDesc `bson:"operationDescription"`
488488
}
489489

490+
type ShardCollectionEvent struct {
491+
// OperationDescription is additional information on the change operation.
492+
//
493+
// This document and its subfields only appear when the change stream uses
494+
// expanded events.
495+
//
496+
// New in version 6.0.
497+
OperationDescription shardCollectionOpDesc `bson:"operationDescription"`
498+
}
499+
500+
// shardCollectionOpDesc represents the description of the shard collection operation.
501+
type shardCollectionOpDesc struct {
502+
// NumInitialChunks is the number of initial chunks created.
503+
NumInitialChunks *int32 `bson:"numInitialChunks,omitempty"`
504+
505+
// PresplitHashedZones indicates whether the collection was presplit.
506+
PresplitHashedZones *bool `bson:"presplitHashedZones,omitempty"`
507+
508+
// ShardKey is the shard key used for the collection.
509+
ShardKey bson.D `bson:"shardKey"`
510+
511+
// Unique indicates whether the shard key is unique.
512+
Unique bool `bson:"unique,omitempty"`
513+
}
514+
490515
// renameOpDesc represents the description of the rename operation.
491516
type renameOpDesc struct {
492517
// DropTarget is UUID of the collection that was dropped in the rename operation.
@@ -513,6 +538,7 @@ func (e ParsingError) Unwrap() error {
513538

514539
type ChangeEvent struct {
515540
EventHeader
541+
516542
Event any
517543
}
518544

@@ -575,7 +601,10 @@ func parseChangeEvent(data bson.Raw, change *ChangeEvent) error {
575601
change.Event = e
576602

577603
case ShardCollection:
578-
return errors.ErrUnsupported
604+
var e ShardCollectionEvent
605+
err = bson.Unmarshal(data, &e)
606+
change.Event = e
607+
579608
case ReshardCollection:
580609
return errors.ErrUnsupported
581610
case RefineCollectionShardKey:

plm/repl.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -678,7 +678,6 @@ func (r *Repl) applyDDLChange(ctx context.Context, change *ChangeEvent) error {
678678
lg.Infof("Database %q has been dropped", change.Namespace)
679679

680680
case CreateIndexes:
681-
// TODO: check the indexes status
682681
event := change.Event.(CreateIndexesEvent) //nolint:forcetypeassert
683682
err = r.catalog.CreateIndexes(ctx,
684683
change.Namespace.Database,
@@ -721,7 +720,13 @@ func (r *Repl) applyDDLChange(ctx context.Context, change *ChangeEvent) error {
721720
return ErrInvalidateEvent
722721

723722
case ShardCollection:
724-
fallthrough
723+
event := change.Event.(ShardCollectionEvent) //nolint:forcetypeassert
724+
err = r.catalog.ShardCollection(ctx,
725+
change.Namespace.Database,
726+
change.Namespace.Collection,
727+
event.OperationDescription.ShardKey,
728+
event.OperationDescription.Unique)
729+
725730
case ReshardCollection:
726731
fallthrough
727732
case RefineCollectionShardKey:

tests/test_collections_sharded.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# pylint: disable=missing-docstring,redefined-outer-name
2+
from datetime import datetime
3+
4+
import pytest
5+
from plm import Runner
6+
from testing import Testing
7+
8+
9+
@pytest.mark.parametrize("phase", [Runner.Phase.APPLY, Runner.Phase.CLONE])
10+
def test_shard_collection(t: Testing, phase: Runner.Phase):
11+
with t.run(phase):
12+
t.source["db_1"].create_collection("coll_1")
13+
t.source.admin.command("shardCollection", "db_1.coll_1", key={"_id": 1})
14+
t.source.admin.command("shardCollection", "db_1.coll_2", key={"_id": "hashed"})
15+
t.source.admin.command("shardCollection", "db_1.coll_3", key={"a": 1, "b": 1})
16+
t.source.admin.command("shardCollection", "db_1.coll_4", key={"a": "hashed", "b": 1})
17+
18+
t.compare_all_sharded()
19+
20+
21+
@pytest.mark.parametrize("phase", [Runner.Phase.APPLY, Runner.Phase.CLONE])
22+
def test_drop_sharded(t: Testing, phase: Runner.Phase):
23+
t.source["db_1"].drop_collection("coll_1")
24+
t.source["db_1"].create_collection("coll_1")
25+
t.source.admin.command("shardCollection", "db_1.coll_1", key={"_id": 1})
26+
27+
with t.run(phase):
28+
t.source["db_1"].drop_collection("coll_1")
29+
30+
assert "coll_1" not in t.target["db_1"].list_collection_names()
31+
32+
33+
@pytest.mark.parametrize("phase", [Runner.Phase.APPLY, Runner.Phase.CLONE])
34+
def test_rename_sharded(t: Testing, phase: Runner.Phase):
35+
with t.run(phase):
36+
t.source["db_1"].create_collection("coll_1")
37+
t.source.admin.command("shardCollection", "db_1.coll_1", key={"_id": 1})
38+
t.source["db_1"]["coll_1"].rename("coll_2")
39+
40+
t.compare_all()

tests/test_selective_sharded.py

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
# pylint: disable=missing-docstring,redefined-outer-name
2+
import pytest
3+
import testing
4+
from plm import Runner
5+
from pymongo import MongoClient
6+
7+
8+
def perform_with_options(source, plm, phase: Runner.Phase, include_ns=None, exclude_ns=None):
9+
"""Perform the PLM operation with the given options."""
10+
plm_options = {}
11+
if include_ns:
12+
plm_options["include_namespaces"] = include_ns
13+
if exclude_ns:
14+
plm_options["exclude_namespaces"] = exclude_ns
15+
16+
return Runner(source, plm, phase, plm_options)
17+
18+
19+
def check_if_target_is_subset(source: MongoClient, target: MongoClient):
20+
"""Check if the target MongoDB is a subset of the source MongoDB."""
21+
source_dbs = set(testing.list_databases(source))
22+
target_dbs = set(testing.list_databases(target))
23+
assert set(target_dbs).issubset(source_dbs)
24+
25+
for db in target_dbs:
26+
source_colls = set(testing.list_collections(source, db))
27+
target_colls = set(testing.list_collections(target, db))
28+
assert set(target_colls).issubset(source_colls)
29+
30+
for coll in target_colls:
31+
testing.compare_namespace(source, target, db, coll)
32+
33+
34+
@pytest.mark.parametrize("phase", [Runner.Phase.APPLY, Runner.Phase.CLONE])
35+
def test_create_collection_with_include_only(t: testing.Testing, phase: Runner.Phase):
36+
with perform_with_options(
37+
t.source,
38+
t.plm,
39+
phase,
40+
include_ns=["db_0.*", "db_1.coll_0", "db_1.coll_1", "db_2.coll_0", "db_2.coll_1"],
41+
):
42+
for db in range(3):
43+
for coll in range(3):
44+
t.source["db_1"]["coll_1"].create_index({"i": 1})
45+
t.source.admin.command("shardCollection", f"db_{db}.coll_{coll}", key={"_id": 1})
46+
t.source[f"db_{db}"][f"coll_{coll}"].insert_one({})
47+
48+
expected = {
49+
"db_0.coll_0",
50+
"db_0.coll_1",
51+
"db_0.coll_2",
52+
"db_1.coll_0",
53+
"db_1.coll_1",
54+
# "db_1.coll_2",
55+
"db_2.coll_0",
56+
"db_2.coll_1",
57+
# "db_2.coll_2",
58+
}
59+
60+
assert expected == set(testing.list_all_namespaces(t.target))
61+
check_if_target_is_subset(t.source, t.target)
62+
63+
64+
@pytest.mark.parametrize("phase", [Runner.Phase.APPLY, Runner.Phase.CLONE])
65+
def test_create_collection_with_exclude_only(t: testing.Testing, phase: Runner.Phase):
66+
with perform_with_options(
67+
t.source, t.plm, phase, exclude_ns=["db_0.*", "db_1.coll_0", "db_1.coll_1"]
68+
):
69+
for db in range(3):
70+
for coll in range(3):
71+
t.source["db_1"]["coll_1"].create_index({"i": 1})
72+
t.source.admin.command("shardCollection", f"db_{db}.coll_{coll}", key={"_id": 1})
73+
t.source[f"db_{db}"][f"coll_{coll}"].insert_one({})
74+
75+
expected = {
76+
# "db_0.coll_0",
77+
# "db_0.coll_1",
78+
# "db_0.coll_2",
79+
# "db_1.coll_0",
80+
# "db_1.coll_1",
81+
"db_1.coll_2",
82+
"db_2.coll_0",
83+
"db_2.coll_1",
84+
"db_2.coll_2",
85+
}
86+
87+
assert expected == set(testing.list_all_namespaces(t.target))
88+
check_if_target_is_subset(t.source, t.target)
89+
90+
91+
@pytest.mark.parametrize("phase", [Runner.Phase.APPLY, Runner.Phase.CLONE])
92+
def test_create_collection(t: testing.Testing, phase: Runner.Phase):
93+
with perform_with_options(
94+
t.source,
95+
t.plm,
96+
phase,
97+
include_ns=["db_0.*", "db_1.coll_0", "db_1.coll_1", "db_2.coll_0", "db_2.coll_1"],
98+
exclude_ns=["db_0.*", "db_1.coll_0", "db_3.coll_1"],
99+
):
100+
for db in range(4):
101+
for coll in range(3):
102+
t.source["db_1"]["coll_1"].create_index({"i": 1})
103+
t.source.admin.command("shardCollection", f"db_{db}.coll_{coll}", key={"_id": 1})
104+
t.source[f"db_{db}"][f"coll_{coll}"].insert_one({})
105+
106+
expected = {
107+
# "db_0.coll_0",
108+
# "db_0.coll_1",
109+
# "db_0.coll_2",
110+
111+
# "db_1.coll_0",
112+
"db_1.coll_1",
113+
# "db_1.coll_2",
114+
115+
"db_2.coll_0",
116+
"db_2.coll_1",
117+
# "db_2.coll_2",
118+
119+
"db_3.coll_0",
120+
# "db_3.coll_1",
121+
"db_3.coll_2",
122+
}
123+
124+
assert expected == set(testing.list_all_namespaces(t.target))
125+
check_if_target_is_subset(t.source, t.target)

tests/test_transactions.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ def test_simple(t: Testing):
1616
t.source["db_2"]["coll_2"].insert_one({"i": 2}, session=sess)
1717
sess.commit_transaction()
1818

19-
2019
t.source["db_1"]["coll_1"].insert_one({"i": 2})
2120

2221
assert t.source["db_1"]["coll_1"].count_documents({}) == 2

0 commit comments

Comments
 (0)