2525#include " bgfetcher.h"
2626#include " checkpoint_manager.h"
2727#include " checkpoint_remover.h"
28+ #include " checkpoint_utils.h"
2829#include " collections/vbucket_manifest_handles.h"
2930#include " dcp/dcpconnmap.h"
3031#include " ep_bucket.h"
@@ -2648,3 +2649,170 @@ INSTANTIATE_TEST_SUITE_P(EPBucketTestCouchstore,
26482649 EPBucketTestCouchstore,
26492650 STParameterizedBucketTest::couchstoreConfigValues (),
26502651 STParameterizedBucketTest::PrintToStringParamName);
2652+
2653+ #ifdef EP_USE_MAGMA
2654+
2655+ /* *
2656+ * Test fixture for CDC tests - Magma only
2657+ */
2658+ class EPBucketCDCTest : public EPBucketTest {
2659+ protected:
2660+ void SetUp () override {
2661+ // Note: Checkpoint removal isn't under test at all here.
2662+ // Eager checkpoint removal, default prod setting in Neo and post-Neo.
2663+ // That helps in cleaning up the CheckpointManager during the test and
2664+ // we won't need to fix the testsuite when merging into the master
2665+ // branch.
2666+ if (!config_string.empty ()) {
2667+ config_string += " ;" ;
2668+ }
2669+ config_string += " checkpoint_removal_mode=eager" ;
2670+ EPBucketTest::SetUp ();
2671+
2672+ auto vb = store->getVBucket (vbid);
2673+ ASSERT_TRUE (vb);
2674+
2675+ CollectionsManifest manifest;
2676+ manifest.add (CollectionEntry::historical,
2677+ cb::NoExpiryLimit,
2678+ true /* history*/ ,
2679+ ScopeEntry::defaultS);
2680+ vb->updateFromManifest (Collections::Manifest{std::string{manifest}});
2681+ flushVBucketToDiskIfPersistent (vbid, 1 );
2682+ auto & manager = *vb->checkpointManager ;
2683+ manager.createNewCheckpoint (true );
2684+
2685+ ASSERT_EQ (1 , manager.getNumCheckpoints ());
2686+ ASSERT_EQ (1 , manager.getNumItems ()); // [cs
2687+ ASSERT_EQ (0 , manager.getNumOpenChkItems ()); // no mutation
2688+ ASSERT_EQ (1 , manager.getHighSeqno ()); // sysevent
2689+ }
2690+ };
2691+
2692+ TEST_P (EPBucketCDCTest, CollectionNonHistorical) {
2693+ auto vb = store->getVBucket (vbid);
2694+ const uint64_t initialHighSeqno = 1 ;
2695+ ASSERT_EQ (initialHighSeqno, vb->getHighSeqno ()); // From SetUp
2696+
2697+ const auto key = makeStoredDocKey (" key" , CollectionEntry::defaultC);
2698+ store_item (vbid, key, " valueA" );
2699+ store_item (vbid, key, " valueB" );
2700+ EXPECT_EQ (initialHighSeqno + 2 , vb->getHighSeqno ());
2701+
2702+ const auto & manager = *vb->checkpointManager ;
2703+ EXPECT_EQ (1 , manager.getNumCheckpoints ());
2704+ EXPECT_EQ (2 , manager.getNumItems ()); // [cs x m)
2705+ EXPECT_EQ (1 , manager.getNumOpenChkItems ());
2706+ EXPECT_EQ (initialHighSeqno + 2 , manager.getHighSeqno ());
2707+
2708+ constexpr auto statName = " magma_NSets" ;
2709+ size_t nSets = 0 ;
2710+ const auto & underlying = *store->getRWUnderlying (vbid);
2711+ ASSERT_TRUE (underlying.getStat (statName, nSets));
2712+ ASSERT_EQ (1 , nSets);
2713+
2714+ flush_vbucket_to_disk (vbid, 1 );
2715+ ASSERT_TRUE (underlying.getStat (statName, nSets));
2716+ EXPECT_EQ (2 , nSets);
2717+ }
2718+
2719+ TEST_P (EPBucketCDCTest, CollectionHistorical) {
2720+ auto vb = store->getVBucket (vbid);
2721+ const uint64_t initialHighSeqno = 1 ;
2722+ ASSERT_EQ (initialHighSeqno, vb->getHighSeqno ()); // From SetUp
2723+
2724+ const auto key = makeStoredDocKey (" key" , CollectionEntry::historical);
2725+ store_item (vbid, key, " valueA" );
2726+ store_item (vbid, key, " valueB" );
2727+ EXPECT_EQ (initialHighSeqno + 2 , vb->getHighSeqno ());
2728+
2729+ const auto & manager = *vb->checkpointManager ;
2730+ EXPECT_EQ (2 , manager.getNumCheckpoints ());
2731+ EXPECT_EQ (5 , manager.getNumItems ()); // [cs m ce] [cs m)
2732+ EXPECT_EQ (1 , manager.getNumOpenChkItems ());
2733+ EXPECT_EQ (initialHighSeqno + 2 , manager.getHighSeqno ());
2734+
2735+ constexpr auto statName = " magma_NSets" ;
2736+ size_t nSets = 0 ;
2737+ const auto & underlying = *store->getRWUnderlying (vbid);
2738+ ASSERT_TRUE (underlying.getStat (statName, nSets));
2739+ ASSERT_EQ (1 , nSets);
2740+
2741+ flush_vbucket_to_disk (vbid, 2 );
2742+ ASSERT_TRUE (underlying.getStat (statName, nSets));
2743+ EXPECT_EQ (3 , nSets);
2744+ }
2745+
2746+ TEST_P (EPBucketCDCTest, CollectionInterleaved) {
2747+ auto vb = store->getVBucket (vbid);
2748+ const uint64_t initialHighSeqno = 1 ;
2749+ ASSERT_EQ (initialHighSeqno, vb->getHighSeqno ()); // From SetUp
2750+
2751+ const auto keyHistorical =
2752+ makeStoredDocKey (" key" , CollectionEntry::historical);
2753+ const auto keyNonHistorical =
2754+ makeStoredDocKey (" key" , CollectionEntry::defaultC);
2755+ const auto value = " value" ;
2756+ for (uint8_t i = 0 ; i < 2 ; ++i) {
2757+ store_item (vbid, keyHistorical, value);
2758+ store_item (vbid, keyNonHistorical, value);
2759+ }
2760+ EXPECT_EQ (initialHighSeqno + 4 , vb->getHighSeqno ());
2761+
2762+ // Note: It is important to ensure that both revisions for keyNonHistorical
2763+ // survived in-memory deduplication - We are verifying flusher-dedup here
2764+ const auto & manager = *vb->checkpointManager ;
2765+ const auto & list =
2766+ CheckpointManagerTestIntrospector::public_getCheckpointList (
2767+ manager);
2768+ ASSERT_EQ (2 , list.size ());
2769+ // First checkpoint
2770+ auto ckptIt = list.begin ();
2771+ EXPECT_EQ (CHECKPOINT_CLOSED, (*ckptIt)->getState ());
2772+ auto it = (*ckptIt)->begin ();
2773+ ++it;
2774+ ++it;
2775+ EXPECT_EQ (queue_op::mutation, (*it)->getOperation ());
2776+ EXPECT_EQ (initialHighSeqno + 1 , (*it)->getBySeqno ());
2777+ EXPECT_EQ (keyHistorical, (*it)->getDocKey ());
2778+ ++it;
2779+ EXPECT_EQ (queue_op::mutation, (*it)->getOperation ());
2780+ EXPECT_EQ (initialHighSeqno + 2 , (*it)->getBySeqno ());
2781+ EXPECT_EQ (keyNonHistorical, (*it)->getDocKey ());
2782+ ++it;
2783+ EXPECT_EQ (queue_op::checkpoint_end, (*it)->getOperation ());
2784+ // Second checkpoint
2785+ ++ckptIt;
2786+ EXPECT_EQ (CHECKPOINT_OPEN, (*ckptIt)->getState ());
2787+ it = (*ckptIt)->begin ();
2788+ ++it;
2789+ ++it;
2790+ EXPECT_EQ (queue_op::mutation, (*it)->getOperation ());
2791+ EXPECT_EQ (initialHighSeqno + 3 , (*it)->getBySeqno ());
2792+ EXPECT_EQ (keyHistorical, (*it)->getDocKey ());
2793+ ++it;
2794+ EXPECT_EQ (queue_op::mutation, (*it)->getOperation ());
2795+ EXPECT_EQ (initialHighSeqno + 4 , (*it)->getBySeqno ());
2796+ EXPECT_EQ (keyNonHistorical, (*it)->getDocKey ());
2797+
2798+ constexpr auto statName = " magma_NSets" ;
2799+ size_t initialNSets = 0 ;
2800+ const auto & underlying = *store->getRWUnderlying (vbid);
2801+ ASSERT_TRUE (underlying.getStat (statName, initialNSets));
2802+ EXPECT_EQ (1 , initialNSets);
2803+
2804+ // Note:
2805+ // . 2 historical mutations -> both persisted
2806+ // . 2 non-historical mutations -> only 1 persisted
2807+ const auto expectedNumPersisted = 3 ;
2808+ flush_vbucket_to_disk (vbid, expectedNumPersisted);
2809+ size_t nSets = 0 ;
2810+ ASSERT_TRUE (underlying.getStat (statName, nSets));
2811+ EXPECT_EQ (initialNSets + expectedNumPersisted, nSets);
2812+ }
2813+
2814+ INSTANTIATE_TEST_SUITE_P (EPBucketCDCTest,
2815+ EPBucketCDCTest,
2816+ STParameterizedBucketTest::magmaConfigValues (),
2817+ STParameterizedBucketTest::PrintToStringParamName);
2818+ #endif // EP_USE_MAGMA
0 commit comments