Skip to content

Commit 4ae67fb

Browse files
authored
[Ray] optimize ray fetcher by query in remote node (#2957)
1 parent 260c315 commit 4ae67fb

File tree

1 file changed

+23
-13
lines changed

1 file changed

+23
-13
lines changed

mars/services/task/execution/ray/fetcher.py

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@
1515
import asyncio
1616
from collections import namedtuple
1717
from typing import Dict, List
18+
from .....utils import lazy_import
1819
from ..api import Fetcher, register_fetcher_cls
1920

20-
21+
ray = lazy_import("ray")
2122
_FetchInfo = namedtuple("FetchInfo", ["key", "object_ref", "conditions"])
2223

2324

@@ -38,18 +39,27 @@ async def append(self, chunk_key: str, chunk_meta: Dict, conditions: List = None
3839
)
3940

4041
async def get(self):
41-
objects = await asyncio.gather(
42-
*(info.object_ref for info in self._fetch_info_list)
43-
)
4442
if self._no_conditions:
45-
return objects
46-
results = []
47-
for o, fetch_info in zip(objects, self._fetch_info_list):
43+
return await asyncio.gather(
44+
*(info.object_ref for info in self._fetch_info_list)
45+
)
46+
refs = [None] * len(self._fetch_info_list)
47+
for index, fetch_info in enumerate(self._fetch_info_list):
4848
if fetch_info.conditions is None:
49-
results.append(o)
49+
refs[index] = fetch_info.object_ref
5050
else:
51-
try:
52-
results.append(o.iloc[fetch_info.conditions])
53-
except AttributeError:
54-
results.append(o[fetch_info.conditions])
55-
return results
51+
refs[index] = query_object_with_condition.remote(
52+
fetch_info.object_ref, fetch_info.conditions
53+
)
54+
return await asyncio.gather(*refs)
55+
56+
57+
def query_object_with_condition(o, conditions):
58+
try:
59+
return o.iloc[conditions]
60+
except AttributeError:
61+
return o[conditions]
62+
63+
64+
if ray:
65+
query_object_with_condition = ray.remote(query_object_with_condition)

0 commit comments

Comments
 (0)