@@ -364,9 +364,6 @@ ENGINE_ERROR_CODE DcpConsumer::streamEnd(uint32_t opaque,
364364
365365 lastMessageTime = ep_current_time ();
366366 UpdateFlowControl ufc (*this , StreamEndResponse::baseMsgBytes);
367- if (doDisconnect ()) {
368- return ENGINE_DISCONNECT;
369- }
370367
371368 auto stream = findStream (vbucket);
372369 if (!stream) {
@@ -393,25 +390,12 @@ ENGINE_ERROR_CODE DcpConsumer::streamEnd(uint32_t opaque,
393390
394391 logger->info (" ({}) End stream received with reason {}" , vbucket, flags);
395392
396- ENGINE_ERROR_CODE err = ENGINE_KEY_ENOENT;
397- try {
398- err = stream->messageReceived (std::make_unique<StreamEndResponse>(
399- opaque,
400- static_cast <end_stream_status_t >(flags),
401- vbucket,
402- cb::mcbp::DcpStreamId{}));
403- } catch (const std::bad_alloc&) {
404- return ENGINE_ENOMEM;
405- }
406-
407- // The item was buffered and will be processed later
408- if (err == ENGINE_TMPFAIL) {
409- ufc.release ();
410- notifyVbucketReady (vbucket);
411- return ENGINE_SUCCESS;
412- }
413-
414- return err;
393+ auto msg = std::make_unique<StreamEndResponse>(
394+ opaque,
395+ static_cast <end_stream_status_t >(flags),
396+ vbucket,
397+ cb::mcbp::DcpStreamId{});
398+ return lookupStreamAndDispatchMessage (ufc, vbucket, opaque, std::move (msg));
415399}
416400
417401ENGINE_ERROR_CODE DcpConsumer::processMutationOrPrepare (
@@ -423,45 +407,24 @@ ENGINE_ERROR_CODE DcpConsumer::processMutationOrPrepare(
423407 size_t msgBytes) {
424408 UpdateFlowControl ufc (*this , msgBytes);
425409
426- if (doDisconnect ()) {
427- return ENGINE_DISCONNECT;
428- }
429-
430- ENGINE_ERROR_CODE err = ENGINE_KEY_ENOENT;
431- auto stream = findStream (vbucket);
432- if (stream && stream->getOpaque () == opaque && stream->isActive ()) {
433- std::unique_ptr<ExtendedMetaData> emd;
434- if (meta.size () > 0 ) {
435- emd = std::make_unique<ExtendedMetaData>(meta.data (), meta.size ());
436- if (emd->getStatus () == ENGINE_EINVAL) {
437- return ENGINE_EINVAL;
438- }
439- }
440-
441- try {
442- err = stream->messageReceived (
443- std::make_unique<MutationConsumerMessage>(
444- item,
445- opaque,
446- IncludeValue::Yes,
447- IncludeXattrs::Yes,
448- IncludeDeleteTime::No,
449- key.getEncoding (),
450- emd.release (),
451- cb::mcbp::DcpStreamId{}));
452- } catch (const std::bad_alloc&) {
453- return ENGINE_ENOMEM;
454- }
455-
456- // The item was buffered and will be processed later
457- if (err == ENGINE_TMPFAIL) {
458- ufc.release ();
459- notifyVbucketReady (vbucket);
460- return ENGINE_SUCCESS;
410+ std::unique_ptr<ExtendedMetaData> emd;
411+ if (meta.size () > 0 ) {
412+ emd = std::make_unique<ExtendedMetaData>(meta.data (), meta.size ());
413+ if (emd->getStatus () == ENGINE_EINVAL) {
414+ return ENGINE_EINVAL;
461415 }
462416 }
463417
464- return err;
418+ auto msg =
419+ std::make_unique<MutationConsumerMessage>(item,
420+ opaque,
421+ IncludeValue::Yes,
422+ IncludeXattrs::Yes,
423+ IncludeDeleteTime::No,
424+ key.getEncoding (),
425+ emd.release (),
426+ cb::mcbp::DcpStreamId{});
427+ return lookupStreamAndDispatchMessage (ufc, vbucket, opaque, std::move (msg));
465428}
466429
467430ENGINE_ERROR_CODE DcpConsumer::mutation (uint32_t opaque,
@@ -743,10 +706,6 @@ ENGINE_ERROR_CODE DcpConsumer::snapshotMarker(uint32_t opaque,
743706 lastMessageTime = ep_current_time ();
744707 UpdateFlowControl ufc (*this , SnapshotMarker::baseMsgBytes);
745708
746- if (doDisconnect ()) {
747- return ENGINE_DISCONNECT;
748- }
749-
750709 if (start_seqno > end_seqno) {
751710 logger->warn (
752711 " ({}) Invalid snapshot marker "
@@ -757,31 +716,13 @@ ENGINE_ERROR_CODE DcpConsumer::snapshotMarker(uint32_t opaque,
757716 return ENGINE_EINVAL;
758717 }
759718
760- ENGINE_ERROR_CODE err = ENGINE_KEY_ENOENT;
761- auto stream = findStream (vbucket);
762- if (stream && stream->getOpaque () == opaque && stream->isActive ()) {
763- try {
764- err = stream->messageReceived (
765- std::make_unique<SnapshotMarker>(opaque,
766- vbucket,
767- start_seqno,
768- end_seqno,
769- flags,
770- cb::mcbp::DcpStreamId{}));
771-
772- } catch (const std::bad_alloc&) {
773- return ENGINE_ENOMEM;
774- }
775-
776- // The item was buffered and will be processed later
777- if (err == ENGINE_TMPFAIL) {
778- notifyVbucketReady (vbucket);
779- ufc.release ();
780- return ENGINE_SUCCESS;
781- }
782- }
783-
784- return err;
719+ auto msg = std::make_unique<SnapshotMarker>(opaque,
720+ vbucket,
721+ start_seqno,
722+ end_seqno,
723+ flags,
724+ cb::mcbp::DcpStreamId{});
725+ return lookupStreamAndDispatchMessage (ufc, vbucket, opaque, std::move (msg));
785726}
786727
787728ENGINE_ERROR_CODE DcpConsumer::noop (uint32_t opaque) {
@@ -801,29 +742,10 @@ ENGINE_ERROR_CODE DcpConsumer::setVBucketState(uint32_t opaque,
801742
802743 lastMessageTime = ep_current_time ();
803744 UpdateFlowControl ufc (*this , SetVBucketState::baseMsgBytes);
804- if (doDisconnect ()) {
805- return ENGINE_DISCONNECT;
806- }
807-
808- ENGINE_ERROR_CODE err = ENGINE_KEY_ENOENT;
809- auto stream = findStream (vbucket);
810- if (stream && stream->getOpaque () == opaque && stream->isActive ()) {
811- try {
812- err = stream->messageReceived (std::make_unique<SetVBucketState>(
813- opaque, vbucket, state, cb::mcbp::DcpStreamId{}));
814- } catch (const std::bad_alloc&) {
815- return ENGINE_ENOMEM;
816- }
817-
818- // The item was buffered and will be processed later
819- if (err == ENGINE_TMPFAIL) {
820- ufc.release ();
821- notifyVbucketReady (vbucket);
822- return ENGINE_SUCCESS;
823- }
824- }
825745
826- return err;
746+ auto msg = std::make_unique<SetVBucketState>(
747+ opaque, vbucket, state, cb::mcbp::DcpStreamId{});
748+ return lookupStreamAndDispatchMessage (ufc, vbucket, opaque, std::move (msg));
827749}
828750
829751ENGINE_ERROR_CODE DcpConsumer::step (struct dcp_message_producers * producers) {
@@ -1649,35 +1571,13 @@ ENGINE_ERROR_CODE DcpConsumer::systemEvent(uint32_t opaque,
16491571 cb::const_byte_buffer key,
16501572 cb::const_byte_buffer eventData) {
16511573 lastMessageTime = ep_current_time ();
1574+ UpdateFlowControl ufc (
1575+ *this ,
1576+ SystemEventMessage::baseMsgBytes + key.size () + eventData.size ());
16521577
1653- ENGINE_ERROR_CODE err = ENGINE_KEY_ENOENT;
1654- auto stream = findStream (vbucket);
1655- if (stream && stream->getOpaque () == opaque && stream->isActive ()) {
1656- try {
1657- err = stream->messageReceived (
1658- std::make_unique<SystemEventConsumerMessage>(opaque,
1659- event,
1660- bySeqno,
1661- vbucket,
1662- version,
1663- key,
1664- eventData));
1665- } catch (const std::bad_alloc&) {
1666- return ENGINE_ENOMEM;
1667- }
1668-
1669- // The item was buffered and will be processed later
1670- if (err == ENGINE_TMPFAIL) {
1671- notifyVbucketReady (vbucket);
1672- return ENGINE_SUCCESS;
1673- }
1674- }
1675-
1676- flowControl.incrFreedBytes (SystemEventMessage::baseMsgBytes + key.size () +
1677- eventData.size ());
1678- scheduleNotifyIfNecessary ();
1679-
1680- return err;
1578+ auto msg = std::make_unique<SystemEventConsumerMessage>(
1579+ opaque, event, bySeqno, vbucket, version, key, eventData);
1580+ return lookupStreamAndDispatchMessage (ufc, vbucket, opaque, std::move (msg));
16811581}
16821582
16831583ENGINE_ERROR_CODE DcpConsumer::prepare (
@@ -1732,6 +1632,46 @@ ENGINE_ERROR_CODE DcpConsumer::prepare(
17321632 return processMutationOrPrepare (vbucket, opaque, key, item, {}, msgBytes);
17331633}
17341634
1635+ ENGINE_ERROR_CODE DcpConsumer::lookupStreamAndDispatchMessage (
1636+ UpdateFlowControl& ufc,
1637+ Vbid vbucket,
1638+ uint32_t opaque,
1639+ std::unique_ptr<DcpResponse> msg) {
1640+ if (doDisconnect ()) {
1641+ return ENGINE_DISCONNECT;
1642+ }
1643+
1644+ auto stream = findStream (vbucket);
1645+ if (!stream || stream->getOpaque () != opaque) {
1646+ // No such stream with the given vbucket / opaque - return KEY_ENOENT
1647+ // to indicate that back to peer.
1648+ return ENGINE_KEY_ENOENT;
1649+ }
1650+
1651+ if (!stream->isActive ()) {
1652+ // Stream is not active - also uses KEY_ENOENT to indicate no valid
1653+ // stream.
1654+ return ENGINE_KEY_ENOENT;
1655+ }
1656+
1657+ // Pass the message to the associated stream.
1658+ ENGINE_ERROR_CODE err;
1659+ try {
1660+ err = stream->messageReceived (std::move (msg));
1661+ } catch (const std::bad_alloc&) {
1662+ return ENGINE_ENOMEM;
1663+ }
1664+
1665+ // The item was buffered and will be processed later
1666+ if (err == ENGINE_TMPFAIL) {
1667+ notifyVbucketReady (vbucket);
1668+ ufc.release ();
1669+ return ENGINE_SUCCESS;
1670+ }
1671+
1672+ return err;
1673+ }
1674+
17351675ENGINE_ERROR_CODE DcpConsumer::commit (uint32_t opaque,
17361676 Vbid vbucket,
17371677 const DocKey& key,
@@ -1741,34 +1681,14 @@ ENGINE_ERROR_CODE DcpConsumer::commit(uint32_t opaque,
17411681 const size_t msgBytes = CommitSyncWrite::commitBaseMsgBytes + key.size ();
17421682 UpdateFlowControl ufc (*this , msgBytes);
17431683
1744- if (doDisconnect ()) {
1745- return ENGINE_DISCONNECT;
1746- }
1747-
17481684 if (commit_seqno == 0 ) {
17491685 logger->warn (" ({}) Invalid sequence number(0) for commit!" , vbucket);
17501686 return ENGINE_EINVAL;
17511687 }
17521688
1753- ENGINE_ERROR_CODE err = ENGINE_KEY_ENOENT;
1754- auto stream = findStream (vbucket);
1755- if (stream && stream->getOpaque () == opaque && stream->isActive ()) {
1756- try {
1757- err = stream->messageReceived (std::make_unique<CommitSyncWrite>(
1758- opaque, vbucket, prepare_seqno, commit_seqno, key));
1759- } catch (const std::bad_alloc&) {
1760- return ENGINE_ENOMEM;
1761- }
1762-
1763- // The item was buffered and will be processed later
1764- if (err == ENGINE_TMPFAIL) {
1765- notifyVbucketReady (vbucket);
1766- ufc.release ();
1767- return ENGINE_SUCCESS;
1768- }
1769- }
1770-
1771- return err;
1689+ auto msg = std::make_unique<CommitSyncWrite>(
1690+ opaque, vbucket, prepare_seqno, commit_seqno, key);
1691+ return lookupStreamAndDispatchMessage (ufc, vbucket, opaque, std::move (msg));
17721692}
17731693
17741694ENGINE_ERROR_CODE DcpConsumer::abort (uint32_t opaque,
@@ -1780,34 +1700,14 @@ ENGINE_ERROR_CODE DcpConsumer::abort(uint32_t opaque,
17801700 UpdateFlowControl ufc (*this ,
17811701 AbortSyncWrite::abortBaseMsgBytes + key.size ());
17821702
1783- if (doDisconnect ()) {
1784- return ENGINE_DISCONNECT;
1785- }
1786-
17871703 if (!abortSeqno) {
17881704 logger->warn (" ({}) Invalid abort-seqno (0)" , vbucket);
17891705 return ENGINE_EINVAL;
17901706 }
17911707
1792- ENGINE_ERROR_CODE err = ENGINE_KEY_ENOENT;
1793- auto stream = findStream (vbucket);
1794- if (stream && stream->getOpaque () == opaque && stream->isActive ()) {
1795- try {
1796- err = stream->messageReceived (std::make_unique<AbortSyncWrite>(
1797- opaque, vbucket, key, prepareSeqno, abortSeqno));
1798- } catch (const std::bad_alloc&) {
1799- return ENGINE_ENOMEM;
1800- }
1801-
1802- // The item was buffered and will be processed later
1803- if (err == ENGINE_TMPFAIL) {
1804- notifyVbucketReady (vbucket);
1805- ufc.release ();
1806- return ENGINE_SUCCESS;
1807- }
1808- }
1809-
1810- return err;
1708+ auto msg = std::make_unique<AbortSyncWrite>(
1709+ opaque, vbucket, key, prepareSeqno, abortSeqno);
1710+ return lookupStreamAndDispatchMessage (ufc, vbucket, opaque, std::move (msg));
18111711}
18121712
18131713void DcpConsumer::setDisconnect () {
0 commit comments