|
6 | 6 | "time" |
7 | 7 |
|
8 | 8 | "github.com/pkg/errors" |
| 9 | + "github.com/samber/lo" |
9 | 10 | "github.com/stretchr/testify/require" |
10 | 11 | "go.mongodb.org/mongo-driver/bson" |
11 | 12 | "go.mongodb.org/mongo-driver/bson/primitive" |
@@ -241,3 +242,68 @@ func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() { |
241 | 242 | "the verifier should flush a recheck doc after a batch", |
242 | 243 | ) |
243 | 244 | } |
| 245 | + |
| 246 | +func (suite *IntegrationTestSuite) TestEventBeforeWritesOff() { |
| 247 | + ctx := suite.Context() |
| 248 | + |
| 249 | + verifier := suite.BuildVerifier() |
| 250 | + |
| 251 | + checkDoneChan := make(chan struct{}) |
| 252 | + checkContinueChan := make(chan struct{}) |
| 253 | + |
| 254 | + // start verifier |
| 255 | + verifierDoneChan := make(chan struct{}) |
| 256 | + go func() { |
| 257 | + err := verifier.CheckDriver(ctx, nil, checkDoneChan, checkContinueChan) |
| 258 | + suite.Require().NoError(err) |
| 259 | + |
| 260 | + close(verifierDoneChan) |
| 261 | + }() |
| 262 | + |
| 263 | + // wait for generation 1 |
| 264 | + <-checkDoneChan |
| 265 | + |
| 266 | + db := suite.srcMongoClient.Database(suite.DBNameForTest()) |
| 267 | + coll := db.Collection("mycoll") |
| 268 | + |
| 269 | + docsCount := 10_000 |
| 270 | + docs := lo.RepeatBy(docsCount, func(_ int) bson.D { return bson.D{} }) |
| 271 | + _, err := coll.InsertMany( |
| 272 | + ctx, |
| 273 | + lo.ToAnySlice(docs), |
| 274 | + ) |
| 275 | + suite.Require().NoError(err) |
| 276 | + |
| 277 | + verifier.WritesOff(ctx) |
| 278 | + |
| 279 | + verifierDone := false |
| 280 | + for !verifierDone { |
| 281 | + select { |
| 282 | + case <-verifierDoneChan: |
| 283 | + verifierDone = true |
| 284 | + case <-checkDoneChan: |
| 285 | + case checkContinueChan <- struct{}{}: |
| 286 | + } |
| 287 | + } |
| 288 | + |
| 289 | + generation := verifier.generation |
| 290 | + failedTasks, incompleteTasks, err := FetchFailedAndIncompleteTasks( |
| 291 | + ctx, |
| 292 | + verifier.verificationTaskCollection(), |
| 293 | + verificationTaskVerifyDocuments, |
| 294 | + generation, |
| 295 | + ) |
| 296 | + suite.Require().NoError(err) |
| 297 | + |
| 298 | + suite.Require().Empty(incompleteTasks, "all tasks should be finished") |
| 299 | + |
| 300 | + totalFailed := lo.Reduce( |
| 301 | + failedTasks, |
| 302 | + func(sofar int, task VerificationTask, _ int) int { |
| 303 | + return sofar + len(task.Ids) |
| 304 | + }, |
| 305 | + 0, |
| 306 | + ) |
| 307 | + |
| 308 | + suite.Assert().Equal(docsCount, totalFailed, "all source docs should be missing") |
| 309 | +} |
0 commit comments