Skip to content

Commit e48c0ab

Browse files
authored
mongo: shorten _full_document to doc (#3391)
Shorten default name from `_full_document` to `doc`.
1 parent 787967e commit e48c0ab

File tree

6 files changed

+59
-43
lines changed

6 files changed

+59
-43
lines changed

flow/connectors/mongo/cdc.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ type ChangeEvent struct {
3636
func (c *MongoConnector) GetTableSchema(
3737
ctx context.Context,
3838
_ map[string]string,
39-
_ uint32,
39+
internalVersion uint32,
4040
_ protos.TypeSystem,
4141
tableMappings []*protos.TableMapping,
4242
) (map[string]*protos.TableSchema, error) {
@@ -47,8 +47,12 @@ func (c *MongoConnector) GetTableSchema(
4747
TypeModifier: -1,
4848
Nullable: false,
4949
}
50+
fullDocumentColumnName := DefaultFullDocumentColumnName
51+
if internalVersion < shared.IntervalVersion_MongoDBFullDocumentColumnToDoc {
52+
fullDocumentColumnName = LegacyFullDocumentColumnName
53+
}
5054
dataFieldDescription := &protos.FieldDescription{
51-
Name: DefaultFullDocumentColumnName,
55+
Name: fullDocumentColumnName,
5256
Type: string(types.QValueKindJSON),
5357
TypeModifier: -1,
5458
Nullable: false,
@@ -118,6 +122,11 @@ func (c *MongoConnector) PullRecords(
118122
) error {
119123
defer req.RecordStream.Close()
120124

125+
fullDocumentColumnName := DefaultFullDocumentColumnName
126+
if req.InternalVersion < shared.IntervalVersion_MongoDBFullDocumentColumnToDoc {
127+
fullDocumentColumnName = LegacyFullDocumentColumnName
128+
}
129+
121130
c.logger.Info("[mongo] started PullRecords for mirror "+req.FlowJobName,
122131
slog.Any("table_mapping", req.TableNameMapping),
123132
slog.Uint64("max_batch_size", uint64(req.MaxBatchSize)),
@@ -229,14 +238,14 @@ func (c *MongoConnector) PullRecords(
229238
if err != nil {
230239
return fmt.Errorf("failed to convert full document to JSON: %w", err)
231240
}
232-
items.AddColumn(DefaultFullDocumentColumnName, qValue)
241+
items.AddColumn(fullDocumentColumnName, qValue)
233242
} else {
234243
// `fullDocument` field will not exist in the following scenarios:
235244
// 1) operationType is 'delete'
236245
// 2) document is deleted / collection is dropped in between update and lookup
237246
// 3) update changes the values for at least one of the fields in that collection's
238247
// shard key (although sharding is not supported today)
239-
items.AddColumn(DefaultFullDocumentColumnName, types.QValueJSON{Val: "{}"})
248+
items.AddColumn(fullDocumentColumnName, types.QValueJSON{Val: "{}"})
240249
}
241250
return nil
242251
}

flow/connectors/mongo/mongo.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ import (
2626

2727
const (
2828
DefaultDocumentKeyColumnName = "_id"
29-
DefaultFullDocumentColumnName = "_full_document"
29+
DefaultFullDocumentColumnName = "doc"
30+
LegacyFullDocumentColumnName = "_full_document"
3031
)
3132

3233
type MongoConnector struct {

flow/connectors/mongo/qrep.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ func (c *MongoConnector) PullQRepRecords(
159159
}
160160
collection := c.client.Database(parseWatermarkTable.Schema).Collection(parseWatermarkTable.Table)
161161

162-
stream.SetSchema(GetDefaultSchema())
162+
stream.SetSchema(GetDefaultSchema(config.Version))
163163

164164
c.totalBytesRead.Store(0)
165165
c.deltaBytesRead.Store(0)
@@ -237,7 +237,11 @@ func (c *MongoConnector) PullQRepRecords(
237237
return totalRecords, c.deltaBytesRead.Swap(0), nil
238238
}
239239

240-
func GetDefaultSchema() types.QRecordSchema {
240+
func GetDefaultSchema(internalVersion uint32) types.QRecordSchema {
241+
fullDocumentColumnName := DefaultFullDocumentColumnName
242+
if internalVersion < shared.IntervalVersion_MongoDBFullDocumentColumnToDoc {
243+
fullDocumentColumnName = LegacyFullDocumentColumnName
244+
}
241245
schema := make([]types.QField, 0, 2)
242246
schema = append(schema,
243247
types.QField{
@@ -246,7 +250,7 @@ func GetDefaultSchema() types.QRecordSchema {
246250
Nullable: false,
247251
},
248252
types.QField{
249-
Name: DefaultFullDocumentColumnName,
253+
Name: fullDocumentColumnName,
250254
Type: types.QValueKindJSON,
251255
Nullable: false,
252256
})

flow/e2e/mongo/mongo.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/PeerDB-io/peerdb/flow/e2e"
1414
"github.com/PeerDB-io/peerdb/flow/generated/protos"
1515
"github.com/PeerDB-io/peerdb/flow/model"
16+
"github.com/PeerDB-io/peerdb/flow/shared"
1617
)
1718

1819
type MongoSource struct {
@@ -62,7 +63,7 @@ func (s *MongoSource) GetRows(ctx context.Context, suffix, table, cols string) (
6263
}
6364

6465
recordBatch := &model.QRecordBatch{
65-
Schema: connmongo.GetDefaultSchema(),
66+
Schema: connmongo.GetDefaultSchema(shared.IntervalVersion_MongoDBFullDocumentColumnToDoc),
6667
Records: nil,
6768
}
6869

flow/e2e/mongo/mongo_test.go

Lines changed: 34 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ func (s MongoClickhouseSuite) Test_Simple_Flow() {
114114
tc := e2e.NewTemporalClient(t)
115115
env := e2e.ExecutePeerflow(t, tc, flowConnConfig)
116116

117-
e2e.EnvWaitForEqualTablesWithNames(env, s, "initial load to match", srcTable, dstTable, "_id,_full_document")
117+
e2e.EnvWaitForEqualTablesWithNames(env, s, "initial load to match", srcTable, dstTable, "_id,doc")
118118

119119
e2e.SetupCDCFlowStatusQuery(t, env, flowConnConfig)
120120
// insert 10 rows into the source table for cdc
@@ -126,7 +126,7 @@ func (s MongoClickhouseSuite) Test_Simple_Flow() {
126126
require.True(t, res.Acknowledged)
127127
}
128128

129-
e2e.EnvWaitForEqualTablesWithNames(env, s, "cdc events to match", srcTable, dstTable, "_id,_full_document")
129+
e2e.EnvWaitForEqualTablesWithNames(env, s, "cdc events to match", srcTable, dstTable, "_id,doc")
130130
env.Cancel(t.Context())
131131
e2e.RequireEnvCanceled(t, env)
132132
}
@@ -161,7 +161,7 @@ func (s MongoClickhouseSuite) Test_Simple_Flow_Partitioned() {
161161
tc := e2e.NewTemporalClient(t)
162162
env := e2e.ExecutePeerflow(t, tc, flowConnConfig)
163163

164-
e2e.EnvWaitForEqualTablesWithNames(env, s, "initial load to match", srcTable, dstTable, "_id,_full_document")
164+
e2e.EnvWaitForEqualTablesWithNames(env, s, "initial load to match", srcTable, dstTable, "_id,doc")
165165

166166
e2e.SetupCDCFlowStatusQuery(t, env, flowConnConfig)
167167
// insert 10 rows into the source table for cdc
@@ -173,7 +173,7 @@ func (s MongoClickhouseSuite) Test_Simple_Flow_Partitioned() {
173173
require.True(t, res.Acknowledged)
174174
}
175175

176-
e2e.EnvWaitForEqualTablesWithNames(env, s, "cdc events to match", srcTable, dstTable, "_id,_full_document")
176+
e2e.EnvWaitForEqualTablesWithNames(env, s, "cdc events to match", srcTable, dstTable, "_id,doc")
177177
env.Cancel(t.Context())
178178
e2e.RequireEnvCanceled(t, env)
179179
}
@@ -210,7 +210,7 @@ func (s MongoClickhouseSuite) Test_Inconsistent_Schema() {
210210

211211
tc := e2e.NewTemporalClient(t)
212212
env := e2e.ExecutePeerflow(t, tc, flowConnConfig)
213-
e2e.EnvWaitForEqualTablesWithNames(env, s, "initial load to match", srcTable, dstTable, "_id,_full_document")
213+
e2e.EnvWaitForEqualTablesWithNames(env, s, "initial load to match", srcTable, dstTable, "_id,doc")
214214

215215
e2e.SetupCDCFlowStatusQuery(t, env, flowConnConfig)
216216

@@ -224,7 +224,7 @@ func (s MongoClickhouseSuite) Test_Inconsistent_Schema() {
224224
require.NoError(t, err)
225225
require.True(t, res.Acknowledged)
226226
}
227-
e2e.EnvWaitForEqualTablesWithNames(env, s, "cdc events to match", srcTable, dstTable, "_id,_full_document")
227+
e2e.EnvWaitForEqualTablesWithNames(env, s, "cdc events to match", srcTable, dstTable, "_id,doc")
228228

229229
env.Cancel(t.Context())
230230
e2e.RequireEnvCanceled(t, env)
@@ -255,7 +255,7 @@ func (s MongoClickhouseSuite) Test_CDC() {
255255
insertRes, err := collection.InsertOne(t.Context(), bson.D{bson.E{Key: "key", Value: 1}}, options.InsertOne())
256256
require.NoError(t, err)
257257
require.True(t, insertRes.Acknowledged)
258-
e2e.EnvWaitForEqualTablesWithNames(env, s, "insert event", srcTable, dstTable, "_id,_full_document")
258+
e2e.EnvWaitForEqualTablesWithNames(env, s, "insert event", srcTable, dstTable, "_id,doc")
259259

260260
updateRes, err := collection.UpdateOne(
261261
t.Context(),
@@ -264,7 +264,7 @@ func (s MongoClickhouseSuite) Test_CDC() {
264264
options.UpdateOne())
265265
require.NoError(t, err)
266266
require.Equal(t, int64(1), updateRes.ModifiedCount)
267-
e2e.EnvWaitForEqualTablesWithNames(env, s, "update event", srcTable, dstTable, "_id,_full_document")
267+
e2e.EnvWaitForEqualTablesWithNames(env, s, "update event", srcTable, dstTable, "_id,doc")
268268

269269
replaceRes, err := collection.ReplaceOne(
270270
t.Context(),
@@ -273,12 +273,12 @@ func (s MongoClickhouseSuite) Test_CDC() {
273273
options.Replace())
274274
require.NoError(t, err)
275275
require.Equal(t, int64(1), replaceRes.ModifiedCount)
276-
e2e.EnvWaitForEqualTablesWithNames(env, s, "replace event", srcTable, dstTable, "_id,_full_document")
276+
e2e.EnvWaitForEqualTablesWithNames(env, s, "replace event", srcTable, dstTable, "_id,doc")
277277

278278
deleteRes, err := collection.DeleteOne(t.Context(), bson.D{bson.E{Key: "key", Value: 3}}, options.DeleteOne())
279279
require.NoError(t, err)
280280
require.Equal(t, int64(1), deleteRes.DeletedCount)
281-
e2e.EnvWaitForEqualTablesWithNames(env, s, "delete event", srcTable, dstTable, "_id,_full_document")
281+
e2e.EnvWaitForEqualTablesWithNames(env, s, "delete event", srcTable, dstTable, "_id,doc")
282282

283283
env.Cancel(t.Context())
284284
e2e.RequireEnvCanceled(t, env)
@@ -317,35 +317,35 @@ func (s MongoClickhouseSuite) Test_Nested_Document_At_Limit() {
317317

318318
tc := e2e.NewTemporalClient(t)
319319
env := e2e.ExecutePeerflow(t, tc, flowConnConfig)
320-
e2e.EnvWaitForEqualTablesWithNames(env, s, "initial load", srcTable, dstTable, "_id,_full_document")
320+
e2e.EnvWaitForEqualTablesWithNames(env, s, "initial load", srcTable, dstTable, "_id,doc")
321321

322322
e2e.SetupCDCFlowStatusQuery(t, env, flowConnConfig)
323323

324324
// insert nested doc for cdc
325325
res, err = collection.InsertOne(t.Context(), nestedDoc("X"), options.InsertOne())
326326
require.NoError(t, err)
327327
require.True(t, res.Acknowledged)
328-
e2e.EnvWaitForEqualTablesWithNames(env, s, "insert events to match", srcTable, dstTable, "_id,_full_document")
328+
e2e.EnvWaitForEqualTablesWithNames(env, s, "insert events to match", srcTable, dstTable, "_id,doc")
329329

330330
oid := bson.D{bson.E{Key: "_id", Value: res.InsertedID}}
331331

332332
// update nested doc for cdc
333333
updateRes, err := collection.UpdateOne(t.Context(), oid, bson.D{bson.E{Key: "$set", Value: nestedDoc("Y")}}, options.UpdateOne())
334334
require.NoError(t, err)
335335
require.Equal(t, int64(1), updateRes.ModifiedCount)
336-
e2e.EnvWaitForEqualTablesWithNames(env, s, "update events to match", srcTable, dstTable, "_id,_full_document")
336+
e2e.EnvWaitForEqualTablesWithNames(env, s, "update events to match", srcTable, dstTable, "_id,doc")
337337

338338
// replace nested doc for cdc
339339
replaceRes, err := collection.ReplaceOne(t.Context(), oid, nestedDoc("Z"), options.Replace())
340340
require.NoError(t, err)
341341
require.Equal(t, int64(1), replaceRes.ModifiedCount)
342-
e2e.EnvWaitForEqualTablesWithNames(env, s, "replace events to match", srcTable, dstTable, "_id,_full_document")
342+
e2e.EnvWaitForEqualTablesWithNames(env, s, "replace events to match", srcTable, dstTable, "_id,doc")
343343

344344
// delete nested doc for cdc
345345
deleteRes, err := collection.DeleteOne(t.Context(), oid, options.DeleteOne())
346346
require.NoError(t, err)
347347
require.Equal(t, int64(1), deleteRes.DeletedCount)
348-
e2e.EnvWaitForEqualTablesWithNames(env, s, "delete events to match", srcTable, dstTable, "_id,_full_document")
348+
e2e.EnvWaitForEqualTablesWithNames(env, s, "delete events to match", srcTable, dstTable, "_id,doc")
349349

350350
env.Cancel(t.Context())
351351
e2e.RequireEnvCanceled(t, env)
@@ -384,35 +384,35 @@ func (s MongoClickhouseSuite) Test_Large_Document_At_Limit() {
384384

385385
tc := e2e.NewTemporalClient(t)
386386
env := e2e.ExecutePeerflow(t, tc, flowConnConfig)
387-
e2e.EnvWaitForEqualTablesWithNames(env, s, "initial load", srcTable, dstTable, "_id,_full_document")
387+
e2e.EnvWaitForEqualTablesWithNames(env, s, "initial load", srcTable, dstTable, "_id,doc")
388388

389389
e2e.SetupCDCFlowStatusQuery(t, env, flowConnConfig)
390390

391391
// insert large doc for cdc (to test change event with "fullDocument")
392392
res, err = collection.InsertOne(t.Context(), largeDoc("X"), options.InsertOne())
393393
require.NoError(t, err)
394394
require.True(t, res.Acknowledged)
395-
e2e.EnvWaitForEqualTablesWithNames(env, s, "insert events to match", srcTable, dstTable, "_id,_full_document")
395+
e2e.EnvWaitForEqualTablesWithNames(env, s, "insert events to match", srcTable, dstTable, "_id,doc")
396396

397397
oid := bson.D{bson.E{Key: "_id", Value: res.InsertedID}}
398398

399399
// update large doc for cdc
400400
updateRes, err := collection.UpdateOne(t.Context(), oid, bson.D{bson.E{Key: "$set", Value: largeDoc("Y")}}, options.UpdateOne())
401401
require.NoError(t, err)
402402
require.Equal(t, int64(1), updateRes.ModifiedCount)
403-
e2e.EnvWaitForEqualTablesWithNames(env, s, "update events to match", srcTable, dstTable, "_id,_full_document")
403+
e2e.EnvWaitForEqualTablesWithNames(env, s, "update events to match", srcTable, dstTable, "_id,doc")
404404

405405
// replace large doc for cdc
406406
replaceRes, err := collection.ReplaceOne(t.Context(), oid, largeDoc("Z"), options.Replace())
407407
require.NoError(t, err)
408408
require.Equal(t, int64(1), replaceRes.ModifiedCount)
409-
e2e.EnvWaitForEqualTablesWithNames(env, s, "replace events to match", srcTable, dstTable, "_id,_full_document")
409+
e2e.EnvWaitForEqualTablesWithNames(env, s, "replace events to match", srcTable, dstTable, "_id,doc")
410410

411411
// delete large doc for cdc
412412
deleteRes, err := collection.DeleteOne(t.Context(), oid, options.DeleteOne())
413413
require.NoError(t, err)
414414
require.Equal(t, int64(1), deleteRes.DeletedCount)
415-
e2e.EnvWaitForEqualTablesWithNames(env, s, "delete events to match", srcTable, dstTable, "_id,_full_document")
415+
e2e.EnvWaitForEqualTablesWithNames(env, s, "delete events to match", srcTable, dstTable, "_id,doc")
416416

417417
env.Cancel(t.Context())
418418
e2e.RequireEnvCanceled(t, env)
@@ -457,8 +457,8 @@ func (s MongoClickhouseSuite) Test_Transactions_Across_Collections() {
457457

458458
tc := e2e.NewTemporalClient(t)
459459
env := e2e.ExecutePeerflow(t, tc, flowConnConfig)
460-
e2e.EnvWaitForEqualTablesWithNames(env, s, "initial load", srcTable1, dstTable1, "_id,_full_document")
461-
e2e.EnvWaitForEqualTablesWithNames(env, s, "initial load", srcTable2, dstTable2, "_id,_full_document")
460+
e2e.EnvWaitForEqualTablesWithNames(env, s, "initial load", srcTable1, dstTable1, "_id,doc")
461+
e2e.EnvWaitForEqualTablesWithNames(env, s, "initial load", srcTable2, dstTable2, "_id,doc")
462462

463463
e2e.SetupCDCFlowStatusQuery(t, env, flowConnConfig)
464464

@@ -480,8 +480,8 @@ func (s MongoClickhouseSuite) Test_Transactions_Across_Collections() {
480480
require.NoError(t, err)
481481
require.Equal(t, int64(1), res.([]*mongo.UpdateResult)[0].ModifiedCount)
482482
require.Equal(t, int64(1), res.([]*mongo.UpdateResult)[1].ModifiedCount)
483-
e2e.EnvWaitForEqualTablesWithNames(env, s, "t1 to match", srcTable1, dstTable1, "_id,_full_document")
484-
e2e.EnvWaitForEqualTablesWithNames(env, s, "t2 to match", srcTable2, dstTable2, "_id,_full_document")
483+
e2e.EnvWaitForEqualTablesWithNames(env, s, "t1 to match", srcTable1, dstTable1, "_id,doc")
484+
e2e.EnvWaitForEqualTablesWithNames(env, s, "t2 to match", srcTable2, dstTable2, "_id,doc")
485485

486486
env.Cancel(t.Context())
487487
e2e.RequireEnvCanceled(t, env)
@@ -511,25 +511,25 @@ func (s MongoClickhouseSuite) Test_Enable_Json() {
511511

512512
tc := e2e.NewTemporalClient(t)
513513
env := e2e.ExecutePeerflow(t, tc, flowConnConfig)
514-
e2e.EnvWaitForEqualTablesWithNames(env, s, "initial load", srcTable, dstTable, "_id,_full_document")
514+
e2e.EnvWaitForEqualTablesWithNames(env, s, "initial load", srcTable, dstTable, "_id,doc")
515515

516516
e2e.SetupCDCFlowStatusQuery(t, env, flowConnConfig)
517517

518518
insertRes, err := collection.InsertOne(t.Context(), bson.D{bson.E{Key: "key2", Value: "val2"}}, options.InsertOne())
519519
require.NoError(t, err)
520520
require.True(t, insertRes.Acknowledged)
521-
e2e.EnvWaitForEqualTablesWithNames(env, s, "insert event", srcTable, dstTable, "_id,_full_document")
521+
e2e.EnvWaitForEqualTablesWithNames(env, s, "insert event", srcTable, dstTable, "_id,doc")
522522
oid := bson.D{bson.E{Key: "_id", Value: res.InsertedID}}
523523

524524
replaceRes, err := collection.ReplaceOne(t.Context(), oid, bson.D{bson.E{Key: "key2", Value: "val2"}}, options.Replace())
525525
require.NoError(t, err)
526526
require.Equal(t, int64(1), replaceRes.ModifiedCount)
527-
e2e.EnvWaitForEqualTablesWithNames(env, s, "replace event", srcTable, dstTable, "_id,_full_document")
527+
e2e.EnvWaitForEqualTablesWithNames(env, s, "replace event", srcTable, dstTable, "_id,doc")
528528

529529
deleteRes, err := collection.DeleteOne(t.Context(), oid, options.DeleteOne())
530530
require.NoError(t, err)
531531
require.Equal(t, int64(1), deleteRes.DeletedCount)
532-
e2e.EnvWaitForEqualTablesWithNames(env, s, "delete event", srcTable, dstTable, "_id,_full_document")
532+
e2e.EnvWaitForEqualTablesWithNames(env, s, "delete event", srcTable, dstTable, "_id,doc")
533533

534534
env.Cancel(t.Context())
535535
e2e.RequireEnvCanceled(t, env)
@@ -566,8 +566,8 @@ func (s MongoClickhouseSuite) Test_Mongo_Can_Resume_After_Delete_Table() {
566566
insertRes, err = db.Collection(srcTable2).InsertOne(t.Context(), bson.D{bson.E{Key: "key", Value: "val"}}, options.InsertOne())
567567
require.NoError(t, err)
568568
require.True(t, insertRes.Acknowledged)
569-
e2e.EnvWaitForEqualTablesWithNames(env, s, "insert event", srcTable1, dstTable1, "_id,_full_document")
570-
e2e.EnvWaitForEqualTablesWithNames(env, s, "insert event", srcTable2, dstTable2, "_id,_full_document")
569+
e2e.EnvWaitForEqualTablesWithNames(env, s, "insert event", srcTable1, dstTable1, "_id,doc")
570+
e2e.EnvWaitForEqualTablesWithNames(env, s, "insert event", srcTable2, dstTable2, "_id,doc")
571571

572572
// pause workflow
573573
e2e.SignalWorkflow(t.Context(), env, model.FlowSignal, model.PauseSignal)
@@ -590,7 +590,7 @@ func (s MongoClickhouseSuite) Test_Mongo_Can_Resume_After_Delete_Table() {
590590
insertRes, err = db.Collection(srcTable1).InsertOne(t.Context(), bson.D{bson.E{Key: "key2", Value: "val2"}}, options.InsertOne())
591591
require.NoError(t, err)
592592
require.True(t, insertRes.Acknowledged)
593-
e2e.EnvWaitForEqualTablesWithNames(env, s, "insert event", srcTable1, dstTable1, "_id,_full_document")
593+
e2e.EnvWaitForEqualTablesWithNames(env, s, "insert event", srcTable1, dstTable1, "_id,doc")
594594

595595
env.Cancel(t.Context())
596596
e2e.RequireEnvCanceled(t, env)
@@ -724,16 +724,16 @@ func (s MongoClickhouseSuite) Test_Json_Types() {
724724

725725
tc := e2e.NewTemporalClient(t)
726726
env := e2e.ExecutePeerflow(t, tc, flowConnConfig)
727-
e2e.EnvWaitForCount(env, s, "initial load", dstTable, "_id,_full_document", 1)
727+
e2e.EnvWaitForCount(env, s, "initial load", dstTable, "_id,doc", 1)
728728

729729
e2e.SetupCDCFlowStatusQuery(t, env, flowConnConfig)
730730

731731
insertRes, err = collection.InsertOne(t.Context(), doc, options.InsertOne())
732732
require.NoError(t, err)
733733
require.True(t, insertRes.Acknowledged)
734-
e2e.EnvWaitForCount(env, s, "cdc", dstTable, "_id,_full_document", 2)
734+
e2e.EnvWaitForCount(env, s, "cdc", dstTable, "_id,doc", 2)
735735

736-
rows, err := s.GetRows(dstTable, "_id,_full_document")
736+
rows, err := s.GetRows(dstTable, "_id,doc")
737737
require.NoError(t, err)
738738
require.Len(t, rows.Records, 2, "Expected 2 rows in destination table")
739739

flow/shared/constants.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ const (
1818
const (
1919
InternalVersion_First uint32 = iota
2020
InternalVersion_PgVectorAsFloatArray
21+
IntervalVersion_MongoDBFullDocumentColumnToDoc
2122

2223
TotalNumberOfInternalVersions
2324
InternalVersion_Latest = TotalNumberOfInternalVersions - 1

0 commit comments

Comments
 (0)