π daily merge: master β main 2026-01-06#733
Conversation
β¦59204) Adds new buildkite job for building + uploading a base `manylinux2014` image. Planning to build in flavors of with and without JDK included to help reduce image size when JDK is not needed. * Building is handled by a `wanda` builder step * Uploading to `rayproject/manylinux2014:{DATE}.{SHORT_COMMIT}-x86_64` and `rayproject/manylinux2014:{DATE}.{SHORT_COMMIT}-jdk-x86_64 is handled by a separate runner that calls the new `ci/ray_ci/automation/copy_wanda_image.py` * Runs by default without uploading for initial testing. Once passed with flag `--upload`, it will upload to the specified registry --------- Signed-off-by: andrew <andrew@anyscale.com> Co-authored-by: Lonnie Liu <95255098+aslonnie@users.noreply.github.com>
Internal doc for ray event exporter infra Test: - CI - https://anyscale-ray--59272.com.readthedocs.build/en/59272/ray-core/internals/ray-event-exporter.html Signed-off-by: Cuong Nguyen <can@anyscale.com>
β¦ject#59471) ## Description This PR fixes a deserialization issue in `register_dataset` where `StatsActor` fails to deserialize `DataContext` parameters containing custom classes, causing dataset stats management to fail and preventing Ray Data overview from displaying in the dashboard. **Problem:** In a shared Ray cluster with multiple sequential Ray Data jobs: - The first job runs successfully and creates the global `StatsActor` - The second job (with different code) runs normally (dataset creation works), but when its `DataContext` (containing custom classes from different modules) is serialized to `StatsActor`, the actor fails to deserialize it - `StatsActor` logs errors and dashboard cannot display Ray Data overview because `register_dataset` fails silently ``` ModuleNotFoundError: No module named 'ray_data_utils' Traceback (most recent call last): File "/usr/local/lib/python3.11/dist-packages/ray/_private/serialization.py", line 526, in deserialize_objects obj = self._deserialize_object(...) File "/usr/local/lib/python3.11/dist-packages/ray/_private/serialization.py", line 363, in _deserialize_object return self._deserialize_msgpack_data(...) File "/usr/local/lib/python3.11/dist-packages/ray/_private/serialization.py", line 310, in _deserialize_msgpack_data python_objects = self._deserialize_pickle5_data(...) File "/usr/local/lib/python3.11/dist-packages/ray/_private/serialization.py", line 292, in _deserialize_pickle5_data obj = pickle.loads(in_band) ModuleNotFoundError: No module named 'ray_data_utils' ``` This error occurs because `DataContext` contains objects (e.g., custom classes) that reference modules not available in `StatsActor`'s runtime environment. Similar errors can occur for other custom classes, such as `ModuleNotFoundError: No module named 'test_custom_module'` when custom exception classes are used. **Reproduction Script:** To reproduce the issue before the fix, you can use the following script: ```python #!/usr/bin/env python3 """Reproduction script for DataContext serialization issue.""" import os import tempfile import ray import ray.data from ray._private.test_utils import run_string_as_driver def create_driver_script_with_dependency(working_dir, ray_address): """Create custom module and driver script that depends on it.""" custom_module_path = os.path.join(working_dir, "test_custom_module.py") with open(custom_module_path, "w") as f: f.write( """class CustomRetryException(Exception): def __init__(self): pass """ ) driver_script = f""" import sys import os os.chdir(r"{working_dir}") import ray import ray.data from ray.data.context import DataContext ray.init( address="{ray_address}", ignore_reinit_error=True, runtime_env={{"working_dir": r"{working_dir}"}} ) import test_custom_module data_context = DataContext.get_current() data_context.actor_task_retry_on_errors = [test_custom_module.CustomRetryException] ds = ray.data.range(10) ds.take(1) ray.shutdown() """ return driver_script if __name__ == "__main__": ray.init() # Job 1: Create dataset to trigger StatsActor creation ds = ray.data.range(10) ds.take(1) # Job 2: Run job that imports custom exception from module working_dir = os.path.abspath(tempfile.mkdtemp()) ray_address = ray.get_runtime_context().gcs_address driver_script = create_driver_script_with_dependency(working_dir, ray_address) run_string_as_driver(driver_script) ``` **Root Cause:** `StatsActor` is a global actor that persists across jobs. When a job submits `DataContext` containing custom classes, `cloudpickle` tries to deserialize them, but the custom class's module may not be in `StatsActor`'s `PYTHONPATH`, causing `ModuleNotFoundError`. **Solution:** Sanitize `DataContext` before serialization using `sanitize_for_struct()` to convert custom classes to basic types (dicts, lists, strings, numbers), avoiding module dependency issues. Introduced `DataContextMetadata` to wrap the sanitized configuration. ## Related issues Fixes deserialization failures when `DataContext` contains custom exception classes from project modules. ## Testing - Added unit test `test_data_context_with_custom_classes_serialization` that: - Creates a first job to trigger `StatsActor` creation - Submits a second job that imports a custom exception from a module - Sets the custom exception in `DataContext.actor_task_retry_on_errors` - Verifies that the dataset registration succeeds without `ModuleNotFoundError` - Confirms `StatsActor` can retrieve datasets without errors The test reproduces the real-world scenario where different jobs submit `DataContext` with custom classes to a shared `StatsActor`. ## Additional Context This fix resolves deserialization issues for all custom classes in `DataContext`, including: - Custom exception classes in `actor_task_retry_on_errors` (e.g., `test_custom_module.CustomRetryException`) - Objects with `__module__` pointing to non-existent modules (e.g., `ray_data_utils`) - Any other custom classes that reference modules not available in `StatsActor`'s runtime environment By sanitizing `DataContext` before serialization using `sanitize_for_struct()`, all custom classes are converted to dictionary representations, eliminating module dependency issues entirely. Signed-off-by: dragongu <andrewgu@vip.qq.com>
# Summary @justinvyu noticed the following logs ``` (TrainController pid=95437) [State Transition] RUNNING -> ABORTED. (TrainController pid=95437) [FailurePolicy] RAISE (TrainController pid=95437) Source: controller (TrainController pid=95437) Error count: 1 (max allowed: 0) (TrainController pid=95437) (TrainController pid=95437) Traceback (most recent call last): (TrainController pid=95437) File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/train/v2/_internal/execution/controller/controller.py", line 433, in _step (TrainController pid=95437) worker_group_status: WorkerGroupPollStatus = await self._poll_workers() (TrainController pid=95437) File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/util/tracing/tracing_helper.py", line 493, in _resume_span (TrainController pid=95437) return await method(self, *_args, **_kwargs) (TrainController pid=95437) File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/train/v2/_internal/execution/controller/controller.py", line 283, in _poll_workers (TrainController pid=95437) ray.actor.exit_actor() (TrainController pid=95437) File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/actor.py", line 2492, in exit_actor (TrainController pid=95437) raise AsyncioActorExit() (TrainController pid=95437) ray.train.ControllerError: Training failed due to controller error: (TrainController pid=95437) (TrainController pid=95437) [State Transition] ABORTED -> SHUTTING_DOWN. ``` The problem is that the fallback I implemented in ray-project#58287 didn't work because the `TrainController` caught the `AsyncioActorExit` raised by `ray.actor.exit_actor` and handled it as a `ControllerError`. However, what we actually want is to finish the abort asap by reraising the exception. # Testing Unit tests. I didn't add a new unit test for this specifically because the situation it covers happens flakily and would require a lot of contrived mocking to reproduce. --------- Signed-off-by: Timothy Seah <tseah@anyscale.com> Co-authored-by: matthewdeng <matthew.j.deng@gmail.com>
β¦59401) Fixes ray-project#59357 ## Description The Pydantic error is not serializable, making it difficult to root cause the bug. When a Pydantic ValidationError containing ArgsKwargs objects cannot be serialized, Ray now preserves the original error details including the ValidationError message and full traceback in the error message. ## Details Modified RayTaskError.__init__ to include the full traceback_str in the error message when serialization fails, allowing users to see the original ValidationError details even when the exception itself cannot be pickled. ## Testing - Created test script that verifies original error details are preserved - Confirmed wrapped error is serializable - Verified original ValidationError information survives round-trip serialization Signed-off-by: mgchoi239 <mg.choi.239@example.com> Co-authored-by: MG <mg@MGs-MacBook-Air.local> Co-authored-by: mgchoi239 <mg.choi.239@example.com>
β¦oject#59378) ## Description Ray processes donβt strictly wait for each other to start. In practice, GCS, raylet, and the dashboard agent all start around the same time, and they rely on retries and timeouts to eventually connect to each other. However, we should avoid unnecessary dependencies during startup when we can. Before this PR: - raylet starts the dashboard agent but does not wait for it. - raylet then registers itself and its metadata to GCS. - Meanwhile, the dashboard agent queries GCS to check whether this node is the head node. It keeps retrying until raylet has finished registering to GCS. This is unnecessary and a bit odd: since the raylet is the one launching the dashboard agent, raylet can simply tell the agent whether it is the head node via command-line arguments instead of forcing the agent to poll GCS. This PR avoids that unnecessary dependency and makes the startup sequence more straightforward and robust. ## Additional information Iβm also working on ray-project#59065, where the dashboard agent must report its bound port back to the raylet so that the raylet can then register itself (with the correct port info) to GCS. If the dashboard agent still depended on the raylet registering to GCS first, this would create a deadlock. Signed-off-by: yicheng <yicheng@anyscale.com> Co-authored-by: yicheng <yicheng@anyscale.com>
β¦be fetched for the right node (ray-project#59229) We are adding support for the ability to specify `temp_dir` per node in ray-project#57735. However, this introduces a problem when we attempt to connect to a node with a custom ip and temp dir. Since the temp dir information will be stored on the `node_info`, we will need to fetch the `node_info` of the node we are connecting to from the gcs. However, since we are still initializing ray, the only information we have access to is the node's ip address. If the node we are connecting to has a custom ip address as well, there will be no way for us to fetch the correct `node_info` needed to set up the temp dir. To resolve this issue, this PR will resolve the node's IP via looking what what IP was used to start the raylet (instead of the existing implementation which depends on reading a file stored in a `temp_dir` that hasn't been initialized yet). This way, we will always be able to retrieve the `node_info` when connecting to even a node with a custom ip address. ## Related issues ray-project#57735 --------- Signed-off-by: davik <davik@anyscale.com> Co-authored-by: davik <davik@anyscale.com>
β¦nd updated docs (ray-project#59456) ## Description - Added config validation to ensure `look_back_period_s` is greater than `metrics_interval_s` in `AutoscalingConfig`. - Updated autoscaling docs to clarify the relationship between delays and metric push intervals. ## Related issues Fixes ray-project#57714 ## Additional information - Updated documentation around the interaction between metrics push interval and autoscaling delays: Delays are enforced purely via the control loop interval and delay parameters, while the push interval bounds how quickly the autoscaler can see metric changes. --------- Signed-off-by: Vaishnavi Panchavati <vaishdho10@gmail.com>
β¦ay-project#59455) Signed-off-by: Kourosh Hakhamaneshi <kourosh@anyscale.com>
β¦ne_benchmark (ray-project#59408) Signed-off-by: Nikhil Ghosh <nikhil@anyscale.com>
Signed-off-by: as-jding <jding@anyscale.com>
## Description [Serve] log deployment config in controller logs ## Related issues ray-project#59167 ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: JianZhang <keepromise@apache.org>
β¦arizer (ray-project#56225) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? This PR introduces deployment-level autoscaling observability in Serve. The controller now emits a single, structured JSON log line (serve_autoscaling_snapshot) per autoscaling-enabled deployment each control-loop tick. This avoids recomputation in the controller call sites and provides a stable, machine-parsable surface for tooling and debugging. #### Changed - Add get_observability_snapshot in AutoscalingState and manager wrapper to generate compact snapshots (replica counts, queued/total requests, metric freshness). - Add ServeEventSummarizer to build payloads, reduce duplicate logs, and summarize recent scaling decisions. #### Example log (single line): Logs can be found in controller log files, `e.g. /tmp/ray/session_2025-09-03_21-12-01_095657_13385/logs/serve/controller_13474.log`. ``` serve_autoscaling_snapshot {"ts":"2025-09-04T06:12:11Z","app":"default","deployment":"worker","current_replicas":2,"target_replicas":2,"replicas_allowed":{"min":1,"max":8},"scaling_status":"stable","policy":"default","metrics":{"look_back_period_s":10.0,"queued_requests":0.0,"total_requests":0.0},"metrics_health":"ok","errors":[],"decisions":[{"ts":"2025-09-04T06:12:11Z","from":0,"to":2,"reason":"current=0, proposed=2"},{"ts":"2025-09-04T06:12:11Z","from":2,"to":2,"reason":"current=2, proposed=2"}]} ``` #### Follow-ups - Expose the same snapshot data via `serve status -v` and CLI/SDK surfaces. - Aggregate per-app snapshots and external scaler history. ## Related issue number ray-project#55834 ## Checks - [x] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [x] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [x] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [x] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Dongjun Na <kmu5544616@gmail.com> Co-authored-by: akyang-anyscale <alexyang@anyscale.com> Co-authored-by: Abrar Sheikh <abrar2002as@gmail.com>
removing py3.9 depsets removing py39 references from configs Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
Signed-off-by: root <root@ip-10-0-11-206.ec2.internal> Signed-off-by: srinarayan-srikanthan <srinarayan.srikanthan@intel.com> Co-authored-by: root <root@ip-10-0-11-206.ec2.internal>
no longer building or using those anywhere. Signed-off-by: Lonnie Liu <lonnie@anyscale.com>
Signed-off-by: xiaowen.wxw <wxw403883@alibaba-inc.com> Signed-off-by: wxwmd <44169396+wxwmd@users.noreply.github.com> Co-authored-by: xiaowen.wxw <wxw403883@alibaba-inc.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
missed one in the change. Signed-off-by: Lonnie Liu <lonnie@anyscale.com>
β¦ay-project#59438) ## Description Rollout fragment length in AlgorithmConfig can be calculated to be zero if num_learners are high enough. This PR guarantees that rollout fragments are at least of size 1
β¦ct#58886) Signed-off-by: dancingactor <s990346@gmail.com> Co-authored-by: Jiajun Yao <jeromeyjj@gmail.com>
β¦ject#59299) Add documentation for Ray's token authentication feature introduced in v2.52.0 (based on REP: https://github.com/ray-project/enhancements/tree/048270c0750b9fe8aeac858466cb1b539d9b49c2/reps/2025-11-24-ray-token-auth ) Co-authored-by: Andrew Sy Kim andrewsy@google.com --------- Signed-off-by: Sampan S Nayak <sampansnayak2@gmail.com> Signed-off-by: sampan <sampan@anyscale.com> Co-authored-by: Andrew Sy Kim <andrewsy@google.com> Co-authored-by: sampan <sampan@anyscale.com>
The old log line was getting executed as part of the control loop unconditionally. cc @KeeProMise Signed-off-by: abrar <abrar@anyscale.com>
β¦9392) > Thank you for contributing to Ray! π > Please review the [Ray Contribution Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html) before opening a pull request. >β οΈ Remove these instructions before submitting your PR. > π‘ Tip: Mark as draft if you want early feedback, or ready for review when it's complete. ## Description > Briefly describe what this PR accomplishes and why it's needed. ### [Data] Concurrency cap backpressure tuning **EWMA_ALPHA** Update EWMA_ALPHA from 0.2->0.1. This makes adjusting level to be more in-favor of limiting concurrency by being more sensitive to downstreaming queuing. **K_DEV** Update K_DEV from 2.0->1.0. This makes stddev to be more in-favor of limiting concurrency by being more sensitive to downstreaming queuing. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
### Description
Completing the Expr (direct) Rounding operations (ceil, floor, round,
trunc)
for example:
```
import ray
from ray.data import from_items
from ray.data.expressions import col
ray.init(include_dashboard=False, ignore_reinit_error=True)
ds = from_items([{"x": 1.1}, {"x": 2.5}, {"x": -3.7}])
for row in ds.iter_rows():
print(row)
x_expr = col("x")
hasattr(x_expr, "ceil")
ds = ds.with_column("x_ceil", x_expr.ceil())
hasattr(x_expr, "floor")
ds = ds.with_column("x_floor", x_expr.floor())
hasattr(x_expr, "round")
ds = ds.with_column("x_round", x_expr.round())
hasattr(x_expr, "trunc")
ds = ds.with_column("x_trunc", x_expr.trunc())
for row in ds.iter_rows():
print(row)
```
### Related issues
Related to ray-project#58674
### Additional information
---------
Signed-off-by: yaommen <myanstu@163.com>
Co-authored-by: yanmin <homeryan@didiglobal.com>
β¦() (ray-project#59102) ## Description Currently, `write_parquet` has been hard-coded to use `hive` partititoning. This PR allows passing `partitioning_flavor` via `arrow_parquet_args`/`arrow_parquet_args_fn`. Since the default behaviors are different between Ray Data and pyarrow: - Ray Data defaults to "hive", which is the case when we do not specify this `partitioning_flavor` - pyarrow uses `None` to represent dictionary partitioning. So we can use partitioning_flavor=None Also, I did not use the Partitioning class in `ray.data.read_parquet`, which seems to be overkill (e.g., we exposed `partition_cols` as top-level args here..) Finally, I have rearranged the docstring a little bit. ## Related issues NA ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Kit Lee <7000003+wingkitlee0@users.noreply.github.com>
β¦ray-project#59412) This PR adds a cap on the total budget allocation by max_resource_usage in ReservationOpResourceAllocator. Previously, the reservation was capped by max_resource_usage, but the total budget (reservation + shared resources) could exceed the max. This could lead to operators being allocated more resources than they can use, while other operators are starved. Changes Cap total allocation by max_resource_usage: - Total allocation = max(total_reserved, op_usage) + op_shared - We now cap op_shared so that total allocation β€ max_resource_usage - Excess shared resources remain in remaining_shared for other operators Redistribute remaining shared resources: - After the allocation loop, any remaining shared resources (from caps) are given to the most downstream uncapped operator - An operator is "uncapped" if its max_resource_usage == ExecutionResources.inf() - If all operators are capped, remaining resources are not redistributed Tests - test_budget_capped_by_max_resource_usage: Tests that capped operators don't receive excess shared resources, and remaining goes to uncapped downstream op - test_budget_capped_by_max_resource_usage_all_capped: Tests that when all operators are capped, remaining shared resources are not redistributed --------- Signed-off-by: Hao Chen <chenh1024@gmail.com>
## Description This change addresses a long-standing problem when Pandas tensors holding null values couldn't be converted into Arrow ones. More details are captured in ray-project#59445. Following changes are made to address that: - Fixed `_is_ndarray_variable_shaped_tensor` - Numpy tensors with `dtype='o'` are cycled t/h Pyarrow to be converted into proper ndarrays - Path raveling and formatting are unified b/w fixed-shape and var-shaped tensors ## Related issues Addresses ray-project#59445 ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com> Signed-off-by: Alexey Kudinkin <alexey.kudinkin@gmail.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
) ## Description this pr introduces the following optimizations in the `opentelemetryMetricsRecorder` and some of its consumers: - use asynchronous instruments wherever available (counter and up down counter) - introduce a batch api to record histogram metrics (to prevent lock contention caused by repeated `set_metric_value()` calls) - batch events received metric update in aggregator_agent instead of making individual calls --------- Signed-off-by: sampan <sampan@anyscale.com> Co-authored-by: sampan <sampan@anyscale.com>
β¦ct#59812) not used anymore, we always use ray/ray-ml/ray-llm without any remapping now. Signed-off-by: Lonnie Liu <95255098+aslonnie@users.noreply.github.com>
β¦ to fix deserialization of existing datasets (ray-project#59818) ## Description Context: [Slack](https://anyscaleteam.slack.com/archives/C04FMM4NPQ9/p1767322231131189) ray-project#59420 moved Ray Data's Arrow tensor extensions from `ray.air.util.tensor_extensions` to `ray.data._internal.tensor_extensions`. That actually broke deserialization of the datasets written with older Ray Data implementation of these extensions inheriting from `pyarrow.PyExtensionType`: 1. `PyEtensionType` pickles class-ref into the metadata when writing the data (in that case it's `ray.air.util.tensor_extensions.arrow.ArrowTensorType` for ex) 2. Upon reading the data it tries to unpickle it and now fails b/c these classes were moved. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Contains py3.14. The manylinux project now uses a new format for tagging images of YYYY.MM.DD-<build_number>, hence why there is no commit hash on this. Signed-off-by: andrew <andrew@anyscale.com>
β¦oject#59821) Bumping to latest from ray-project#59819 Signed-off-by: andrew <andrew@anyscale.com>
it is always going to be overwritten later now Signed-off-by: Lonnie Liu <95255098+aslonnie@users.noreply.github.com>
python 3.9 is end of life --------- Signed-off-by: Lonnie Liu <lonnie@anyscale.com>
β¦ to fix deserialization of existing datasets (ray-project#59828) ## Description Follow-up for ray-project#59818 1. Fixing serde for `ArrowPythonObjectType` 2. Missing `__init__.py` files making packages omitted at build time ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
not used anywhere any more. min install tests uses dockerfiles to setup the test environments now Signed-off-by: Lonnie Liu <95255098+aslonnie@users.noreply.github.com>
and remove python 3.9 related tests Signed-off-by: Lonnie Liu <95255098+aslonnie@users.noreply.github.com>
## Description
When the Ray process unexpected terminated and node id changed, we
cannot know the previous and current node id through event as we did not
include `node_id` in the base event.
This feature is needed by the history server for the Ray cluster. When
the Ray process unexpected terminated, we have to flush the events
generated by the previous node. If the ray process was restarted fast,
it is difficult to know which events are generated by the previous node.
This PR add the `node_id` into the base event showing where the event is
being emitted.
### Main changes
- `src/ray/protobuf/public/events_base_event.proto`
- Add node id to the base event proto (`RayEvent`)
For GCS:
- `src/ray/gcs/gcs_server_main.cc`
- add `--node_id` as cli args
- `src/ray/observability/` and `src/ray/gcs/` (some files)
- Add `node_id` as arguments and pass to `RayEvent`
For CoreWorker
- `src/ray/core_worker/`
- Passing the `node_id` to the `RayEvent`
Python side
- `python/ray/_private/node.py`
- Passing `node_id` when starting gcs server
## Related issues
Closes ray-project#58879
## Additional information
### Testing process
1. export env var:
- `RAY_enable_core_worker_ray_event_to_aggregator=1`
-
`RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR="http://localhost:8000"`
2. `ray start --head --system-config='{"enable_ray_event":true}'`
3. Submit simple job `ray job submit -- python rayjob.py`. E.g.
```py
import ray
@ray.remote
def hello_world():
return "hello world"
ray.init()
print(ray.get(hello_world.remote()))
```
4. Run event listener (script below) to start listening the event export
host `python event_listener.py`
```py
import http.server
import socketserver
import json
import logging
PORT = 8000
class EventReceiver(http.server.SimpleHTTPRequestHandler):
def do_POST(self):
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
print(json.loads(post_data.decode('utf-8')))
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({"status": "success", "message": "Event received"}).encode('utf-8'))
if __name__ == "__main__":
with socketserver.TCPServer(("", PORT), EventReceiver) as httpd:
print(f"Serving event listener on http://localhost:{PORT}")
print("Set RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR to this address.")
httpd.serve_forever()
```
Will get the event as below:
- GCS event:
```json
[
{
"eventId":"A+yrzknbyDjQALBvaimTPcyZWll9Te3Tw+FEnQ==",
"sourceType":"GCS",
"eventType":"DRIVER_JOB_LIFECYCLE_EVENT",
"timestamp":"2025-12-07T10: 54: 12.621560Z",
"severity":"INFO",
"sessionName":"session_2025-12-07_17-33-33_853835_27993",
"driverJobLifecycleEvent":{
"jobId":"BAAAAA==",
"stateTransitions":[
{
"state":"FINISHED",
"timestamp":"2025-12-07T10: 54: 12.621562Z"
}
]
},
"nodeId":"k4hj3FDLYStB38nSSRZQRwOjEV32EoAjQe3KPw==", // <- nodeId set
"message":""
}
]
```
- CoreWorker event:
```json
[
{
"eventId":"TIAp8D4NwN/ne3VhPHQ0QnsBCYSkZmOUWoe6zQ==",
"sourceType":"CORE_WORKER",
"eventType":"TASK_DEFINITION_EVENT",
"timestamp":"2025-12-07T10:54:12.025967Z",
"severity":"INFO",
"sessionName":"session_2025-12-07_17-33-33_853835_27993",
"taskDefinitionEvent":{
"taskId":"yoDzqOi6LlD///////////////8EAAAA",
"taskFunc":{
"pythonFunctionDescriptor":{
"moduleName":"rayjob",
"functionName":"hello_world",
"functionHash":"a37aacc3b7884c2da4aec32db6151d65",
"className":""
}
},
"taskName":"hello_world",
"requiredResources":{
"CPU":1.0
},
"jobId":"BAAAAA==",
"parentTaskId":"//////////////////////////8EAAAA",
"placementGroupId":"////////////////////////",
"serializedRuntimeEnv":"{}",
"taskAttempt":0,
"taskType":"NORMAL_TASK",
"language":"PYTHON",
"refIds":{
}
},
"nodeId":"k4hj3FDLYStB38nSSRZQRwOjEV32EoAjQe3KPw==", // <- nodeId set here
"message":""
}
]
```
---------
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Co-authored-by: Future-Outlier <eric901201@gmail.com>
Co-authored-by: Mengjin Yan <mengjinyan3@gmail.com>
Signed-off-by: win5923 <ken89@kimo.com>
from python 3.9 to python 3.10 we are practically already using python 3.10 everywhere Signed-off-by: Lonnie Liu <95255098+aslonnie@users.noreply.github.com>
and remove python dependency requirements for python 3.9 or below Signed-off-by: Lonnie Liu <lonnie@anyscale.com>
β¦ded_decodingβ¦ (ray-project#59421) Signed-off-by: Sathyanarayanaa-T <tsathyanarayanaa@gmail.com> Co-authored-by: Jeffrey Wang <jeffreywang@anyscale.com>
β¦oject#59806) Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com>
## Description Move gcs_callback_types.h to rpc_callback_types.h ## Related issues Closes ray-project#58597 --------- Signed-off-by: tianyi-ge <tianyig@outlook.com> Co-authored-by: tianyi-ge <tianyig@outlook.com>
β¦ay-project#59345) ## Description Using stateful models in Offline RL is an important feature and the major prerequisites for this feature are already implemented in RLlib's stack. However, some minor changes are needed to indeed train such models in the new stack. This PR implements these minor but important changes at different locations in the code: 1. It introduces the `STATE_OUT` key in the outputs of `BC`'s `forward` function to make the next hidden state available to the connectors and loss function. 2. It adds in the `AddStatesFromEpisodesToBatch` the initial state to the batch for offline data. 3. It adds in `MARWIL` a burn-in for the state that can be controlled via `burnin_len`. 4. It generates sequence sampling in the `OfflinePreLearner` dependent on the `max_seq_len`, `lookback_len` and `burnin_len`. 5. It fixes multiple smaller bugs in `OfflineEnvRunner` when recording from class-referenced environments, in `offline_rl_with_image_data.py` and `cartpole_recording.py` examples when loading the `RLModule` from checkpoint. 6. It fixes the use of `explore=True` in evaluation of Offline RL tests and examples. 7. It adds recorded expert data from `StatelessCartPole` to the `s3://ray-example-data/rllib/offline-data/statelesscartpole` 8. It adds a test for learning on a single episode and a single batch from recorded stateful expert data. Adds also a test to use instead of recorded states the initial states for sequences. 9. Adds a new config parameter `prelearner_use_recorded_module_states` to either use recorded states from the data (`True`) or use the initial state from the `RLModule` (`False`). ## Related issues ## Additional information The only API change is the introduction of a `burnin_len` to the `MARWIL/BC` config. >__Note:__ Stateful model training is only possible in `BC` and `MARWIL` so far for Offline RL. For `IQL` and `CQL` these changes have to be initiated through the off-policy algorithms (`DQN/SAC`) and for these all the buffers need to provide sequence sampling which is implemented right now solely in the `EpisodeReplayBuffer`. Therefore a couple of follow-up PRs need to be produced: 1. Introduce sequence sampling to `PrioritizedEpisodeReplayBuffer`. 2. Introduce sequence sampling to `MultiAgentEpisodeReplayBuffer`. 3. Introduce sequence sampling to `MultiAgentPrioritizedEpisodeReplayBuffer`. 4. Introduce stateful model training in `SAC`. 5. Introduce stateful model training to `IQL/CQL`. --------- Signed-off-by: simonsays1980 <simon.zehnder@gmail.com>
) ## Description ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
β¦59774) ## Description Instead of rendering a large json blob for Operator metrics, render the log in a tabular form for better readability. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Goutam <goutam@anyscale.com>
β¦ing row count (ray-project#59513) ## Description ```python def process_single_row(x): time.sleep(.3) x['id2'] = x['id'] return x class PrepareImageUdf: def __call__(self, x): time.sleep(5) return x class ChatUdf: def __call__(self, x): time.sleep(5) return x def preprocess(x): time.sleep(.3) return x def task2(x): time.sleep(.3) return x def task3(x): time.sleep(.3) return x def filterfn(x): return True ds = ( ray.data.range(1024, override_num_blocks=1024) .map_batches(task3, compute=ray.data.TaskPoolStrategy(size=1)) .drop_columns(cols="id") .map(process_single_row) .filter(filterfn) .map(preprocess) .map_batches(PrepareImageUdf, zero_copy_batch=True, batch_size=64, compute=ray.data.ActorPoolStrategy(min_size=1, max_size=12)) ) ds.explain() ``` Here's a fun question: what should this return as the optimized physical plan: Ans: ```python -------- Physical Plan (Optimized) -------- ActorPoolMapOperator[MapBatches(drop_columns)->Map(process_single_row)->Filter(filterfn)->Map(preprocess)->MapBatches(PrepareImageUdf)] +- TaskPoolMapOperator[MapBatches(task3)] +- TaskPoolMapOperator[ReadRange] +- InputDataBuffer[Input] ``` Cool. It looks like it fused mostly everything from `drop_columns` to `PrepareImageUDF` Ok what if I added these lines: what happens now? ```python ds = ( ds .map_batches(ChatUdf, zero_copy_batch=True, batch_size=64, compute=ray.data.ActorPoolStrategy(min_size=1, max_size=12)) ) ds.explain() ``` Ans: ```python -------- Physical Plan (Optimized) -------- ActorPoolMapOperator[MapBatches(ChatUdf)] +- ActorPoolMapOperator[Map(preprocess)->MapBatches(PrepareImageUdf)] +- TaskPoolMapOperator[MapBatches(drop_columns)->Map(process_single_row)->Filter(filterfn)] +- TaskPoolMapOperator[MapBatches(task3)] +- TaskPoolMapOperator[ReadRange] +- InputDataBuffer[Input] ``` HuH?? Why did `preprocess->PrepareImageUDF` get defused?? The issue is that operator map fusion does not preserve whether or not the row counts can be modified. This PR addresses that. ## Related issues None ## Additional information None --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
|
Note The number of changes in this pull request is too large for Gemini Code Assist to generate a review. |
|
This pull request has been automatically marked as stale because it has not had You can always ask for help on our discussion forum or Ray's public slack channel. If you'd like to keep this open, just leave any comment, and the stale label will be removed. |
|
This pull request has been automatically closed because there has been no more activity in the 14 days Please feel free to reopen or open a new pull request if you'd still like this to be addressed. Again, you can always ask for help on our discussion forum or Ray's public slack channel. Thanks again for your contribution! |
This Pull Request was created automatically to merge the latest changes from
masterintomainbranch.π Created: 2026-01-06
π Merge direction:
masterβmainπ€ Triggered by: Scheduled
Please review and merge if everything looks good.