@@ -270,18 +270,23 @@ class EWB_Engine : public EngineIface, public DcpIface {
270270 }
271271
272272 const bool inject = iter->second .second ->should_inject_error (cmd, err);
273- const bool add_to_pending_io_ops = iter->second .second ->add_to_pending_io_ops ();
274273
275274 if (inject) {
276275 LOG_DEBUG (" EWB_Engine: injecting error:{} for cmd:{}" ,
277276 err,
278277 to_string (cmd));
279278
280- if (err == ENGINE_EWOULDBLOCK && add_to_pending_io_ops) {
281- // The server expects that if EWOULDBLOCK is returned then the
282- // server should be notified in the future when the operation is
283- // ready - so add this op to the pending IO queue.
284- schedule_notification (iter->second .first );
279+ if (err == ENGINE_EWOULDBLOCK) {
280+ const auto add_to_pending_io_ops =
281+ iter->second .second ->add_to_pending_io_ops ();
282+ if (add_to_pending_io_ops) {
283+ // The server expects that if EWOULDBLOCK is returned then
284+ // the server should be notified in the future when the
285+ // operation is ready - so add this op to the pending IO
286+ // queue.
287+ schedule_notification (iter->second .first ,
288+ *add_to_pending_io_ops);
289+ }
285290 }
286291 }
287292
@@ -582,9 +587,19 @@ class EWB_Engine : public EngineIface, public DcpIface {
582587 new_mode = std::make_shared<ErrOnFirst>(injected_error);
583588 break ;
584589
585- case EWBEngineMode::Sequence:
586- new_mode = std::make_shared<ErrSequence>(injected_error, value);
590+ case EWBEngineMode::Sequence: {
591+ std::vector<cb::engine_errc> decoded;
592+ for (unsigned int ii = 0 ;
593+ ii < key.size () / sizeof (cb::engine_errc);
594+ ii++) {
595+ auto status = *reinterpret_cast <const uint32_t *>(
596+ key.data () + (ii * sizeof (cb::engine_errc)));
597+ status = ntohl (status);
598+ decoded.emplace_back (cb::engine_errc (status));
599+ }
600+ new_mode = std::make_shared<ErrSequence>(decoded);
587601 break ;
602+ }
588603
589604 case EWBEngineMode::No_Notify:
590605 new_mode = std::make_shared<ErrOnNoNotify>(injected_error);
@@ -1023,7 +1038,11 @@ class EWB_Engine : public EngineIface, public DcpIface {
10231038 // thread processing pending io ops.
10241039 std::mutex mutex;
10251040 std::condition_variable condvar;
1026- std::queue<const void *> pending_io_ops;
1041+ struct PendingIO {
1042+ const void * cookie;
1043+ ENGINE_ERROR_CODE status;
1044+ };
1045+ std::queue<PendingIO> pending_io_ops;
10271046
10281047 std::atomic<bool > stop_notification_thread;
10291048
@@ -1034,9 +1053,14 @@ class EWB_Engine : public EngineIface, public DcpIface {
10341053 FaultInjectMode (ENGINE_ERROR_CODE injected_error_)
10351054 : injected_error(injected_error_) {}
10361055
1037- virtual bool add_to_pending_io_ops () {
1038- return true ;
1056+ // In the event of injecting an EWOULDBLOCK error, should the connection
1057+ // be added to the pending_io_ops (and subsequently notified)?
1058+ // @returns empty if shouldn't be added, otherwise contains the
1059+ // status code to notify with.
1060+ virtual boost::optional<ENGINE_ERROR_CODE> add_to_pending_io_ops () {
1061+ return ENGINE_SUCCESS;
10391062 }
1063+
10401064 virtual bool should_inject_error (Cmd cmd, ENGINE_ERROR_CODE& err) = 0;
10411065
10421066 virtual std::string to_string () const = 0;
@@ -1130,36 +1154,88 @@ class EWB_Engine : public EngineIface, public DcpIface {
11301154 uint32_t percentage_to_err;
11311155 };
11321156
1157+ /* *
1158+ * Injects a sequence of error codes for each call to should_inject_error().
1159+ * If the end of the given sequence is reached, then throws
1160+ * std::logic_error.
1161+ *
1162+ * cb::mcbp::Status::ReservedUserStart can be used to specify that the
1163+ * no error is injected (the original status code is returned unchanged).
1164+ */
11331165 class ErrSequence : public FaultInjectMode {
11341166 public:
1167+ /* *
1168+ * Construct with a sequence of the specified error, or the 'normal'
1169+ * status code.
1170+ */
11351171 ErrSequence (ENGINE_ERROR_CODE injected_error_, uint32_t sequence_)
1136- : FaultInjectMode(injected_error_),
1172+ : FaultInjectMode(injected_error_) {
1173+ for (int ii = 0 ; ii < 32 ; ii++) {
1174+ if ((sequence_ & (1 << ii)) != 0 ) {
1175+ sequence.push_back (cb::engine_errc (injected_error_));
1176+ } else {
1177+ sequence.push_back (cb::engine_errc (-1 ));
1178+ }
1179+ }
1180+ pos = sequence.begin ();
1181+ }
1182+
1183+ /* *
1184+ * Construct with a specific sequence of (potentially different) status
1185+ * codes encoded as vector of cb::engine_errc elements in the
1186+ * request value.
1187+ */
1188+ ErrSequence (std::vector<cb::engine_errc> sequence_)
1189+ : FaultInjectMode(ENGINE_SUCCESS),
11371190 sequence (sequence_),
1138- pos(0 ) {}
1191+ pos(sequence.begin()) {
1192+ }
11391193
11401194 bool should_inject_error (Cmd cmd, ENGINE_ERROR_CODE& err) {
1141- bool inject = false ;
1142- if (pos < 32 ) {
1143- inject = (sequence & ( 1 << pos)) != 0 ;
1144- pos++ ;
1195+ if (pos == sequence. end ()) {
1196+ throw std::logic_error (
1197+ " ErrSequence::should_inject_error() Reached end of "
1198+ " sequence " ) ;
11451199 }
1146- if (inject) {
1147- err = injected_error;
1200+ bool inject = false ;
1201+ if (*pos != cb::engine_errc (-1 )) {
1202+ inject = true ;
1203+ err = ENGINE_ERROR_CODE (*pos);
11481204 }
1205+ pos++;
11491206 return inject;
11501207 }
11511208
1209+ virtual boost::optional<ENGINE_ERROR_CODE> add_to_pending_io_ops () {
1210+ // If this function has been called, should_inject_error() must
1211+ // have returned true. Return the next status code in the sequnce
1212+ // as the result of the pending IO.
1213+ if (pos == sequence.end ()) {
1214+ throw std::logic_error (
1215+ " ErrSequence::add_to_pending_io_ops() Reached end of "
1216+ " sequence" );
1217+ }
1218+
1219+ return ENGINE_ERROR_CODE (*pos++);
1220+ }
1221+
11521222 std::string to_string () const {
11531223 std::stringstream ss;
1154- ss << " ErrSequence inject_error=" << injected_error
1155- << " sequence=0x" << std::hex << sequence
1156- << " pos=" << pos;
1224+ ss << " ErrSequence sequence=[" ;
1225+ for (const auto & err : sequence) {
1226+ if (err == cb::engine_errc (-1 )) {
1227+ ss << " '<passthrough>'," ;
1228+ } else {
1229+ ss << " '" << err << " '," ;
1230+ }
1231+ }
1232+ ss << " ] pos=" << pos - sequence.begin ();
11571233 return ss.str ();
11581234 }
11591235
11601236 private:
1161- uint32_t sequence;
1162- uint32_t pos;
1237+ std::vector<cb::engine_errc> sequence;
1238+ std::vector<cb::engine_errc>::const_iterator pos;
11631239 };
11641240
11651241 class ErrOnNoNotify : public FaultInjectMode {
@@ -1168,7 +1244,10 @@ class EWB_Engine : public EngineIface, public DcpIface {
11681244 : FaultInjectMode(injected_error_),
11691245 issued_return_error (false ) {}
11701246
1171- bool add_to_pending_io_ops () {return false ;}
1247+ boost::optional<ENGINE_ERROR_CODE> add_to_pending_io_ops () {
1248+ return {};
1249+ }
1250+
11721251 bool should_inject_error (Cmd cmd, ENGINE_ERROR_CODE& err) {
11731252 if (!issued_return_error) {
11741253 issued_return_error = true ;
@@ -1282,8 +1361,7 @@ class EWB_Engine : public EngineIface, public DcpIface {
12821361 suspended_map.erase (iter);
12831362 }
12841363
1285-
1286- schedule_notification (cookie);
1364+ schedule_notification (cookie, ENGINE_SUCCESS);
12871365 return true ;
12881366 }
12891367
@@ -1304,10 +1382,10 @@ class EWB_Engine : public EngineIface, public DcpIface {
13041382 return false ;
13051383 }
13061384
1307- void schedule_notification (const void * cookie) {
1385+ void schedule_notification (const void * cookie, ENGINE_ERROR_CODE status ) {
13081386 {
13091387 std::lock_guard<std::mutex> guard (mutex);
1310- pending_io_ops.push (cookie);
1388+ pending_io_ops.push ({ cookie, status} );
13111389 }
13121390 LOG_DEBUG (" EWB_Engine: connection {} should be resumed for engine {}" ,
13131391 (void *)cookie,
@@ -1840,11 +1918,11 @@ void EWB_Engine::process_notifications() {
18401918 return (pending_io_ops.size () > 0 ) || stop_notification_thread;
18411919 });
18421920 while (!pending_io_ops.empty ()) {
1843- const void * cookie = pending_io_ops.front ();
1921+ const auto op = pending_io_ops.front ();
18441922 pending_io_ops.pop ();
18451923 lk.unlock ();
1846- LOG_DEBUG (" EWB_Engine: notify {}" , cookie);
1847- server->cookie ->notify_io_complete (cookie, ENGINE_SUCCESS );
1924+ LOG_DEBUG (" EWB_Engine: notify {} status:{} " , op. cookie , op. status );
1925+ server->cookie ->notify_io_complete (op. cookie , op. status );
18481926 lk.lock ();
18491927 }
18501928 }
0 commit comments