Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
687a57e
move to common
FGasper Nov 11, 2025
61b89e3
move
FGasper Nov 11, 2025
3d35d29
fix lint
FGasper Nov 11, 2025
1c672f9
unused
FGasper Nov 11, 2025
aefe4d6
pointer receivers
FGasper Nov 11, 2025
1cb493b
renames/refacotrs
FGasper Nov 11, 2025
08d5633
cleanup
FGasper Nov 11, 2025
c9e4610
tidy
FGasper Nov 11, 2025
c7d9edd
string
FGasper Nov 11, 2025
196f887
more renames
FGasper Nov 11, 2025
06f7794
move
FGasper Nov 11, 2025
0b13627
metadata
FGasper Nov 11, 2025
25b7e61
comment
FGasper Nov 11, 2025
3284468
comments
FGasper Nov 11, 2025
15a7636
remove persistor error
FGasper Nov 12, 2025
df29cde
remove doneChan
FGasper Nov 12, 2025
fc6c8ce
remove reader error channel
FGasper Nov 12, 2025
d47c40a
allow eventual to accept nil -- MUST TEST!
FGasper Nov 12, 2025
4e38f04
allow premature exit
FGasper Nov 12, 2025
0d4e406
rename & move
FGasper Nov 12, 2025
95129ef
Merge branch 'main' into REP-6804-scratch
FGasper Nov 12, 2025
4e82706
Merge branch 'REP-6804-scratch' into REP-6804-fix-context
FGasper Nov 12, 2025
e23a503
move
FGasper Nov 12, 2025
d61a8f1
handling
FGasper Nov 12, 2025
b05ffe3
add oplog reader & alia
FGasper Nov 12, 2025
c6025f5
dedupe retry of change reader
FGasper Nov 12, 2025
8751b11
format
FGasper Nov 12, 2025
d3070e2
oops
FGasper Nov 12, 2025
06bc31b
nolint
FGasper Nov 12, 2025
1cbb232
fix test
FGasper Nov 12, 2025
51f4163
ddl allowance
FGasper Nov 12, 2025
6918fda
token
FGasper Nov 12, 2025
eb0464d
test 2nd time
FGasper Nov 12, 2025
7b30bff
save
FGasper Nov 12, 2025
4df242e
add comment
FGasper Nov 12, 2025
d2b9e18
Merge branch 'REP-6804-fix-context' into REP-6804-mongod-oplog-final
FGasper Nov 12, 2025
e5a1596
tolerate later startAtTs
FGasper Nov 12, 2025
90f9123
collmod test
FGasper Nov 12, 2025
11140ae
empty ts
FGasper Nov 12, 2025
8f6871b
read oplog for 4.2
FGasper Nov 12, 2025
3c6380c
Merge branch 'main' into REP-6804-fix-context
FGasper Nov 12, 2025
5337613
Merge branch 'REP-6804-fix-context' into REP-6804-mongod-oplog-final
FGasper Nov 12, 2025
0fbc28e
maybe fix test?
FGasper Nov 12, 2025
6052ecb
avoid $switch for 4.2.
FGasper Nov 13, 2025
491e8f8
4.4
FGasper Nov 13, 2025
6c0aa33
compat
FGasper Nov 13, 2025
8c0a1d3
switch is OK
FGasper Nov 13, 2025
1bfbe36
no oplog for 4.2 for now
FGasper Nov 13, 2025
e80cba3
projection is only 4.4 anyway
FGasper Nov 13, 2025
f3bf387
allow non-expr oplog
FGasper Nov 13, 2025
f07b8de
handle applyOps
FGasper Nov 13, 2025
7840134
clone
FGasper Nov 13, 2025
ed11123
Merge branch 'main' into REP-6804-mongod-oplog-final
FGasper Nov 13, 2025
acbbbbe
options
FGasper Nov 13, 2025
ddeb8f3
github ci
FGasper Nov 13, 2025
e32835b
err check
FGasper Nov 13, 2025
bb78043
array
FGasper Nov 13, 2025
bf719a6
quotes
FGasper Nov 13, 2025
a74853e
quotes again
FGasper Nov 13, 2025
1f1fdb4
support ns filter in oplog
FGasper Nov 13, 2025
0622651
show Max the wonky
FGasper Nov 13, 2025
0b68637
wth
FGasper Nov 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 56 additions & 23 deletions .github/workflows/all.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ on:
pull_request:
workflow_dispatch:

env:
replsetSrcConnStr: mongodb://localhost:27020,localhost:27021,localhost:27022
replsetDstConnStr: mongodb://localhost:27030,localhost:27031,localhost:27032
shardedSrcConnStr: mongodb://localhost:27020
shardedDstConnStr: mongodb://localhost:27030

jobs:
basics:
strategy:
Expand All @@ -17,10 +23,37 @@ jobs:
# Testing fallback when `hello` isn’t implemented
# (but appendOplogNote is).
- mongodb_versions: [ '4.2.5', '6.0' ]
topology:
name: replset
srcConnStr: mongodb://localhost:27020,localhost:27021,localhost:27022
dstConnStr: mongodb://localhost:27030,localhost:27031,localhost:27032
topology: replset

- mongodb_versions: [ '4.2', '8.0' ]
topology: replset
srcChangeReader: tailOplog
dstChangeReader: tailOplog

- mongodb_versions: [ '4.4', '8.0' ]
topology: replset
srcChangeReader: tailOplog
dstChangeReader: tailOplog

- mongodb_versions: [ '5.0', '8.0' ]
topology: replset
srcChangeReader: tailOplog
dstChangeReader: tailOplog

- mongodb_versions: [ '6.0', '8.0' ]
topology: replset
srcChangeReader: tailOplog
dstChangeReader: tailOplog

- mongodb_versions: [ '7.0', '8.0' ]
topology: replset
srcChangeReader: tailOplog
dstChangeReader: tailOplog

- mongodb_versions: [ '8.0', '8.0' ]
topology: replset
srcChangeReader: tailOplog
dstChangeReader: tailOplog

exclude:
- mongodb_versions: [ '4.2', '4.2' ]
Expand All @@ -31,13 +64,16 @@ jobs:
toHashedIndexKey: true
- mongodb_versions: [ '4.2', '6.0' ]
toHashedIndexKey: true
- mongodb_versions: [ '4.2', '8.0' ]
toHashedIndexKey: true

# versions are: source, destination
mongodb_versions:
- [ '4.2', '4.2' ]
- [ '4.2', '4.4' ]
- [ '4.2', '5.0' ]
- [ '4.2', '6.0' ]
- [ '4.2', '8.0' ]

- [ '4.4', '4.4' ]
- [ '4.4', '5.0' ]
Expand All @@ -60,27 +96,19 @@ jobs:

toHashedIndexKey: [true, false]

topology:
- name: replset
srcConnStr: mongodb://localhost:27020,localhost:27021,localhost:27022
dstConnStr: mongodb://localhost:27030,localhost:27031,localhost:27032

- name: replset-to-sharded
dstArgs: --sharded 2
srcConnStr: mongodb://localhost:27020,localhost:27021,localhost:27022
dstConnStr: mongodb://localhost:27030
srcChangeReader: [changeStream]
dstChangeReader: [changeStream]

- name: sharded
srcArgs: --sharded 2
dstArgs: --sharded 2
srcConnStr: mongodb://localhost:27020
dstConnStr: mongodb://localhost:27030
topology:
- replset
- replset-to-sharded
- sharded

# Ubuntu 24 lacks OpenSSL 1.1.1’s libcrypto, which pre-v6 MongoDB
# versions need.
runs-on: ubuntu-22.04

name: ${{ matrix.mongodb_versions[0] }} to ${{ matrix.mongodb_versions[1] }}, ${{ matrix.topology.name }}${{ matrix.toHashedIndexKey && ', hashed doc compare' || '' }}
name: ${{ matrix.mongodb_versions[0] }} to ${{ matrix.mongodb_versions[1] }}, ${{ matrix.topology }}${{ matrix.toHashedIndexKey && ', hashed doc compare' || '' }}, srcChangeReader=${{ matrix.srcChangeReader }}, dstChangeReader=${{ matrix.dstChangeReader }}

steps:
- run: uname -a
Expand Down Expand Up @@ -115,15 +143,20 @@ jobs:
run: |-
{
echo ./build.sh
echo mlaunch init --binarypath $(cat .srcpath) --port 27020 --dir src --replicaset ${{ matrix.topology.srcArgs }}
echo mlaunch init --binarypath $(cat .dstpath) --port 27030 --dir dst --replicaset ${{ matrix.topology.dstArgs }}
echo mlaunch init --binarypath $(cat .srcpath) --port 27020 --dir src --replicaset ${{ (matrix.topology == 'sharded') && '--sharded 2' || '' }}
echo mlaunch init --binarypath $(cat .dstpath) --port 27030 --dir dst --replicaset ${{ (matrix.topology == 'sharded' || matrix.topology == 'replset-to-sharded') && '--sharded 2' || '' }}
echo mlaunch init --binarypath $(cat .metapath) --port 27040 --dir meta --replicaset --nodes 1
} | parallel

- name: Test
run: go test -v ./... -race
env:
MVTEST_DOC_COMPARE_METHOD: ${{matrix.toHashedIndexKey && 'toHashedIndexKey' || ''}}
MVTEST_SRC: ${{matrix.topology.srcConnStr}}
MVTEST_DST: ${{matrix.topology.dstConnStr}}

MVTEST_SRC_CHANGE_READER: ${{matrix.srcChangeReader}}
MVTEST_DST_CHANGE_READER: ${{matrix.dstChangeReader}}

MVTEST_SRC: ${{ (matrix.topology == 'sharded') && env.shardedSrcConnStr || env.replsetSrcConnStr }}
MVTEST_DST: ${{ (matrix.topology == 'sharded' || matrix.topology == 'replset-to-sharded') && env.shardedDstConnStr || env.replsetDstConnStr }}

MVTEST_META: mongodb://localhost:27040
185 changes: 185 additions & 0 deletions agg/agg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package agg

import (
"go.mongodb.org/mongo-driver/v2/bson"
)

func Eq(comparands ...any) bson.D {
return bson.D{{"$eq", comparands}}
}

func In[T any](needle any, haystack ...T) bson.D {
return bson.D{{"$in", bson.A{needle, haystack}}}
}

func BSONSize(ref any) bson.D {
return bson.D{{"$bsonSize", ref}}
}

func Type(ref any) bson.D {
return bson.D{{"$type", ref}}
}

func Concat(refs ...any) bson.D {
return bson.D{{"$concat", refs}}
}

// ---------------------------------------------

type Not struct {
Ref any
}

var _ bson.Marshaler = Not{}

func (n Not) MarshalBSON() ([]byte, error) {
return bson.Marshal(bson.D{
{"$not", n.Ref},
})
}

// ---------------------------------------------

type And []any

var _ bson.Marshaler = And{}

func (a And) MarshalBSON() ([]byte, error) {
return bson.Marshal(bson.D{
{"$and", []any(a)},
})
}

// ---------------------------------------------

type Or []any

var _ bson.Marshaler = Or{}

func (o Or) MarshalBSON() ([]byte, error) {
return bson.Marshal(bson.D{
{"$or", []any(o)},
})
}

// ---------------------------------------------

type MergeObjects []any

var _ bson.Marshaler = MergeObjects{}

func (m MergeObjects) MarshalBSON() ([]byte, error) {
return bson.Marshal(bson.D{
{"$mergeObjects", []any(m)},
})
}

// ---------------------------------------------

type Cond struct {
If, Then, Else any
}

var _ bson.Marshaler = Cond{}

func (c Cond) D() bson.D {
return bson.D{
{"$cond", bson.D{
{"if", c.If},
{"then", c.Then},
{"else", c.Else},
}},
}
}

func (c Cond) MarshalBSON() ([]byte, error) {
return bson.Marshal(c.D())
}

// ---------------------------------------------

type Switch struct {
Branches []SwitchCase
Default any
}

type SwitchCase struct {
Case any
Then any
}

func (s Switch) D() bson.D {
return bson.D{{"$switch", bson.D{
{"branches", s.Branches},
{"default", s.Default},
}}}
}

func (s Switch) MarshalBSON() ([]byte, error) {
return bson.Marshal(s.D())
}

// ---------------------------------------------

type ArrayElemAt struct {
Array any
Index int
}

func (a ArrayElemAt) D() bson.D {
return bson.D{{"$arrayElemAt", bson.A{
a.Array,
a.Index,
}}}
}

func (a ArrayElemAt) MarshalBSON() ([]byte, error) {
return bson.Marshal(a.D())
}

// ---------------------------------------------

type Map struct {
Input, As, In any
}

var _ bson.Marshaler = Map{}

func (m Map) D() bson.D {
return bson.D{
{"$map", bson.D{
{"input", m.Input},
{"as", m.As},
{"in", m.In},
}},
}
}

func (m Map) MarshalBSON() ([]byte, error) {
return bson.Marshal(m.D())
}

// ------------------------------------------

type Filter struct {
Input, As, Cond, Limit any
}

var _ bson.Marshaler = Filter{}

func (f Filter) D() bson.D {
d := bson.D{
{"input", f.Input},
{"as", f.As},
{"cond", f.Cond},
}

if f.Limit != nil {
d = append(d, bson.E{"limit", f.Limit})
}
return bson.D{{"$filter", d}}
}

func (f Filter) MarshalBSON() ([]byte, error) {
return bson.Marshal(f.D())
}
35 changes: 35 additions & 0 deletions agg/helpers/string.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package helpers

import (
"go.mongodb.org/mongo-driver/v2/bson"
)

type StringHasPrefix struct {
FieldRef any
Prefix string
}

func (sp StringHasPrefix) MarshalBSON() ([]byte, error) {
return bson.Marshal(bson.D{
{"$eq", bson.A{
0,
bson.D{{"$indexOfCP", bson.A{
sp.FieldRef,
sp.Prefix,
0,
1,
}}},
}},
})

/*
return bson.Marshal(agg.Eq(
sp.Prefix,
agg.SubstrBytes{
sp.FieldRef,
0,
len(sp.Prefix),
},
))
*/
}
12 changes: 11 additions & 1 deletion internal/retry/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,18 +180,28 @@ func (r *Retryer) runRetryLoop(

// Not a transient error? Fail immediately.
if !r.shouldRetryWithSleep(logger, sleepTime, descriptions, cbErr) {
if descr, has := r.description.Get(); has {
cbErr = errors.Wrap(cbErr, descr)
}

return cbErr
}

// Our error is transient. If we've exhausted the allowed time
// then fail.

if failedFuncInfo.GetDurationSoFar() > li.durationLimit {
return RetryDurationLimitExceededErr{
var err error = RetryDurationLimitExceededErr{
attempts: li.attemptsSoFar,
duration: failedFuncInfo.GetDurationSoFar(),
lastErr: groupErr.errFromCallback,
}

if descr, has := r.description.Get(); has {
err = errors.Wrap(err, descr)
}

return err
}

// Sleep and increase the sleep time for the next retry,
Expand Down
Loading
Loading