@@ -92,80 +92,6 @@ def _get_worker_inputs(
9292 return futures
9393
9494
95- @ray .remote (num_cpus = 0 )
96- def execute_query_stage (
97- query_stages : list [QueryStage ],
98- stage_id : int ,
99- ) -> tuple [int , list [ray .ObjectRef ]]:
100- """
101- Execute a query stage on the workers.
102-
103- Returns the stage ID, and a list of futures for the output partitions of the query stage.
104- """
105- stage = QueryStage (stage_id , query_stages [stage_id ])
106-
107- # execute child stages first
108- child_futures = []
109- for child_id in stage .get_child_stage_ids ():
110- child_futures .append (execute_query_stage .remote (query_stages , child_id ))
111-
112- # if the query stage has a single output partition then we need to execute for the output
113- # partition, otherwise we need to execute in parallel for each input partition
114- concurrency = stage .get_input_partition_count ()
115- output_partitions_count = stage .get_output_partition_count ()
116- if output_partitions_count == 1 :
117- # reduce stage
118- print ("Forcing reduce stage concurrency from {} to 1" .format (concurrency ))
119- concurrency = 1
120-
121- print (
122- "Scheduling query stage #{} with {} input partitions and {} output partitions" .format (
123- stage .id (), concurrency , output_partitions_count
124- )
125- )
126-
127- # A list of (stage ID, list of futures) for each child stage
128- # Each list is a 2-D array of (input partitions, output partitions).
129- child_outputs = ray .get (child_futures )
130-
131- def _get_worker_inputs (
132- part : int ,
133- ) -> tuple [list [tuple [int , int , int ]], list [ray .ObjectRef ]]:
134- ids = []
135- futures = []
136- for child_stage_id , child_futures in child_outputs :
137- for i , lst in enumerate (child_futures ):
138- if isinstance (lst , list ):
139- for j , f in enumerate (lst ):
140- if concurrency == 1 or j == part :
141- # If concurrency is 1, pass in all shuffle partitions. Otherwise,
142- # only pass in the partitions that match the current worker partition.
143- ids .append ((child_stage_id , i , j ))
144- futures .append (f )
145- elif concurrency == 1 or part == 0 :
146- ids .append ((child_stage_id , i , 0 ))
147- futures .append (lst )
148- return ids , futures
149-
150- # schedule the actual execution workers
151- plan_bytes = stage .get_execution_plan_bytes ()
152- futures = []
153- opt = {}
154- # TODO not sure why we had this but my Ray cluster could not find suitable resource
155- # until I commented this out
156- # opt["resources"] = {"worker": 1e-3}
157- opt ["num_returns" ] = output_partitions_count
158- for part in range (concurrency ):
159- ids , inputs = _get_worker_inputs (part )
160- futures .append (
161- execute_query_partition .options (** opt ).remote (
162- stage_id , plan_bytes , part , ids , * inputs
163- )
164- )
165-
166- return stage_id , futures
167-
168-
16995@ray .remote
17096def execute_query_partition (
17197 stage_id : int ,
0 commit comments