Skip to content

Commit b626393

Browse files
chore(ray): add instrumentation for ray.get (#14768)
## Description This PR adds instrumentation for ray.get to our Ray integration, as specified in [MLOB-3820](https://datadoghq.atlassian.net/browse/MLOB-3820). This is based on #14587, and is a clean recreation of #14658 with some minor changes. ## Testing **NOTE (10/3 12PM): All tests re-run after latest commit** I launched the testagent with docker compose up -d testagent, and then in a scripts/ddtest container I ran `DD_AGENT_PORT=9126 riot -v run --pass-env -p 3.12 ray -- -s -vv` to execute the snapshot tests, which resulted in all tests passing. I committed and pushed the generated snapshot tests jsons to this branch. <img width="1131" height="171" alt="Screenshot 2025-10-03 at 11 28 59 AM" src="https://github.com/user-attachments/assets/791f328e-9adf-463b-8a79-b822426c0ca6" /> Manual test on local Ray cluster: Create/activate ray venv and pip install dd-trace-py from local repo with these changes. ``` export DD_SERVICE="imran-ray-get-test-002" export DD_TRACE_RAY_CORE_API=1 RAY_LOGGING_CONFIG_ENCODING=JSON DD_ENV=dev ray start --head --dashboard-host=127.0.0.1 --tracing-startup-hook=ddtrace.contrib.ray:setup_tracing ``` submit the [simple_get.py](https://github.com/user-attachments/files/22682834/simple_get.py) test job to local cluster with: `ray job submit --submission-id="imran-ray-get-test-002" -- python /Users/imran.hendley/scripts/simple_get.py` And we can see the ray.get span on the job: <img width="1217" height="933" alt="Screenshot 2025-10-03 at 11 49 28 AM" src="https://github.com/user-attachments/assets/6feb4530-d1f1-43bb-83aa-99d3d424c315" /> The same span in trace explorer is [here](https://dd.datad0g.com/apm/trace/68dff038000000001c792f1f5460456c?graphType=json&shouldShowLegend=true&spanID=18012011505180865563&timeHint=1759506488231.9692&traceQuery=). ## Risks None ## Additional Notes None [MLOB-3820]: https://datadoghq.atlassian.net/browse/MLOB-3820?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ
1 parent 6ad4124 commit b626393

File tree

5 files changed

+149
-1
lines changed

5 files changed

+149
-1
lines changed

ddtrace/contrib/internal/ray/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
RAY_WAIT_TIMEOUT = "ray.wait.timeout_s"
4141
RAY_WAIT_NUM_RETURNS = "ray.wait.num_returns"
4242
RAY_WAIT_FETCH_LOCAL = "ray.wait.fetch_local"
43+
RAY_GET_VALUE_SIZE_BYTES = "ray.get.value_size_bytes"
4344
RAY_PUT_VALUE_TYPE = "ray.put.value_type"
4445
RAY_PUT_VALUE_SIZE_BYTES = "ray.put.value_size_bytes"
4546
RAY_METADATA_PREFIX = "ray.job.metadata"

ddtrace/contrib/internal/ray/patch.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from .constants import RAY_ACTOR_METHOD_ARGS
3030
from .constants import RAY_ACTOR_METHOD_KWARGS
3131
from .constants import RAY_ENTRYPOINT
32+
from .constants import RAY_GET_VALUE_SIZE_BYTES
3233
from .constants import RAY_JOB_NAME
3334
from .constants import RAY_JOB_STATUS
3435
from .constants import RAY_JOB_SUBMIT_STATUS
@@ -287,6 +288,33 @@ def traced_actor_method_call(wrapped, instance, args, kwargs):
287288
return wrapped(*args, **kwargs)
288289

289290

291+
def traced_get(wrapped, instance, args, kwargs):
292+
"""
293+
Trace the calls of ray.get
294+
"""
295+
if not config.ray.trace_core_api:
296+
return wrapped(*args, **kwargs)
297+
298+
if tracer.current_span() is None:
299+
tracer.context_provider.activate(_extract_tracing_context_from_env())
300+
301+
with long_running_ray_span(
302+
"ray.get",
303+
service=RAY_SERVICE_NAME or DEFAULT_JOB_NAME,
304+
span_type=SpanTypes.RAY,
305+
child_of=tracer.context_provider.active(),
306+
activate=True,
307+
) as span:
308+
span.set_tag_str(SPAN_KIND, SpanKind.PRODUCER)
309+
timeout = kwargs.get("timeout")
310+
if timeout is not None:
311+
span.set_tag_str("ray.get.timeout_s", str(timeout))
312+
_inject_ray_span_tags_and_metrics(span)
313+
get_value = get_argument_value(args, kwargs, 0, "object_refs")
314+
span.set_tag_str(RAY_GET_VALUE_SIZE_BYTES, str(sys.getsizeof(get_value)))
315+
return wrapped(*args, **kwargs)
316+
317+
290318
def traced_put(wrapped, instance, args, kwargs):
291319
"""
292320
Trace the calls of ray.put
@@ -529,6 +557,7 @@ def _(m):
529557
def _(m):
530558
_w(m.RemoteFunction, "_remote", traced_submit_task)
531559

560+
_w(ray, "get", traced_get)
532561
_w(ray, "wait", traced_wait)
533562
_w(ray, "put", traced_put)
534563

@@ -545,6 +574,7 @@ def unpatch():
545574
_u(ray.actor, "_modify_class")
546575
_u(ray.actor.ActorHandle, "_actor_method_call")
547576

577+
_u(ray, "get")
548578
_u(ray, "wait")
549579
_u(ray, "put")
550580

tests/contrib/ray/test_ray.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,17 @@ def add_one(x):
215215
assert running == [], f"Expected no running tasks, got {len(running)}"
216216
assert ray.get(done) == [43], f"Expected done to be [43], got {done}"
217217

218+
@pytest.mark.snapshot(token="tests.contrib.ray.test_ray.test_simple_get", ignores=RAY_SNAPSHOT_IGNORES)
219+
def test_simple_get(self):
220+
with override_config("ray", dict(trace_core_api=True)):
221+
222+
@ray.remote
223+
def add_one(x):
224+
return x + 1
225+
226+
results = ray.get([add_one.remote(x) for x in range(1)])
227+
assert results == [1], f"Expected [1], got {results}"
228+
218229
@pytest.mark.snapshot(token="tests.contrib.ray.test_ray.test_simple_wait", ignores=RAY_SNAPSHOT_IGNORES)
219230
def test_simple_wait(self):
220231
with override_config("ray", dict(trace_core_api=True)):

tests/contrib/ray/test_ray_patch.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ def assert_module_patched(self, ray):
2121
self.assert_wrapped(ray.dashboard.modules.job.job_manager.JobManager._monitor_job_internal)
2222
self.assert_wrapped(ray.actor._modify_class)
2323
self.assert_wrapped(ray.actor.ActorHandle._actor_method_call)
24+
self.assert_wrapped(ray.get)
2425
self.assert_wrapped(ray.wait)
2526
self.assert_wrapped(ray.put)
2627

@@ -30,14 +31,16 @@ def assert_not_module_patched(self, ray):
3031
self.assert_not_wrapped(ray.dashboard.modules.job.job_manager.JobManager._monitor_job_internal)
3132
self.assert_not_wrapped(ray.actor._modify_class)
3233
self.assert_not_wrapped(ray.actor.ActorHandle._actor_method_call)
33-
self.assert_not_wrapped(ray.put)
34+
self.assert_not_wrapped(ray.get)
3435
self.assert_not_wrapped(ray.wait)
36+
self.assert_not_wrapped(ray.put)
3537

3638
def assert_not_module_double_patched(self, ray):
3739
self.assert_not_double_wrapped(ray.remote_function.RemoteFunction._remote)
3840
self.assert_not_double_wrapped(ray.dashboard.modules.job.job_manager.JobManager.submit_job)
3941
self.assert_not_double_wrapped(ray.dashboard.modules.job.job_manager.JobManager._monitor_job_internal)
4042
self.assert_not_double_wrapped(ray.actor._modify_class)
4143
self.assert_not_double_wrapped(ray.actor.ActorHandle._actor_method_call)
44+
self.assert_not_double_wrapped(ray.get)
4245
self.assert_not_double_wrapped(ray.wait)
4346
self.assert_not_double_wrapped(ray.put)
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
[[
2+
{
3+
"name": "task.submit",
4+
"service": "tests.contrib.ray",
5+
"resource": "tests.contrib.ray.test_ray.add_one.remote",
6+
"trace_id": 0,
7+
"span_id": 1,
8+
"parent_id": 0,
9+
"type": "ray",
10+
"error": 0,
11+
"meta": {
12+
"_dd.hostname": "docker-desktop",
13+
"_dd.p.dm": "-0",
14+
"_dd.p.tid": "68dfeb8100000000",
15+
"component": "ray",
16+
"language": "python",
17+
"ray.hostname": "docker-desktop",
18+
"ray.job_id": "01000000",
19+
"ray.node_id": "142aa12894d2b47a2b01989991cd0cec0340e675b8ba6d22a3ea0292",
20+
"ray.task.submit_status": "success",
21+
"ray.worker_id": "01000000ffffffffffffffffffffffffffffffffffffffffffffffff",
22+
"runtime-id": "81dd04c9904045e9b283e0bdc8fe152f",
23+
"span.kind": "producer"
24+
},
25+
"metrics": {
26+
"_dd.djm.enabled": 1,
27+
"_dd.filter.kept": 1,
28+
"_dd.measured": 1,
29+
"_dd.top_level": 1,
30+
"_dd.tracer_kr": 1.0,
31+
"_sampling_priority_v1": 2,
32+
"process_id": 1289
33+
},
34+
"duration": 2621500,
35+
"start": 1759505281704904470
36+
},
37+
{
38+
"name": "task.execute",
39+
"service": "tests.contrib.ray",
40+
"resource": "tests.contrib.ray.test_ray.TestRayIntegration.test_simple_get.<locals>.add_one",
41+
"trace_id": 0,
42+
"span_id": 2,
43+
"parent_id": 1,
44+
"type": "ray",
45+
"error": 0,
46+
"meta": {
47+
"component": "ray",
48+
"ray.hostname": "docker-desktop",
49+
"ray.job_id": "01000000",
50+
"ray.node_id": "142aa12894d2b47a2b01989991cd0cec0340e675b8ba6d22a3ea0292",
51+
"ray.task.status": "success",
52+
"ray.worker_id": "01000000ffffffffffffffffffffffffffffffffffffffffffffffff",
53+
"runtime-id": "81dd04c9904045e9b283e0bdc8fe152f",
54+
"span.kind": "consumer"
55+
},
56+
"metrics": {
57+
"_dd.djm.enabled": 1,
58+
"_dd.filter.kept": 1,
59+
"_dd.measured": 1,
60+
"_dd.top_level": 1,
61+
"_sampling_priority_v1": 2,
62+
"process_id": 1289
63+
},
64+
"duration": 266417,
65+
"start": 1759505281707023595
66+
}],
67+
[
68+
{
69+
"name": "ray.get",
70+
"service": "unnamed.ray.job",
71+
"resource": "ray.get",
72+
"trace_id": 1,
73+
"span_id": 1,
74+
"parent_id": 0,
75+
"type": "ray",
76+
"error": 0,
77+
"meta": {
78+
"_dd.base_service": "tests.contrib.ray",
79+
"_dd.hostname": "docker-desktop",
80+
"_dd.p.dm": "-0",
81+
"_dd.p.tid": "68dfeb8100000000",
82+
"component": "ray",
83+
"language": "python",
84+
"ray.get.value_size_bytes": "88",
85+
"ray.hostname": "docker-desktop",
86+
"ray.job_id": "01000000",
87+
"ray.node_id": "142aa12894d2b47a2b01989991cd0cec0340e675b8ba6d22a3ea0292",
88+
"ray.worker_id": "01000000ffffffffffffffffffffffffffffffffffffffffffffffff",
89+
"runtime-id": "81dd04c9904045e9b283e0bdc8fe152f",
90+
"span.kind": "producer"
91+
},
92+
"metrics": {
93+
"_dd.djm.enabled": 1,
94+
"_dd.filter.kept": 1,
95+
"_dd.measured": 1,
96+
"_dd.top_level": 1,
97+
"_dd.tracer_kr": 1.0,
98+
"_sampling_priority_v1": 2,
99+
"process_id": 1289
100+
},
101+
"duration": 313000,
102+
"start": 1759505281707677970
103+
}]]

0 commit comments

Comments
 (0)