|
5 | 5 | import time |
6 | 6 | import unittest |
7 | 7 | from dataclasses import replace |
8 | | -from typing import Any, Callable, Dict, Optional, Tuple |
| 8 | +from typing import Any, Callable, Dict, Iterable, Optional, Tuple |
9 | 9 | from unittest.mock import MagicMock |
10 | 10 |
|
11 | 11 | import pyarrow as pa |
|
17 | 17 | from ray._private.ray_constants import ID_SIZE |
18 | 18 | from ray.actor import ActorHandle |
19 | 19 | from ray.data._internal.actor_autoscaler import ActorPoolScalingRequest |
| 20 | +from ray.data._internal.compute import ActorPoolStrategy |
20 | 21 | from ray.data._internal.execution.bundle_queue import FIFOBundleQueue |
21 | 22 | from ray.data._internal.execution.interfaces import ( |
22 | 23 | ExecutionOptions, |
|
25 | 26 | ) |
26 | 27 | from ray.data._internal.execution.interfaces.physical_operator import _ActorPoolInfo |
27 | 28 | from ray.data._internal.execution.interfaces.ref_bundle import RefBundle |
| 29 | +from ray.data._internal.execution.interfaces.task_context import TaskContext |
28 | 30 | from ray.data._internal.execution.operators.actor_pool_map_operator import ( |
29 | 31 | ActorPoolMapOperator, |
30 | 32 | _ActorPool, |
31 | 33 | _ActorTaskSelector, |
32 | 34 | ) |
33 | 35 | from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer |
| 36 | +from ray.data._internal.execution.operators.map_operator import MapOperator |
34 | 37 | from ray.data._internal.execution.operators.map_transformer import ( |
35 | 38 | BlockMapTransformFn, |
36 | 39 | MapTransformer, |
|
41 | 44 | ) |
42 | 45 | from ray.data._internal.execution.util import make_ref_bundles |
43 | 46 | from ray.data.block import Block, BlockAccessor, BlockMetadata |
| 47 | +from ray.data.context import ( |
| 48 | + DEFAULT_ACTOR_MAX_TASKS_IN_FLIGHT_TO_MAX_CONCURRENCY_FACTOR, |
| 49 | + DataContext, |
| 50 | +) |
| 51 | +from ray.data.tests.util import ( |
| 52 | + create_map_transformer_from_block_fn, |
| 53 | + run_one_op_task, |
| 54 | + run_op_tasks_sync, |
| 55 | +) |
| 56 | +from ray.tests.client_test_utils import create_remote_signal_actor |
44 | 57 | from ray.tests.conftest import * # noqa |
45 | 58 | from ray.types import ObjectRef |
46 | 59 |
|
@@ -1001,6 +1014,165 @@ def __call__(self, batch): |
1001 | 1014 | ).take_all() |
1002 | 1015 |
|
1003 | 1016 |
|
| 1017 | +def test_actor_pool_map_operator_init(ray_start_regular_shared, data_context_override): |
| 1018 | + """Tests that ActorPoolMapOperator runs init_fn on start.""" |
| 1019 | + |
| 1020 | + from ray.exceptions import RayActorError |
| 1021 | + |
| 1022 | + # Override to block on actor pool provisioning at least min actors |
| 1023 | + data_context_override.wait_for_min_actors_s = 60 |
| 1024 | + |
| 1025 | + def _sleep(block_iter: Iterable[Block]) -> Iterable[Block]: |
| 1026 | + time.sleep(999) |
| 1027 | + |
| 1028 | + def _fail(): |
| 1029 | + raise ValueError("init_failed") |
| 1030 | + |
| 1031 | + input_op = InputDataBuffer( |
| 1032 | + DataContext.get_current(), make_ref_bundles([[i] for i in range(10)]) |
| 1033 | + ) |
| 1034 | + compute_strategy = ActorPoolStrategy(min_size=1) |
| 1035 | + |
| 1036 | + op = MapOperator.create( |
| 1037 | + create_map_transformer_from_block_fn(_sleep, init_fn=_fail), |
| 1038 | + input_op=input_op, |
| 1039 | + data_context=DataContext.get_current(), |
| 1040 | + name="TestMapper", |
| 1041 | + compute_strategy=compute_strategy, |
| 1042 | + ) |
| 1043 | + |
| 1044 | + with pytest.raises(RayActorError, match=r"init_failed"): |
| 1045 | + op.start(ExecutionOptions()) |
| 1046 | + |
| 1047 | + |
| 1048 | +@pytest.mark.parametrize( |
| 1049 | + "max_tasks_in_flight_strategy, max_tasks_in_flight_ctx, max_concurrency, expected_max_tasks_in_flight", |
| 1050 | + [ |
| 1051 | + # Compute strategy takes precedence |
| 1052 | + (3, 5, 4, 3), |
| 1053 | + # DataContext.max_tasks_in_flight_per_actor takes precedence |
| 1054 | + (None, 5, 4, 5), |
| 1055 | + # Max tasks in-flight is derived as max_concurrency x 4 |
| 1056 | + ( |
| 1057 | + None, |
| 1058 | + None, |
| 1059 | + 4, |
| 1060 | + 4 * DEFAULT_ACTOR_MAX_TASKS_IN_FLIGHT_TO_MAX_CONCURRENCY_FACTOR, |
| 1061 | + ), |
| 1062 | + ], |
| 1063 | +) |
| 1064 | +def test_actor_pool_map_operator_should_add_input( |
| 1065 | + ray_start_regular_shared, |
| 1066 | + max_tasks_in_flight_strategy, |
| 1067 | + max_tasks_in_flight_ctx, |
| 1068 | + max_concurrency, |
| 1069 | + expected_max_tasks_in_flight, |
| 1070 | + restore_data_context, |
| 1071 | +): |
| 1072 | + """Tests that ActorPoolMapOperator refuses input when actors are pending.""" |
| 1073 | + |
| 1074 | + ctx = DataContext.get_current() |
| 1075 | + ctx.max_tasks_in_flight_per_actor = max_tasks_in_flight_ctx |
| 1076 | + |
| 1077 | + input_op = InputDataBuffer(ctx, make_ref_bundles([[i] for i in range(20)])) |
| 1078 | + |
| 1079 | + compute_strategy = ActorPoolStrategy( |
| 1080 | + size=1, |
| 1081 | + max_tasks_in_flight_per_actor=max_tasks_in_flight_strategy, |
| 1082 | + ) |
| 1083 | + |
| 1084 | + def _failing_transform( |
| 1085 | + block_iter: Iterable[Block], task_context: TaskContext |
| 1086 | + ) -> Iterable[Block]: |
| 1087 | + raise ValueError("expected failure") |
| 1088 | + |
| 1089 | + op = MapOperator.create( |
| 1090 | + create_map_transformer_from_block_fn(_failing_transform), |
| 1091 | + input_op=input_op, |
| 1092 | + data_context=ctx, |
| 1093 | + name="TestMapper", |
| 1094 | + compute_strategy=compute_strategy, |
| 1095 | + ray_remote_args={"max_concurrency": max_concurrency}, |
| 1096 | + ) |
| 1097 | + |
| 1098 | + op.start(ExecutionOptions()) |
| 1099 | + |
| 1100 | + # Cannot add input until actor has started. |
| 1101 | + assert not op.should_add_input() |
| 1102 | + run_op_tasks_sync(op) |
| 1103 | + assert op.should_add_input() |
| 1104 | + |
| 1105 | + # Assert that single actor can accept up to N tasks |
| 1106 | + for _ in range(expected_max_tasks_in_flight): |
| 1107 | + assert op.should_add_input() |
| 1108 | + op.add_input(input_op.get_next(), 0) |
| 1109 | + assert not op.should_add_input() |
| 1110 | + |
| 1111 | + |
| 1112 | +def test_actor_pool_map_operator_num_active_tasks_and_completed(shutdown_only): |
| 1113 | + """Tests ActorPoolMapOperator's num_active_tasks and completed methods.""" |
| 1114 | + num_actors = 2 |
| 1115 | + ray.shutdown() |
| 1116 | + ray.init(num_cpus=num_actors) |
| 1117 | + |
| 1118 | + signal_actor = create_remote_signal_actor(ray).options(num_cpus=0).remote() |
| 1119 | + |
| 1120 | + def _map_transfom_fn(block_iter: Iterable[Block], _) -> Iterable[Block]: |
| 1121 | + ray.get(signal_actor.wait.remote()) |
| 1122 | + yield from block_iter |
| 1123 | + |
| 1124 | + input_op = InputDataBuffer( |
| 1125 | + DataContext.get_current(), make_ref_bundles([[i] for i in range(num_actors)]) |
| 1126 | + ) |
| 1127 | + compute_strategy = ActorPoolStrategy(min_size=num_actors, max_size=2 * num_actors) |
| 1128 | + |
| 1129 | + # Create an operator with [num_actors, 2 * num_actors] actors. |
| 1130 | + # Resources are limited to num_actors, so the second half will be pending. |
| 1131 | + op = MapOperator.create( |
| 1132 | + create_map_transformer_from_block_fn(_map_transfom_fn), |
| 1133 | + input_op=input_op, |
| 1134 | + data_context=DataContext.get_current(), |
| 1135 | + name="TestMapper", |
| 1136 | + compute_strategy=compute_strategy, |
| 1137 | + ) |
| 1138 | + actor_pool = op._actor_pool |
| 1139 | + |
| 1140 | + # Wait for the op to scale up to the min size. |
| 1141 | + op.start(ExecutionOptions()) |
| 1142 | + run_op_tasks_sync(op) |
| 1143 | + assert actor_pool.num_running_actors() == num_actors |
| 1144 | + assert op.num_active_tasks() == 0 |
| 1145 | + |
| 1146 | + # Scale up to the max size, the second half of the actors will be pending. |
| 1147 | + actor_pool.scale(ActorPoolScalingRequest(delta=num_actors)) |
| 1148 | + assert actor_pool.num_pending_actors() == num_actors |
| 1149 | + # `num_active_tasks` should exclude the metadata tasks for the pending actors. |
| 1150 | + assert op.num_active_tasks() == 0 |
| 1151 | + |
| 1152 | + # Add inputs. |
| 1153 | + for _ in range(num_actors): |
| 1154 | + assert op.should_add_input() |
| 1155 | + op.add_input(input_op.get_next(), 0) |
| 1156 | + # Still `num_active_tasks` should only include data tasks. |
| 1157 | + assert op.num_active_tasks() == num_actors |
| 1158 | + assert actor_pool.num_pending_actors() == num_actors |
| 1159 | + |
| 1160 | + # Let the data tasks complete. |
| 1161 | + signal_actor.send.remote() |
| 1162 | + while len(op._data_tasks) > 0: |
| 1163 | + run_one_op_task(op) |
| 1164 | + assert op.num_active_tasks() == 0 |
| 1165 | + assert actor_pool.num_pending_actors() == num_actors |
| 1166 | + |
| 1167 | + # Mark the inputs done and take all outputs. |
| 1168 | + # The operator should be completed, even if there are pending actors. |
| 1169 | + op.all_inputs_done() |
| 1170 | + while op.has_next(): |
| 1171 | + op.get_next() |
| 1172 | + assert actor_pool.num_pending_actors() == num_actors |
| 1173 | + assert op.has_completed() |
| 1174 | + |
| 1175 | + |
1004 | 1176 | if __name__ == "__main__": |
1005 | 1177 | import sys |
1006 | 1178 |
|
|
0 commit comments