@@ -1144,15 +1144,6 @@ struct writer_thread_ctx {
11441144 Vbid vbid;
11451145};
11461146
1147- struct continuous_dcp_ctx {
1148- EngineIface* h;
1149- const void *cookie;
1150- Vbid vbid;
1151- const std::string &name;
1152- uint64_t start_seqno;
1153- std::unique_ptr<TestDcpConsumer> dcpConsumer;
1154- };
1155-
11561147// Forward declaration required for dcp_thread_func
11571148static uint32_t add_stream_for_consumer (EngineIface* h,
11581149 CookieIface* cookie,
@@ -1276,22 +1267,6 @@ extern "C" {
12761267 " Failed to store value" );
12771268 }
12781269 }
1279-
1280- static void continuous_dcp_thread (void *args) {
1281- auto *cdc = static_cast <continuous_dcp_ctx *>(args);
1282-
1283- DcpStreamCtx ctx;
1284- ctx.vbucket = cdc->vbid ;
1285- std::string vbuuid_entry (" vb_" + std::to_string (cdc->vbid .get ()) +
1286- " :0:id" );
1287- ctx.vb_uuid = get_ull_stat (cdc->h , vbuuid_entry.c_str (), " failovers" );
1288- ctx.seqno = {cdc->start_seqno , std::numeric_limits<uint64_t >::max ()};
1289- ctx.snapshot = {cdc->start_seqno , cdc->start_seqno };
1290- ctx.skip_verification = true ;
1291-
1292- cdc->dcpConsumer ->addStreamCtx (ctx);
1293- cdc->dcpConsumer ->run ();
1294- }
12951270}
12961271
12971272/* DCP step thread that keeps running till it reads upto 'exp_mutations'.
@@ -2555,27 +2530,28 @@ static enum test_result test_dcp_producer_stream_latest(EngineIface* h) {
25552530}
25562531
25572532static enum test_result test_dcp_producer_keep_stream_open (EngineIface* h) {
2558- const std::string conn_name (" unittest" );
2559- const int num_items = 2 , vb = 0 ;
2560-
2533+ const int num_items = 2 ;
25612534 write_items (h, num_items);
2562-
25632535 wait_for_flusher_to_settle (h);
25642536 verify_curr_items (h, num_items, " Wrong amount of items" );
25652537
2538+ // We want to stream items till end and keep the stream open. Then we want
2539+ // to verify the stream is still open
25662540 auto * cookie = testHarness->create_cookie (h);
2567-
2568- /* We want to stream items till end and keep the stream open. Then we want
2569- to verify the stream is still open */
2570- struct continuous_dcp_ctx cdc = {
2571- h,
2572- cookie,
2573- Vbid (0 ),
2574- conn_name,
2575- 0 ,
2576- std::make_unique<TestDcpConsumer>(conn_name, cookie, h)};
2577- auto dcp_thread = create_thread ([&cdc]() { continuous_dcp_thread (&cdc); },
2578- " dcp_thread" );
2541+ const std::string conn_name = " test-consumer" ;
2542+ auto consumer = std::make_unique<TestDcpConsumer>(conn_name, cookie, h);
2543+ auto dcp_thread = create_thread (
2544+ [&]() {
2545+ DcpStreamCtx ctx;
2546+ ctx.vbucket = Vbid (0 );
2547+ ctx.vb_uuid = get_ull_stat (h, " vb_0:0:id" , " failovers" );
2548+ ctx.seqno = {0 , std::numeric_limits<uint64_t >::max ()};
2549+ ctx.snapshot = {0 , 0 };
2550+ ctx.skip_verification = true ;
2551+ consumer->addStreamCtx (ctx);
2552+ consumer->run ();
2553+ },
2554+ " dcp_thread" );
25792555
25802556 /* Wait for producer to be created */
25812557 wait_for_stat_to_be (h, " ep_dcp_producer_count" , 1 , " dcp" );
@@ -2587,19 +2563,17 @@ static enum test_result test_dcp_producer_keep_stream_open(EngineIface* h) {
25872563
25882564 /* Wait for the dcp test client to receive upto highest seqno we have */
25892565 cb::RelaxedAtomic<uint64_t > exp_items (num_items);
2590- wait_for_val_to_be (" last_sent_seqno" ,
2591- cdc.dcpConsumer ->producers .last_byseqno ,
2592- exp_items);
2566+ wait_for_val_to_be (
2567+ " last_sent_seqno" , consumer->producers .last_byseqno , exp_items);
25932568
2594- /* Check if the stream is still open after sending out latest items */
2595- std::string stat_stream_state (" eq_dcpq:" + conn_name + " :stream_" +
2596- std::to_string (vb) + " _state" );
2569+ // Check if the stream is still open after sending out latest items
2570+ std::string stat_stream_state (" eq_dcpq:" + conn_name + " :stream_0_state" );
25972571 std::string state = get_str_stat (h, stat_stream_state.c_str (), " dcp" );
25982572 checkeq (state.compare (" in-memory" ), 0 , " Stream is not open" );
25992573
2600- /* Before closing the connection stop the thread that continuously polls
2601- for dcp data */
2602- cdc. dcpConsumer ->stop ();
2574+ // Before closing the connection stop the thread that continuously polls for
2575+ // dcp data
2576+ consumer ->stop ();
26032577 cookie->notifyIoComplete (cb::engine_errc::success);
26042578 dcp_thread.join ();
26052579 testHarness->destroy_cookie (cookie);
@@ -2692,16 +2666,20 @@ static enum test_result test_dcp_producer_keep_stream_open_replica(
26922666 /* We want to stream items till end and keep the stream open. Then we want
26932667 to verify the stream is still open */
26942668 auto * cookie1 = testHarness->create_cookie (h);
2695- const std::string conn_name1 (" unittest1" );
2696- struct continuous_dcp_ctx cdc = {
2697- h,
2698- cookie1,
2699- Vbid (0 ),
2700- conn_name1,
2701- 0 ,
2702- std::make_unique<TestDcpConsumer>(conn_name1, cookie1, h)};
2703- auto dcp_thread = create_thread ([&cdc]() { continuous_dcp_thread (&cdc); },
2704- " dcp_thread" );
2669+ const std::string conn_name1 = " test-consumer" ;
2670+ auto consumer = std::make_unique<TestDcpConsumer>(conn_name1, cookie1, h);
2671+ auto dcp_thread = create_thread (
2672+ [&]() {
2673+ DcpStreamCtx ctx;
2674+ ctx.vbucket = Vbid (0 );
2675+ ctx.vb_uuid = get_ull_stat (h, " vb_0:0:id" , " failovers" );
2676+ ctx.seqno = {0 , std::numeric_limits<uint64_t >::max ()};
2677+ ctx.snapshot = {0 , 0 };
2678+ ctx.skip_verification = true ;
2679+ consumer->addStreamCtx (ctx);
2680+ consumer->run ();
2681+ },
2682+ " dcp_thread" );
27052683
27062684 /* Wait for producer to be created */
27072685 wait_for_stat_to_be (h, " ep_dcp_producer_count" , 1 , " dcp" );
@@ -2713,15 +2691,13 @@ static enum test_result test_dcp_producer_keep_stream_open_replica(
27132691
27142692 /* Wait for the dcp test client to receive upto highest seqno we have */
27152693 cb::RelaxedAtomic<uint64_t > exp_items (3 * num_items);
2716- wait_for_val_to_be (" last_sent_seqno" ,
2717- cdc.dcpConsumer ->producers .last_byseqno ,
2718- exp_items);
2694+ wait_for_val_to_be (
2695+ " last_sent_seqno" , consumer->producers .last_byseqno , exp_items);
27192696
27202697 /* Check if correct snap end seqno is sent */
2721- std::string stat_stream_last_sent_snap_end_seqno (" eq_dcpq:" + conn_name1 +
2722- " :stream_" +
2723- std::to_string (vb) +
2724- " _last_sent_snap_end_seqno" );
2698+ std::string stat_stream_last_sent_snap_end_seqno (
2699+ " eq_dcpq:" + conn_name1 + " :stream_" + std::to_string (vb) +
2700+ " _last_sent_snap_end_seqno" );
27252701 wait_for_stat_to_be (h,
27262702 stat_stream_last_sent_snap_end_seqno.c_str (),
27272703 3 * num_items,
@@ -2735,7 +2711,7 @@ static enum test_result test_dcp_producer_keep_stream_open_replica(
27352711
27362712 /* Before closing the connection stop the thread that continuously polls
27372713 for dcp data */
2738- cdc. dcpConsumer ->stop ();
2714+ consumer ->stop ();
27392715 cookie1->notifyIoComplete (cb::engine_errc::success);
27402716 dcp_thread.join ();
27412717
@@ -2747,7 +2723,6 @@ static enum test_result test_dcp_producer_keep_stream_open_replica(
27472723
27482724static enum test_result test_dcp_producer_stream_cursor_movement (
27492725 EngineIface* h) {
2750- const std::string conn_name (" unittest" );
27512726 const int num_items = 30 ;
27522727 for (int j = 0 ; j < num_items; ++j) {
27532728 if (j % 10 == 0 ) {
@@ -2765,19 +2740,23 @@ static enum test_result test_dcp_producer_stream_cursor_movement(
27652740 wait_for_flusher_to_settle (h);
27662741 verify_curr_items (h, num_items, " Wrong amount of items" );
27672742
2743+ // We want to stream items till end and keep the stream open. We want to
2744+ // verify if the DCP cursor has moved to new open checkpoint
27682745 auto * cookie = testHarness->create_cookie (h);
2769-
2770- /* We want to stream items till end and keep the stream open. We want to
2771- verify if the DCP cursor has moved to new open checkpoint */
2772- struct continuous_dcp_ctx cdc = {
2773- h,
2774- cookie,
2775- Vbid (0 ),
2776- conn_name,
2777- 20 ,
2778- std::make_unique<TestDcpConsumer>(conn_name, cookie, h)};
2779- auto dcp_thread = create_thread ([&cdc]() { continuous_dcp_thread (&cdc); },
2780- " dcp_thread" );
2746+ const std::string conn_name = " test-consumer" ;
2747+ auto consumer = std::make_unique<TestDcpConsumer>(conn_name, cookie, h);
2748+ auto dcp_thread = create_thread (
2749+ [&]() {
2750+ DcpStreamCtx ctx;
2751+ ctx.vbucket = Vbid (0 );
2752+ ctx.vb_uuid = get_ull_stat (h, " vb_0:0:id" , " failovers" );
2753+ ctx.seqno = {20 , std::numeric_limits<uint64_t >::max ()};
2754+ ctx.snapshot = {20 , 20 };
2755+ ctx.skip_verification = true ;
2756+ consumer->addStreamCtx (ctx);
2757+ consumer->run ();
2758+ },
2759+ " dcp_thread" );
27812760
27822761 /* Wait for producer to be created */
27832762 wait_for_stat_to_be (h, " ep_dcp_producer_count" , 1 , " dcp" );
@@ -2789,9 +2768,8 @@ static enum test_result test_dcp_producer_stream_cursor_movement(
27892768
27902769 /* Wait for the dcp test client to receive upto highest seqno we have */
27912770 cb::RelaxedAtomic<uint64_t > exp_items (num_items);
2792- wait_for_val_to_be (" last_sent_seqno" ,
2793- cdc.dcpConsumer ->producers .last_byseqno ,
2794- exp_items);
2771+ wait_for_val_to_be (
2772+ " last_sent_seqno" , consumer->producers .last_byseqno , exp_items);
27952773
27962774 /* We want to make sure that no cursors are lingering on any of the previous
27972775 checkpoints. For that we wait for checkpoint remover to remove all but
@@ -2800,7 +2778,7 @@ static enum test_result test_dcp_producer_stream_cursor_movement(
28002778
28012779 /* Before closing the connection stop the thread that continuously polls
28022780 for dcp data */
2803- cdc. dcpConsumer ->stop ();
2781+ consumer ->stop ();
28042782 cookie->notifyIoComplete (cb::engine_errc::success);
28052783 dcp_thread.join ();
28062784 testHarness->destroy_cookie (cookie);
@@ -7045,19 +7023,22 @@ static enum test_result test_dcp_multiple_streams(EngineIface* h) {
70457023}
70467024
70477025static enum test_result test_dcp_on_vbucket_state_change (EngineIface* h) {
7048- const std::string conn_name = " unittest" ;
7049- auto * cookie = testHarness->create_cookie (h);
7050-
70517026 // Set up a DcpTestConsumer that would remain in in-memory mode
7052- struct continuous_dcp_ctx cdc = {
7053- h,
7054- cookie,
7055- Vbid (0 ),
7056- conn_name,
7057- 0 ,
7058- std::make_unique<TestDcpConsumer>(conn_name, cookie, h)};
7059- auto dcp_thread = create_thread ([&cdc]() { continuous_dcp_thread (&cdc); },
7060- " dcp_thread" );
7027+ auto * cookie = testHarness->create_cookie (h);
7028+ const std::string conn_name = " test-consumer" ;
7029+ auto consumer = std::make_unique<TestDcpConsumer>(conn_name, cookie, h);
7030+ auto dcp_thread = create_thread (
7031+ [&]() {
7032+ DcpStreamCtx ctx;
7033+ ctx.vbucket = Vbid (0 );
7034+ ctx.vb_uuid = get_ull_stat (h, " vb_0:0:id" , " failovers" );
7035+ ctx.seqno = {0 , std::numeric_limits<uint64_t >::max ()};
7036+ ctx.snapshot = {0 , 0 };
7037+ ctx.skip_verification = true ;
7038+ consumer->addStreamCtx (ctx);
7039+ consumer->run ();
7040+ },
7041+ " dcp_thread" );
70617042
70627043 // Wait for producer to be created
70637044 wait_for_stat_to_be (h, " ep_dcp_producer_count" , 1 , " dcp" );
@@ -7081,8 +7062,8 @@ static enum test_result test_dcp_on_vbucket_state_change(EngineIface* h) {
70817062 // Expect producers->last_end_status to carry StateChanged as reason
70827063 // for stream closure
70837064 check_expression (cb::mcbp::DcpStreamEndStatus::StateChanged ==
7084- cdc. dcpConsumer ->producers .last_end_status ,
7085- " Last DCP flag not StateChanged" );
7065+ consumer ->producers .last_end_status ,
7066+ " Last DCP flag not StateChanged" );
70867067
70877068 testHarness->destroy_cookie (cookie);
70887069
0 commit comments