Skip to content

Commit b8aa082

Browse files
jameseh96paolococchi
authored andcommitted
MB-38283: Stop PassiveStream repeatedly logging ENOMEM
PassiveStream would previously log each time it tried to processMessage, quickly filling logs. Make it instead log when first backing off, and when it resumes processing. Change-Id: Icc73ca9429a307140882a083b2c435bc489379ed Reviewed-on: http://review.couchbase.org/c/kv_engine/+/142428 Well-Formed: Build Bot <[email protected]> Reviewed-by: Paolo Cocchi <[email protected]> Tested-by: Paolo Cocchi <[email protected]>
1 parent 1dfec49 commit b8aa082

File tree

2 files changed

+70
-14
lines changed

2 files changed

+70
-14
lines changed

engines/ep/src/dcp/stream.cc

Lines changed: 48 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2506,17 +2506,24 @@ ENGINE_ERROR_CODE PassiveStream::processMutation(MutationResponse* mutation) {
25062506
}
25072507

25082508
if (ret != ENGINE_SUCCESS) {
2509-
log(EXTENSION_LOG_WARNING,
2510-
"vb:%" PRIu16
2511-
" Got error '%s' while trying to process "
2512-
"mutation with seqno:%" PRId64,
2513-
vb_,
2514-
cb::to_string(cb::to_engine_errc(ret)).c_str(),
2515-
mutation->getItem()->getBySeqno());
2509+
// ENOMEM logging is handled by maybeLogMemoryState
2510+
if (ret != ENGINE_ENOMEM) {
2511+
log(EXTENSION_LOG_WARNING,
2512+
"vb:%" PRIu16
2513+
" Got error '%s' while trying to process "
2514+
"mutation with seqno:%" PRId64,
2515+
vb_,
2516+
cb::to_string(cb::to_engine_errc(ret)).c_str(),
2517+
mutation->getItem()->getBySeqno());
2518+
}
25162519
} else {
25172520
handleSnapshotEnd(vb, *mutation->getBySeqno());
25182521
}
25192522

2523+
maybeLogMemoryState(cb::to_engine_errc(ret),
2524+
"mutation",
2525+
mutation->getItem()->getBySeqno());
2526+
25202527
return ret;
25212528
}
25222529

@@ -2588,17 +2595,23 @@ ENGINE_ERROR_CODE PassiveStream::processDeletion(MutationResponse* deletion) {
25882595
}
25892596

25902597
if (ret != ENGINE_SUCCESS) {
2591-
log(EXTENSION_LOG_WARNING,
2592-
"vb:%" PRIu16
2593-
" Got error '%s' while trying to process "
2594-
"deletion with seqno:%" PRId64,
2595-
vb_,
2596-
cb::to_string(cb::to_engine_errc(ret)).c_str(),
2597-
*deletion->getBySeqno());
2598+
// ENOMEM logging is handled by maybeLogMemoryState
2599+
if (ret != ENGINE_ENOMEM) {
2600+
log(EXTENSION_LOG_WARNING,
2601+
"vb:%" PRIu16
2602+
" Got error '%s' while trying to process "
2603+
"deletion with seqno:%" PRId64,
2604+
vb_,
2605+
cb::to_string(cb::to_engine_errc(ret)).c_str(),
2606+
*deletion->getBySeqno());
2607+
}
25982608
} else {
25992609
handleSnapshotEnd(vb, *deletion->getBySeqno());
26002610
}
26012611

2612+
maybeLogMemoryState(
2613+
cb::to_engine_errc(ret), "deletion", *deletion->getBySeqno());
2614+
26022615
return ret;
26032616
}
26042617

@@ -2916,6 +2929,27 @@ void PassiveStream::log(EXTENSION_LOG_LEVEL severity,
29162929
va_end(va);
29172930
}
29182931

2932+
void PassiveStream::maybeLogMemoryState(cb::engine_errc status,
2933+
const std::string& msgType,
2934+
int64_t seqno) {
2935+
bool previousNoMem = isNoMemory.load();
2936+
if (status == cb::engine_errc::no_memory && !previousNoMem) {
2937+
log(EXTENSION_LOG_WARNING,
2938+
"vb:%" PRIu16
2939+
" Got error '%s' while trying to process %s with seqno:%" PRId64,
2940+
vb_,
2941+
cb::to_string(status).c_str(),
2942+
msgType.c_str(),
2943+
seqno);
2944+
isNoMemory.store(true);
2945+
} else if (status == cb::engine_errc::success && previousNoMem) {
2946+
log(EXTENSION_LOG_INFO,
2947+
"vb:%" PRIu16 " PassiveStream resuming after no-memory backoff",
2948+
vb_);
2949+
isNoMemory.store(false);
2950+
}
2951+
}
2952+
29192953
void PassiveStream::notifyStreamReady() {
29202954
auto consumer = consumerPtr.lock();
29212955
if (!consumer) {

engines/ep/src/dcp/stream.h

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -813,6 +813,24 @@ class PassiveStream : public Stream {
813813

814814
void log(EXTENSION_LOG_LEVEL severity, const char* fmt, ...) const override;
815815

816+
/**
817+
* Log when the stream backs off or resumes processing items.
818+
*
819+
* Determines if the stream has backed off or resumed based on
820+
* the passed status. Assumes no_memory indicates the stream will
821+
* backoff, and that success indicates the stream has resumed
822+
* processing.
823+
*
824+
* @param status last error code returned by the engine when processing
825+
* a message
826+
* @param msgType string indicating the type of the last message processed
827+
* ("mutation", "deletion", "expiration", "prepare")
828+
* @param seqno seqno of last message processed
829+
*/
830+
void maybeLogMemoryState(cb::engine_errc status,
831+
const std::string& msgType,
832+
int64_t seqno);
833+
816834
/**
817835
* Notifies the consumer connection that the stream has items ready to be
818836
* pick up.
@@ -887,6 +905,10 @@ class PassiveStream : public Stream {
887905
* messages in the buffer.
888906
*/
889907
std::function<void()> processBufferedMessages_postFront_Hook;
908+
909+
// Flag indicating if the most recent call to processMessage
910+
// backed off due to ENOMEM. Only used for limiting logging
911+
std::atomic<bool> isNoMemory{false};
890912
};
891913

892914
#endif // SRC_DCP_STREAM_H_

0 commit comments

Comments
 (0)