Skip to content

Commit b1bc2ed

Browse files
author
denvr
committed
YT-25975 - Task count for parallel reader
* Changelog entry Type: feature Component: python-sdk Tune task count for parallel reader commit_hash:4dd27a2d54e4478f7d086723de1beeffa9e855b6
1 parent 1854ca8 commit b1bc2ed

File tree

4 files changed

+118
-65
lines changed

4 files changed

+118
-65
lines changed

yt/python/yt/wrapper/default_config.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ class DefaultConfigQueryTrackerType(TypedDict, total=False):
236236

237237
class DefaultConfigReadParallelType(TypedDict, total=False):
238238
max_thread_count: int
239-
data_size_per_thread: int
239+
data_size_per_thread: Union[int, None]
240240
enable: bool
241241

242242
read_parallel: DefaultConfigReadParallelType
@@ -844,8 +844,8 @@ def get_dynamic_table_retries() -> DefaultConfigRetriesBackoffType:
844844
"read_parallel": {
845845
# Number of threads for reading table.
846846
"max_thread_count": 10,
847-
# Approximate data size per one thread.
848-
"data_size_per_thread": 8 * 1024 * 1024,
847+
# Approximate data size per one task.
848+
"data_size_per_thread": None,
849849
# Always run read parallel if it is possible.
850850
"enable": False
851851
},

yt/python/yt/wrapper/file_commands.py

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from .cypress_commands import (remove, exists, set_attribute, mkdir, find_free_subpath,
1010
create, link, get, set)
1111
from .default_config import DEFAULT_WRITE_CHUNK_SIZE
12-
from .parallel_reader import make_read_parallel_request
12+
from .parallel_reader import make_read_parallel_request, _prepare_ranges_for_parallel_read
1313
from .parallel_writer import make_parallel_write_request
1414
from .retries import Retrier, default_chaos_monkey
1515
from .transaction import Transaction
@@ -90,23 +90,6 @@ def attributes(self):
9090
return self._attributes
9191

9292

93-
def _prepare_ranges_for_parallel_read(offset, length, data_size, data_size_per_thread):
94-
offset = get_value(offset, 0)
95-
offset = min(offset, data_size)
96-
97-
length = get_value(length, data_size)
98-
length = min(length, data_size - offset)
99-
100-
result = []
101-
while offset < data_size and length > 0:
102-
range_size = min(data_size_per_thread, length)
103-
result.append({"range" : (offset, range_size)})
104-
offset += range_size
105-
length -= range_size
106-
107-
return result
108-
109-
11093
def _prepare_params_for_parallel_read(params, range):
11194
params["offset"], params["length"] = range["range"][0], range["range"][1]
11295
return params

yt/python/yt/wrapper/parallel_reader.py

Lines changed: 96 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from .common import YtError
1+
from .common import YtError, require, get_value
22
from .config import get_config, get_option
33
from .errors import YtChunkUnavailable
44
from .format import YtFormatReadError
@@ -11,10 +11,18 @@
1111
from .thread_pool import ThreadPool
1212
from .ypath import TablePath
1313

14+
import yt.logger as logger
15+
1416
from yt.common import join_exceptions
1517

18+
import builtins
1619
import copy
1720
import threading
21+
import typing
22+
23+
24+
DEFAULT_DATA_SIZE_PER_THREAD = 8 * 1024 * 1024
25+
DEFAULT_SINGLE_CHUNK_SPLIT = 3
1826

1927

2028
class ParallelReadRetrier(Retrier):
@@ -138,3 +146,90 @@ def make_read_parallel_request(command_name, path, ranges, params, prepare_param
138146
if transaction:
139147
transaction.abort()
140148
raise
149+
150+
151+
def _prepare_ranges_for_parallel_read(
152+
offset: int,
153+
length: int,
154+
data_size: int,
155+
data_size_per_thread: int,
156+
) -> typing.List[typing.Dict[str, typing.Tuple[int, int]]]:
157+
if not data_size_per_thread:
158+
data_size_per_thread = DEFAULT_DATA_SIZE_PER_THREAD
159+
160+
offset = get_value(offset, 0)
161+
offset = min(offset, data_size)
162+
163+
length = get_value(length, data_size)
164+
length = min(length, data_size - offset)
165+
166+
result = []
167+
while offset < data_size and length > 0:
168+
range_size = min(data_size_per_thread, length)
169+
result.append({"range" : (offset, range_size)})
170+
offset += range_size
171+
length -= range_size
172+
173+
return result
174+
175+
176+
def _slice_row_ranges_for_parallel_read(
177+
ranges: typing.List[typing.Mapping],
178+
row_count: int,
179+
chunk_count: int,
180+
data_size: int,
181+
replication_factor: int,
182+
data_size_per_thread: typing.Union[int, None],
183+
) -> typing.Tuple[typing.List[typing.Dict[str, typing.Tuple[int, int]]], int]:
184+
def _get_ranges(ranges, rows_per_task):
185+
result = []
186+
for range in ranges:
187+
if "exact" in range:
188+
require("row_index" in range["exact"], lambda: YtError('Invalid YPath: "row_index" not found'))
189+
lower_limit = range["exact"]["row_index"]
190+
upper_limit = lower_limit + 1
191+
else:
192+
if "lower_limit" in range:
193+
require("row_index" in range["lower_limit"], lambda: YtError('Invalid YPath: "row_index" not found'))
194+
if "upper_limit" in range:
195+
require("row_index" in range["upper_limit"], lambda: YtError('Invalid YPath: "row_index" not found'))
196+
197+
lower_limit = 0 if "lower_limit" not in range else range["lower_limit"]["row_index"]
198+
upper_limit = row_count if "upper_limit" not in range else range["upper_limit"]["row_index"]
199+
200+
for start in builtins.range(lower_limit, upper_limit, rows_per_task):
201+
end = min(start + rows_per_task, upper_limit)
202+
result.append({"range" : (start, end)})
203+
return result
204+
205+
if row_count > 0:
206+
row_size = data_size / float(row_count)
207+
else:
208+
row_size = 1
209+
210+
if data_size_per_thread:
211+
rows_per_task = max(
212+
int(data_size_per_thread / row_size),
213+
1,
214+
)
215+
result = _get_ranges(ranges, rows_per_task)
216+
logger.debug(f"Parallel read tasks count: {len(result)}, {row_size=}, {rows_per_task=}")
217+
return result, rows_per_task * row_size
218+
else:
219+
if chunk_count > 0 and replication_factor >= 0:
220+
data_size_per_thread = int(data_size / (chunk_count * replication_factor))
221+
else:
222+
data_size_per_thread = data_size
223+
rows_per_task_by_table = int(data_size_per_thread / row_size)
224+
rows_per_task_by_default = int(DEFAULT_DATA_SIZE_PER_THREAD / row_size)
225+
rows_per_task = max(
226+
rows_per_task_by_default,
227+
rows_per_task_by_table,
228+
1,
229+
)
230+
result = _get_ranges(ranges, rows_per_task)
231+
if len(result) == 1 and rows_per_task > DEFAULT_SINGLE_CHUNK_SPLIT:
232+
rows_per_task //= DEFAULT_SINGLE_CHUNK_SPLIT
233+
result = _get_ranges(ranges, rows_per_task)
234+
logger.debug(f"Parallel read tasks count: {len(result)}, {row_size=}, {rows_per_task=} ({rows_per_task_by_default}, {rows_per_task_by_table})")
235+
return result, rows_per_task * row_size

yt/python/yt/wrapper/table_commands.py

Lines changed: 18 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from .table_helpers import (_prepare_source_tables, _are_default_empty_table, _prepare_table_writer,
2626
_remove_tables, DEFAULT_EMPTY_TABLE, _to_chunk_stream, _prepare_command_format)
2727
from .file_commands import _get_remote_temp_files_directory, _append_default_path_with_user_level
28-
from .parallel_reader import make_read_parallel_request
28+
from .parallel_reader import make_read_parallel_request, _slice_row_ranges_for_parallel_read
2929
from .schema import _SchemaRuntimeCtx, TableSchema, make_dataclass_from_table_schema
3030
from .stream import ItemStream, _ChunkStream
3131
from .ypath import TablePath, YPath, ypath_join
@@ -34,7 +34,6 @@
3434
import yt.yson as yson
3535
import yt.logger as logger
3636

37-
import builtins
3837
from copy import deepcopy
3938
from datetime import timedelta
4039
import enum
@@ -449,35 +448,6 @@ def read_blob_table(table, part_index_column_name=None, data_column_name=None,
449448
return response
450449

451450

452-
def _slice_row_ranges_for_parallel_read(ranges, row_count, data_size, data_size_per_thread):
453-
result = []
454-
if row_count > 0:
455-
row_size = data_size / float(row_count)
456-
else:
457-
row_size = 1
458-
459-
rows_per_thread = max(int(data_size_per_thread / row_size), 1)
460-
for range in ranges:
461-
if "exact" in range:
462-
require("row_index" in range["exact"], lambda: YtError('Invalid YPath: "row_index" not found'))
463-
lower_limit = range["exact"]["row_index"]
464-
upper_limit = lower_limit + 1
465-
else:
466-
if "lower_limit" in range:
467-
require("row_index" in range["lower_limit"], lambda: YtError('Invalid YPath: "row_index" not found'))
468-
if "upper_limit" in range:
469-
require("row_index" in range["upper_limit"], lambda: YtError('Invalid YPath: "row_index" not found'))
470-
471-
lower_limit = 0 if "lower_limit" not in range else range["lower_limit"]["row_index"]
472-
upper_limit = row_count if "upper_limit" not in range else range["upper_limit"]["row_index"]
473-
474-
for start in builtins.range(lower_limit, upper_limit, rows_per_thread):
475-
end = min(start + rows_per_thread, upper_limit)
476-
result.append({"range" : (start, end)})
477-
478-
return result
479-
480-
481451
def _prepare_params_for_parallel_read(params, range):
482452
params["path"].attributes["ranges"] = [{"lower_limit": {"row_index": range["range"][0]},
483453
"upper_limit": {"row_index": range["range"][1]}}]
@@ -808,7 +778,7 @@ def _check_attributes_for_read_table(attributes, table, client):
808778
def _get_table_attributes(table, client):
809779
attributes = get(
810780
table + "/@",
811-
attributes=["type", "chunk_count", "compressed_data_size", "dynamic", "row_count", "uncompressed_data_size"],
781+
attributes=["type", "chunk_count", "compressed_data_size", "dynamic", "row_count", "replication_factor", "chunk_count", "uncompressed_data_size"],
812782
client=client)
813783
return attributes
814784

@@ -861,11 +831,14 @@ def read_table(table, format=None, table_reader=None, control_attributes=None, u
861831
table.attributes["ranges"] = [
862832
{"lower_limit": {"row_index": 0},
863833
"upper_limit": {"row_index": attributes["row_count"]}}]
864-
ranges = _slice_row_ranges_for_parallel_read(
865-
table.attributes["ranges"],
866-
attributes["row_count"],
867-
attributes["uncompressed_data_size"],
868-
get_config(client)["read_parallel"]["data_size_per_thread"])
834+
ranges, _ = _slice_row_ranges_for_parallel_read(
835+
ranges=table.attributes["ranges"],
836+
row_count=attributes["row_count"],
837+
chunk_count=attributes["chunk_count"],
838+
data_size=attributes["uncompressed_data_size"],
839+
replication_factor=attributes["replication_factor"],
840+
data_size_per_thread=get_config(client)["read_parallel"]["data_size_per_thread"],
841+
)
869842
response_parameters = get_value(response_parameters, {})
870843
if not ranges:
871844
response_parameters["start_row_index"] = 0
@@ -1229,12 +1202,14 @@ def _dump_file(table, output_file, output_path, enable_several_files, unordered,
12291202
"upper_limit": {"row_index": attributes["row_count"]},
12301203
}]
12311204

1232-
data_size_per_thread = get_config(client)["read_parallel"]["data_size_per_thread"]
1233-
ranges = _slice_row_ranges_for_parallel_read(
1234-
table.attributes["ranges"],
1235-
attributes["row_count"],
1236-
attributes["uncompressed_data_size"],
1237-
data_size_per_thread)
1205+
ranges, data_size_per_thread = _slice_row_ranges_for_parallel_read(
1206+
ranges=table.attributes["ranges"],
1207+
row_count=attributes["row_count"],
1208+
chunk_count=attributes["chunk_count"],
1209+
data_size=attributes["uncompressed_data_size"],
1210+
replication_factor=attributes["replication_factor"],
1211+
data_size_per_thread=get_config(client)["read_parallel"]["data_size_per_thread"],
1212+
)
12381213

12391214
range_count = len(ranges)
12401215
result_ranges = []

0 commit comments

Comments
 (0)