diff --git a/.github/workflows/all.yml b/.github/workflows/all.yml index 7a082756..458e6abd 100644 --- a/.github/workflows/all.yml +++ b/.github/workflows/all.yml @@ -11,6 +11,8 @@ jobs: strategy: fail-fast: false matrix: + + # versions are: source, destination mongodb_versions: - [ '4.2', '4.2' ] - [ '4.2', '4.4' ] @@ -58,8 +60,12 @@ jobs: with: go-version: stable - - name: Install m - run: npm install -g m mongosh + - name: Install m and mtools + run: |- + { + echo npm install -g m + echo pipx install 'mtools[all]' + } | parallel - name: Install MongoDB ${{ matrix.mongodb_versions[0] }} (source) run: yes | m ${{ matrix.mongodb_versions[0] }} && dirname $(readlink $(which mongod)) > .srcpath @@ -70,18 +76,15 @@ jobs: - name: Install latest stable MongoDB (metadata) run: yes | m stable && dirname $(readlink $(which mongod)) > .metapath - - name: Install mtools - run: pipx install 'mtools[all]' - - name: Build run: go build main/migration_verifier.go - name: Start clusters run: |- { - echo "mlaunch init --binarypath $(cat .srcpath) --port 27020 --dir src --replicaset ${{ matrix.topology.args }}" - echo "mlaunch init --binarypath $(cat .dstpath) --port 27030 --dir dst --replicaset ${{ matrix.topology.args }}" - echo "mlaunch init --binarypath $(cat .metapath) --port 27040 --dir meta --replicaset --nodes 1" + echo mlaunch init --binarypath $(cat .srcpath) --port 27020 --dir src --replicaset ${{ matrix.topology.args }} + echo mlaunch init --binarypath $(cat .dstpath) --port 27030 --dir dst --replicaset ${{ matrix.topology.args }} + echo mlaunch init --binarypath $(cat .metapath) --port 27040 --dir meta --replicaset --nodes 1 } | parallel - name: Test diff --git a/.gitignore b/.gitignore index b7c3c335..e5fbc730 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,4 @@ dev-bin/ -go.sum golangci-lint migration_verifier internal/verifier/mongodb_exec/ diff --git a/go.mod b/go.mod index 0f8ca4b7..fa9f2646 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/10gen/migration-verifier -go 1.20 +go 1.22 require ( github.com/cespare/permute/v2 v2.0.0-beta2 diff --git a/go.sum b/go.sum new file mode 100644 index 00000000..d5301ade --- /dev/null +++ b/go.sum @@ -0,0 +1,169 @@ +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/cespare/permute/v2 v2.0.0-beta2 h1:iiJWgDsInCbnwfDZiLJX7cVlJevgIM1pUiArcUylUMc= +github.com/cespare/permute/v2 v2.0.0-beta2/go.mod h1:7e2Tqe0fjn1T4rTjDLJjQ+kNtt+UW4B9lg10asqWw0A= +github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= +github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w= +github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/deckarep/golang-set/v2 v2.3.0 h1:qs18EKUfHm2X9fA50Mr/M5hccg2tNnVqsiBImnyDs0g= +github.com/deckarep/golang-set/v2 v2.3.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= +github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= +github.com/gin-gonic/gin v1.8.1 h1:4+fr/el88TOO3ewCmQr8cx/CtZ/umlIRIs5M4NTNjf8= +github.com/gin-gonic/gin v1.8.1/go.mod h1:ji8BvRH1azfM+SYow9zQ6SZMvR8qOMZHmsCuWR9tTTk= +github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A= +github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= +github.com/go-playground/locales v0.14.0 h1:u50s323jtVGugKlcYeyzC0etD1HifMjqmJqb8WugfUU= +github.com/go-playground/locales v0.14.0/go.mod h1:sawfccIbzZTqEDETgFXqTho0QybSa7l++s0DH+LDiLs= +github.com/go-playground/universal-translator v0.18.0 h1:82dyy6p4OuJq4/CByFNOn/jYrnRPArHwAcmLoJZxyho= +github.com/go-playground/universal-translator v0.18.0/go.mod h1:UvRDBj+xPUEGrFYl+lu/H90nyDXpg0fqeB/AQUGNTVA= +github.com/go-playground/validator/v10 v10.10.0 h1:I7mrTYv78z8k8VXa/qJlOlEXn/nBh+BF8dHX5nt/dr0= +github.com/go-playground/validator/v10 v10.10.0/go.mod h1:74x4gJWsvQexRdW8Pn3dXSGrTK4nAUsbPlLADvpJkos= +github.com/goccy/go-json v0.9.7 h1:IcB+Aqpx/iMHu5Yooh7jEzJk1JZ7Pjtmys2ukPr7EeM= +github.com/goccy/go-json v0.9.7/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc= +github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w= +github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY= +github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZbaA40= +github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= +github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y= +github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= +github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= +github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE= +github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= +github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= +github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= +github.com/pelletier/go-toml/v2 v2.0.1 h1:8e3L2cCQzLFi2CR4g7vGFuFxX7Jl1kKX8gW+iV0GUKU= +github.com/pelletier/go-toml/v2 v2.0.1/go.mod h1:r9LEWfGN8R5k0VXJ+0BkIe7MYkRdwZOjgMj2KwnJFUo= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8= +github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= +github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= +github.com/rs/zerolog v1.28.0 h1:MirSo27VyNi7RJYP3078AA1+Cyzd2GB66qy3aUHvsWY= +github.com/rs/zerolog v1.28.0/go.mod h1:NILgTygv/Uej1ra5XxGf82ZFSLk58MFGAUS2o6usyD0= +github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc= +github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU= +github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/ugorji/go v1.2.7/go.mod h1:nF9osbDWLy6bDVv/Rtoh6QgnvNDpmCalQV5urGCCS6M= +github.com/ugorji/go/codec v1.2.7 h1:YPXUKf7fYbp/y8xloBqZOw2qaVggbfwMlI8WM3wZUJ0= +github.com/ugorji/go/codec v1.2.7/go.mod h1:WGN1fab3R1fzQlVQTkfxVtIBhWDRqOviHU95kRgeqEY= +github.com/urfave/cli v1.22.9 h1:cv3/KhXGBGjEXLC4bH0sLuJ9BewaAbpk5oyMOveu4pw= +github.com/urfave/cli v1.22.9/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.mongodb.org/mongo-driver v1.17.1 h1:Wic5cJIwJgSpBhe3lx3+/RybR5PiYRMpVFgO7cOHyIM= +go.mongodb.org/mongo-driver v1.17.1/go.mod h1:wwWm/+BuOddhcq3n68LKRmgk2wXzmF6s0SFOa0GINL4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= +golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= +golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8= +golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= +golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM= +golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw= +google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= +gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index ddf8f5ba..c4c4c9cc 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -12,7 +12,6 @@ import ( "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" - "golang.org/x/exp/constraints" ) const fauxDocSizeForDeleteEvents = 1024 @@ -47,7 +46,7 @@ type UnknownEventError struct { } func (uee UnknownEventError) Error() string { - return fmt.Sprintf("Received event with unknown optype: %+v", uee.Event) + return fmt.Sprintf("received event with unknown optype: %+v", uee.Event) } // HandleChangeStreamEvents performs the necessary work for change stream events after receiving a batch. @@ -76,7 +75,7 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch [] fallthrough case "update": if err := verifier.eventRecorder.AddEvent(&changeEvent); err != nil { - return errors.Wrapf(err, "failed to augment stats with change event: %+v", changeEvent) + return errors.Wrapf(err, "failed to augment stats with change event (%+v)", changeEvent) } dbNames[i] = changeEvent.Ns.DB collNames[i] = changeEvent.Ns.Coll @@ -105,6 +104,11 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch [] // GetChangeStreamFilter returns an aggregation pipeline that filters // namespaces as per configuration. // +// Note that this omits verifier.globalFilter because we still need to +// recheck any out-filter documents that may have changed in order to +// account for filter traversals (i.e., updates that change whether a +// document matches the filter). +// // NB: Ideally we could make the change stream give $bsonSize(fullDocument) // and omit fullDocument, but $bsonSize was new in MongoDB 4.4, and we still // want to verify migrations from 4.2. fullDocument is unlikely to be a @@ -122,60 +126,76 @@ func (verifier *Verifier) GetChangeStreamFilter() []bson.D { return []bson.D{stage} } -func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.ChangeStream) { - defer cs.Close(ctx) +// This function reads a single `getMore` response into a slice. +// +// Note that this doesn’t care about the writesOff timestamp. Thus, +// if writesOff has happened and a `getMore` response’s events straddle +// the writesOff timestamp (i.e., some events precede it & others follow it), +// the verifier will enqueue rechecks from those post-writesOff events. This +// is unideal but shouldn’t impede correctness since post-writesOff events +// shouldn’t really happen anyway by definition. +func (verifier *Verifier) readAndHandleOneChangeEventBatch( + ctx context.Context, + cs *mongo.ChangeStream, +) error { + eventsRead := 0 + var changeEventBatch []ParsedEvent + + for hasEventInBatch := true; hasEventInBatch; hasEventInBatch = cs.RemainingBatchLength() > 0 { + gotEvent := cs.TryNext(ctx) + + if cs.Err() != nil { + return errors.Wrap(cs.Err(), "change stream iteration failed") + } - var lastPersistedTime time.Time + if !gotEvent { + break + } - persistResumeTokenIfNeeded := func() error { - if time.Since(lastPersistedTime) <= minChangeStreamPersistInterval { - return nil + if changeEventBatch == nil { + changeEventBatch = make([]ParsedEvent, cs.RemainingBatchLength()+1) } - err := verifier.persistChangeStreamResumeToken(ctx, cs) - if err == nil { - lastPersistedTime = time.Now() + if err := cs.Decode(&changeEventBatch[eventsRead]); err != nil { + return errors.Wrapf(err, "failed to decode change event to %T", changeEventBatch[eventsRead]) } - return err + eventsRead++ } - readAndHandleOneChangeEventBatch := func() (bool, error) { - eventsRead := 0 - var changeEventBatch []ParsedEvent + if eventsRead == 0 { + return nil + } - for hasEventInBatch := true; hasEventInBatch; hasEventInBatch = cs.RemainingBatchLength() > 0 { - gotEvent := cs.TryNext(ctx) + err := verifier.HandleChangeStreamEvents(ctx, changeEventBatch) + if err != nil { + return errors.Wrap(err, "failed to handle change events") + } - if !gotEvent || cs.Err() != nil { - break - } + return nil +} - if changeEventBatch == nil { - changeEventBatch = make([]ParsedEvent, cs.RemainingBatchLength()+1) - } +func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.ChangeStream) { + defer cs.Close(ctx) - if err := cs.Decode(&changeEventBatch[eventsRead]); err != nil { - return false, errors.Wrap(err, "failed to decode change event") - } + var lastPersistedTime time.Time - eventsRead++ + persistResumeTokenIfNeeded := func() error { + if time.Since(lastPersistedTime) <= minChangeStreamPersistInterval { + return nil } - if eventsRead > 0 { - verifier.logger.Debug().Int("eventsCount", eventsRead).Msgf("Received a batch of events.") - err := verifier.HandleChangeStreamEvents(ctx, changeEventBatch) - if err != nil { - return false, errors.Wrap(err, "failed to handle change events") - } + err := verifier.persistChangeStreamResumeToken(ctx, cs) + if err == nil { + lastPersistedTime = time.Now() } - return eventsRead > 0, errors.Wrap(cs.Err(), "change stream iteration failed") + return err } for { var err error - var changeStreamEnded bool + var gotwritesOffTimestamp bool select { @@ -189,29 +209,45 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha // If the changeStreamEnderChan has a message, the user has indicated that // source writes are ended. This means we should exit rather than continue // reading the change stream since there should be no more events. - case <-verifier.changeStreamEnderChan: + case writesOffTs := <-verifier.changeStreamWritesOffTsChan: verifier.logger.Debug(). - Msg("Change stream thread received shutdown request.") + Interface("writesOffTimestamp", writesOffTs). + Msg("Change stream thread received writesOff timestamp. Finalizing change stream.") - changeStreamEnded = true + gotwritesOffTimestamp = true // Read all change events until the source reports no events. // (i.e., the `getMore` call returns empty) for { - var gotEvent bool - gotEvent, err = readAndHandleOneChangeEventBatch() + var curTs primitive.Timestamp + curTs, err = extractTimestampFromResumeToken(cs.ResumeToken()) + if err != nil { + err = errors.Wrap(err, "failed to extract timestamp from change stream's resume token") + break + } + + if curTs == writesOffTs || curTs.After(writesOffTs) { + verifier.logger.Debug(). + Interface("currentTimestamp", curTs). + Interface("writesOffTimestamp", writesOffTs). + Msg("Change stream has reached the writesOff timestamp. Shutting down.") - if !gotEvent || err != nil { + break + } + + err = verifier.readAndHandleOneChangeEventBatch(ctx, cs) + + if err != nil { break } } default: - _, err = readAndHandleOneChangeEventBatch() - } + err = verifier.readAndHandleOneChangeEventBatch(ctx, cs) - if err == nil { - err = persistResumeTokenIfNeeded() + if err == nil { + err = persistResumeTokenIfNeeded() + } } if err != nil && !errors.Is(err, context.Canceled) { @@ -221,12 +257,12 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha verifier.changeStreamErrChan <- err - if !changeStreamEnded { + if !gotwritesOffTimestamp { break } } - if changeStreamEnded { + if gotwritesOffTimestamp { verifier.mux.Lock() verifier.changeStreamRunning = false if verifier.lastChangeEventTime != nil { @@ -242,9 +278,9 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha infoLog := verifier.logger.Info() if verifier.lastChangeEventTime == nil { - infoLog = infoLog.Str("changeStreamStopTime", "none") + infoLog = infoLog.Str("lastEventTime", "none") } else { - infoLog = infoLog.Interface("changeStreamStopTime", *verifier.lastChangeEventTime) + infoLog = infoLog.Interface("lastEventTime", *verifier.lastChangeEventTime) } infoLog.Msg("Change stream is done.") @@ -270,7 +306,7 @@ func (verifier *Verifier) StartChangeStream(ctx context.Context) error { ts, err := extractTimestampFromResumeToken(savedResumeToken) if err == nil { - logEvent = addUnixTimeToLogEvent(ts.T, logEvent) + logEvent = addTimestampToLogEvent(ts, logEvent) } else { verifier.logger.Warn(). Err(err). @@ -323,8 +359,10 @@ func (verifier *Verifier) StartChangeStream(ctx context.Context) error { return nil } -func addUnixTimeToLogEvent[T constraints.Integer](unixTime T, event *zerolog.Event) *zerolog.Event { - return event.Time("timestampTime", time.Unix(int64(unixTime), int64(0))) +func addTimestampToLogEvent(ts primitive.Timestamp, event *zerolog.Event) *zerolog.Event { + return event. + Interface("timestamp", ts). + Time("time", time.Unix(int64(ts.T), int64(0))) } func (v *Verifier) getChangeStreamMetadataCollection() *mongo.Collection { @@ -363,7 +401,7 @@ func (verifier *Verifier) persistChangeStreamResumeToken(ctx context.Context, cs logEvent := verifier.logger.Debug() if err == nil { - logEvent = addUnixTimeToLogEvent(ts.T, logEvent) + logEvent = addTimestampToLogEvent(ts, logEvent) } else { verifier.logger.Warn().Err(err). Msg("failed to extract resume token timestamp") diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index 89bc7877..ce00be68 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/pkg/errors" + "github.com/samber/lo" "github.com/stretchr/testify/require" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" @@ -141,16 +142,12 @@ func (suite *IntegrationTestSuite) TestStartAtTimeNoChanges() { err = verifier.StartChangeStream(ctx) suite.Require().NoError(err) suite.Require().Equal(verifier.srcStartAtTs, origStartTs) - verifier.changeStreamEnderChan <- struct{}{} + verifier.changeStreamWritesOffTsChan <- *origStartTs <-verifier.changeStreamDoneChan suite.Require().Equal(verifier.srcStartAtTs, origStartTs) } func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() { - if suite.GetSrcTopology() == TopologySharded { - suite.T().Skip("Skipping pending REP-5299.") - } - verifier := suite.BuildVerifier() ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -187,13 +184,13 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() { "session time after events should exceed the original", ) - verifier.changeStreamEnderChan <- struct{}{} + verifier.changeStreamWritesOffTsChan <- *postEventsSessionTime <-verifier.changeStreamDoneChan - suite.Assert().GreaterOrEqual( - verifier.srcStartAtTs.Compare(*postEventsSessionTime), - 0, - "verifier.srcStartAtTs should now meet or exceed our session timestamp", + suite.Assert().Equal( + *postEventsSessionTime, + *verifier.srcStartAtTs, + "verifier.srcStartAtTs should now be our session timestamp", ) } @@ -243,3 +240,54 @@ func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() { "the verifier should flush a recheck doc after a batch", ) } + +func (suite *IntegrationTestSuite) TestEventBeforeWritesOff() { + ctx := suite.Context() + + verifier := suite.BuildVerifier() + + db := suite.srcMongoClient.Database(suite.DBNameForTest()) + coll := db.Collection("mycoll") + suite.Require().NoError( + db.CreateCollection(ctx, coll.Name()), + ) + + // start verifier + verifierRunner := RunVerifierCheck(suite.Context(), suite.T(), verifier) + + // wait for generation 0 to end + verifierRunner.AwaitGenerationEnd() + + docsCount := 10_000 + docs := lo.RepeatBy(docsCount, func(_ int) bson.D { return bson.D{} }) + _, err := coll.InsertMany( + ctx, + lo.ToAnySlice(docs), + ) + suite.Require().NoError(err) + + suite.Require().NoError(verifier.WritesOff(ctx)) + + suite.Require().NoError(verifierRunner.Await()) + + generation := verifier.generation + failedTasks, incompleteTasks, err := FetchFailedAndIncompleteTasks( + ctx, + verifier.verificationTaskCollection(), + verificationTaskVerifyDocuments, + generation, + ) + suite.Require().NoError(err) + + suite.Require().Empty(incompleteTasks, "all tasks should be finished") + + totalFailed := lo.Reduce( + failedTasks, + func(sofar int, task VerificationTask, _ int) int { + return sofar + len(task.Ids) + }, + 0, + ) + + suite.Assert().Equal(docsCount, totalFailed, "all source docs should be missing") +} diff --git a/internal/verifier/check.go b/internal/verifier/check.go index 5a60bfb5..0ae007c5 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -25,8 +25,6 @@ var failedStatus = mapset.NewSet( verificationTaskMetadataMismatch, ) -var verificationStatusCheckInterval time.Duration = 15 * time.Second - // Check is the asynchronous entry point to Check, should only be called by the web server. Use // CheckDriver directly for synchronous run. // testChan is a pair of channels for coordinating generations in tests. @@ -42,30 +40,27 @@ func (verifier *Verifier) Check(ctx context.Context, filter map[string]any) { verifier.MaybeStartPeriodicHeapProfileCollection(ctx) } -func (verifier *Verifier) waitForChangeStream() error { - verifier.mux.RLock() - csRunning := verifier.changeStreamRunning - verifier.mux.RUnlock() - if csRunning { - verifier.logger.Debug().Msg("Changestream still running, signalling that writes are done and waiting for change stream to exit") - verifier.changeStreamEnderChan <- struct{}{} - select { - case err := <-verifier.changeStreamErrChan: - verifier.logger.Warn().Err(err). - Msg("Received error from change stream.") - return err - case <-verifier.changeStreamDoneChan: - verifier.logger.Debug(). - Msg("Received completion signal from change stream.") - break - } +func (verifier *Verifier) waitForChangeStream(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-verifier.changeStreamErrChan: + verifier.logger.Warn().Err(err). + Msg("Received error from change stream.") + return err + case <-verifier.changeStreamDoneChan: + verifier.logger.Debug(). + Msg("Received completion signal from change stream.") + break } + return nil } func (verifier *Verifier) CheckWorker(ctx context.Context) error { verifier.logger.Debug().Msgf("Starting %d verification workers", verifier.numWorkers) ctx, cancel := context.WithCancel(ctx) + wg := sync.WaitGroup{} for i := 0; i < verifier.numWorkers; i++ { wg.Add(1) @@ -113,7 +108,7 @@ func (verifier *Verifier) CheckWorker(ctx context.Context) error { //wait for task to be created, if none of the tasks found. if verificationStatus.AddedTasks > 0 || verificationStatus.ProcessingTasks > 0 || verificationStatus.RecheckTasks > 0 { waitForTaskCreation++ - time.Sleep(verificationStatusCheckInterval) + time.Sleep(verifier.verificationStatusCheckInterval) } else { verifier.PrintVerificationSummary(ctx, GenerationComplete) verifier.logger.Debug().Msg("Verification tasks complete") @@ -174,7 +169,9 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any verifier.mux.RLock() csRunning := verifier.changeStreamRunning verifier.mux.RUnlock() - if !csRunning { + if csRunning { + verifier.logger.Debug().Msg("Check: Change stream already running.") + } else { verifier.logger.Debug().Msg("Change stream not running; starting change stream") err = verifier.StartChangeStream(ctx) @@ -242,7 +239,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any // It's necessary to wait for the change stream to finish before incrementing the // generation number, or the last changes will not be checked. verifier.mux.Unlock() - err := verifier.waitForChangeStream() + err := verifier.waitForChangeStream(ctx) if err != nil { return err } @@ -388,12 +385,15 @@ func (verifier *Verifier) Work(ctx context.Context, workerNum int, wg *sync.Wait if errors.Is(err, mongo.ErrNoDocuments) { duration := verifier.workerSleepDelayMillis * time.Millisecond - verifier.logger.Debug(). - Int("workerNum", workerNum). - Stringer("duration", duration). - Msg("No tasks found. Sleeping.") + if duration > 0 { + verifier.logger.Debug(). + Int("workerNum", workerNum). + Stringer("duration", duration). + Msg("No tasks found. Sleeping.") + + time.Sleep(duration) + } - time.Sleep(duration) continue } else if err != nil { panic(err) diff --git a/internal/verifier/check_runner.go b/internal/verifier/check_runner.go new file mode 100644 index 00000000..fe48fd26 --- /dev/null +++ b/internal/verifier/check_runner.go @@ -0,0 +1,55 @@ +package verifier + +import ( + "context" + "testing" +) + +type CheckRunner struct { + checkDoneChan chan error + generationDoneChan chan struct{} + doNextGenerationChan chan struct{} +} + +func RunVerifierCheck(ctx context.Context, t *testing.T, verifier *Verifier) *CheckRunner { + verifierDoneChan := make(chan error) + + generationDoneChan := make(chan struct{}) + doNextGenerationChan := make(chan struct{}) + + go func() { + err := verifier.CheckDriver(ctx, nil, generationDoneChan, doNextGenerationChan) + verifierDoneChan <- err + }() + + return &CheckRunner{ + checkDoneChan: verifierDoneChan, + generationDoneChan: generationDoneChan, + doNextGenerationChan: doNextGenerationChan, + } +} + +// AwaitGenerationEnd blocks until the check’s current generation ends. +func (cr *CheckRunner) AwaitGenerationEnd() { + <-cr.generationDoneChan +} + +// StartNextGeneration blocks until it can tell the check to start +// the next generation. +func (cr *CheckRunner) StartNextGeneration() { + cr.doNextGenerationChan <- struct{}{} +} + +// Await will await generations and start new ones until the check +// finishes. It returns the error that verifier.CheckDriver() returns. +func (cr *CheckRunner) Await() error { + for { + select { + case err := <-cr.checkDoneChan: + return err + + case <-cr.generationDoneChan: + case cr.doNextGenerationChan <- struct{}{}: + } + } +} diff --git a/internal/verifier/clustertime.go b/internal/verifier/clustertime.go new file mode 100644 index 00000000..30a2c579 --- /dev/null +++ b/internal/verifier/clustertime.go @@ -0,0 +1,165 @@ +package verifier + +import ( + "context" + + "github.com/10gen/migration-verifier/internal/logger" + "github.com/10gen/migration-verifier/internal/retry" + "github.com/10gen/migration-verifier/internal/util" + "github.com/10gen/migration-verifier/mbson" + "github.com/pkg/errors" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/mongo/writeconcern" +) + +const opTimeKeyInServerResponse = "operationTime" + +// GetNewClusterTime advances the cluster time and returns that time. +// All shards’ cluster times will meet or exceed the returned time. +func GetNewClusterTime( + ctx context.Context, + logger *logger.Logger, + client *mongo.Client, +) (primitive.Timestamp, error) { + retryer := retry.New(retry.DefaultDurationLimit) + + var clusterTime primitive.Timestamp + + // First we just fetch the latest cluster time among all shards without + // updating any shards’ oplogs. + err := retryer.RunForTransientErrorsOnly( + ctx, + logger, + func(_ *retry.Info) error { + var err error + clusterTime, err = fetchClusterTime(ctx, client) + return err + }, + ) + + if err != nil { + return primitive.Timestamp{}, err + } + + // fetchClusterTime() will have taught the mongos about the most current + // shard’s cluster time. Now we tell that mongos to update all lagging + // shards to that time. + err = retryer.RunForTransientErrorsOnly( + ctx, + logger, + func(_ *retry.Info) error { + var err error + _, err = syncClusterTimeAcrossShards(ctx, client, clusterTime) + return err + }, + ) + if err != nil { + // This isn't serious enough even for info-level. + logger.Debug().Err(err). + Msg("Failed to append oplog note; change stream may need extra time to finish.") + } + + return clusterTime, nil +} + +// Use this when we just need the correct cluster time without +// actually changing any shards’ oplogs. +func fetchClusterTime( + ctx context.Context, + client *mongo.Client, +) (primitive.Timestamp, error) { + cmd, rawResponse, err := runAppendOplogNote( + ctx, + client, + "expect StaleClusterTime error", + primitive.Timestamp{1, 0}, + ) + + // We expect an error here; if we didn't get one then something is + // amiss on the server. + if err == nil { + return primitive.Timestamp{}, errors.Errorf("server request unexpectedly succeeded: %v", cmd) + } + + if !util.IsStaleClusterTimeError(err) { + return primitive.Timestamp{}, errors.Wrap( + err, + "unexpected error (expected StaleClusterTime) from request", + ) + } + + return getOpTimeFromRawResponse(rawResponse) +} + +func syncClusterTimeAcrossShards( + ctx context.Context, + client *mongo.Client, + maxTime primitive.Timestamp, +) (primitive.Timestamp, error) { + _, rawResponse, err := runAppendOplogNote( + ctx, + client, + "syncing cluster time", + maxTime, + ) + + // If any shard’s cluster time >= maxTime, the mongos will return a + // StaleClusterTime error. This particular error doesn’t indicate a + // failure, so we ignore it. + if err != nil && !util.IsStaleClusterTimeError(err) { + return primitive.Timestamp{}, errors.Wrap( + err, + "failed to append note to oplog", + ) + } + + return getOpTimeFromRawResponse(rawResponse) +} + +func runAppendOplogNote( + ctx context.Context, + client *mongo.Client, + note string, + maxClusterTime primitive.Timestamp, +) (bson.D, bson.Raw, error) { + cmd := bson.D{ + {"appendOplogNote", 1}, + {"maxClusterTime", maxClusterTime}, + {"data", bson.D{ + {"migration-verifier", note}, + }}, + } + + resp := client. + Database( + "admin", + options.Database().SetWriteConcern(writeconcern.Majority()), + ). + RunCommand(ctx, cmd) + + raw, err := resp.Raw() + + return cmd, raw, errors.Wrapf( + err, + "command (%v) failed unexpectedly", + cmd, + ) +} + +func getOpTimeFromRawResponse(rawResponse bson.Raw) (primitive.Timestamp, error) { + // Get the `operationTime` from the response and return it. + var optime primitive.Timestamp + + found, err := mbson.RawLookup(rawResponse, &optime, opTimeKeyInServerResponse) + if err != nil { + return primitive.Timestamp{}, errors.Errorf("failed to read server response (%s)", rawResponse) + } + if !found { + return primitive.Timestamp{}, errors.Errorf("server response (%s) lacks %#q", rawResponse, opTimeKeyInServerResponse) + } + + return optime, nil +} diff --git a/internal/verifier/clustertime_test.go b/internal/verifier/clustertime_test.go new file mode 100644 index 00000000..6d2e5acb --- /dev/null +++ b/internal/verifier/clustertime_test.go @@ -0,0 +1,13 @@ +package verifier + +import "context" + +func (suite *IntegrationTestSuite) TestGetClusterTime() { + ctx := context.Background() + logger, _ := getLoggerAndWriter("stdout") + + ts, err := GetNewClusterTime(ctx, logger, suite.srcMongoClient) + suite.Require().NoError(err) + + suite.Assert().NotZero(ts, "timestamp should be nonzero") +} diff --git a/internal/verifier/integration_test_suite.go b/internal/verifier/integration_test_suite.go index d3a112c8..33e3a336 100644 --- a/internal/verifier/integration_test_suite.go +++ b/internal/verifier/integration_test_suite.go @@ -3,6 +3,7 @@ package verifier import ( "context" "strings" + "time" mapset "github.com/deckarep/golang-set/v2" "github.com/pkg/errors" @@ -11,6 +12,7 @@ import ( "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/mongo/readconcern" "go.mongodb.org/mongo-driver/mongo/writeconcern" ) @@ -47,17 +49,23 @@ func (suite *IntegrationTestSuite) Context() context.Context { func (suite *IntegrationTestSuite) SetupSuite() { ctx := context.Background() - clientOpts := options.Client().ApplyURI(suite.srcConnStr).SetAppName("Verifier Test Suite").SetWriteConcern(writeconcern.Majority()) + clientOpts := options.Client().ApplyURI(suite.srcConnStr).SetAppName("Verifier Test Suite"). + SetWriteConcern(writeconcern.Majority()). + SetReadConcern(readconcern.Majority()) var err error suite.srcMongoClient, err = mongo.Connect(ctx, clientOpts) suite.Require().NoError(err) - clientOpts = options.Client().ApplyURI(suite.dstConnStr).SetAppName("Verifier Test Suite").SetWriteConcern(writeconcern.Majority()) + clientOpts = options.Client().ApplyURI(suite.dstConnStr).SetAppName("Verifier Test Suite"). + SetWriteConcern(writeconcern.Majority()). + SetReadConcern(readconcern.Majority()) suite.dstMongoClient, err = mongo.Connect(ctx, clientOpts) suite.Require().NoError(err) - clientOpts = options.Client().ApplyURI(suite.metaConnStr).SetAppName("Verifier Test Suite") + clientOpts = options.Client().ApplyURI(suite.metaConnStr).SetAppName("Verifier Test Suite"). + SetWriteConcern(writeconcern.Majority()). + SetReadConcern(readconcern.Majority()) suite.metaMongoClient, err = mongo.Connect(ctx, clientOpts) suite.Require().NoError(err) @@ -94,6 +102,19 @@ func (suite *IntegrationTestSuite) SetupTest() { dbname, ) + for _, client := range []*mongo.Client{suite.srcMongoClient, suite.dstMongoClient} { + dbNames, err := client.ListDatabaseNames(ctx, bson.D{}) + suite.Require().NoError(err, "should list database names") + for _, dbName := range dbNames { + if strings.HasPrefix(dbName, suite.DBNameForTest()) { + suite.T().Logf("Dropping database %#q because it seems to be left over from an earlier run of this test.", dbName) + suite.Require().NoError(client.Database(dbName).Drop(ctx)) + } + + suite.initialDbNames.Add(dbName) + } + } + suite.testContext, suite.contextCanceller = ctx, canceller } @@ -146,6 +167,8 @@ func (suite *IntegrationTestSuite) BuildVerifier() *Verifier { verifier.SetGenerationPauseDelayMillis(0) verifier.SetWorkerSleepDelayMillis(0) + verifier.verificationStatusCheckInterval = 10 * time.Millisecond + ctx := suite.Context() suite.Require().NoError( diff --git a/internal/verifier/logging_setup.go b/internal/verifier/logging_setup.go index 051840c0..7f4a3ac7 100644 --- a/internal/verifier/logging_setup.go +++ b/internal/verifier/logging_setup.go @@ -29,3 +29,15 @@ func getLogWriter(logPath string) io.Writer { return zerolog.SyncWriter(writer) } + +func getLoggerAndWriter(logPath string) (*logger.Logger, io.Writer) { + writer := getLogWriter(logPath) + + consoleWriter := zerolog.ConsoleWriter{ + Out: writer, + TimeFormat: timeFormat, + } + + l := zerolog.New(consoleWriter).With().Timestamp().Logger() + return logger.NewLogger(&l, writer), writer +} diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index 809e4fe3..c37eea7d 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -24,7 +24,6 @@ import ( "github.com/10gen/migration-verifier/internal/uuidutil" "github.com/olekukonko/tablewriter" "github.com/pkg/errors" - "github.com/rs/zerolog" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" @@ -49,7 +48,6 @@ const ( SrcNamespaceField = "query_filter.namespace" DstNamespaceField = "query_filter.to" NumWorkers = 10 - refetch = "TODO_CHANGE_ME_REFETCH" Idle = "idle" Check = "check" Recheck = "recheck" @@ -123,12 +121,13 @@ type Verifier struct { metaDBName string srcStartAtTs *primitive.Timestamp - mux sync.RWMutex - changeStreamRunning bool - changeStreamEnderChan chan struct{} - changeStreamErrChan chan error - changeStreamDoneChan chan struct{} - lastChangeEventTime *primitive.Timestamp + mux sync.RWMutex + changeStreamRunning bool + changeStreamWritesOffTsChan chan primitive.Timestamp + changeStreamErrChan chan error + changeStreamDoneChan chan struct{} + lastChangeEventTime *primitive.Timestamp + writesOffTimestamp *primitive.Timestamp readConcernSetting ReadConcernSetting @@ -138,6 +137,8 @@ type Verifier struct { globalFilter map[string]any pprofInterval time.Duration + + verificationStatusCheckInterval time.Duration } // VerificationStatus holds the Verification Status @@ -188,19 +189,21 @@ func NewVerifier(settings VerifierSettings) *Verifier { } return &Verifier{ - phase: Idle, - numWorkers: NumWorkers, - readPreference: readpref.Primary(), - partitionSizeInBytes: 400 * 1024 * 1024, - failureDisplaySize: DefaultFailureDisplaySize, - changeStreamEnderChan: make(chan struct{}), - changeStreamErrChan: make(chan error), - changeStreamDoneChan: make(chan struct{}), - readConcernSetting: readConcern, + phase: Idle, + numWorkers: NumWorkers, + readPreference: readpref.Primary(), + partitionSizeInBytes: 400 * 1024 * 1024, + failureDisplaySize: DefaultFailureDisplaySize, + changeStreamWritesOffTsChan: make(chan primitive.Timestamp), + changeStreamErrChan: make(chan error), + changeStreamDoneChan: make(chan struct{}), + readConcernSetting: readConcern, // This will get recreated once gen0 starts, but we want it // here in case the change streams gets an event before then. eventRecorder: NewEventRecorder(), + + verificationStatusCheckInterval: 15 * time.Second, } } @@ -228,13 +231,39 @@ func (verifier *Verifier) SetFailureDisplaySize(size int64) { verifier.failureDisplaySize = size } -func (verifier *Verifier) WritesOff(ctx context.Context) { +func (verifier *Verifier) WritesOff(ctx context.Context) error { verifier.logger.Debug(). Msg("WritesOff called.") verifier.mux.Lock() verifier.writesOff = true - verifier.mux.Unlock() + + if verifier.writesOffTimestamp == nil { + verifier.logger.Debug().Msg("Change stream still running. Signalling that writes are done.") + + finalTs, err := GetNewClusterTime( + ctx, + verifier.logger, + verifier.srcClient, + ) + + if err != nil { + return errors.Wrapf(err, "failed to fetch source's cluster time") + } + + verifier.writesOffTimestamp = &finalTs + + verifier.mux.Unlock() + + // This has to happen under the lock because the change stream + // might be inserting docs into the recheck queue, which happens + // under the lock. + verifier.changeStreamWritesOffTsChan <- finalTs + } else { + verifier.mux.Unlock() + } + + return nil } func (verifier *Verifier) WritesOn(ctx context.Context) { @@ -315,15 +344,7 @@ func (verifier *Verifier) SetPartitionSizeMB(partitionSizeMB uint32) { } func (verifier *Verifier) SetLogger(logPath string) { - writer := getLogWriter(logPath) - verifier.writer = writer - - consoleWriter := zerolog.ConsoleWriter{ - Out: writer, - TimeFormat: timeFormat, - } - l := zerolog.New(consoleWriter).With().Timestamp().Logger() - verifier.logger = logger.NewLogger(&l, writer) + verifier.logger, verifier.writer = getLoggerAndWriter(logPath) } func (verifier *Verifier) SetSrcNamespaces(arg []string) { @@ -1172,10 +1193,6 @@ func (verifier *Verifier) verificationTaskCollection() *mongo.Collection { return verifier.verificationDatabase().Collection(verificationTasksCollection) } -func (verifier *Verifier) refetchCollection() *mongo.Collection { - return verifier.verificationDatabase().Collection(refetch) -} - func (verifier *Verifier) srcClientDatabase(dbName string) *mongo.Database { db := verifier.srcClient.Database(dbName) // No need to check the write concern because we do not write to the source database. diff --git a/internal/verifier/migration_verifier_bench_test.go b/internal/verifier/migration_verifier_bench_test.go index 900c260f..9732ded4 100644 --- a/internal/verifier/migration_verifier_bench_test.go +++ b/internal/verifier/migration_verifier_bench_test.go @@ -81,10 +81,6 @@ func BenchmarkGeneric(t *testing.B) { if err != nil { t.Fatal(err) } - err = verifier.refetchCollection().Drop(context.Background()) - if err != nil { - t.Fatal(err) - } println("Starting tasks") for _, namespace := range namespaces { diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index ff125fea..d6e9d8e5 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -14,12 +14,12 @@ import ( "regexp" "sort" "testing" + "time" "github.com/10gen/migration-verifier/internal/partitions" "github.com/10gen/migration-verifier/internal/testutil" "github.com/cespare/permute/v2" "github.com/rs/zerolog" - "github.com/rs/zerolog/log" "github.com/samber/lo" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -28,7 +28,6 @@ import ( "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" - "golang.org/x/sync/errgroup" ) func TestIntegration(t *testing.T) { @@ -1152,10 +1151,6 @@ func TestVerifierCompareIndexSpecs(t *testing.T) { } func (suite *IntegrationTestSuite) TestVerifierNamespaceList() { - if suite.GetSrcTopology() == TopologySharded { - suite.T().Skip("Skipping pending REP-5299.") - } - verifier := suite.BuildVerifier() ctx := suite.Context() @@ -1202,8 +1197,12 @@ func (suite *IntegrationTestSuite) TestVerifierNamespaceList() { suite.Require().NoError(err) err = suite.dstMongoClient.Database("testDb4").CreateCollection(ctx, "testColl6") suite.Require().NoError(err) - err = suite.dstMongoClient.Database("local").CreateCollection(ctx, "testColl7") - suite.Require().NoError(err) + + if suite.GetSrcTopology() != TopologySharded { + err = suite.dstMongoClient.Database("local").CreateCollection(ctx, "testColl7") + suite.Require().NoError(err) + } + err = suite.dstMongoClient.Database("mongosync_reserved_for_internal_use").CreateCollection(ctx, "globalState") suite.Require().NoError(err) err = suite.dstMongoClient.Database("mongosync_reserved_for_verification_src_metadata").CreateCollection(ctx, "auditor") @@ -1240,10 +1239,13 @@ func (suite *IntegrationTestSuite) TestVerifierNamespaceList() { suite.ElementsMatch([]string{"testDb1.testColl1", "testDb1.testColl2", "testDb1.testView1"}, verifier.dstNamespaces) // Collections in admin, config, and local should not be found - err = suite.srcMongoClient.Database("local").CreateCollection(ctx, "islocalSrc") - suite.Require().NoError(err) - err = suite.dstMongoClient.Database("local").CreateCollection(ctx, "islocalDest") - suite.Require().NoError(err) + if suite.GetSrcTopology() != TopologySharded { + err = suite.srcMongoClient.Database("local").CreateCollection(ctx, "islocalSrc") + suite.Require().NoError(err) + err = suite.dstMongoClient.Database("local").CreateCollection(ctx, "islocalDest") + suite.Require().NoError(err) + } + err = suite.srcMongoClient.Database("admin").CreateCollection(ctx, "isAdminSrc") suite.Require().NoError(err) err = suite.dstMongoClient.Database("admin").CreateCollection(ctx, "isAdminDest") @@ -1282,17 +1284,13 @@ func (suite *IntegrationTestSuite) TestVerificationStatus() { } func (suite *IntegrationTestSuite) TestGenerationalRechecking() { - if suite.GetSrcTopology() == TopologySharded { - suite.T().Skip("Skipping pending REP-5299.") - } - zerolog.SetGlobalLevel(zerolog.DebugLevel) verifier := suite.BuildVerifier() verifier.SetSrcNamespaces([]string{"testDb1.testColl1"}) verifier.SetDstNamespaces([]string{"testDb2.testColl3"}) verifier.SetNamespaceMap() - ctx := context.Background() + ctx := suite.Context() srcColl := suite.srcMongoClient.Database("testDb1").Collection("testColl1") dstColl := suite.dstMongoClient.Database("testDb2").Collection("testColl3") @@ -1303,27 +1301,20 @@ func (suite *IntegrationTestSuite) TestGenerationalRechecking() { _, err = dstColl.InsertOne(ctx, bson.M{"_id": 1, "x": 42}) suite.Require().NoError(err) - checkDoneChan := make(chan struct{}) - checkContinueChan := make(chan struct{}) - - errGroup, errGrpCtx := errgroup.WithContext(context.Background()) - errGroup.Go(func() error { - checkDriverErr := verifier.CheckDriver(errGrpCtx, nil, checkDoneChan, checkContinueChan) - // Log this as fatal error so that the test doesn't hang. - if checkDriverErr != nil { - log.Fatal().Err(checkDriverErr).Msg("check driver error") - } - return checkDriverErr - }) + runner := RunVerifierCheck(ctx, suite.T(), verifier) waitForTasks := func() *VerificationStatus { status, err := verifier.GetVerificationStatus() suite.Require().NoError(err) - for status.TotalTasks == 0 && verifier.generation < 10 { - suite.T().Logf("TotalTasks is 0 (generation=%d); waiting another generation …", verifier.generation) - checkContinueChan <- struct{}{} - <-checkDoneChan + for status.TotalTasks == 0 && verifier.generation < 50 { + delay := time.Second + + suite.T().Logf("TotalTasks is 0 (generation=%d); waiting %s then will run another generation …", verifier.generation, delay) + + time.Sleep(delay) + runner.StartNextGeneration() + runner.AwaitGenerationEnd() status, err = verifier.GetVerificationStatus() suite.Require().NoError(err) } @@ -1331,7 +1322,7 @@ func (suite *IntegrationTestSuite) TestGenerationalRechecking() { } // wait for one generation to finish - <-checkDoneChan + runner.AwaitGenerationEnd() status := waitForTasks() suite.Require().Equal(VerificationStatus{TotalTasks: 2, FailedTasks: 1, CompletedTasks: 1}, *status) @@ -1340,10 +1331,10 @@ func (suite *IntegrationTestSuite) TestGenerationalRechecking() { suite.Require().NoError(err) // tell check to start the next generation - checkContinueChan <- struct{}{} + runner.StartNextGeneration() // wait for generation to finish - <-checkDoneChan + runner.AwaitGenerationEnd() status = waitForTasks() // there should be no failures now, since they are are equivalent at this point in time suite.Require().Equal(VerificationStatus{TotalTasks: 1, CompletedTasks: 1}, *status) @@ -1353,10 +1344,10 @@ func (suite *IntegrationTestSuite) TestGenerationalRechecking() { suite.Require().NoError(err) // tell check to start the next generation - checkContinueChan <- struct{}{} + runner.StartNextGeneration() // wait for one generation to finish - <-checkDoneChan + runner.AwaitGenerationEnd() status = waitForTasks() // there should be a failure from the src insert @@ -1367,43 +1358,32 @@ func (suite *IntegrationTestSuite) TestGenerationalRechecking() { suite.Require().NoError(err) // continue - checkContinueChan <- struct{}{} + runner.StartNextGeneration() // wait for it to finish again, this should be a clean run - <-checkDoneChan + runner.AwaitGenerationEnd() status = waitForTasks() // there should be no failures now, since they are are equivalent at this point in time - suite.Require().Equal(VerificationStatus{TotalTasks: 1, CompletedTasks: 1}, *status) - - // turn writes off - verifier.WritesOff(ctx) - _, err = srcColl.InsertOne(ctx, bson.M{"_id": 1019, "x": 1019}) - suite.Require().NoError(err) - checkContinueChan <- struct{}{} - <-checkDoneChan - // now write to the source, this should not be seen by the change stream which should have ended - // because of the calls to WritesOff - status, err = verifier.GetVerificationStatus() - suite.Require().NoError(err) - // there should be a failure from the src insert - suite.Require().Equal(VerificationStatus{TotalTasks: 1, FailedTasks: 1}, *status) + suite.Assert().Equal(VerificationStatus{TotalTasks: 1, CompletedTasks: 1}, *status) - checkContinueChan <- struct{}{} - require.NoError(suite.T(), errGroup.Wait()) + // We could just abandon this verifier, but we might as well shut it down + // gracefully. That prevents a spurious error in the log from “drop” + // change events. + suite.Require().NoError(verifier.WritesOff(ctx)) + suite.Require().NoError(runner.Await()) } func (suite *IntegrationTestSuite) TestVerifierWithFilter() { - if suite.GetSrcTopology() == TopologySharded { - suite.T().Skip("Skipping pending REP-5299.") - } - zerolog.SetGlobalLevel(zerolog.DebugLevel) - filter := map[string]any{"inFilter": map[string]any{"$ne": false}} + dbname1 := suite.DBNameForTest("1") + dbname2 := suite.DBNameForTest("2") + + filter := bson.M{"inFilter": bson.M{"$ne": false}} verifier := suite.BuildVerifier() - verifier.SetSrcNamespaces([]string{"testDb1.testColl1"}) - verifier.SetDstNamespaces([]string{"testDb2.testColl3"}) + verifier.SetSrcNamespaces([]string{dbname1 + ".testColl1"}) + verifier.SetDstNamespaces([]string{dbname2 + ".testColl3"}) verifier.SetNamespaceMap() verifier.SetIgnoreBSONFieldOrder(true) // Set this value low to test the verifier with multiple partitions. @@ -1411,8 +1391,8 @@ func (suite *IntegrationTestSuite) TestVerifierWithFilter() { ctx := suite.Context() - srcColl := suite.srcMongoClient.Database("testDb1").Collection("testColl1") - dstColl := suite.dstMongoClient.Database("testDb2").Collection("testColl3") + srcColl := suite.srcMongoClient.Database(dbname1).Collection("testColl1") + dstColl := suite.dstMongoClient.Database(dbname2).Collection("testColl3") // Documents with _id in [0, 100) should match. var docs []interface{} @@ -1443,8 +1423,12 @@ func (suite *IntegrationTestSuite) TestVerifierWithFilter() { status, err := verifier.GetVerificationStatus() suite.Require().NoError(err) - for status.TotalTasks == 0 && verifier.generation < 10 { - suite.T().Logf("TotalTasks is 0 (generation=%d); waiting another generation …", verifier.generation) + for status.TotalTasks == 0 && verifier.generation < 50 { + delay := time.Second + + suite.T().Logf("TotalTasks is 0 (generation=%d); waiting %s then will run another generation …", verifier.generation, delay) + + time.Sleep(delay) checkContinueChan <- struct{}{} <-checkDoneChan status, err = verifier.GetVerificationStatus() @@ -1461,6 +1445,7 @@ func (suite *IntegrationTestSuite) TestVerifierWithFilter() { suite.Require().Equal(status.FailedTasks, 0) // Insert another document that is not in the filter. + // This should trigger a recheck despite being outside the filter. _, err = srcColl.InsertOne(ctx, bson.M{"_id": 200, "x": 200, "inFilter": false}) suite.Require().NoError(err) @@ -1469,11 +1454,13 @@ func (suite *IntegrationTestSuite) TestVerifierWithFilter() { // Wait for generation to finish. <-checkDoneChan + status = waitForTasks() + // There should be no failures, since the inserted document is not in the filter. suite.Require().Equal(VerificationStatus{TotalTasks: 1, CompletedTasks: 1}, *status) - // Now insert in the source, this should come up next generation. + // Now insert in the source. This should come up next generation. _, err = srcColl.InsertOne(ctx, bson.M{"_id": 201, "x": 201, "inFilter": true}) suite.Require().NoError(err) @@ -1502,7 +1489,7 @@ func (suite *IntegrationTestSuite) TestVerifierWithFilter() { suite.Require().Equal(VerificationStatus{TotalTasks: 1, CompletedTasks: 1}, *status) // Turn writes off. - verifier.WritesOff(ctx) + suite.Require().NoError(verifier.WritesOff(ctx)) // Tell CheckDriver to do one more pass. This should terminate the change stream. checkContinueChan <- struct{}{} diff --git a/internal/verifier/web_server.go b/internal/verifier/web_server.go index 9f3925f0..0c15f8be 100644 --- a/internal/verifier/web_server.go +++ b/internal/verifier/web_server.go @@ -22,7 +22,7 @@ const RequestInProgressErrorDescription = "Another request is currently in progr // MigrationVerifierAPI represents the interaction webserver with mongosync type MigrationVerifierAPI interface { Check(ctx context.Context, filter map[string]any) - WritesOff(ctx context.Context) + WritesOff(ctx context.Context) error WritesOn(ctx context.Context) GetProgress(ctx context.Context) (Progress, error) } @@ -214,7 +214,11 @@ func (server *WebServer) writesOffEndpoint(c *gin.Context) { return } - server.Mapi.WritesOff(context.Background()) + err := server.Mapi.WritesOff(context.Background()) + if err != nil { + c.JSON(http.StatusOK, gin.H{"error": err.Error()}) + return + } successResponse(c) } diff --git a/internal/verifier/web_server_test.go b/internal/verifier/web_server_test.go index efa586f6..c7c8b476 100644 --- a/internal/verifier/web_server_test.go +++ b/internal/verifier/web_server_test.go @@ -30,8 +30,8 @@ func NewMockVerifier() *MockVerifier { func (verifier *MockVerifier) Check(ctx context.Context, filter map[string]any) { verifier.filter = filter } -func (verifier *MockVerifier) WritesOff(ctx context.Context) {} -func (verifier *MockVerifier) WritesOn(ctx context.Context) {} +func (verifier *MockVerifier) WritesOff(ctx context.Context) error { return nil } +func (verifier *MockVerifier) WritesOn(ctx context.Context) {} func (verifier *MockVerifier) GetProgress(ctx context.Context) (Progress, error) { return Progress{}, nil } diff --git a/main/migration_verifier.go b/main/migration_verifier.go index cb5a6320..8863bbba 100644 --- a/main/migration_verifier.go +++ b/main/migration_verifier.go @@ -176,7 +176,11 @@ func main() { zerolog.SetGlobalLevel(zerolog.DebugLevel) } if cCtx.Bool(checkOnly) { - verifier.WritesOff(ctx) + err := verifier.WritesOff(ctx) + if err != nil { + return errors.Wrap(err, "failed to set writes off") + } + return verifier.CheckDriver(ctx, nil) } else { return verifier.StartServer() diff --git a/mbson/bson_raw.go b/mbson/bson_raw.go new file mode 100644 index 00000000..0759ffca --- /dev/null +++ b/mbson/bson_raw.go @@ -0,0 +1,29 @@ +package mbson + +import ( + "github.com/pkg/errors" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/x/bsonx/bsoncore" +) + +// RawLookup combines bson.Raw’s LookupErr method with an additional +// unmarshal step. The result is a convenient way to extract values from +// bson.Raw. The returned boolean indicates whether the value was found. +func RawLookup[T any](doc bson.Raw, dest *T, keys ...string) (bool, error) { + val, err := doc.LookupErr(keys...) + + if err == nil { + return true, val.Unmarshal(dest) + } else if errors.Is(err, bsoncore.ErrElementNotFound) { + return false, nil + } + + return false, errors.Wrapf(err, "failed to look up %+v in BSON doc", keys) +} + +// RawContains is like RawLookup but makes no effort to unmarshal +// the value. +func RawContains(doc bson.Raw, keys ...string) (bool, error) { + val := any(nil) + return RawLookup(doc, &val, keys...) +} diff --git a/mbson/unit_test.go b/mbson/unit_test.go new file mode 100644 index 00000000..0a66e92d --- /dev/null +++ b/mbson/unit_test.go @@ -0,0 +1,88 @@ +package mbson + +import ( + "testing" + + "github.com/stretchr/testify/suite" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/x/bsonx/bsoncore" +) + +type UnitTestSuite struct { + suite.Suite +} + +func TestUnitTestSuite(t *testing.T) { + ts := new(UnitTestSuite) + suite.Run(t, ts) +} + +func (s *UnitTestSuite) Test_RawLookup() { + myDoc := bson.D{ + {"foo", 1}, + {"bar", "baz"}, + {"quux", bson.A{ + 123, + bson.D{{"hey", "ho"}}, + }}, + } + + myRaw, err := bson.Marshal(myDoc) + s.Require().NoError(err) + + var myInt int + var myStr string + + found, err := RawLookup(myRaw, &myInt, "foo") + s.Require().True(found) + s.Require().NoError(err) + s.Assert().EqualValues(1, myInt) + + found, err = RawLookup(myRaw, &myInt, "quux", "0") + s.Require().True(found) + s.Require().NoError(err) + s.Assert().EqualValues(123, myInt) + + found, err = RawLookup(myRaw, &myStr, "quux", "1", "hey") + s.Require().True(found) + s.Require().NoError(err) + s.Assert().EqualValues("ho", myStr) + + found, err = RawLookup(myRaw, &myStr, "not there") + s.Require().NoError(err) + s.Assert().False(found) + + myRaw = myRaw[:len(myRaw)-2] + _, err = RawLookup(myRaw, &myStr, "not there") + s.Assert().ErrorAs(err, &bsoncore.InsufficientBytesError{}) +} + +func (s *UnitTestSuite) Test_RawContains() { + myDoc := bson.D{ + {"foo", 1}, + {"bar", "baz"}, + {"quux", bson.A{ + 123, + bson.D{{"hey", "ho"}}, + }}, + } + + myRaw, err := bson.Marshal(myDoc) + s.Require().NoError(err) + + has, err := RawContains(myRaw, "foo") + s.Require().NoError(err) + s.Assert().True(has, "`foo` should exist") + + has, err = RawContains(myRaw, "quux", "1", "hey") + s.Require().NoError(err) + s.Assert().True(has, "deep lookup should work") + + has, err = RawContains(myRaw, "not there") + s.Require().NoError(err) + s.Assert().False(has, "missing element should not exist") + + myRaw = myRaw[:len(myRaw)-2] + _, err = RawContains(myRaw, "not there") + s.Assert().ErrorAs(err, &bsoncore.InsufficientBytesError{}) +}