Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions src/server/debug_sync_point.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 205, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

#include <absl/container/flat_hash_set.h>

#include "util/fibers/synchronization.h"

namespace dfly {

#ifdef NDEBUG
#define DEBUG_SYNC(n, f)
#else
#define DEBUG_SYNC(n, f) \
{ \
if (debug_sync_point.Find(n)) \
f(); \
}
#endif

class DebugSyncPoint {
public:
void Add(std::string_view name) {
util::fb2::LockGuard lk(mu_);
sync_points_.emplace(name);
}

void Del(std::string_view name) {
util::fb2::LockGuard lk(mu_);
sync_points_.erase(name);
}

bool Find(std::string_view name) {
return sync_points_.find(name) != sync_points_.end();
}

private:
absl::flat_hash_set<std::string> sync_points_;
util::fb2::Mutex mu_;
};

inline DebugSyncPoint debug_sync_point;

} // namespace dfly
51 changes: 51 additions & 0 deletions src/server/debugcmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ extern "C" {
#include "facade/cmd_arg_parser.h"
#include "server/blocking_controller.h"
#include "server/container_utils.h"
#include "server/debug_sync_point.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/main_service.h"
Expand All @@ -41,6 +42,7 @@ extern "C" {
#include "server/server_state.h"
#include "server/string_family.h"
#include "server/transaction.h"

using namespace std;

ABSL_DECLARE_FLAG(string, dir);
Expand Down Expand Up @@ -657,6 +659,8 @@ void DebugCmd::Run(CmdArgList args, facade::SinkReplyBuilder* builder) {
" per second.",
"SEGMENTS",
" Prints segment info for the current database.",
"SYNC ADD [sync_name] | DEL [sync_name]",
" Enable or disable debug sync point.",
"HELP",
" Prints this help.",
};
Expand Down Expand Up @@ -741,6 +745,11 @@ void DebugCmd::Run(CmdArgList args, facade::SinkReplyBuilder* builder) {
if (subcmd == "SEGMENTS") {
return Segments(args.subspan(1), builder);
}

if (subcmd == "SYNC") {
return Sync(args, builder);
}

string reply = UnknownSubCmd(subcmd, "DEBUG");
return builder->SendError(reply, kSyntaxErrType);
}
Expand Down Expand Up @@ -984,6 +993,48 @@ void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t num_of_keys,
});
}

// SYNC arguments format:
// [ADD sync_point_name | DEL sync_point_name]
std::optional<DebugCmd::SyncOptions> DebugCmd::ParseSyncArgs(CmdArgList args,
facade::SinkReplyBuilder* builder) {
CmdArgParser parser(args.subspan(1));
SyncOptions options;

while (parser.HasNext()) {
SyncFlags flag = parser.MapNext("ADD", SYNC_ADD, "DEL", SYNC_DEL);
switch (flag) {
case SYNC_ADD:
case SYNC_DEL:
options.sync_point_name = parser.Next<string_view>();
options.sync_action = flag;
break;
default:
LOG(FATAL) << "Unexpected flag in PopulateArgs. Args: " << args;
break;
}
}
if (parser.HasError()) {
builder->SendError(parser.TakeError().MakeReply());
return nullopt;
}
return options;
}

void DebugCmd::Sync(CmdArgList args, facade::SinkReplyBuilder* builder) {
optional<SyncOptions> options = ParseSyncArgs(args, builder);
if (!options.has_value()) {
return;
}

if (options->sync_action == SYNC_ADD) {
debug_sync_point.Add(options->sync_point_name);
} else {
debug_sync_point.Del(options->sync_point_name);
}

builder->SendOk();
}

void DebugCmd::Exec(facade::SinkReplyBuilder* builder) {
EngineShardSet& ess = *shard_set;
fb2::Mutex mu;
Expand Down
10 changes: 10 additions & 0 deletions src/server/debugcmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ class DebugCmd {
std::optional<std::pair<uint32_t, uint32_t>> expire_ttl_range;
};

enum SyncFlags { SYNC_ADD, SYNC_DEL };
struct SyncOptions {
std::string sync_point_name;
SyncFlags sync_action;
};

public:
DebugCmd(ServerFamily* owner, cluster::ClusterFamily* cf, ConnectionContext* cntx);

Expand All @@ -43,6 +49,10 @@ class DebugCmd {
facade::SinkReplyBuilder* builder);
void PopulateRangeFiber(uint64_t from, uint64_t count, const PopulateOptions& opts);

static std::optional<SyncOptions> ParseSyncArgs(CmdArgList args,
facade::SinkReplyBuilder* builder);
void Sync(CmdArgList args, facade::SinkReplyBuilder* builder);

void Reload(CmdArgList args, facade::SinkReplyBuilder* builder);
void Replica(CmdArgList args, facade::SinkReplyBuilder* builder);
void Migration(CmdArgList args, facade::SinkReplyBuilder* builder);
Expand Down
4 changes: 4 additions & 0 deletions src/server/string_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "server/command_registry.h"
#include "server/common.h"
#include "server/conn_context.h"
#include "server/debug_sync_point.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/family_utils.h"
Expand Down Expand Up @@ -1178,6 +1179,9 @@ void StringFamily::SetNx(CmdArgList args, const CommandContext& cmnd_cntx) {
void StringFamily::Get(CmdArgList args, const CommandContext& cmnd_cntx) {
auto cb = [key = ArgS(args, 0)](Transaction* tx, EngineShard* es) -> OpResult<StringResult> {
auto it_res = tx->GetDbSlice(es->shard_id()).FindReadOnly(tx->GetDbContext(), key, OBJ_STRING);

DEBUG_SYNC("return_get_key_not_found", [&]() { it_res = OpStatus::KEY_NOTFOUND; })

if (!it_res.ok())
return it_res.status();

Expand Down
25 changes: 25 additions & 0 deletions tests/dragonfly/generic_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,3 +342,28 @@ async def test_key_bump_ups(df_factory):
new_slot_id = int(dict(map(lambda s: s.split(":"), debug_key_info.split()))["slot"])
assert new_slot_id + 1 == slot_id
break


@pytest.mark.debug_only
@pytest.mark.asyncio
async def test_debug_sync(df_factory):
df_server = df_factory.create()
df_server.start()
client = df_server.client()

value = "123"

# Should return key
await client.execute_command(f"SET key {value}")
result = await client.get("key")
assert result == value

# Enable debug sync point to return KEY_NOTFOUND
await client.execute_command("DEBUG SYNC ADD return_get_key_not_found")
result = await client.get("key")
assert result == None

# Disable - should return key
await client.execute_command("DEBUG SYNC DEL return_get_key_not_found")
result = await client.get("key")
assert result == value
Loading