diff --git a/src/server/debug_sync_point.h b/src/server/debug_sync_point.h new file mode 100644 index 000000000000..91cdf56b9e45 --- /dev/null +++ b/src/server/debug_sync_point.h @@ -0,0 +1,44 @@ +// Copyright 205, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#include + +#include "util/fibers/synchronization.h" + +namespace dfly { + +#ifdef NDEBUG +#define DEBUG_SYNC(n, f) +#else +#define DEBUG_SYNC(n, f) \ + { \ + if (debug_sync_point.Find(n)) \ + f(); \ + } +#endif + +class DebugSyncPoint { + public: + void Add(std::string_view name) { + util::fb2::LockGuard lk(mu_); + sync_points_.emplace(name); + } + + void Del(std::string_view name) { + util::fb2::LockGuard lk(mu_); + sync_points_.erase(name); + } + + bool Find(std::string_view name) { + return sync_points_.find(name) != sync_points_.end(); + } + + private: + absl::flat_hash_set sync_points_; + util::fb2::Mutex mu_; +}; + +inline DebugSyncPoint debug_sync_point; + +} // namespace dfly diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 8fede1703665..68447b2e36b3 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -33,6 +33,7 @@ extern "C" { #include "facade/cmd_arg_parser.h" #include "server/blocking_controller.h" #include "server/container_utils.h" +#include "server/debug_sync_point.h" #include "server/engine_shard_set.h" #include "server/error.h" #include "server/main_service.h" @@ -41,6 +42,7 @@ extern "C" { #include "server/server_state.h" #include "server/string_family.h" #include "server/transaction.h" + using namespace std; ABSL_DECLARE_FLAG(string, dir); @@ -657,6 +659,8 @@ void DebugCmd::Run(CmdArgList args, facade::SinkReplyBuilder* builder) { " per second.", "SEGMENTS", " Prints segment info for the current database.", + "SYNC ADD [sync_name] | DEL [sync_name]", + " Enable or disable debug sync point.", "HELP", " Prints this help.", }; @@ -741,6 +745,11 @@ void DebugCmd::Run(CmdArgList args, facade::SinkReplyBuilder* builder) { if (subcmd == "SEGMENTS") { return Segments(args.subspan(1), builder); } + + if (subcmd == "SYNC") { + return Sync(args, builder); + } + string reply = UnknownSubCmd(subcmd, "DEBUG"); return builder->SendError(reply, kSyntaxErrType); } @@ -984,6 +993,48 @@ void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t num_of_keys, }); } +// SYNC arguments format: +// [ADD sync_point_name | DEL sync_point_name] +std::optional DebugCmd::ParseSyncArgs(CmdArgList args, + facade::SinkReplyBuilder* builder) { + CmdArgParser parser(args.subspan(1)); + SyncOptions options; + + while (parser.HasNext()) { + SyncFlags flag = parser.MapNext("ADD", SYNC_ADD, "DEL", SYNC_DEL); + switch (flag) { + case SYNC_ADD: + case SYNC_DEL: + options.sync_point_name = parser.Next(); + options.sync_action = flag; + break; + default: + LOG(FATAL) << "Unexpected flag in PopulateArgs. Args: " << args; + break; + } + } + if (parser.HasError()) { + builder->SendError(parser.TakeError().MakeReply()); + return nullopt; + } + return options; +} + +void DebugCmd::Sync(CmdArgList args, facade::SinkReplyBuilder* builder) { + optional options = ParseSyncArgs(args, builder); + if (!options.has_value()) { + return; + } + + if (options->sync_action == SYNC_ADD) { + debug_sync_point.Add(options->sync_point_name); + } else { + debug_sync_point.Del(options->sync_point_name); + } + + builder->SendOk(); +} + void DebugCmd::Exec(facade::SinkReplyBuilder* builder) { EngineShardSet& ess = *shard_set; fb2::Mutex mu; diff --git a/src/server/debugcmd.h b/src/server/debugcmd.h index 141fe3643357..cc63b719ae39 100644 --- a/src/server/debugcmd.h +++ b/src/server/debugcmd.h @@ -30,6 +30,12 @@ class DebugCmd { std::optional> expire_ttl_range; }; + enum SyncFlags { SYNC_ADD, SYNC_DEL }; + struct SyncOptions { + std::string sync_point_name; + SyncFlags sync_action; + }; + public: DebugCmd(ServerFamily* owner, cluster::ClusterFamily* cf, ConnectionContext* cntx); @@ -43,6 +49,10 @@ class DebugCmd { facade::SinkReplyBuilder* builder); void PopulateRangeFiber(uint64_t from, uint64_t count, const PopulateOptions& opts); + static std::optional ParseSyncArgs(CmdArgList args, + facade::SinkReplyBuilder* builder); + void Sync(CmdArgList args, facade::SinkReplyBuilder* builder); + void Reload(CmdArgList args, facade::SinkReplyBuilder* builder); void Replica(CmdArgList args, facade::SinkReplyBuilder* builder); void Migration(CmdArgList args, facade::SinkReplyBuilder* builder); diff --git a/src/server/string_family.cc b/src/server/string_family.cc index cedca8c3d315..e4769df23098 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -26,6 +26,7 @@ #include "server/command_registry.h" #include "server/common.h" #include "server/conn_context.h" +#include "server/debug_sync_point.h" #include "server/engine_shard_set.h" #include "server/error.h" #include "server/family_utils.h" @@ -1178,6 +1179,9 @@ void StringFamily::SetNx(CmdArgList args, const CommandContext& cmnd_cntx) { void StringFamily::Get(CmdArgList args, const CommandContext& cmnd_cntx) { auto cb = [key = ArgS(args, 0)](Transaction* tx, EngineShard* es) -> OpResult { auto it_res = tx->GetDbSlice(es->shard_id()).FindReadOnly(tx->GetDbContext(), key, OBJ_STRING); + + DEBUG_SYNC("return_get_key_not_found", [&]() { it_res = OpStatus::KEY_NOTFOUND; }) + if (!it_res.ok()) return it_res.status(); diff --git a/tests/dragonfly/generic_test.py b/tests/dragonfly/generic_test.py index e6204dfc9821..a1eb7db757b8 100644 --- a/tests/dragonfly/generic_test.py +++ b/tests/dragonfly/generic_test.py @@ -342,3 +342,28 @@ async def test_key_bump_ups(df_factory): new_slot_id = int(dict(map(lambda s: s.split(":"), debug_key_info.split()))["slot"]) assert new_slot_id + 1 == slot_id break + + +@pytest.mark.debug_only +@pytest.mark.asyncio +async def test_debug_sync(df_factory): + df_server = df_factory.create() + df_server.start() + client = df_server.client() + + value = "123" + + # Should return key + await client.execute_command(f"SET key {value}") + result = await client.get("key") + assert result == value + + # Enable debug sync point to return KEY_NOTFOUND + await client.execute_command("DEBUG SYNC ADD return_get_key_not_found") + result = await client.get("key") + assert result == None + + # Disable - should return key + await client.execute_command("DEBUG SYNC DEL return_get_key_not_found") + result = await client.get("key") + assert result == value