|
31 | 31 |
|
32 | 32 | class STDcpTest : public STParameterizedBucketTest { |
33 | 33 | protected: |
| 34 | + struct StreamRequestResult { |
| 35 | + ENGINE_ERROR_CODE status; |
| 36 | + uint64_t rollbackSeqno; |
| 37 | + }; |
| 38 | + |
| 39 | + // callbackCount needs to be static as its used inside of the static |
| 40 | + // function fakeDcpAddFailoverLog. |
| 41 | + // static int callbackCount; |
| 42 | + |
| 43 | + /* |
| 44 | + * Fake callback emulating dcp_add_failover_log |
| 45 | + */ |
| 46 | + static ENGINE_ERROR_CODE fakeDcpAddFailoverLog( |
| 47 | + vbucket_failover_t* entry, |
| 48 | + size_t nentries, |
| 49 | + gsl::not_null<const void*> cookie) { |
| 50 | + // callbackCount++; |
| 51 | + return ENGINE_SUCCESS; |
| 52 | + } |
| 53 | + |
| 54 | + StreamRequestResult doStreamRequest(DcpProducer& producer, |
| 55 | + uint64_t startSeqno = 0, |
| 56 | + uint64_t endSeqno = ~0, |
| 57 | + uint64_t snapStart = 0, |
| 58 | + uint64_t snapEnd = ~0, |
| 59 | + uint64_t vbUUID = 0) { |
| 60 | + StreamRequestResult result; |
| 61 | + result.status = producer.streamRequest(/*flags*/ 0, |
| 62 | + /*opaque*/ 0, |
| 63 | + vbid, |
| 64 | + startSeqno, |
| 65 | + endSeqno, |
| 66 | + vbUUID, |
| 67 | + snapStart, |
| 68 | + snapEnd, |
| 69 | + &result.rollbackSeqno, |
| 70 | + fakeDcpAddFailoverLog, |
| 71 | + {}); |
| 72 | + return result; |
| 73 | + } |
| 74 | + |
34 | 75 | /** |
35 | 76 | * @param producerState Are we simulating a negotiation against a Producer |
36 | 77 | * that enables IncludeDeletedUserXattrs? |
@@ -565,6 +606,63 @@ TEST_P(STDcpTest, |
565 | 606 | processConsumerMutationsNearThreshold(false); |
566 | 607 | } |
567 | 608 |
|
| 609 | +/* Checks that the DCP producer does an async stream close when the DCP client |
| 610 | + expects "DCP_STREAM_END" msg. */ |
| 611 | +TEST_P(STDcpTest, test_producer_stream_end_on_client_close_stream) { |
| 612 | + setVBucketStateAndRunPersistTask(vbid, vbucket_state_active); |
| 613 | + |
| 614 | + const void* cookie = create_mock_cookie(engine.get()); |
| 615 | + auto& mockConnMap = static_cast<MockDcpConnMap&>(engine->getDcpConnMap()); |
| 616 | + |
| 617 | + /* Create a new Dcp producer */ |
| 618 | + auto producer = std::make_shared<MockDcpProducer>(*engine, |
| 619 | + cookie, |
| 620 | + "test_producer", |
| 621 | + /*flags*/ 0); |
| 622 | + mockConnMap.addConn(cookie, producer); |
| 623 | + EXPECT_TRUE(mockConnMap.doesConnHandlerExist("test_producer")); |
| 624 | + |
| 625 | + /* Send a control message to the producer indicating that the DCP client |
| 626 | + expects a "DCP_STREAM_END" upon stream close */ |
| 627 | + const std::string sendStreamEndOnClientStreamCloseCtrlMsg( |
| 628 | + "send_stream_end_on_client_close_stream"); |
| 629 | + const std::string sendStreamEndOnClientStreamCloseCtrlValue("true"); |
| 630 | + EXPECT_EQ(ENGINE_SUCCESS, |
| 631 | + producer->control(0, |
| 632 | + sendStreamEndOnClientStreamCloseCtrlMsg, |
| 633 | + sendStreamEndOnClientStreamCloseCtrlValue)); |
| 634 | + |
| 635 | + /* Open stream */ |
| 636 | + EXPECT_EQ(ENGINE_SUCCESS, doStreamRequest(*producer).status); |
| 637 | + EXPECT_TRUE(mockConnMap.doesVbConnExist(vbid, "test_producer")); |
| 638 | + |
| 639 | + /* Close stream */ |
| 640 | + EXPECT_EQ(ENGINE_SUCCESS, producer->closeStream(0, vbid)); |
| 641 | + |
| 642 | + /* Expect a stream end message */ |
| 643 | + MockDcpMessageProducers producers(engine.get()); |
| 644 | + EXPECT_EQ(ENGINE_SUCCESS, producer->step(&producers)); |
| 645 | + EXPECT_EQ(cb::mcbp::ClientOpcode::DcpStreamEnd, producers.last_op); |
| 646 | + EXPECT_EQ(cb::mcbp::DcpStreamEndStatus::Closed, producers.last_end_status); |
| 647 | + |
| 648 | + /* Re-open stream for the same vbucket on the conn */ |
| 649 | + EXPECT_EQ(ENGINE_SUCCESS, doStreamRequest(*producer).status); |
| 650 | + |
| 651 | + /* Check that the new stream is opened properly */ |
| 652 | + auto stream = producer->findStream(vbid); |
| 653 | + auto* as = static_cast<ActiveStream*>(stream.get()); |
| 654 | + EXPECT_TRUE(as->isInMemory()); |
| 655 | + |
| 656 | + // MB-27769: Prior to the fix, this would fail here because we would skip |
| 657 | + // adding the connhandler into the connmap vbConns vector, causing the |
| 658 | + // stream to never get notified. |
| 659 | + EXPECT_TRUE(mockConnMap.doesVbConnExist(vbid, "test_producer")); |
| 660 | + |
| 661 | + mockConnMap.disconnect(cookie); |
| 662 | + EXPECT_FALSE(mockConnMap.doesVbConnExist(vbid, "test_producer")); |
| 663 | + mockConnMap.manageConnections(); |
| 664 | +} |
| 665 | + |
568 | 666 | INSTANTIATE_TEST_SUITE_P(PersistentAndEphemeral, |
569 | 667 | STDcpTest, |
570 | 668 | STParameterizedBucketTest::allConfigValues()); |
0 commit comments