Skip to content

Commit 2376990

Browse files
committed
Add _send_small_objects for sender to optimize sending small objects
1 parent 61c0c51 commit 2376990

File tree

3 files changed

+165
-62
lines changed

3 files changed

+165
-62
lines changed

mars/services/storage/handler.py

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -82,22 +82,23 @@ async def __post_create__(self):
8282
if client.level & level:
8383
clients[level] = client
8484

85-
async def _get_data(self, data_info, conditions):
85+
@mo.extensible
86+
async def get_data_by_info(self, data_info: DataInfo, conditions: List = None):
8687
if conditions is None:
87-
res = yield self._clients[data_info.level].get(data_info.object_id)
88+
res = await self._clients[data_info.level].get(data_info.object_id)
8889
else:
8990
try:
90-
res = yield self._clients[data_info.level].get(
91+
res = await self._clients[data_info.level].get(
9192
data_info.object_id, conditions=conditions
9293
)
9394
except NotImplementedError:
94-
data = yield self._clients[data_info.level].get(data_info.object_id)
95+
data = await self._clients[data_info.level].get(data_info.object_id)
9596
try:
9697
sliced_value = data.iloc[tuple(conditions)]
9798
except AttributeError:
9899
sliced_value = data[tuple(conditions)]
99100
res = sliced_value
100-
raise mo.Return(res)
101+
return res
101102

102103
@mo.extensible
103104
async def get(
@@ -111,7 +112,7 @@ async def get(
111112
data_info = await self._data_manager_ref.get_data_info(
112113
session_id, data_key, self._band_name
113114
)
114-
data = yield self._get_data(data_info, conditions)
115+
data = yield self.get_data_by_info(data_info, conditions)
115116
raise mo.Return(data)
116117
except DataNotExist:
117118
if error == "raise":
@@ -143,7 +144,7 @@ async def batch_get(self, args_list, kwargs_list):
143144
if data_info is None:
144145
results.append(None)
145146
else:
146-
result = yield self._get_data(data_info, conditions)
147+
result = yield self.get_data_by_info(data_info, conditions)
147148
results.append(result)
148149
raise mo.Return(results)
149150

@@ -314,12 +315,16 @@ async def batch_delete(self, args_list, kwargs_list):
314315
for level, size in level_sizes.items():
315316
await self._quota_refs[level].release_quota(size)
316317

318+
@mo.extensible
319+
async def open_reader_by_info(self, data_info: DataInfo) -> StorageFileObject:
320+
return await self._clients[data_info.level].open_reader(data_info.object_id)
321+
317322
@mo.extensible
318323
async def open_reader(self, session_id: str, data_key: str) -> StorageFileObject:
319324
data_info = await self._data_manager_ref.get_data_info(
320325
session_id, data_key, self._band_name
321326
)
322-
reader = await self._clients[data_info.level].open_reader(data_info.object_id)
327+
reader = await self.open_reader_by_info(data_info)
323328
return reader
324329

325330
@open_reader.batch
@@ -333,10 +338,7 @@ async def batch_open_readers(self, args_list, kwargs_list):
333338
)
334339
data_infos = await self._data_manager_ref.get_data_info.batch(*get_data_infos)
335340
return await asyncio.gather(
336-
*[
337-
self._clients[data_info.level].open_reader(data_info.object_id)
338-
for data_info in data_infos
339-
]
341+
*[self.open_reader_by_info(data_info) for data_info in data_infos]
340342
)
341343

342344
@mo.extensible

mars/services/storage/tests/test_transfer.py

Lines changed: 59 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -101,50 +101,84 @@ async def test_simple_transfer(create_actors):
101101
storage_handler1 = await mo.actor_ref(
102102
uid=StorageHandlerActor.gen_uid("numa-0"), address=worker_address_1
103103
)
104+
data_manager1 = await mo.actor_ref(
105+
uid=DataManagerActor.default_uid(), address=worker_address_1
106+
)
104107
storage_handler2 = await mo.actor_ref(
105108
uid=StorageHandlerActor.gen_uid("numa-0"), address=worker_address_2
106109
)
110+
data_manager2 = await mo.actor_ref(
111+
uid=DataManagerActor.default_uid(), address=worker_address_2
112+
)
107113

108114
await storage_handler1.put(session_id, "data_key1", data1, StorageLevel.MEMORY)
109115
await storage_handler1.put(session_id, "data_key2", data2, StorageLevel.MEMORY)
110116
await storage_handler2.put(session_id, "data_key3", data2, StorageLevel.MEMORY)
111117

112-
sender_actor = await mo.actor_ref(
118+
# sender_actor1 use default block_size
119+
sender_actor1 = await mo.actor_ref(
113120
address=worker_address_1, uid=SenderManagerActor.gen_uid("numa-0")
114121
)
115-
116-
# send data to worker2 from worker1
117-
await sender_actor.send_batch_data(
118-
session_id,
119-
["data_key1"],
120-
worker_address_2,
121-
StorageLevel.MEMORY,
122-
block_size=1000,
122+
# send_actor2 set block_size to 0
123+
sender_actor2 = await mo.create_actor(
124+
SenderManagerActor,
125+
"numa-0",
126+
0,
127+
data_manager1,
128+
storage_handler1,
129+
uid=SenderManagerActor.gen_uid("mock"),
130+
address=worker_address_1,
123131
)
124132

125-
await sender_actor.send_batch_data(
126-
session_id,
127-
["data_key2"],
128-
worker_address_2,
129-
StorageLevel.MEMORY,
130-
block_size=1000,
131-
)
133+
for i, sender_actor in enumerate([sender_actor1, sender_actor2]):
134+
# send data to worker2 from worker1
135+
await sender_actor.send_batch_data(
136+
session_id,
137+
["data_key1"],
138+
worker_address_2,
139+
StorageLevel.MEMORY,
140+
block_size=1000,
141+
)
132142

133-
get_data1 = await storage_handler2.get(session_id, "data_key1")
134-
np.testing.assert_array_equal(data1, get_data1)
143+
await sender_actor.send_batch_data(
144+
session_id,
145+
["data_key2"],
146+
worker_address_2,
147+
StorageLevel.MEMORY,
148+
block_size=1000,
149+
)
150+
151+
get_data1 = await storage_handler2.get(session_id, "data_key1")
152+
np.testing.assert_array_equal(data1, get_data1)
135153

136-
get_data2 = await storage_handler2.get(session_id, "data_key2")
137-
pd.testing.assert_frame_equal(data2, get_data2)
154+
get_data2 = await storage_handler2.get(session_id, "data_key2")
155+
pd.testing.assert_frame_equal(data2, get_data2)
156+
await storage_handler2.delete(session_id, "data_key1")
157+
await storage_handler2.delete(session_id, "data_key2")
138158

139159
# send data to worker1 from worker2
140-
sender_actor = await mo.actor_ref(
160+
sender_actor1 = await mo.actor_ref(
141161
address=worker_address_2, uid=SenderManagerActor.gen_uid("numa-0")
142162
)
143-
await sender_actor.send_batch_data(
144-
session_id, ["data_key3"], worker_address_1, StorageLevel.MEMORY
163+
# send_actor2 set block_size to 0
164+
sender_actor2 = await mo.create_actor(
165+
SenderManagerActor,
166+
"numa-0",
167+
0,
168+
data_manager2,
169+
storage_handler2,
170+
uid=SenderManagerActor.gen_uid("mock"),
171+
address=worker_address_2,
145172
)
146-
get_data3 = await storage_handler1.get(session_id, "data_key3")
147-
pd.testing.assert_frame_equal(data2, get_data3)
173+
for i, sender_actor in enumerate([sender_actor1, sender_actor2]):
174+
# send data to worker1 from worker2
175+
data_key = f"data_key3"
176+
await sender_actor.send_batch_data(
177+
session_id, [data_key], worker_address_1, StorageLevel.MEMORY
178+
)
179+
get_data3 = await storage_handler1.get(session_id, data_key)
180+
pd.testing.assert_frame_equal(data2, get_data3)
181+
await storage_handler1.delete(session_id, "data_key3")
148182

149183

150184
# test for cancelling happens when writing

mars/services/storage/transfer.py

Lines changed: 92 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import asyncio
1616
from dataclasses import dataclass
1717
import logging
18-
from typing import Dict, Union, Any, List
18+
from typing import Dict, Union, Any, List, Tuple
1919

2020
from ... import oscar as mo
2121
from ...lib.aio import alru_cache
@@ -28,7 +28,7 @@
2828
)
2929
from ...storage import StorageLevel
3030
from ...utils import dataslots
31-
from .core import DataManagerActor, WrappedStorageFileObject
31+
from .core import DataManagerActor, WrappedStorageFileObject, DataInfo
3232
from .handler import StorageHandlerActor
3333

3434
DEFAULT_TRANSFER_BLOCK_SIZE = 4 * 1024**2
@@ -96,6 +96,7 @@ async def _send_data(
9696
receiver_ref: Union[mo.ActorRef],
9797
session_id: str,
9898
data_keys: List[str],
99+
data_infos: List[DataInfo],
99100
level: StorageLevel,
100101
block_size: int,
101102
):
@@ -129,11 +130,13 @@ async def send(self, buffer, eof_mark, key):
129130

130131
sender = BufferedSender()
131132
open_reader_tasks = []
132-
for data_key in data_keys:
133+
for data_key, info in zip(data_keys, data_infos):
133134
open_reader_tasks.append(
134-
self._storage_handler.open_reader.delay(session_id, data_key)
135+
self._storage_handler.open_reader_by_info.delay(info)
135136
)
136-
readers = await self._storage_handler.open_reader.batch(*open_reader_tasks)
137+
readers = await self._storage_handler.open_reader_by_info.batch(
138+
*open_reader_tasks
139+
)
137140

138141
for data_key, reader in zip(data_keys, readers):
139142
while True:
@@ -152,7 +155,61 @@ async def send(self, buffer, eof_mark, key):
152155
break
153156
await sender.flush()
154157

155-
@mo.extensible
158+
async def _send(
159+
self,
160+
session_id: str,
161+
data_keys: List[Union[str, Tuple]],
162+
data_infos: List[DataInfo],
163+
data_sizes: List[int],
164+
block_size: int,
165+
address: str,
166+
band_name: str,
167+
level: StorageLevel,
168+
):
169+
receiver_ref: Union[
170+
ReceiverManagerActor, mo.ActorRef
171+
] = await self.get_receiver_ref(address, band_name)
172+
is_transferring_list = await receiver_ref.open_writers(
173+
session_id, data_keys, data_sizes, level
174+
)
175+
to_send_keys = []
176+
to_send_infos = []
177+
to_wait_keys = []
178+
for data_key, is_transferring, info in zip(
179+
data_keys, is_transferring_list, data_infos
180+
):
181+
if is_transferring:
182+
to_wait_keys.append(data_key)
183+
else:
184+
to_send_keys.append(data_key)
185+
to_send_infos.append(info)
186+
187+
if to_send_keys:
188+
await self._send_data(
189+
receiver_ref, session_id, to_send_keys, to_send_infos, level, block_size
190+
)
191+
if to_wait_keys:
192+
await receiver_ref.wait_transfer_done(session_id, to_wait_keys)
193+
194+
async def _send_small_objects(
195+
self,
196+
session_id: str,
197+
data_keys: List[Union[str, Tuple]],
198+
data_infos: List[DataInfo],
199+
address: str,
200+
band_name: str,
201+
level: StorageLevel,
202+
):
203+
# simple get all objects and send them all to receiver
204+
get_tasks = [
205+
self._storage_handler.get_data_by_info.delay(info) for info in data_infos
206+
]
207+
data_list = await self._storage_handler.get_data_by_info.batch(*get_tasks)
208+
receiver_ref: Union[
209+
ReceiverManagerActor, mo.ActorRef
210+
] = await self.get_receiver_ref(address, band_name)
211+
await receiver_ref.put_small_objects(session_id, data_keys, data_list, level)
212+
156213
async def send_batch_data(
157214
self,
158215
session_id: str,
@@ -167,9 +224,6 @@ async def send_batch_data(
167224
"Begin to send data (%s, %s) to %s", session_id, data_keys, address
168225
)
169226
block_size = block_size or self._transfer_block_size
170-
receiver_ref: Union[
171-
ReceiverManagerActor, mo.ActorRef
172-
] = await self.get_receiver_ref(address, band_name)
173227
get_infos = []
174228
pin_tasks = []
175229
for data_key in data_keys:
@@ -198,23 +252,27 @@ async def send_batch_data(
198252
data_sizes = [info.store_size for info in infos]
199253
if level is None:
200254
level = infos[0].level
201-
is_transferring_list = await receiver_ref.open_writers(
202-
session_id, data_keys, data_sizes, level
203-
)
204-
to_send_keys = []
205-
to_wait_keys = []
206-
for data_key, is_transferring in zip(data_keys, is_transferring_list):
207-
if is_transferring:
208-
to_wait_keys.append(data_key)
209-
else:
210-
to_send_keys.append(data_key)
211-
212-
if to_send_keys:
213-
await self._send_data(
214-
receiver_ref, session_id, to_send_keys, level, block_size
255+
total_size = sum(data_sizes)
256+
if total_size > block_size:
257+
logger.debug("Choose block method for sending data of %s bytes", total_size)
258+
await self._send(
259+
session_id,
260+
data_keys,
261+
infos,
262+
data_sizes,
263+
block_size,
264+
address,
265+
band_name,
266+
level,
267+
)
268+
else:
269+
logger.debug(
270+
"Choose send_small_objects method for sending data of %s bytes",
271+
total_size,
272+
)
273+
await self._send_small_objects(
274+
session_id, data_keys, infos, address, band_name, level
215275
)
216-
if to_wait_keys:
217-
await receiver_ref.wait_transfer_done(session_id, to_wait_keys)
218276
unpin_tasks = []
219277
for data_key in data_keys:
220278
unpin_tasks.append(
@@ -268,6 +326,15 @@ def _decref_writing_key(self, session_id: str, data_key: str):
268326
if self._writing_infos[(session_id, data_key)].ref_counts == 0:
269327
del self._writing_infos[(session_id, data_key)]
270328

329+
async def put_small_objects(
330+
self, session_id: str, data_keys: List[str], objects: List, level: StorageLevel
331+
):
332+
tasks = [
333+
self._storage_handler.put.delay(session_id, data_key, obj, level)
334+
for data_key, obj in zip(data_keys, objects)
335+
]
336+
await self._storage_handler.put.batch(*tasks)
337+
271338
async def create_writers(
272339
self,
273340
session_id: str,

0 commit comments

Comments
 (0)