@@ -62,6 +62,14 @@ Stream::~Stream() {
6262 clear_UNLOCKED ();
6363}
6464
65+ bool Stream::isBackfilling () const {
66+ return state_.load () == STREAM_BACKFILLING;
67+ }
68+
69+ bool Stream::isInMemory () const {
70+ return state_.load () == STREAM_IN_MEMORY;
71+ }
72+
6573void Stream::clear_UNLOCKED () {
6674 while (!readyQ.empty ()) {
6775 DcpResponse* resp = readyQ.front ();
@@ -325,12 +333,21 @@ void ActiveStream::markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno) {
325333 lastSentSnapEndSeqno.store (endSeqno, std::memory_order_relaxed);
326334
327335 if (!(flags_ & DCP_ADD_STREAM_FLAG_DISKONLY)) {
328- // Only re-register the cursor if we still need to get memory snapshots
329- CursorRegResult result =
330- vb->checkpointManager .registerCursorBySeqno (
331- name_, chkCursorSeqno,
332- MustSendCheckpointEnd::NO);
333- curChkSeqno = result.first ;
336+ // Only re-register the cursor if we still need to get memory
337+ // snapshots
338+ try {
339+ CursorRegResult result =
340+ vb->checkpointManager .registerCursorBySeqno (
341+ name_, chkCursorSeqno,
342+ MustSendCheckpointEnd::NO);
343+
344+ curChkSeqno = result.first ;
345+ } catch (std::exception& error) {
346+ producer->getLogger ().log (EXTENSION_LOG_WARNING,
347+ " (vb %" PRIu16 " ) Failed to register cursor: %s" ,
348+ vb_, error.what ());
349+ endStream (END_STREAM_STATE);
350+ }
334351 }
335352
336353 lh.unlock ();
@@ -1024,13 +1041,17 @@ void ActiveStream::scheduleBackfill_UNLOCKED(bool reschedule) {
10241041 }
10251042 tryBackfill = true ;
10261043 } else {
1027- CursorRegResult result =
1028- vbucket->checkpointManager .registerCursorBySeqno (
1029- name_,
1030- lastReadSeqno.load (),
1031- MustSendCheckpointEnd::NO);
1032- curChkSeqno = result.first ;
1033- tryBackfill = result.second ;
1044+ try {
1045+ std::tie (curChkSeqno, tryBackfill) =
1046+ vbucket->checkpointManager .registerCursorBySeqno (
1047+ name_, lastReadSeqno.load (),
1048+ MustSendCheckpointEnd::NO);
1049+ } catch (std::exception& error) {
1050+ producer->getLogger ().log (EXTENSION_LOG_WARNING,
1051+ " (vb %" PRIu16 " ) Failed to register "
1052+ " cursor: %s" , vb_, error.what ());
1053+ endStream (END_STREAM_STATE);
1054+ }
10341055
10351056 if (lastReadSeqno.load () > curChkSeqno) {
10361057 throw std::logic_error (" ActiveStream::scheduleBackfill_UNLOCKED: "
@@ -1088,6 +1109,30 @@ void ActiveStream::scheduleBackfill_UNLOCKED(bool reschedule) {
10881109 lastReadSeqno.load (),
10891110 lastSentSeqno.load (), curChkSeqno.load (),
10901111 itemsReady ? " True" : " False" );
1112+
1113+ /* Cursor was dropped, but we will not do backfill.
1114+ * This may happen in a corner case where, the memory usage is high
1115+ * due to other vbuckets and persistence cursor moves ahead of
1116+ * replication cursor to new checkpoint open but does not persist
1117+ * items yet.
1118+ *
1119+ * Because we dropped the cursor but did not do a backfill (and
1120+ * therefore did not re-register a cursor in markDiskSnapshot) we
1121+ * must re-register the cursor here.
1122+ */
1123+ try {
1124+ CursorRegResult result =
1125+ vbucket->checkpointManager .registerCursorBySeqno (
1126+ name_, lastReadSeqno.load (),
1127+ MustSendCheckpointEnd::NO);
1128+
1129+ curChkSeqno = result.first ;
1130+ } catch (std::exception& error) {
1131+ producer->getLogger ().log (EXTENSION_LOG_WARNING,
1132+ " (vb %" PRIu16 " ) Failed to register "
1133+ " cursor: %s" , vb_, error.what ());
1134+ endStream (END_STREAM_STATE);
1135+ }
10911136 }
10921137 if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
10931138 endStream (END_STREAM_OK);
@@ -1097,17 +1142,15 @@ void ActiveStream::scheduleBackfill_UNLOCKED(bool reschedule) {
10971142 transitionState (STREAM_IN_MEMORY);
10981143 }
10991144 if (reschedule) {
1100- /* Cursor was dropped, but we will not do backfill.
1101- This may happen in a corner case where, the memory
1102- usage is high due to other vbuckets and persistence cursor moves
1103- ahead of replication cursor to new checkpoint open but does not
1104- persist items yet.
1105- Note: (1) We must not notify when we schedule backfill for the
1106- first time because the stream is not yet in producer
1107- conn list of streams
1108- (2) It is not absolutely necessary to notify immediately
1109- as conn manager or an incoming items will cause a
1110- notification eventually, but wouldn't hurt to do so */
1145+ /*
1146+ * It is not absolutely necessary to notify immediately as conn
1147+ * manager or an incoming item will cause a notification eventually,
1148+ * but wouldn't hurt to do so.
1149+ *
1150+ * Note: must not notify when we schedule a backfill for the first
1151+ * time (i.e. when reschedule is false) because the stream is not
1152+ * yet in producer conn list of streams.
1153+ */
11111154 bool inverse = false ;
11121155 if (itemsReady.compare_exchange_strong (inverse, true )) {
11131156 producer->notifyStreamReady (vb_);
0 commit comments