Skip to content

Commit 7eca669

Browse files
authored
[core] Make NotifyGCSRestart RPC Fault Tolerant (#57945)
## Description > Briefly describe what this PR accomplishes and why it's needed. Making NotifyGCSRestart RPC Fault Tolerant and Idempotent. There were multiple places where we were always returning Status::OK() in the gcs_subscriber making idempotency harder to understand and there was dead code for one of the resubscribes, so did a minor clean up. Added a python integration test to verify retry behavior, left out the cpp test since on the raylet side there's nothing to test since its just making a gcs_client rpc call --------- Signed-off-by: joshlee <[email protected]>
1 parent f4d10b8 commit 7eca669

File tree

14 files changed

+129
-105
lines changed

14 files changed

+129
-105
lines changed

python/ray/tests/test_raylet_fault_tolerance.py

Lines changed: 62 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ def node_is_dead():
9292
# propagated to all the raylets. Since this is inherently racy, we block CancelResourceReserve RPCs
9393
# from ever succeeding to make this test deterministic.
9494
@pytest.fixture
95-
def inject_rpc_failures(monkeypatch, request):
95+
def inject_release_unused_bundles_rpc_failure(monkeypatch, request):
9696
deterministic_failure = request.param
9797
monkeypatch.setenv(
9898
"RAY_testing_rpc_failure",
@@ -102,14 +102,16 @@ def inject_rpc_failures(monkeypatch, request):
102102
)
103103

104104

105-
@pytest.mark.parametrize("inject_rpc_failures", ["request", "response"], indirect=True)
105+
@pytest.mark.parametrize(
106+
"inject_release_unused_bundles_rpc_failure", ["request", "response"], indirect=True
107+
)
106108
@pytest.mark.parametrize(
107109
"ray_start_cluster_head_with_external_redis",
108110
[{"num_cpus": 1}],
109111
indirect=True,
110112
)
111113
def test_release_unused_bundles_idempotent(
112-
inject_rpc_failures,
114+
inject_release_unused_bundles_rpc_failure,
113115
ray_start_cluster_head_with_external_redis,
114116
):
115117
cluster = ray_start_cluster_head_with_external_redis
@@ -141,6 +143,63 @@ def task():
141143
assert result == "success"
142144

143145

146+
@pytest.fixture
147+
def inject_notify_gcs_restart_rpc_failure(monkeypatch, request):
148+
deterministic_failure = request.param
149+
monkeypatch.setenv(
150+
"RAY_testing_rpc_failure",
151+
"NodeManagerService.grpc_client.NotifyGCSRestart=1:"
152+
+ ("100:0" if deterministic_failure == "request" else "0:100"),
153+
)
154+
155+
156+
@pytest.mark.parametrize(
157+
"inject_notify_gcs_restart_rpc_failure", ["request", "response"], indirect=True
158+
)
159+
@pytest.mark.parametrize(
160+
"ray_start_cluster_head_with_external_redis",
161+
[
162+
{
163+
"_system_config": {
164+
# Extending the fallback timeout to focus on death
165+
# notification received from GCS_ACTOR_CHANNEL pubsub
166+
"timeout_ms_task_wait_for_death_info": 10000,
167+
}
168+
}
169+
],
170+
indirect=True,
171+
)
172+
def test_notify_gcs_restart_idempotent(
173+
inject_notify_gcs_restart_rpc_failure,
174+
ray_start_cluster_head_with_external_redis,
175+
):
176+
cluster = ray_start_cluster_head_with_external_redis
177+
178+
@ray.remote(num_cpus=1, max_restarts=0)
179+
class DummyActor:
180+
def get_pid(self):
181+
return psutil.Process().pid
182+
183+
def ping(self):
184+
return "pong"
185+
186+
actor = DummyActor.remote()
187+
ray.get(actor.ping.remote())
188+
actor_pid = ray.get(actor.get_pid.remote())
189+
190+
cluster.head_node.kill_gcs_server()
191+
cluster.head_node.start_gcs_server()
192+
193+
p = psutil.Process(actor_pid)
194+
p.kill()
195+
196+
# If the actor death notification is not received from the GCS pubsub, this will timeout since
197+
# the fallback via wait_for_death_info_tasks in the actor task submitter will never trigger
198+
# since it's set to 10 seconds.
199+
with pytest.raises(ray.exceptions.RayActorError):
200+
ray.get(actor.ping.remote(), timeout=5)
201+
202+
144203
def test_kill_local_actor_rpc_retry_and_idempotency(monkeypatch, shutdown_only):
145204
"""Test that KillLocalActor RPC retries work correctly and guarantee actor death.
146205
Not testing response since the actor is killed either way.

src/mock/ray/gcs_client/accessor.h

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,13 @@ class MockActorInfoAccessor : public ActorInfoAccessor {
6363
(const TaskSpecification &task_spec,
6464
const rpc::ClientCallback<rpc::CreateActorReply> &callback),
6565
(override));
66-
MOCK_METHOD(Status,
66+
MOCK_METHOD(void,
6767
AsyncSubscribe,
6868
(const ActorID &actor_id,
6969
(const SubscribeCallback<ActorID, rpc::ActorTableData> &subscribe),
7070
const StatusCallback &done),
7171
(override));
72-
MOCK_METHOD(Status, AsyncUnsubscribe, (const ActorID &actor_id), (override));
72+
MOCK_METHOD(void, AsyncUnsubscribe, (const ActorID &actor_id), (override));
7373
MOCK_METHOD(void, AsyncResubscribe, (), (override));
7474
MOCK_METHOD(bool, IsActorUnsubscribed, (const ActorID &actor_id), (override));
7575
};
@@ -91,7 +91,7 @@ class MockJobInfoAccessor : public JobInfoAccessor {
9191
AsyncMarkFinished,
9292
(const JobID &job_id, const StatusCallback &callback),
9393
(override));
94-
MOCK_METHOD(Status,
94+
MOCK_METHOD(void,
9595
AsyncSubscribeAll,
9696
((const SubscribeCallback<JobID, rpc::JobTableData> &subscribe),
9797
const StatusCallback &done),
@@ -191,7 +191,6 @@ class MockNodeResourceInfoAccessor : public NodeResourceInfoAccessor {
191191
AsyncGetAllAvailableResources,
192192
(const MultiItemCallback<rpc::AvailableResources> &callback),
193193
(override));
194-
MOCK_METHOD(void, AsyncResubscribe, (), (override));
195194
MOCK_METHOD(void,
196195
AsyncGetAllResourceUsage,
197196
(const ItemCallback<rpc::ResourceUsageBatchData> &callback),
@@ -231,7 +230,7 @@ namespace gcs {
231230

232231
class MockWorkerInfoAccessor : public WorkerInfoAccessor {
233232
public:
234-
MOCK_METHOD(Status,
233+
MOCK_METHOD(void,
235234
AsyncSubscribeToWorkerFailures,
236235
(const ItemCallback<rpc::WorkerDeltaData> &subscribe,
237236
const StatusCallback &done),

src/ray/core_worker/actor_manager.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ void ActorManager::SubscribeActorState(const ActorID &actor_id) {
308308
this,
309309
std::placeholders::_1,
310310
std::placeholders::_2);
311-
RAY_CHECK_OK(gcs_client_->Actors().AsyncSubscribe(
311+
gcs_client_->Actors().AsyncSubscribe(
312312
actor_id,
313313
actor_notification_callback,
314314
[this, actor_id, cached_actor_name](Status status) {
@@ -323,7 +323,7 @@ void ActorManager::SubscribeActorState(const ActorID &actor_id) {
323323
cached_actor_name_to_ids_.emplace(cached_actor_name, actor_id);
324324
}
325325
}
326-
}));
326+
});
327327
}
328328

329329
void ActorManager::MarkActorKilledOrOutOfScope(

src/ray/core_worker/tests/actor_manager_test.cc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,14 @@ class MockActorInfoAccessor : public gcs::ActorInfoAccessor {
3737

3838
~MockActorInfoAccessor() {}
3939

40-
Status AsyncSubscribe(
40+
void AsyncSubscribe(
4141
const ActorID &actor_id,
4242
const gcs::SubscribeCallback<ActorID, rpc::ActorTableData> &subscribe,
4343
const gcs::StatusCallback &done) {
4444
auto callback_entry = std::make_pair(actor_id, subscribe);
4545
callback_map_.emplace(actor_id, subscribe);
4646
subscribe_finished_callback_map_[actor_id] = done;
4747
actor_subscribed_times_[actor_id]++;
48-
return Status::OK();
4948
}
5049

5150
bool ActorStateNotificationPublished(const ActorID &actor_id,

src/ray/gcs/gcs_node_manager.cc

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -690,7 +690,14 @@ void GcsNodeManager::Initialize(const GcsInitData &gcs_init_data) {
690690
auto remote_address = rpc::RayletClientPool::GenerateRayletAddress(
691691
node_id, node_info.node_manager_address(), node_info.node_manager_port());
692692
auto raylet_client = raylet_client_pool_->GetOrConnectByAddress(remote_address);
693-
raylet_client->NotifyGCSRestart(nullptr);
693+
raylet_client->NotifyGCSRestart(
694+
[](const Status &status, const rpc::NotifyGCSRestartReply &reply) {
695+
if (!status.ok()) {
696+
RAY_LOG(WARNING) << "NotifyGCSRestart failed. This is expected if the "
697+
"target node has died. Status: "
698+
<< status;
699+
}
700+
});
694701
} else if (node_info.state() == rpc::GcsNodeInfo::DEAD) {
695702
dead_nodes_.emplace(node_id, std::make_shared<rpc::GcsNodeInfo>(node_info));
696703
sorted_dead_node_list_.emplace_back(node_id, node_info.end_time_ms());

src/ray/gcs_rpc_client/accessor.cc

Lines changed: 16 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ void JobInfoAccessor::AsyncMarkFinished(const JobID &job_id,
6969
});
7070
}
7171

72-
Status JobInfoAccessor::AsyncSubscribeAll(
72+
void JobInfoAccessor::AsyncSubscribeAll(
7373
const SubscribeCallback<JobID, rpc::JobTableData> &subscribe,
7474
const StatusCallback &done) {
7575
RAY_CHECK(subscribe != nullptr);
@@ -91,9 +91,9 @@ Status JobInfoAccessor::AsyncSubscribeAll(
9191
/*timeout_ms=*/-1);
9292
};
9393
subscribe_operation_ = [this, subscribe](const StatusCallback &done_callback) {
94-
return client_impl_->GetGcsSubscriber().SubscribeAllJobs(subscribe, done_callback);
94+
client_impl_->GetGcsSubscriber().SubscribeAllJobs(subscribe, done_callback);
9595
};
96-
return subscribe_operation_(
96+
subscribe_operation_(
9797
[this, done](const Status &status) { fetch_all_data_operation_(done); });
9898
}
9999

@@ -105,9 +105,9 @@ void JobInfoAccessor::AsyncResubscribe() {
105105
};
106106

107107
if (subscribe_operation_ != nullptr) {
108-
RAY_CHECK_OK(subscribe_operation_([this, fetch_all_done](const Status &) {
108+
subscribe_operation_([this, fetch_all_done](const Status &) {
109109
fetch_all_data_operation_(fetch_all_done);
110-
}));
110+
});
111111
}
112112
}
113113

@@ -391,7 +391,7 @@ void ActorInfoAccessor::AsyncReportActorOutOfScope(
391391
timeout_ms);
392392
}
393393

394-
Status ActorInfoAccessor::AsyncSubscribe(
394+
void ActorInfoAccessor::AsyncSubscribe(
395395
const ActorID &actor_id,
396396
const SubscribeCallback<ActorID, rpc::ActorTableData> &subscribe,
397397
const StatusCallback &done) {
@@ -418,28 +418,27 @@ Status ActorInfoAccessor::AsyncSubscribe(
418418
absl::MutexLock lock(&mutex_);
419419
resubscribe_operations_[actor_id] =
420420
[this, actor_id, subscribe](const StatusCallback &subscribe_done) {
421-
return client_impl_->GetGcsSubscriber().SubscribeActor(
421+
client_impl_->GetGcsSubscriber().SubscribeActor(
422422
actor_id, subscribe, subscribe_done);
423423
};
424424
fetch_data_operations_[actor_id] = fetch_data_operation;
425425
}
426426

427-
return client_impl_->GetGcsSubscriber().SubscribeActor(
427+
client_impl_->GetGcsSubscriber().SubscribeActor(
428428
actor_id, subscribe, [fetch_data_operation, done](const Status &) {
429429
fetch_data_operation(done);
430430
});
431431
}
432432

433-
Status ActorInfoAccessor::AsyncUnsubscribe(const ActorID &actor_id) {
433+
void ActorInfoAccessor::AsyncUnsubscribe(const ActorID &actor_id) {
434434
RAY_LOG(DEBUG).WithField(actor_id).WithField(actor_id.JobId())
435435
<< "Cancelling subscription to an actor";
436-
auto status = client_impl_->GetGcsSubscriber().UnsubscribeActor(actor_id);
436+
client_impl_->GetGcsSubscriber().UnsubscribeActor(actor_id);
437437
absl::MutexLock lock(&mutex_);
438438
resubscribe_operations_.erase(actor_id);
439439
fetch_data_operations_.erase(actor_id);
440440
RAY_LOG(DEBUG).WithField(actor_id).WithField(actor_id.JobId())
441441
<< "Finished cancelling subscription to an actor";
442-
return status;
443442
}
444443

445444
void ActorInfoAccessor::AsyncResubscribe() {
@@ -449,7 +448,7 @@ void ActorInfoAccessor::AsyncResubscribe() {
449448
// server first, then fetch data from the GCS server.
450449
absl::MutexLock lock(&mutex_);
451450
for (auto &[actor_id, resubscribe_op] : resubscribe_operations_) {
452-
RAY_CHECK_OK(resubscribe_op([this, id = actor_id](const Status &status) {
451+
resubscribe_op([this, id = actor_id](const Status &status) {
453452
absl::MutexLock callback_lock(&mutex_);
454453
auto fetch_data_operation = fetch_data_operations_[id];
455454
// `fetch_data_operation` is called in the callback function of subscribe.
@@ -458,7 +457,7 @@ void ActorInfoAccessor::AsyncResubscribe() {
458457
if (fetch_data_operation != nullptr) {
459458
fetch_data_operation(nullptr);
460459
}
461-
}));
460+
});
462461
}
463462
}
464463

@@ -935,16 +934,6 @@ void NodeResourceInfoAccessor::AsyncGetDrainingNodes(
935934
});
936935
}
937936

938-
void NodeResourceInfoAccessor::AsyncResubscribe() {
939-
RAY_LOG(DEBUG) << "Reestablishing subscription for node resource info.";
940-
if (subscribe_resource_operation_ != nullptr) {
941-
RAY_CHECK_OK(subscribe_resource_operation_(nullptr));
942-
}
943-
if (subscribe_batch_resource_usage_operation_ != nullptr) {
944-
RAY_CHECK_OK(subscribe_batch_resource_usage_operation_(nullptr));
945-
}
946-
}
947-
948937
void NodeResourceInfoAccessor::AsyncGetAllResourceUsage(
949938
const ItemCallback<rpc::ResourceUsageBatchData> &callback) {
950939
rpc::GetAllResourceUsageRequest request;
@@ -1009,14 +998,13 @@ void ErrorInfoAccessor::AsyncReportJobError(rpc::ErrorTableData data) {
1009998
WorkerInfoAccessor::WorkerInfoAccessor(GcsClient *client_impl)
1010999
: client_impl_(client_impl) {}
10111000

1012-
Status WorkerInfoAccessor::AsyncSubscribeToWorkerFailures(
1001+
void WorkerInfoAccessor::AsyncSubscribeToWorkerFailures(
10131002
const ItemCallback<rpc::WorkerDeltaData> &subscribe, const StatusCallback &done) {
10141003
RAY_CHECK(subscribe != nullptr);
10151004
subscribe_operation_ = [this, subscribe](const StatusCallback &done_callback) {
1016-
return client_impl_->GetGcsSubscriber().SubscribeAllWorkerFailures(subscribe,
1017-
done_callback);
1005+
client_impl_->GetGcsSubscriber().SubscribeAllWorkerFailures(subscribe, done_callback);
10181006
};
1019-
return subscribe_operation_(done);
1007+
subscribe_operation_(done);
10201008
}
10211009

10221010
void WorkerInfoAccessor::AsyncResubscribe() {
@@ -1025,7 +1013,7 @@ void WorkerInfoAccessor::AsyncResubscribe() {
10251013
RAY_LOG(DEBUG) << "Reestablishing subscription for worker failures.";
10261014
// The pub-sub server has restarted, we need to resubscribe to the pub-sub server.
10271015
if (subscribe_operation_ != nullptr) {
1028-
RAY_CHECK_OK(subscribe_operation_(nullptr));
1016+
subscribe_operation_(nullptr);
10291017
}
10301018
}
10311019

src/ray/gcs_rpc_client/accessor.h

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ namespace gcs {
3838
// RAY_gcs_server_request_timeout_seconds
3939
int64_t GetGcsTimeoutMs();
4040

41-
using SubscribeOperation = std::function<Status(const StatusCallback &done)>;
41+
using SubscribeOperation = std::function<void(const StatusCallback &done)>;
4242
using FetchDataOperation = std::function<void(const StatusCallback &done)>;
4343

4444
class GcsClient;
@@ -170,17 +170,15 @@ class ActorInfoAccessor {
170170
/// \param actor_id The ID of actor to be subscribed to.
171171
/// \param subscribe Callback that will be called each time when the actor is updated.
172172
/// \param done Callback that will be called when subscription is complete.
173-
/// \return Status
174-
virtual Status AsyncSubscribe(
173+
virtual void AsyncSubscribe(
175174
const ActorID &actor_id,
176175
const SubscribeCallback<ActorID, rpc::ActorTableData> &subscribe,
177176
const StatusCallback &done);
178177

179178
/// Cancel subscription to an actor.
180179
///
181180
/// \param actor_id The ID of the actor to be unsubscribed to.
182-
/// \return Status
183-
virtual Status AsyncUnsubscribe(const ActorID &actor_id);
181+
virtual void AsyncUnsubscribe(const ActorID &actor_id);
184182

185183
/// Reestablish subscription.
186184
/// This should be called when GCS server restarts from a failure.
@@ -237,8 +235,7 @@ class JobInfoAccessor {
237235
///
238236
/// \param subscribe Callback that will be called each time when a job updates.
239237
/// \param done Callback that will be called when subscription is complete.
240-
/// \return Status
241-
virtual Status AsyncSubscribeAll(
238+
virtual void AsyncSubscribeAll(
242239
const SubscribeCallback<JobID, rpc::JobTableData> &subscribe,
243240
const StatusCallback &done);
244241

@@ -511,13 +508,6 @@ class NodeResourceInfoAccessor {
511508
virtual void AsyncGetDrainingNodes(
512509
const ItemCallback<std::unordered_map<NodeID, int64_t>> &callback);
513510

514-
/// Reestablish subscription.
515-
/// This should be called when GCS server restarts from a failure.
516-
/// PubSub server restart will cause GCS server restart. In this case, we need to
517-
/// resubscribe from PubSub server, otherwise we only need to fetch data from GCS
518-
/// server.
519-
virtual void AsyncResubscribe();
520-
521511
/// Get newest resource usage of all nodes from GCS asynchronously.
522512
///
523513
/// \param callback Callback that will be called after lookup finishes.
@@ -533,11 +523,6 @@ class NodeResourceInfoAccessor {
533523
rpc::GetAllResourceUsageReply &reply);
534524

535525
private:
536-
/// Save the subscribe operation in this function, so we can call it again when PubSub
537-
/// server restarts from a failure.
538-
SubscribeOperation subscribe_resource_operation_;
539-
SubscribeOperation subscribe_batch_resource_usage_operation_;
540-
541526
GcsClient *client_impl_;
542527

543528
Sequencer<NodeID> sequencer_;
@@ -607,8 +592,7 @@ class WorkerInfoAccessor {
607592
///
608593
/// \param subscribe Callback that will be called each time when a worker failed.
609594
/// \param done Callback that will be called when subscription is complete.
610-
/// \return Status
611-
virtual Status AsyncSubscribeToWorkerFailures(
595+
virtual void AsyncSubscribeToWorkerFailures(
612596
const ItemCallback<rpc::WorkerDeltaData> &subscribe, const StatusCallback &done);
613597

614598
/// Report a worker failure to GCS asynchronously.

0 commit comments

Comments
 (0)