Skip to content

Commit b9ddaee

Browse files
authored
feat: SHUTDOWN: fast path for NOW/FORCE; unify SAVE/SAFE; support NOSAVE (#5783)
Fixed: #5444
1 parent 3a5ab12 commit b9ddaee

File tree

4 files changed

+141
-11
lines changed

4 files changed

+141
-11
lines changed

src/facade/dragonfly_listener.cc

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ CONFIG_enum(tls_auth_clients, "yes", "", tls_auth_clients_enum, tls_auth_clients
6666

6767
namespace facade {
6868

69+
// See dragonfly_listener.h
70+
std::atomic<bool> g_shutdown_fast{false};
71+
6972
using namespace util;
7073
using util::detail::SafeErrorMessage;
7174

@@ -252,7 +255,12 @@ bool Listener::IsMainInterface() const {
252255
}
253256

254257
void Listener::PreShutdown() {
255-
// Iterate on all connections and allow them to finish their commands for
258+
// If NOW/FORCE requested, expedite shutdown without waiting.
259+
if (g_shutdown_fast.load(std::memory_order_acquire)) {
260+
return;
261+
}
262+
263+
// Otherwise: Iterate on all connections and allow them to finish their commands for
256264
// a short period.
257265
// Executed commands can be visible in snapshots or replicas, but if we close the client
258266
// connections too fast we might not send the acknowledgment for those commands.

src/facade/dragonfly_listener.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <absl/base/internal/spinlock.h>
88
#include <absl/time/time.h>
99

10+
#include <atomic>
1011
#include <memory>
1112
#include <system_error>
1213
#include <utility>
@@ -107,4 +108,9 @@ class DispatchTracker {
107108
bool ignore_blocked_;
108109
};
109110

111+
// Global shutdown tuning flag, controlled by SHUTDOWN options.
112+
// When true, listeners perform expedited shutdown without waiting for
113+
// in-flight dispatches (used by NOW/FORCE).
114+
extern std::atomic<bool> g_shutdown_fast;
115+
110116
} // namespace facade

src/server/server_family.cc

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3781,24 +3781,60 @@ void ServerFamily::Latency(CmdArgList args, const CommandContext& cmd_cntx) {
37813781
}
37823782

37833783
void ServerFamily::ShutdownCmd(CmdArgList args, const CommandContext& cmd_cntx) {
3784-
if (args.size() > 1) {
3784+
// Supported options (case-insensitive):
3785+
// SAVE | NOSAVE, NOW, FORCE, ABORT, SAFE (Valkey-specific, the same as SAVE in Dragonfly)
3786+
enum ShutBits : uint32_t {
3787+
SB_SAVE = 1u << 0,
3788+
SB_NOSAVE = 1u << 1,
3789+
SB_NOW = 1u << 2,
3790+
SB_FORCE = 1u << 3,
3791+
SB_ABORT = 1u << 4,
3792+
};
3793+
3794+
uint32_t sb = 0;
3795+
3796+
CmdArgParser parser(args);
3797+
while (parser.HasNext()) {
3798+
// Map SAFE to SAVE directly (fallthrough behavior)
3799+
ShutBits opt = parser.MapNext("SAVE", SB_SAVE, "NOSAVE", SB_NOSAVE, "NOW", SB_NOW, "FORCE",
3800+
SB_FORCE, "ABORT", SB_ABORT, "SAFE", SB_SAVE);
3801+
sb |= static_cast<uint32_t>(opt);
3802+
}
3803+
3804+
if (auto perr = parser.TakeError(); perr) {
3805+
return cmd_cntx.rb->SendError(perr.MakeReply());
3806+
}
3807+
3808+
// Conflicting toggles
3809+
if ((sb & SB_SAVE) && (sb & SB_NOSAVE)) {
37853810
cmd_cntx.rb->SendError(kSyntaxErr);
37863811
return;
37873812
}
37883813

3789-
if (args.size() == 1) {
3790-
auto sub_cmd = ArgS(args, 0);
3791-
if (absl::EqualsIgnoreCase(sub_cmd, "SAVE")) {
3792-
} else if (absl::EqualsIgnoreCase(sub_cmd, "NOSAVE")) {
3793-
save_on_shutdown_ = false;
3794-
} else {
3795-
cmd_cntx.rb->SendError(kSyntaxErr);
3796-
return;
3797-
}
3814+
if (sb & SB_ABORT) {
3815+
// We currently do not support aborting an in-progress shutdown sequence.
3816+
cmd_cntx.rb->SendError("SHUTDOWN ABORT is not supported");
3817+
return;
37983818
}
37993819

3820+
// Configure save behavior on shutdown according to options.
3821+
if (sb & SB_FORCE) {
3822+
// FORCE implies no snapshot on shutdown regardless of SAVE/SAFE
3823+
save_on_shutdown_ = false;
3824+
} else if (sb & SB_NOSAVE) {
3825+
save_on_shutdown_ = false;
3826+
} else if (sb & SB_SAVE) {
3827+
save_on_shutdown_ = true;
3828+
}
3829+
3830+
// Wire NOW/FORCE to a single fast-shutdown flag for listeners.
3831+
facade::g_shutdown_fast.store((sb & (SB_NOW | SB_FORCE)) != 0, std::memory_order_seq_cst);
3832+
38003833
CHECK_NOTNULL(acceptor_)->Stop();
38013834
cmd_cntx.rb->SendOk();
3835+
3836+
// Reset flag for any subsequent restarts (mainly for tests).
3837+
facade::g_shutdown_fast.store(false, std::memory_order_seq_cst);
38023838
}
38033839

38043840
void ServerFamily::Dfly(CmdArgList args, const CommandContext& cmd_cntx) {

tests/dragonfly/shutdown_test.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,3 +54,83 @@ async def delayed_takeover():
5454
assert acknowleged_value == int(value_from_snapshot)
5555

5656
await client.connection_pool.disconnect()
57+
58+
59+
@dfly_args({"proactor_threads": "2"})
60+
class TestShutdownOptions:
61+
@pytest.mark.asyncio
62+
async def test_shutdown_abort_and_invalid_option(self, df_factory):
63+
df_args = {"dbfilename": "dump", **BASIC_ARGS, "port": 1121}
64+
df_server = df_factory.create(**df_args)
65+
df_server.start()
66+
67+
client = aioredis.Redis(port=df_server.port)
68+
69+
# ABORT should be rejected and server should remain responsive
70+
with pytest.raises(redis.exceptions.ResponseError):
71+
await client.execute_command("SHUTDOWN ABORT")
72+
73+
pong = await client.ping()
74+
assert pong is True
75+
76+
# Invalid option -> syntax error
77+
with pytest.raises(redis.exceptions.ResponseError):
78+
await client.execute_command("SHUTDOWN FOO")
79+
80+
await client.connection_pool.disconnect()
81+
df_server.stop()
82+
83+
@pytest.mark.asyncio
84+
async def test_shutdown_safe_persists_snapshot(self, df_factory, tmp_path):
85+
# Ensure snapshot dir exists and is used
86+
snap_dir = tmp_path
87+
df_args = {"dbfilename": "dump", "dir": str(snap_dir) + "/", "port": 1122}
88+
89+
df_server = df_factory.create(**df_args)
90+
df_server.start()
91+
92+
client = aioredis.Redis(port=df_server.port)
93+
await client.set("safe_key", "safe_value")
94+
95+
# SHUTDOWN SAFE should save synchronously and then stop
96+
try:
97+
await client.execute_command("SHUTDOWN SAFE")
98+
except Exception:
99+
# Connection may be dropped as part of shutdown; this is acceptable
100+
pass
101+
102+
await client.connection_pool.disconnect()
103+
104+
# Restart and verify data persisted
105+
df_server.start()
106+
client = aioredis.Redis(port=df_server.port)
107+
val = await client.get("safe_key")
108+
assert val == b"safe_value"
109+
await client.connection_pool.disconnect()
110+
df_server.stop()
111+
112+
@pytest.mark.asyncio
113+
async def test_shutdown_save_persists_snapshot(self, df_factory, tmp_path):
114+
# SAVE should follow the same synchronous path as SAFE
115+
snap_dir = tmp_path
116+
df_args = {"dbfilename": "dump", "dir": str(snap_dir) + "/", "port": 1123}
117+
118+
df_server = df_factory.create(**df_args)
119+
df_server.start()
120+
121+
client = aioredis.Redis(port=df_server.port)
122+
await client.set("save_key", "save_value")
123+
124+
try:
125+
await client.execute_command("SHUTDOWN SAVE")
126+
except Exception:
127+
pass
128+
129+
await client.connection_pool.disconnect()
130+
131+
df_server.start()
132+
client = aioredis.Redis(port=df_server.port)
133+
val = await client.get("save_key")
134+
assert val == b"save_value"
135+
await client.connection_pool.disconnect()
136+
df_server.stop()

0 commit comments

Comments
 (0)