Skip to content

Commit 97de782

Browse files
authored
[core] Add comments explaining the usage of the ray_syncer_ channels in the Raylet (#58342)
Found it very hard to parse what was happening here, so helping future me (or you!). Also: - Deleted vestigial `next_resource_seq_no_`. - Converted from non-monotonic clock to a monotonically incremented `uint64_t` for the version number for commands. - Added logs when we drop messages with stale versions. --------- Signed-off-by: Edward Oakes <[email protected]>
1 parent 50ffca4 commit 97de782

File tree

4 files changed

+58
-19
lines changed

4 files changed

+58
-19
lines changed

src/ray/ray_syncer/node_state.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,12 @@ bool NodeState::ConsumeSyncMessage(std::shared_ptr<const RaySyncMessage> message
6363
<< (current ? current->version() : -1)
6464
<< " message_version=" << message->version()
6565
<< ", message_from=" << NodeID::FromBinary(message->node_id());
66+
6667
// Check whether newer version of this message has been received.
6768
if (current && current->version() >= message->version()) {
69+
RAY_LOG(INFO) << "Dropping sync message with stale version. latest version: "
70+
<< current->version()
71+
<< ", dropped message version: " << message->version();
6872
return false;
6973
}
7074

src/ray/ray_syncer/ray_syncer_bidi_reactor_base.h

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,14 +64,18 @@ class RaySyncerBidiReactorBase : public RaySyncerBidiReactor, public T {
6464
}
6565

6666
auto &node_versions = GetNodeComponentVersions(message->node_id());
67-
if (node_versions[message->message_type()] < message->version()) {
68-
node_versions[message->message_type()] = message->version();
69-
sending_buffer_[std::make_pair(message->node_id(), message->message_type())] =
70-
std::move(message);
71-
StartSend();
72-
return true;
67+
if (node_versions[message->message_type()] >= message->version()) {
68+
RAY_LOG(INFO) << "Dropping sync message with stale version. latest version: "
69+
<< node_versions[message->message_type()]
70+
<< ", dropped message version: " << message->version();
71+
return false;
7372
}
74-
return false;
73+
74+
node_versions[message->message_type()] = message->version();
75+
sending_buffer_[std::make_pair(message->node_id(), message->message_type())] =
76+
std::move(message);
77+
StartSend();
78+
return true;
7579
}
7680

7781
virtual ~RaySyncerBidiReactorBase() = default;

src/ray/raylet/node_manager.cc

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,6 @@ NodeManager::NodeManager(
225225
cluster_lease_manager_(cluster_lease_manager),
226226
record_metrics_period_ms_(config.record_metrics_period_ms),
227227
placement_group_resource_manager_(placement_group_resource_manager),
228-
next_resource_seq_no_(0),
229228
ray_syncer_(io_service_, self_node_id_.Binary()),
230229
worker_killing_policy_(std::make_shared<GroupByOwnerIdWorkerKillingPolicy>()),
231230
memory_monitor_(std::make_unique<MemoryMonitor>(
@@ -324,16 +323,29 @@ void NodeManager::RegisterGcs() {
324323
auto on_node_change_subscribe_done = [this](Status status) {
325324
RAY_CHECK_OK(status);
326325

327-
// Register resource manager and scheduler
326+
// RESOURCE_VIEW is used to synchronize available resources across Raylets.
327+
//
328+
// LocalResourceManager::CreateSyncMessage will be called periodically to collect
329+
// the local Raylet's usage to broadcast to others (via the GCS). The updates are
330+
// versioned inside of `LocalResourceManager` to avoid unnecessary broadcasts.
331+
//
332+
// NodeManager::ConsumeSyncMessage will be called when a sync message containing
333+
// other Raylets' resource usage is received.
328334
ray_syncer_.Register(
329335
/* message_type */ syncer::MessageType::RESOURCE_VIEW,
330336
/* reporter */ &cluster_resource_scheduler_.GetLocalResourceManager(),
331337
/* receiver */ this,
332338
/* pull_from_reporter_interval_ms */
333339
report_resources_period_ms_);
334340

335-
// Register a commands channel.
336-
// It's only used for GC right now.
341+
// COMMANDS is used only to broadcast a global request to call the Python garbage
342+
// collector on all Raylets when the cluster is under memory pressure.
343+
//
344+
// Periodic collection is disabled, so this command is only broadcasted via
345+
// `OnDemandBroadcasting` (which will call NodeManager::CreateSyncMessage).
346+
//
347+
// NodeManager::ConsumeSyncMessage is called to execute the GC command from other
348+
// Raylets.
337349
ray_syncer_.Register(
338350
/* message_type */ syncer::MessageType::COMMANDS,
339351
/* reporter */ this,
@@ -348,6 +360,9 @@ void NodeManager::RegisterGcs() {
348360
// If plasma store is under high pressure, we should try to schedule a global
349361
// gc.
350362
if (triggered_by_global_gc) {
363+
// Always increment the sync message version number so that all GC commands
364+
// are sent indiscriminately.
365+
gc_command_sync_version_++;
351366
ray_syncer_.OnDemandBroadcasting(syncer::MessageType::COMMANDS);
352367
}
353368
},
@@ -3034,19 +3049,25 @@ void NodeManager::ConsumeSyncMessage(
30343049

30353050
std::optional<syncer::RaySyncMessage> NodeManager::CreateSyncMessage(
30363051
int64_t after_version, syncer::MessageType message_type) const {
3052+
// This method is only called for the COMMANDS channel, as the RESOURCE_VIEW
3053+
// channel goes through the LocalResourceManager.
30373054
RAY_CHECK_EQ(message_type, syncer::MessageType::COMMANDS);
30383055

3056+
// Serialize the COMMANDS message to a byte string to be nested inside the sync message.
3057+
std::string serialized_commands_sync_msg;
30393058
syncer::CommandsSyncMessage commands_sync_message;
30403059
commands_sync_message.set_should_global_gc(true);
30413060
commands_sync_message.set_cluster_full_of_actors_detected(resource_deadlock_warned_ >=
30423061
1);
3062+
RAY_CHECK(commands_sync_message.SerializeToString(&serialized_commands_sync_msg));
3063+
3064+
// Populate the sync message.
30433065
syncer::RaySyncMessage msg;
3044-
msg.set_version(absl::GetCurrentTimeNanos());
3066+
msg.set_version(gc_command_sync_version_);
30453067
msg.set_node_id(self_node_id_.Binary());
30463068
msg.set_message_type(syncer::MessageType::COMMANDS);
3047-
std::string serialized_msg;
3048-
RAY_CHECK(commands_sync_message.SerializeToString(&serialized_msg));
3049-
msg.set_sync_message(std::move(serialized_msg));
3069+
msg.set_sync_message(std::move(serialized_commands_sync_msg));
3070+
30503071
return std::make_optional(std::move(msg));
30513072
}
30523073

src/ray/raylet/node_manager.h

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -202,8 +202,18 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
202202
/// Get the port of the node manager rpc server.
203203
int GetServerPort() const { return node_manager_server_.GetPort(); }
204204

205+
// Consume a RaySyncer sync message from another Raylet.
206+
//
207+
// The two types of messages that are received are:
208+
// - RESOURCE_VIEW: an update of the resources available on another Raylet.
209+
// - COMMANDS: a request to run the Python garbage collector globally across Raylets.
205210
void ConsumeSyncMessage(std::shared_ptr<const syncer::RaySyncMessage> message) override;
206211

212+
// Generate a RaySyncer sync message to be sent to other Raylets.
213+
//
214+
// This is currently only used to generate messages for the COMMANDS channel to request
215+
// other Raylets to call the Python garbage collector, and is only called on demand
216+
// (not periodically polled by the RaySyncer code).
207217
std::optional<syncer::RaySyncMessage> CreateSyncMessage(
208218
int64_t after_version, syncer::MessageType message_type) const override;
209219

@@ -894,13 +904,13 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
894904
/// Managers all bundle-related operations.
895905
PlacementGroupResourceManager &placement_group_resource_manager_;
896906

897-
/// Next resource broadcast seq no. Non-incrementing sequence numbers
898-
/// indicate network issues (dropped/duplicated/ooo packets, etc).
899-
int64_t next_resource_seq_no_;
900-
901907
/// Ray syncer for synchronization
902908
syncer::RaySyncer ray_syncer_;
903909

910+
/// `version` for the RaySyncer COMMANDS channel. Monotonically incremented each time
911+
/// we issue a GC command so that none of the messages are dropped.
912+
int64_t gc_command_sync_version_ = 0;
913+
904914
/// The Policy for selecting the worker to kill when the node runs out of memory.
905915
std::shared_ptr<WorkerKillingPolicy> worker_killing_policy_;
906916

0 commit comments

Comments
 (0)