Skip to content

Commit f4824c9

Browse files
authored
PCSM-200: Set simple collation for bulk writes on sharded collections for target below 8.0 (#152)
1 parent 5f47aa8 commit f4824c9

File tree

8 files changed

+136
-51
lines changed

8 files changed

+136
-51
lines changed

pcsm/bulk.go

Lines changed: 88 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ import (
1919
//nolint:gochecknoglobals
2020
var yes = true // for ref
2121

22+
//nolint:gochecknoglobals
23+
var simpleCollation = &options.Collation{Locale: "simple"}
24+
2225
//nolint:gochecknoglobals
2326
var clientBulkOptions = options.ClientBulkWrite().
2427
SetOrdered(true).
@@ -41,12 +44,14 @@ type bulkWrite interface {
4144
}
4245

4346
type clientBulkWrite struct {
44-
writes []mongo.ClientBulkWrite
47+
useSimpleCollation bool
48+
writes []mongo.ClientBulkWrite
4549
}
4650

47-
func newClientBulkWrite(size int) *clientBulkWrite {
51+
func newClientBulkWrite(size int, useSimpleCollation bool) *clientBulkWrite {
4852
return &clientBulkWrite{
49-
make([]mongo.ClientBulkWrite, 0, size),
53+
useSimpleCollation: useSimpleCollation,
54+
writes: make([]mongo.ClientBulkWrite, 0, size),
5055
}
5156
}
5257

@@ -76,67 +81,93 @@ func (o *clientBulkWrite) Do(ctx context.Context, m *mongo.Client) (int, error)
7681
}
7782

7883
func (o *clientBulkWrite) Insert(ns Namespace, event *InsertEvent) {
84+
m := &mongo.ClientReplaceOneModel{
85+
Filter: event.DocumentKey,
86+
Replacement: event.FullDocument,
87+
Upsert: &yes,
88+
}
89+
90+
if ns.Sharded && o.useSimpleCollation {
91+
m.Collation = simpleCollation
92+
}
93+
7994
bw := mongo.ClientBulkWrite{
8095
Database: ns.Database,
8196
Collection: ns.Collection,
82-
Model: &mongo.ClientReplaceOneModel{
83-
Filter: event.DocumentKey,
84-
Replacement: event.FullDocument,
85-
Upsert: &yes,
86-
},
97+
Model: m,
8798
}
8899

89100
o.writes = append(o.writes, bw)
90101
}
91102

92103
func (o *clientBulkWrite) Update(ns Namespace, event *UpdateEvent) {
104+
m := &mongo.ClientUpdateOneModel{
105+
Filter: event.DocumentKey,
106+
Update: collectUpdateOps(event),
107+
}
108+
109+
if ns.Sharded && o.useSimpleCollation {
110+
m.Collation = simpleCollation
111+
}
112+
93113
bw := mongo.ClientBulkWrite{
94114
Database: ns.Database,
95115
Collection: ns.Collection,
96-
Model: &mongo.ClientUpdateOneModel{
97-
Filter: event.DocumentKey,
98-
Update: collectUpdateOps(event),
99-
},
116+
Model: m,
100117
}
101118

102119
o.writes = append(o.writes, bw)
103120
}
104121

105122
func (o *clientBulkWrite) Replace(ns Namespace, event *ReplaceEvent) {
123+
m := &mongo.ClientReplaceOneModel{
124+
Filter: event.DocumentKey,
125+
Replacement: event.FullDocument,
126+
}
127+
128+
if ns.Sharded && o.useSimpleCollation {
129+
m.Collation = simpleCollation
130+
}
131+
106132
bw := mongo.ClientBulkWrite{
107133
Database: ns.Database,
108134
Collection: ns.Collection,
109-
Model: &mongo.ClientReplaceOneModel{
110-
Filter: event.DocumentKey,
111-
Replacement: event.FullDocument,
112-
},
135+
Model: m,
113136
}
114137

115138
o.writes = append(o.writes, bw)
116139
}
117140

118141
func (o *clientBulkWrite) Delete(ns Namespace, event *DeleteEvent) {
142+
m := &mongo.ClientDeleteOneModel{
143+
Filter: event.DocumentKey,
144+
}
145+
146+
if ns.Sharded && o.useSimpleCollation {
147+
m.Collation = simpleCollation
148+
}
149+
119150
bw := mongo.ClientBulkWrite{
120151
Database: ns.Database,
121152
Collection: ns.Collection,
122-
Model: &mongo.ClientDeleteOneModel{
123-
Filter: event.DocumentKey,
124-
},
153+
Model: m,
125154
}
126155

127156
o.writes = append(o.writes, bw)
128157
}
129158

130159
type collectionBulkWrite struct {
131-
max int
132-
count int
133-
writes map[Namespace][]mongo.WriteModel
160+
useSimpleCollation bool
161+
max int
162+
count int
163+
writes map[Namespace][]mongo.WriteModel
134164
}
135165

136-
func newCollectionBulkWrite(size int) *collectionBulkWrite {
166+
func newCollectionBulkWrite(size int, nonDefaultCollationSupport bool) *collectionBulkWrite {
137167
return &collectionBulkWrite{
138-
max: size,
139-
writes: make(map[Namespace][]mongo.WriteModel),
168+
useSimpleCollation: nonDefaultCollationSupport,
169+
max: size,
170+
writes: make(map[Namespace][]mongo.WriteModel),
140171
}
141172
}
142173

@@ -185,37 +216,61 @@ func (o *collectionBulkWrite) Do(ctx context.Context, m *mongo.Client) (int, err
185216
}
186217

187218
func (o *collectionBulkWrite) Insert(ns Namespace, event *InsertEvent) {
188-
o.writes[ns] = append(o.writes[ns], &mongo.ReplaceOneModel{
219+
m := &mongo.ReplaceOneModel{
189220
Filter: event.DocumentKey,
190221
Replacement: event.FullDocument,
191222
Upsert: &yes,
192-
})
223+
}
224+
225+
if ns.Sharded && o.useSimpleCollation {
226+
m.Collation = simpleCollation
227+
}
228+
229+
o.writes[ns] = append(o.writes[ns], m)
193230

194231
o.count++
195232
}
196233

197234
func (o *collectionBulkWrite) Update(ns Namespace, event *UpdateEvent) {
198-
o.writes[ns] = append(o.writes[ns], &mongo.UpdateOneModel{
235+
m := &mongo.UpdateOneModel{
199236
Filter: event.DocumentKey,
200237
Update: collectUpdateOps(event),
201-
})
238+
}
239+
240+
if ns.Sharded && o.useSimpleCollation {
241+
m.Collation = simpleCollation
242+
}
243+
244+
o.writes[ns] = append(o.writes[ns], m)
202245

203246
o.count++
204247
}
205248

206249
func (o *collectionBulkWrite) Replace(ns Namespace, event *ReplaceEvent) {
207-
o.writes[ns] = append(o.writes[ns], &mongo.ReplaceOneModel{
250+
m := &mongo.ReplaceOneModel{
208251
Filter: event.DocumentKey,
209252
Replacement: event.FullDocument,
210-
})
253+
}
254+
255+
if ns.Sharded && o.useSimpleCollation {
256+
m.Collation = simpleCollation
257+
}
258+
259+
o.writes[ns] = append(o.writes[ns], m)
211260

212261
o.count++
213262
}
214263

215264
func (o *collectionBulkWrite) Delete(ns Namespace, event *DeleteEvent) {
216-
o.writes[ns] = append(o.writes[ns], &mongo.DeleteOneModel{
265+
m := &mongo.DeleteOneModel{
217266
Filter: event.DocumentKey,
218-
})
267+
}
268+
269+
if ns.Sharded && o.useSimpleCollation {
270+
m.Collation = simpleCollation
271+
}
272+
273+
o.writes[ns] = append(o.writes[ns], m)
219274

220275
o.count++
221276
}

pcsm/catalog.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ type collectionCatalog struct {
9494
AddedAt bson.Timestamp
9595
UUID *bson.Binary
9696
Indexes []indexCatalogEntry
97+
Sharded bool
9798
}
9899

99100
type indexCatalogEntry struct {
@@ -767,7 +768,11 @@ func (c *Catalog) UUIDMap() UUIDMap {
767768
for db, dbCat := range c.Databases {
768769
for coll, collCat := range dbCat.Collections {
769770
if collCat.UUID != nil {
770-
uuidMap[hex.EncodeToString(collCat.UUID.Data)] = Namespace{db, coll}
771+
uuidMap[hex.EncodeToString(collCat.UUID.Data)] = Namespace{
772+
Database: db,
773+
Collection: coll,
774+
Sharded: collCat.Sharded,
775+
}
771776
}
772777
}
773778
}
@@ -1181,7 +1186,15 @@ func (c *Catalog) ShardCollection(
11811186
return err //nolint:wrapcheck
11821187
}
11831188

1184-
log.Ctx(ctx).Infof("Sharded collection %s.%s", db, coll)
1189+
log.Ctx(ctx).Debugf("Sharded collection %s.%s", db, coll)
1190+
1191+
c.lock.Lock()
1192+
databaseEntry := c.Databases[db]
1193+
collectionEntry := databaseEntry.Collections[coll]
1194+
collectionEntry.Sharded = true
1195+
databaseEntry.Collections[coll] = collectionEntry
1196+
c.Databases[db] = databaseEntry
1197+
c.lock.Unlock()
11851198

11861199
return nil
11871200
}

pcsm/clone.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ func (c *Clone) doClone(ctx context.Context, namespaces []namespaceInfo) error {
341341

342342
prevNS := ns
343343
ns = namespaceInfo{
344-
Namespace: Namespace{prevNS.Database, name},
344+
Namespace: Namespace{Database: prevNS.Database, Collection: name},
345345
UUID: prevNS.UUID,
346346
}
347347

@@ -407,7 +407,7 @@ func (c *Clone) doCollectionClone(
407407
spec, err := topo.GetCollectionSpec(ctx, c.source, ns.Database, ns.Collection)
408408
if err != nil {
409409
if errors.Is(err, topo.ErrNotFound) {
410-
return NamespaceNotFoundError(ns)
410+
return NamespaceNotFoundError{ns.Database, ns.Collection}
411411
}
412412

413413
return errors.Wrap(err, "$collStats")
@@ -445,9 +445,9 @@ func (c *Clone) doCollectionClone(
445445
if err != nil {
446446
return errors.Wrap(err, "shard collection")
447447
}
448-
}
449448

450-
lg.Infof("Collection %q sharded", ns.String())
449+
lg.Infof("Collection %q sharded", ns.String())
450+
}
451451

452452
c.catalog.SetCollectionTimestamp(ctx, ns.Database, ns.Collection, capturedAt)
453453

@@ -617,7 +617,7 @@ func (c *Clone) collectSizeMap(ctx context.Context) error {
617617
collGrp.Go(func() error {
618618
if spec.Type == topo.TypeView {
619619
mu.Lock()
620-
sm[Namespace{db, spec.Name}] = sizeMapElem{}
620+
sm[Namespace{Database: db, Collection: spec.Name}] = sizeMapElem{}
621621
mu.Unlock()
622622

623623
return nil
@@ -633,7 +633,7 @@ func (c *Clone) collectSizeMap(ctx context.Context) error {
633633
}
634634

635635
mu.Lock()
636-
sm[Namespace{db, spec.Name}] = sizeMapElem{
636+
sm[Namespace{Database: db, Collection: spec.Name}] = sizeMapElem{
637637
UUID: spec.UUID,
638638
Size: uint64(stats.Size), //nolint:gosec
639639
Count: stats.Count,

pcsm/copy.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -598,7 +598,7 @@ func NewSegmenter(
598598
stats, err := topo.GetCollStats(ctx, m, ns.Database, ns.Collection)
599599
if err != nil {
600600
if errors.Is(err, topo.ErrNotFound) {
601-
return nil, NamespaceNotFoundError(ns)
601+
return nil, NamespaceNotFoundError{ns.Database, ns.Collection}
602602
}
603603

604604
return nil, errors.Wrap(err, "$collStats")
@@ -897,7 +897,7 @@ func NewCappedSegmenter(
897897
stats, err := topo.GetCollStats(ctx, m, ns.Database, ns.Collection)
898898
if err != nil {
899899
if errors.Is(err, topo.ErrNotFound) {
900-
return nil, NamespaceNotFoundError(ns)
900+
return nil, NamespaceNotFoundError{ns.Database, ns.Collection}
901901
}
902902

903903
return nil, errors.Wrap(err, "$collStats")

pcsm/copy_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func getNamespace() pcsm.Namespace {
3333
panic("PCSM_TEST_NAMESPACE contains invalid namespace")
3434
}
3535

36-
return pcsm.Namespace{db, coll}
36+
return pcsm.Namespace{Database: db, Collection: coll}
3737
}
3838

3939
func getSourceURI() string {

pcsm/events.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,9 @@ type Namespace struct {
9595

9696
// Collection is the name of the collection where the event occurred.
9797
Collection string `bson:"coll"`
98+
99+
// Sharded indicates whether the collection is sharded.
100+
Sharded bool
98101
}
99102

100103
func (ns Namespace) String() string {

pcsm/repl.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -154,10 +154,15 @@ func (r *Repl) Recover(cp *replCheckpoint) error {
154154
r.eventsRead.Store(cp.EventsApplied)
155155
r.lastReplicatedOpTime = cp.LastReplicatedOpTime
156156

157+
targetVer, err := topo.Version(context.Background(), r.target)
158+
if err != nil {
159+
return errors.Wrap(err, "major version")
160+
}
161+
157162
if cp.UseClientBulkWrite {
158-
r.bulkWrite = newClientBulkWrite(config.BulkOpsSize)
163+
r.bulkWrite = newClientBulkWrite(config.BulkOpsSize, targetVer.Major() < 8) //nolint:mnd
159164
} else {
160-
r.bulkWrite = newCollectionBulkWrite(config.BulkOpsSize)
165+
r.bulkWrite = newCollectionBulkWrite(config.BulkOpsSize, targetVer.Major() < 8) //nolint:mnd
161166
}
162167

163168
if cp.Error != "" {
@@ -211,15 +216,15 @@ func (r *Repl) Start(ctx context.Context, startAt bson.Timestamp) error {
211216
return errors.New("already started")
212217
}
213218

214-
serverVersion, err := topo.Version(ctx, r.target)
219+
targetVer, err := topo.Version(ctx, r.target)
215220
if err != nil {
216221
return errors.Wrap(err, "major version")
217222
}
218223

219-
if topo.Support(serverVersion).ClientBulkWrite() && !config.UseCollectionBulkWrite() {
220-
r.bulkWrite = newClientBulkWrite(config.BulkOpsSize)
224+
if topo.Support(targetVer).ClientBulkWrite() && !config.UseCollectionBulkWrite() {
225+
r.bulkWrite = newClientBulkWrite(config.BulkOpsSize, targetVer.Major() < 8) //nolint:mnd
221226
} else {
222-
r.bulkWrite = newCollectionBulkWrite(config.BulkOpsSize)
227+
r.bulkWrite = newCollectionBulkWrite(config.BulkOpsSize, targetVer.Major() < 8) //nolint:mnd
223228

224229
log.New("repl").Debug("Use collection-level bulk write")
225230
}
@@ -568,7 +573,7 @@ func (r *Repl) run(opts *options.ChangeStreamOptionsBuilder) {
568573
metrics.AddEventsApplied(1)
569574

570575
switch change.OperationType { //nolint:exhaustive
571-
case Create, Rename, Drop, DropDatabase:
576+
case Create, Rename, Drop, DropDatabase, ShardCollection:
572577
uuidMap = r.catalog.UUIDMap()
573578
}
574579
}
@@ -737,6 +742,8 @@ func (r *Repl) applyDDLChange(ctx context.Context, change *ChangeEvent) error {
737742
event.OperationDescription.ShardKey,
738743
event.OperationDescription.Unique)
739744

745+
lg.Infof("Collection %q has been sharded", change.Namespace)
746+
740747
case ReshardCollection:
741748
fallthrough
742749
case RefineCollectionShardKey:

0 commit comments

Comments
 (0)