Skip to content

Commit ad7fd2c

Browse files
authored
fix(replication): add socket timeout to REPLTAKEOVER command (#6070)
* fix(replication): add socket timeout to REPLTAKEOVER command * fix: merge conflicts
1 parent 62e9aa1 commit ad7fd2c

File tree

2 files changed

+72
-1
lines changed

2 files changed

+72
-1
lines changed

src/server/replica.cc

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,9 +203,24 @@ void Replica::Pause(bool pause) {
203203
std::error_code Replica::TakeOver(std::string_view timeout, bool save_flag) {
204204
VLOG(1) << "Taking over";
205205

206+
// Parse timeout value for socket timeout
207+
int timeout_sec = 0;
208+
if (!absl::SimpleAtoi(timeout, &timeout_sec)) {
209+
return make_error_code(errc::invalid_argument);
210+
}
211+
206212
std::error_code ec;
207213
auto takeOverCmd = absl::StrCat("TAKEOVER ", timeout, (save_flag ? " SAVE" : ""));
208-
Proactor()->Await([this, &ec, cmd = std::move(takeOverCmd)] { ec = SendNextPhaseRequest(cmd); });
214+
Proactor()->Await([this, &ec, cmd = std::move(takeOverCmd), timeout_sec] {
215+
// Set socket timeout to prevent hanging on unresponsive master
216+
// Add buffer time for master processing (timeout + 10 seconds)
217+
auto prev_timeout = Sock()->timeout();
218+
Sock()->set_timeout((timeout_sec + 10) * 1000); // milliseconds
219+
220+
ec = SendNextPhaseRequest(cmd);
221+
222+
Sock()->set_timeout(prev_timeout);
223+
});
209224

210225
// If we successfully taken over, return and let server_family stop the replication.
211226
return ec;

tests/dragonfly/replication_test.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1+
import os
12
import platform
23
import shutil
4+
import signal
35
import tarfile
6+
import time
47
import urllib.request
58
from itertools import chain, repeat
69

@@ -3603,3 +3606,56 @@ async def test_takeover_bug_wrong_replica_checked_in_logs(df_factory):
36033606
assert (
36043607
str(replicas[0].port) not in log
36053608
), f"BUG: Checked initiating replica {replicas[0].port} instead of others"
3609+
3610+
3611+
@pytest.mark.slow
3612+
async def test_takeover_timeout_on_unresponsive_master(df_factory):
3613+
master = df_factory.create(proactor_threads=4)
3614+
replica = df_factory.create(proactor_threads=2)
3615+
df_factory.start_all([master, replica])
3616+
3617+
c_master = master.client()
3618+
c_replica = replica.client()
3619+
3620+
# Setup replication
3621+
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
3622+
await wait_available_async(c_replica)
3623+
3624+
# Write some data
3625+
for i in range(10):
3626+
await c_master.set(f"key{i}", f"val{i}")
3627+
await asyncio.sleep(0.2)
3628+
3629+
# PAUSE master process (SIGSTOP) - socket stays open but doesn't respondExpand commentComment on line R3629Code has comments. Press enter to view.
3630+
os.kill(master.proc.pid, signal.SIGSTOP)
3631+
logging.info(f"Paused master process {master.proc.pid}")
3632+
3633+
# Try takeover with 5 second timeout
3634+
# BUG: This will hang forever because SendNextPhaseRequest has no timeout
3635+
# FIXED: Should return error within ~15 seconds (5 + buffer)
3636+
start_time = time.time()
3637+
try:
3638+
await asyncio.wait_for(
3639+
c_replica.execute_command("REPLTAKEOVER 5"),
3640+
timeout=20, # Should complete within 20 seconds
3641+
)
3642+
elapsed = time.time() - start_time
3643+
logging.info(f"Takeover completed in {elapsed:.1f}s")
3644+
except asyncio.TimeoutError:
3645+
elapsed = time.time() - start_time
3646+
pytest.fail(
3647+
f"BUG: REPLTAKEOVER hung for {elapsed:.1f}s without timeout. "
3648+
f"SendNextPhaseRequest in replica.cc has no socket timeout."
3649+
)
3650+
except Exception as e:
3651+
# Expected: connection error or timeout error
3652+
elapsed = time.time() - start_time
3653+
logging.info(f"Takeover failed after {elapsed:.1f}s: {e}")
3654+
# Should fail quickly, not hang
3655+
assert elapsed < 20, f"Took too long: {elapsed:.1f}s"
3656+
finally:
3657+
# Resume master so it can be stopped properly
3658+
try:
3659+
os.kill(master.proc.pid, signal.SIGCONT)
3660+
except Exception:
3661+
pass

0 commit comments

Comments
 (0)