Skip to content

Commit 6bb5195

Browse files
authored
chore: non preemptive ProtocolClient destructor (#5927)
* remove preemption from ~ProtocolClient * fix rare race condition last_io_time_ by making it atomic * always dispatch in socket thread for Replica::GetSummary() Signed-off-by: kostas <[email protected]>
1 parent ad0684b commit 6bb5195

File tree

4 files changed

+12
-26
lines changed

4 files changed

+12
-26
lines changed

src/server/cluster/outgoing_slot_migration.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,8 @@ void OutgoingMigration::Finish(const GenericError& error) {
161161
}
162162

163163
bool should_cancel_flows = false;
164+
absl::Cleanup on_exit([this]() { CloseSocket(); });
165+
164166
{
165167
util::fb2::LockGuard lk(state_mu_);
166168
switch (state_) {
@@ -313,6 +315,7 @@ void OutgoingMigration::SyncFb() {
313315
break;
314316
}
315317

318+
CloseSocket();
316319
VLOG(1) << "Exiting outgoing migration fiber for migration " << migration_info_.ToString();
317320
}
318321

src/server/protocol_client.cc

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -119,13 +119,6 @@ ProtocolClient::ProtocolClient(ServerContext context) : server_context_(std::mov
119119
ProtocolClient::~ProtocolClient() {
120120
exec_st_.JoinErrorHandler();
121121

122-
// FIXME: We should close the socket explictly outside of the destructor. This currently
123-
// breaks test_cancel_replication_immediately.
124-
if (sock_) {
125-
std::error_code ec;
126-
sock_->proactor()->Await([this, &ec]() { ec = sock_->Close(); });
127-
LOG_IF(ERROR, ec) << "Error closing socket " << ec;
128-
}
129122
#ifdef DFLY_USE_SSL
130123
if (ssl_ctx_) {
131124
SSL_CTX_free(ssl_ctx_);
@@ -235,6 +228,9 @@ void ProtocolClient::CloseSocket() {
235228
auto ec = sock_->Shutdown(SHUT_RDWR);
236229
LOG_IF(ERROR, ec) << "Could not shutdown socket " << ec;
237230
}
231+
232+
auto ec = sock_->Close(); // Quietly close.
233+
LOG_IF(WARNING, ec) << "Error closing socket " << ec << "/" << ec.message();
238234
});
239235
}
240236
}
@@ -385,11 +381,11 @@ void ProtocolClient::ResetParser(RedisParser::Mode mode) {
385381
}
386382

387383
uint64_t ProtocolClient::LastIoTime() const {
388-
return last_io_time_;
384+
return last_io_time_.load(std::memory_order_relaxed);
389385
}
390386

391387
void ProtocolClient::TouchIoTime() {
392-
last_io_time_ = Proactor()->GetMonotonicTimeNs();
388+
last_io_time_.store(Proactor()->GetMonotonicTimeNs(), std::memory_order_relaxed);
393389
}
394390

395391
} // namespace dfly

src/server/protocol_client.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ class ProtocolClient {
132132
std::string last_cmd_;
133133
std::string last_resp_;
134134

135-
uint64_t last_io_time_ = 0; // in ns, monotonic clock.
135+
std::atomic<uint64_t> last_io_time_ = 0; // in ns, monotonic clock.
136136

137137
#ifdef DFLY_USE_SSL
138138

src/server/replica.cc

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ GenericError Replica::Start() {
108108
VLOG(1) << "Starting replication " << this;
109109
ProactorBase* mythread = ProactorBase::me();
110110
CHECK(mythread);
111+
DCHECK(proactor_ == mythread);
111112

112113
auto check_connection_error = [this](error_code ec, const char* msg) -> GenericError {
113114
if (!exec_st_.IsRunning()) {
@@ -1212,9 +1213,7 @@ auto Replica::GetSummary() const -> Summary {
12121213
auto f = [this]() {
12131214
auto last_io_time = LastIoTime();
12141215

1215-
// Note: we access LastIoTime from foreigh thread in unsafe manner. However, specifically here
1216-
// it's unlikely to cause a real bug.
1217-
for (const auto& flow : shard_flows_) { // Get last io time from all sub flows.
1216+
for (const auto& flow : shard_flows_) {
12181217
last_io_time = std::max(last_io_time, flow->LastIoTime());
12191218
}
12201219

@@ -1246,19 +1245,7 @@ auto Replica::GetSummary() const -> Summary {
12461245
return res;
12471246
};
12481247

1249-
if (Sock())
1250-
return Proactor()->AwaitBrief(f);
1251-
1252-
/**
1253-
* when this branch happens: there is a very short grace period
1254-
* where Sock() is not initialized, yet the server can
1255-
* receive ROLE/INFO commands. That period happens when launching
1256-
* an instance with '--replicaof' and then immediately
1257-
* sending a command.
1258-
*
1259-
* In that instance, we have to run f() on the current fiber.
1260-
*/
1261-
return f();
1248+
return proactor_->AwaitBrief(f);
12621249
}
12631250

12641251
std::vector<uint64_t> Replica::GetReplicaOffset() const {

0 commit comments

Comments
 (0)