|
26 | 26 | #include "bgfetcher.h" |
27 | 27 | #include "checkpoint.h" |
28 | 28 | #include "dcp/dcpconnmap.h" |
| 29 | +#include "ephemeral_tombstone_purger.h" |
29 | 30 | #include "ep_time.h" |
30 | 31 | #include "evp_store_test.h" |
31 | 32 | #include "failover-table.h" |
|
42 | 43 | #include <xattr/utils.h> |
43 | 44 |
|
44 | 45 | #include <thread> |
| 46 | +#include <engines/ep/src/ephemeral_vb.h> |
45 | 47 |
|
46 | 48 | ProcessClock::time_point SingleThreadedKVBucketTest::runNextTask( |
47 | 49 | TaskQueue& taskQ, const std::string& expectedTaskName) { |
@@ -290,6 +292,111 @@ void SingleThreadedKVBucketTest::runCompaction(uint64_t purgeBeforeTime, |
290 | 292 | "Compact DB file 0"); |
291 | 293 | } |
292 | 294 |
|
| 295 | +/* |
| 296 | + * MB-31175 |
| 297 | + * The following test checks to see that when we call handleSlowStream in an |
| 298 | + * in memory state and drop the cursor/schedule a backfill as a result, the |
| 299 | + * resulting backfill checks the purgeSeqno and tells the stream to rollback |
| 300 | + * if purgeSeqno > startSeqno. |
| 301 | + */ |
| 302 | +TEST_P(STParameterizedBucketTest, SlowStreamBackfillPurgeSeqnoCheck) { |
| 303 | + // Make vbucket active. |
| 304 | + setVBucketStateAndRunPersistTask(vbid, vbucket_state_active); |
| 305 | + auto vb = store->getVBuckets().getBucket(vbid); |
| 306 | + ASSERT_TRUE(vb.get()); |
| 307 | + |
| 308 | + // Store two items |
| 309 | + std::array<std::string, 2> initialKeys = {{"k1", "k2"}}; |
| 310 | + for (const auto& key : initialKeys) { |
| 311 | + store_item(vbid, makeStoredDocKey(key), key); |
| 312 | + } |
| 313 | + flushVBucketToDiskIfPersistent(vbid, initialKeys.size()); |
| 314 | + |
| 315 | + // Delete the items so that we can advance the purgeSeqno using |
| 316 | + // compaction later |
| 317 | + for (const auto& key : initialKeys) { |
| 318 | + delete_item(vbid, makeStoredDocKey(key)); |
| 319 | + } |
| 320 | + flushVBucketToDiskIfPersistent(vbid, initialKeys.size()); |
| 321 | + |
| 322 | + auto& ckpt_mgr = *vb->checkpointManager; |
| 323 | + |
| 324 | + // Create a Mock Dcp producer |
| 325 | + // Create the Mock Active Stream with a startSeqno of 1 |
| 326 | + // as a startSeqno is always valid |
| 327 | + auto producer = std::make_shared<MockDcpProducer>( |
| 328 | + *engine, |
| 329 | + cookie, |
| 330 | + "test_producer", |
| 331 | + /*notifyOnly*/ false, |
| 332 | + cb::const_byte_buffer() /*no json*/); |
| 333 | + // Create a Mock Active Stream |
| 334 | + auto mock_stream = std::make_shared<MockActiveStream>( |
| 335 | + static_cast<EventuallyPersistentEngine*>(engine.get()), |
| 336 | + producer, |
| 337 | + /*flags*/ 0, |
| 338 | + /*opaque*/ 0, |
| 339 | + *vb, |
| 340 | + /*st_seqno*/ 1, |
| 341 | + /*en_seqno*/ ~0, |
| 342 | + /*vb_uuid*/ 0xabcd, |
| 343 | + /*snap_start_seqno*/ 0, |
| 344 | + /*snap_end_seqno*/ ~0, |
| 345 | + IncludeValue::Yes, |
| 346 | + IncludeXattrs::Yes); |
| 347 | + |
| 348 | + producer->createCheckpointProcessorTask(); |
| 349 | + producer->scheduleCheckpointProcessorTask(); |
| 350 | + |
| 351 | + mock_stream->transitionStateToBackfilling(); |
| 352 | + ASSERT_TRUE(mock_stream->isInMemory()) |
| 353 | + << "stream state should have transitioned to InMemory"; |
| 354 | + |
| 355 | + // Check number of expected cursors (might not have persistence cursor) |
| 356 | + int expectedCursors = persistent() ? 2 : 1; |
| 357 | + EXPECT_EQ(expectedCursors, ckpt_mgr.getNumOfCursors()); |
| 358 | + |
| 359 | + EXPECT_TRUE(mock_stream->handleSlowStream()); |
| 360 | + EXPECT_TRUE(mock_stream->public_getPendingBackfill()); |
| 361 | + |
| 362 | + // Might not have persistence cursor |
| 363 | + expectedCursors = persistent() ? 1 : 0; |
| 364 | + EXPECT_EQ(expectedCursors, ckpt_mgr.getNumOfCursors()) |
| 365 | + << "stream cursor should have been dropped"; |
| 366 | + |
| 367 | + // This will schedule the backfill |
| 368 | + mock_stream->transitionStateToBackfilling(); |
| 369 | + ASSERT_TRUE(mock_stream->isBackfilling()); |
| 370 | + |
| 371 | + // Advance the purgeSeqno |
| 372 | + if (persistent()) { |
| 373 | + runCompaction(~0, 3); |
| 374 | + } else { |
| 375 | + EphemeralVBucket::HTTombstonePurger purger(0); |
| 376 | + auto vbptr = store->getVBucket(vbid); |
| 377 | + EphemeralVBucket* evb = dynamic_cast<EphemeralVBucket*>(vbptr.get()); |
| 378 | + purger.setCurrentVBucket(*evb); |
| 379 | + evb->ht.visit(purger); |
| 380 | + evb->purgeStaleItems(); |
| 381 | + } |
| 382 | + |
| 383 | + ASSERT_EQ(3, vb->getPurgeSeqno()); |
| 384 | + |
| 385 | + // Run the backfill we scheduled when we transitioned to the backfilling |
| 386 | + // state |
| 387 | + auto& bfm = producer->getBFM(); |
| 388 | + bfm.backfill(); |
| 389 | + |
| 390 | + // The backfill should have set the stream state to dead because |
| 391 | + // purgeSeqno > startSeqno |
| 392 | + EXPECT_TRUE(mock_stream->isDead()); |
| 393 | + |
| 394 | + // Stop Producer checkpoint processor task |
| 395 | + producer->cancelCheckpointCreatorTask(); |
| 396 | + |
| 397 | + cancelAndPurgeTasks(); |
| 398 | +} |
| 399 | + |
293 | 400 | /* |
294 | 401 | * The following test checks to see if we call handleSlowStream when in a |
295 | 402 | * backfilling state, but the backfillTask is not running, we |
@@ -3380,3 +3487,13 @@ INSTANTIATE_TEST_CASE_P(XattrCompressedTest, |
3380 | 3487 | XattrCompressedTest, |
3381 | 3488 | ::testing::Combine(::testing::Bool(), |
3382 | 3489 | ::testing::Bool()), ); |
| 3490 | + |
| 3491 | +static auto allConfigValues = ::testing::Values( |
| 3492 | + std::make_tuple(std::string("ephemeral"), std::string("auto_delete")), |
| 3493 | + std::make_tuple(std::string("ephemeral"), std::string("fail_new_data")), |
| 3494 | + std::make_tuple(std::string("persistent"), std::string{})); |
| 3495 | + |
| 3496 | +// Test cases which run for persistent and ephemeral buckets |
| 3497 | +INSTANTIATE_TEST_CASE_P(EphemeralOrPersistent, |
| 3498 | + STParameterizedBucketTest, |
| 3499 | + allConfigValues, ); |
0 commit comments