Skip to content

Commit 145cad0

Browse files
committed
Fix PR comments.
1 parent a60989b commit 145cad0

File tree

7 files changed

+39
-69
lines changed

7 files changed

+39
-69
lines changed

plm/catalog.go

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ type collectionCatalog struct {
9898

9999
type indexCatalogEntry struct {
100100
*topo.IndexSpecification
101+
101102
Incomplete bool `bson:"incomplete"`
102103
Failed bool `bson:"failed"`
103104
}
@@ -561,7 +562,7 @@ func (c *Catalog) ModifyChangeStreamPreAndPostImages(
561562
}) //nolint:wrapcheck
562563
}
563564

564-
// ModifyCappedCollection modifies a capped collection in the target MongoDB.
565+
// ModifyValidation modifies a capped collection in the target MongoDB.
565566
func (c *Catalog) ModifyValidation(
566567
ctx context.Context,
567568
db string,
@@ -1143,29 +1144,17 @@ func (c *Catalog) ShardCollection(
11431144
shardKey bson.D,
11441145
unique bool,
11451146
) error {
1146-
enableCmd := bson.D{{"enableSharding", db}}
1147-
1148-
err := runWithRetry(ctx, func(ctx context.Context) error {
1149-
err := c.target.Database("admin").RunCommand(ctx, enableCmd).Err()
1150-
1151-
return errors.Wrap(err, "enable sharding")
1152-
})
1153-
if err != nil {
1154-
return err //nolint:wrapcheck
1155-
}
1156-
11571147
cmd := bson.D{
11581148
{"shardCollection", db + "." + coll},
11591149
{"key", shardKey},
1160-
// {"collation", bson.D{{"locale", "simple"}}},
11611150
}
11621151

11631152
if unique {
11641153
cmd = append(cmd, bson.E{"unique", true})
11651154
}
11661155

1167-
err = runWithRetry(ctx, func(ctx context.Context) error {
1168-
err = c.target.Database("admin").RunCommand(ctx, cmd).Err()
1156+
err := runWithRetry(ctx, func(ctx context.Context) error {
1157+
err := c.target.Database("admin").RunCommand(ctx, cmd).Err()
11691158

11701159
return errors.Wrap(err, "shard collection")
11711160
})

plm/clone.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"context"
66
"runtime"
77
"slices"
8-
"strings"
98
"sync"
109
"sync/atomic"
1110
"time"
@@ -419,7 +418,7 @@ func (c *Clone) doCollectionClone(
419418
}
420419

421420
shInfo, err := topo.GetCollectionShardingInfo(ctx, c.source, ns.Database, ns.Collection)
422-
if err != nil && !strings.Contains(err.Error(), "no documents in result") {
421+
if err != nil && !errors.Is(err, topo.ErrNotFound) {
423422
return errors.Wrap(err, "get sharding info")
424423
}
425424

@@ -442,6 +441,7 @@ func (c *Clone) doCollectionClone(
442441
lg.Infof("Collection %q has been created", ns.String())
443442

444443
c.catalog.SetCollectionTimestamp(ctx, ns.Database, ns.Collection, capturedAt)
444+
445445
if spec.UUID != nil {
446446
c.catalog.SetCollectionUUID(ctx, ns.Database, ns.Collection, spec.UUID)
447447
}
@@ -451,7 +451,8 @@ func (c *Clone) doCollectionClone(
451451
updateC := copyManager.Do(nsCtx, ns, spec)
452452

453453
for update := range updateC {
454-
if err := update.Err; err != nil {
454+
err := update.Err
455+
if err != nil {
455456
switch {
456457
case topo.IsCollectionDropped(err):
457458
lg.Warnf("Collection %q has been dropped during clone: %s", ns, err)
@@ -659,6 +660,7 @@ func (c *Clone) collectSizeMap(ctx context.Context) error {
659660

660661
type namespaceInfo struct {
661662
Namespace
663+
662664
UUID *bson.Binary
663665
}
664666

plm/events.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -538,6 +538,7 @@ func (e ParsingError) Unwrap() error {
538538

539539
type ChangeEvent struct {
540540
EventHeader
541+
541542
Event any
542543
}
543544

tests/test_collections_sharded.py

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,13 @@
11
# pylint: disable=missing-docstring,redefined-outer-name
2-
import random
32
from datetime import datetime
43

5-
import time
64
import pytest
7-
import testing
8-
from plm import PLM, Runner
9-
from pymongo import MongoClient
5+
from plm import Runner
106
from testing import Testing
11-
from bson.decimal128 import Decimal128
127

138

149
@pytest.mark.parametrize("phase", [Runner.Phase.APPLY, Runner.Phase.CLONE])
1510
def test_shard_collection(t: Testing, phase: Runner.Phase):
16-
t.source.admin.command("enableSharding", "db_1")
17-
1811
with t.run(phase):
1912
t.source["db_1"].create_collection("coll_1")
2013
t.source.admin.command("shardCollection", "db_1.coll_1", key={"_id": 1})
@@ -27,7 +20,6 @@ def test_shard_collection(t: Testing, phase: Runner.Phase):
2720

2821
@pytest.mark.parametrize("phase", [Runner.Phase.APPLY, Runner.Phase.CLONE])
2922
def test_drop_sharded(t: Testing, phase: Runner.Phase):
30-
t.source.admin.command("enableSharding", "db_1")
3123
t.source["db_1"].drop_collection("coll_1")
3224
t.source["db_1"].create_collection("coll_1")
3325
t.source.admin.command("shardCollection", "db_1.coll_1", key={"_id": 1})
@@ -40,8 +32,6 @@ def test_drop_sharded(t: Testing, phase: Runner.Phase):
4032

4133
@pytest.mark.parametrize("phase", [Runner.Phase.APPLY, Runner.Phase.CLONE])
4234
def test_rename_sharded(t: Testing, phase: Runner.Phase):
43-
t.source.admin.command("enableSharding", "db_1")
44-
4535
with t.run(phase):
4636
t.source["db_1"].create_collection("coll_1")
4737
t.source.admin.command("shardCollection", "db_1.coll_1", key={"_id": 1})

tests/test_selective_sharded.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ def test_create_collection_with_include_only(t: testing.Testing, phase: Runner.P
4040
include_ns=["db_0.*", "db_1.coll_0", "db_1.coll_1", "db_2.coll_0", "db_2.coll_1"],
4141
):
4242
for db in range(3):
43-
t.source.admin.command("enableSharding", f"db_{db}")
4443
for coll in range(3):
4544
t.source["db_1"]["coll_1"].create_index({"i": 1})
4645
t.source.admin.command("shardCollection", f"db_{db}.coll_{coll}", key={"_id": 1})
@@ -68,7 +67,6 @@ def test_create_collection_with_exclude_only(t: testing.Testing, phase: Runner.P
6867
t.source, t.plm, phase, exclude_ns=["db_0.*", "db_1.coll_0", "db_1.coll_1"]
6968
):
7069
for db in range(3):
71-
t.source.admin.command("enableSharding", f"db_{db}")
7270
for coll in range(3):
7371
t.source["db_1"]["coll_1"].create_index({"i": 1})
7472
t.source.admin.command("shardCollection", f"db_{db}.coll_{coll}", key={"_id": 1})
@@ -100,7 +98,6 @@ def test_create_collection(t: testing.Testing, phase: Runner.Phase):
10098
exclude_ns=["db_0.*", "db_1.coll_0", "db_3.coll_1"],
10199
):
102100
for db in range(4):
103-
t.source.admin.command("enableSharding", f"db_{db}")
104101
for coll in range(3):
105102
t.source["db_1"]["coll_1"].create_index({"i": 1})
106103
t.source.admin.command("shardCollection", f"db_{db}.coll_{coll}", key={"_id": 1})

tests/test_transactions_sharded.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ def test_shard_collection_in_tx(t: Testing):
99
with t.run(phase=Runner.Phase.APPLY):
1010
with t.source.start_session() as sess:
1111
sess.start_transaction()
12-
t.source.admin.command("enableSharding", "db_1")
1312
t.source["db_1"].create_collection("coll_1")
1413
t.source.admin.command("shardCollection", "db_1.coll_1", key={"_id": 1})
1514
t.source["db_1"]["coll_1"].insert_one({"i": 1}, session=sess)
@@ -23,8 +22,6 @@ def test_shard_collection_in_tx(t: Testing):
2322

2423

2524
def test_simple(t: Testing):
26-
t.source.admin.command("enableSharding", "db_1")
27-
t.source.admin.command("enableSharding", "db_2")
2825
t.source["db_1"].create_collection("coll_1")
2926
t.source.admin.command("shardCollection", "db_1.coll_1", key={"_id": 1})
3027
t.source["db_2"].create_collection("coll_2")
@@ -46,7 +43,6 @@ def test_simple(t: Testing):
4643

4744

4845
def test_simple_mix(t: Testing):
49-
t.source.admin.command("enableSharding", "db_1")
5046
t.source["db_1"].create_collection("coll_1")
5147
t.source.admin.command("shardCollection", "db_1.coll_1", key={"_id": 1})
5248

@@ -68,8 +64,6 @@ def test_simple_mix(t: Testing):
6864

6965

7066
def test_simple_aborted(t: Testing):
71-
t.source.admin.command("enableSharding", "db_1")
72-
t.source.admin.command("enableSharding", "db_2")
7367
t.source["db_1"].create_collection("coll_1")
7468
t.source.admin.command("shardCollection", "db_1.coll_1", key={"_id": 1})
7569
t.source["db_2"].create_collection("coll_2")
@@ -90,7 +84,6 @@ def test_simple_aborted(t: Testing):
9084

9185

9286
def test_simple_aborted_mix(t: Testing):
93-
t.source.admin.command("enableSharding", "db_1")
9487
t.source["db_1"].create_collection("coll_1")
9588
t.source.admin.command("shardCollection", "db_1.coll_1", key={"_id": 1})
9689
t.source["db_2"].create_collection("coll_2")
@@ -110,7 +103,6 @@ def test_simple_aborted_mix(t: Testing):
110103

111104

112105
def test_in_progress(t: Testing):
113-
t.source.admin.command("enableSharding", "db_1")
114106
t.source["db_1"].create_collection("coll_1")
115107
t.source.admin.command("shardCollection", "db_1.coll_1", key={"_id": 1})
116108

@@ -130,7 +122,6 @@ def test_in_progress(t: Testing):
130122

131123

132124
def test_in_progress_aborted(t: Testing):
133-
t.source.admin.command("enableSharding", "db_1")
134125
t.source["db_1"].create_collection("coll_1")
135126
t.source.admin.command("shardCollection", "db_1.coll_1", key={"_id": 1})
136127

topo/schema.go

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -197,9 +197,9 @@ func ListInProgressIndexBuilds(
197197
}
198198

199199
type ChunkInfo struct {
200-
Shard string
201-
Min bson.M
202-
Max bson.M
200+
Shard string `bson:"shard"`
201+
Min bson.M `bson:"min"`
202+
Max bson.M `bson:"max"`
203203
}
204204

205205
type ShardingInfo struct {
@@ -216,48 +216,48 @@ func (s ShardingInfo) IsSharded() bool {
216216
return s.ShardKey != nil
217217
}
218218

219-
func GetCollectionShardingInfo(ctx context.Context, m *mongo.Client, dbName, collName string) (*ShardingInfo, error) {
219+
func GetCollectionShardingInfo(
220+
ctx context.Context,
221+
m *mongo.Client,
222+
dbName, collName string,
223+
) (*ShardingInfo, error) {
220224
collNS := dbName + "." + collName
221225

222-
// Look up collection spec in config.collections
223226
info := &ShardingInfo{}
224227

225228
err := m.Database("config").
226229
Collection("collections").
227230
FindOne(ctx, bson.M{"_id": collNS}).
228231
Decode(info)
229232
if err != nil {
233+
if errors.Is(err, mongo.ErrNoDocuments) {
234+
return nil, ErrNotFound
235+
}
236+
230237
return nil, errors.Wrapf(err, "find collection %s in config.collections", collNS)
231238
}
232239

233-
if info.IsSharded() {
234-
chunksColl := m.Database("config").Collection("chunks")
240+
if !info.IsSharded() {
241+
return info, nil
242+
}
235243

236-
cur, err := chunksColl.Find(ctx, bson.M{"ns": collNS})
237-
if err != nil {
238-
return nil, errors.Wrapf(err, "find chunks for %s", collNS)
239-
}
240-
defer cur.Close(ctx)
244+
chunksColl := m.Database("config").Collection("chunks")
241245

242-
if err := cur.Err(); err != nil { // nolint:noinlineerr
243-
return nil, errors.Wrap(err, "iterate chunks")
244-
}
246+
cur, err := chunksColl.Find(ctx, bson.M{"ns": collNS})
247+
if err != nil {
248+
return nil, errors.Wrapf(err, "find chunks for %s", collNS)
249+
}
250+
defer cur.Close(ctx)
245251

246-
for cur.Next(ctx) {
247-
var c bson.M
252+
if err := cur.Err(); err != nil { // nolint:noinlineerr
253+
return nil, errors.Wrap(err, "iterate chunks")
254+
}
248255

249-
err := cur.Decode(&c)
250-
if err != nil {
251-
return nil, errors.Wrap(err, "decode chunk")
252-
}
256+
var chunks []ChunkInfo
253257

254-
ci := ChunkInfo{ //nolint:forcetypeassert
255-
Shard: c["shard"].(string),
256-
Min: c["min"].(bson.M),
257-
Max: c["max"].(bson.M),
258-
}
259-
info.Chunks = append(info.Chunks, ci)
260-
}
258+
err = cur.All(ctx, &chunks)
259+
if err != nil {
260+
return nil, errors.Wrap(err, "read chunks")
261261
}
262262

263263
return info, nil

0 commit comments

Comments
 (0)