Skip to content

Commit be77f0a

Browse files
[Core] Fix task name inconsistency in RUNNING vs FINISHED metrics (#59893)
## Description Fix inconsistent task name in metrics between RUNNING and FINISHED states. When a Ray task is defined with a custom name via `.options(name="custom_name")`, the `ray_tasks` metrics show inconsistent names: - **RUNNING** state: shows the original function name (e.g., `RemoteFn`) - **FINISHED/FAILED** state: shows the custom name (e.g., `test`) **Root cause:** The RUNNING task counter in `CoreWorker` uses `FunctionDescriptor()->CallString()` to get the task name, while finished task events correctly use `TaskSpecification::GetName()`. **Fix:** Changed both `HandlePushTask` and `ExecuteTask` in `core_worker.cc` to use `task_spec.GetName()` consistently, which properly returns the custom name when set. ## Related issues None - this PR addresses a newly discovered bug. ## Additional information **Files changed:** - `src/ray/core_worker/core_worker.cc` - Use `GetName()` instead of `FunctionDescriptor()->CallString()` for metrics - `python/ray/tests/test_task_metrics.py` - Added test `test_task_custom_name_metrics` to verify custom names appear correctly in metrics Signed-off-by: Yuan Jiewei <jieweihh.yuan@gmail.com> Co-authored-by: Yuan Jiewei <jieweihh.yuan@gmail.com>
1 parent 639c4c5 commit be77f0a

File tree

2 files changed

+62
-5
lines changed

2 files changed

+62
-5
lines changed

python/ray/tests/test_task_metrics.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,60 @@ def c():
109109
proc.kill()
110110

111111

112+
@pytest.mark.skipif(sys.platform == "win32", reason="Flaky on Windows.")
113+
def test_task_custom_name_metrics(shutdown_only):
114+
"""Verify that custom task names set via .options(name=...) are used in metrics.
115+
116+
This tests that RUNNING tasks use the custom name consistently with
117+
FINISHED/FAILED tasks. Previously there was a bug where RUNNING metrics used
118+
the function name (FunctionDescriptor->CallString()) but FINISHED/FAILED used
119+
the custom name (TaskSpec::GetName()).
120+
"""
121+
info = ray.init(num_cpus=2, **METRIC_CONFIG)
122+
123+
driver = """
124+
import ray
125+
import time
126+
127+
ray.init("auto")
128+
129+
@ray.remote
130+
def my_function():
131+
time.sleep(999)
132+
133+
# Submit tasks with custom names
134+
a = [my_function.options(name="custom_task_name").remote() for _ in range(4)]
135+
ray.get(a)
136+
"""
137+
proc = run_string_as_driver_nonblocking(driver)
138+
timeseries = PrometheusTimeseries()
139+
140+
# Verify that RUNNING tasks use the custom name, not the function name.
141+
# With 2 CPUs, 2 tasks should be running and 2 should be pending.
142+
expected = {
143+
("custom_task_name", "RUNNING"): 2.0,
144+
("custom_task_name", "PENDING_NODE_ASSIGNMENT"): 2.0,
145+
}
146+
wait_for_condition(
147+
lambda: tasks_by_name_and_state(info, timeseries) == expected,
148+
timeout=20,
149+
retry_interval_ms=500,
150+
)
151+
152+
# Verify the original function name is NOT used in metrics
153+
breakdown = tasks_by_name_and_state(info, timeseries)
154+
assert (
155+
"my_function",
156+
"RUNNING",
157+
) not in breakdown, "RUNNING tasks should use custom name, not function name"
158+
assert (
159+
"my_function",
160+
"PENDING_NODE_ASSIGNMENT",
161+
) not in breakdown, "PENDING tasks should use custom name, not function name"
162+
163+
proc.kill()
164+
165+
112166
def test_task_job_ids(shutdown_only):
113167
info = ray.init(num_cpus=2, **METRIC_CONFIG)
114168
timeseries = PrometheusTimeseries()

src/ray/core_worker/core_worker.cc

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2807,8 +2807,11 @@ Status CoreWorker::ExecuteTask(
28072807
// about any IDs that we are still borrowing by the time the task completes.
28082808
std::vector<ObjectID> borrowed_ids;
28092809

2810-
// Extract function name and retry status for metrics reporting.
2811-
std::string func_name = task_spec.FunctionDescriptor()->CallString();
2810+
// Extract task name and retry status for metrics reporting.
2811+
// Use GetName() which returns the custom task name if set via .options(name="..."),
2812+
// otherwise falls back to the function descriptor's call string. This ensures
2813+
// consistency with task events reported to the State API / Dashboard.
2814+
std::string func_name = task_spec.GetName();
28122815
bool is_retry = task_spec.IsRetry();
28132816

28142817
++num_get_pin_args_in_flight_;
@@ -3434,10 +3437,10 @@ void CoreWorker::HandlePushTask(rpc::PushTaskRequest request,
34343437
}
34353438

34363439
// Increment the task_queue_length and per function counter.
3440+
// Use task name which includes custom name from .options(name="...") if set,
3441+
// ensuring consistency with task events reported to the State API / Dashboard.
34373442
task_queue_length_ += 1;
3438-
std::string func_name =
3439-
FunctionDescriptorBuilder::FromProto(request.task_spec().function_descriptor())
3440-
->CallString();
3443+
std::string func_name = request.task_spec().name();
34413444
task_counter_.IncPending(func_name, request.task_spec().attempt_number() > 0);
34423445

34433446
// For actor tasks, we just need to post a HandleActorTask instance to the task

0 commit comments

Comments
 (0)