Skip to content

Commit 9ed304b

Browse files
authored
[python] support table scan with row range (#6944)
1 parent 7bfa946 commit 9ed304b

File tree

3 files changed

+137
-33
lines changed

3 files changed

+137
-33
lines changed

paimon-python/pypaimon/read/scanner/full_starting_scanner.py

100644100755
Lines changed: 67 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ def __init__(self, table, predicate: Optional[Predicate], limit: Optional[int]):
6363

6464
self.idx_of_this_subtask = None
6565
self.number_of_para_subtasks = None
66+
self.start_row_of_this_subtask = None
67+
self.end_row_of_this_subtask = None
6668

6769
self.only_read_real_buckets = True if options.bucket() == BucketMode.POSTPONE_BUCKET.value else False
6870
self.data_evolution = options.data_evolution_enabled()
@@ -121,37 +123,24 @@ def read_manifest_entries(self, manifest_files: List[ManifestFileMeta]) -> List[
121123
def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 'FullStartingScanner':
122124
if idx_of_this_subtask >= number_of_para_subtasks:
123125
raise Exception("idx_of_this_subtask must be less than number_of_para_subtasks")
126+
if self.start_row_of_this_subtask is not None:
127+
raise Exception("with_shard and with_row_range cannot be used simultaneously")
124128
self.idx_of_this_subtask = idx_of_this_subtask
125129
self.number_of_para_subtasks = number_of_para_subtasks
126130
return self
127131

128-
def _append_only_filter_by_shard(self, partitioned_files: defaultdict) -> (defaultdict, int, int):
129-
"""
130-
Filter file entries by shard. Only keep the files within the range, which means
131-
that only the starting and ending files need to be further divided subsequently
132-
"""
133-
total_row = 0
134-
# Sort by file creation time to ensure consistent sharding
135-
for key, file_entries in partitioned_files.items():
136-
for entry in file_entries:
137-
total_row += entry.file.row_count
138-
139-
# Calculate number of rows this shard should process using balanced distribution
140-
# Distribute remainder evenly among first few shards to avoid last shard overload
141-
base_rows_per_shard = total_row // self.number_of_para_subtasks
142-
remainder = total_row % self.number_of_para_subtasks
143-
144-
# Each of the first 'remainder' shards gets one extra row
145-
if self.idx_of_this_subtask < remainder:
146-
num_row = base_rows_per_shard + 1
147-
start_row = self.idx_of_this_subtask * (base_rows_per_shard + 1)
148-
else:
149-
num_row = base_rows_per_shard
150-
start_row = (remainder * (base_rows_per_shard + 1) +
151-
(self.idx_of_this_subtask - remainder) * base_rows_per_shard)
152-
153-
end_row = start_row + num_row
132+
def with_row_range(self, start_row, end_row) -> 'FullStartingScanner':
133+
if start_row >= end_row:
134+
raise Exception("start_row must be less than end_row")
135+
if self.idx_of_this_subtask is not None:
136+
raise Exception("with_row_range and with_shard cannot be used simultaneously")
137+
self.start_row_of_this_subtask = start_row
138+
self.end_row_of_this_subtask = end_row
139+
return self
154140

141+
def _append_only_filter_by_row_range(self, partitioned_files: defaultdict,
142+
start_row: int,
143+
end_row: int) -> (defaultdict, int, int):
155144
plan_start_row = 0
156145
plan_end_row = 0
157146
entry_end_row = 0 # end row position of current file in all data
@@ -183,12 +172,16 @@ def _append_only_filter_by_shard(self, partitioned_files: defaultdict) -> (defau
183172

184173
return filtered_partitioned_files, plan_start_row, plan_end_row
185174

186-
def _data_evolution_filter_by_shard(self, partitioned_files: defaultdict) -> (defaultdict, int, int):
175+
def _append_only_filter_by_shard(self, partitioned_files: defaultdict) -> (defaultdict, int, int):
176+
"""
177+
Filter file entries by shard. Only keep the files within the range, which means
178+
that only the starting and ending files need to be further divided subsequently
179+
"""
187180
total_row = 0
181+
# Sort by file creation time to ensure consistent sharding
188182
for key, file_entries in partitioned_files.items():
189183
for entry in file_entries:
190-
if not self._is_blob_file(entry.file.file_name):
191-
total_row += entry.file.row_count
184+
total_row += entry.file.row_count
192185

193186
# Calculate number of rows this shard should process using balanced distribution
194187
# Distribute remainder evenly among first few shards to avoid last shard overload
@@ -206,6 +199,11 @@ def _data_evolution_filter_by_shard(self, partitioned_files: defaultdict) -> (de
206199

207200
end_row = start_row + num_row
208201

202+
return self._append_only_filter_by_row_range(partitioned_files, start_row, end_row)
203+
204+
def _data_evolution_filter_by_row_range(self, partitioned_files: defaultdict,
205+
start_row: int,
206+
end_row: int) -> (defaultdict, int, int):
209207
plan_start_row = 0
210208
plan_end_row = 0
211209
entry_end_row = 0 # end row position of current file in all data
@@ -244,6 +242,30 @@ def _data_evolution_filter_by_shard(self, partitioned_files: defaultdict) -> (de
244242

245243
return filtered_partitioned_files, plan_start_row, plan_end_row
246244

245+
def _data_evolution_filter_by_shard(self, partitioned_files: defaultdict) -> (defaultdict, int, int):
246+
total_row = 0
247+
for key, file_entries in partitioned_files.items():
248+
for entry in file_entries:
249+
if not self._is_blob_file(entry.file.file_name):
250+
total_row += entry.file.row_count
251+
252+
# Calculate number of rows this shard should process using balanced distribution
253+
# Distribute remainder evenly among first few shards to avoid last shard overload
254+
base_rows_per_shard = total_row // self.number_of_para_subtasks
255+
remainder = total_row % self.number_of_para_subtasks
256+
257+
# Each of the first 'remainder' shards gets one extra row
258+
if self.idx_of_this_subtask < remainder:
259+
num_row = base_rows_per_shard + 1
260+
start_row = self.idx_of_this_subtask * (base_rows_per_shard + 1)
261+
else:
262+
num_row = base_rows_per_shard
263+
start_row = (remainder * (base_rows_per_shard + 1) +
264+
(self.idx_of_this_subtask - remainder) * base_rows_per_shard)
265+
266+
end_row = start_row + num_row
267+
return self._data_evolution_filter_by_row_range(partitioned_files, start_row, end_row)
268+
247269
def _compute_split_start_end_row(self, splits: List[Split], plan_start_row, plan_end_row):
248270
"""
249271
Find files that needs to be divided for each split
@@ -451,7 +473,13 @@ def _create_append_only_splits(
451473
for entry in file_entries:
452474
partitioned_files[(tuple(entry.partition.values), entry.bucket)].append(entry)
453475

454-
if self.idx_of_this_subtask is not None:
476+
if self.start_row_of_this_subtask is not None:
477+
# shard data range: [plan_start_row, plan_end_row)
478+
partitioned_files, plan_start_row, plan_end_row = \
479+
self._append_only_filter_by_row_range(partitioned_files,
480+
self.start_row_of_this_subtask,
481+
self.end_row_of_this_subtask)
482+
elif self.idx_of_this_subtask is not None:
455483
partitioned_files, plan_start_row, plan_end_row = self._append_only_filter_by_shard(partitioned_files)
456484

457485
def weight_func(f: DataFileMeta) -> int:
@@ -467,7 +495,7 @@ def weight_func(f: DataFileMeta) -> int:
467495
packed_files: List[List[DataFileMeta]] = self._pack_for_ordered(data_files, weight_func,
468496
self.target_split_size)
469497
splits += self._build_split_from_pack(packed_files, file_entries, False, deletion_files_map)
470-
if self.idx_of_this_subtask is not None:
498+
if self.start_row_of_this_subtask is not None or self.idx_of_this_subtask is not None:
471499
# When files are combined into splits, it is necessary to find files that needs to be divided for each split
472500
self._compute_split_start_end_row(splits, plan_start_row, plan_end_row)
473501
return splits
@@ -555,7 +583,13 @@ def sort_key(manifest_entry: ManifestEntry) -> tuple:
555583
for entry in sorted_entries:
556584
partitioned_files[(tuple(entry.partition.values), entry.bucket)].append(entry)
557585

558-
if self.idx_of_this_subtask is not None:
586+
if self.start_row_of_this_subtask is not None:
587+
# shard data range: [plan_start_row, plan_end_row)
588+
partitioned_files, plan_start_row, plan_end_row = \
589+
self._data_evolution_filter_by_row_range(partitioned_files,
590+
self.start_row_of_this_subtask,
591+
self.end_row_of_this_subtask)
592+
elif self.idx_of_this_subtask is not None:
559593
# shard data range: [plan_start_row, plan_end_row)
560594
partitioned_files, plan_start_row, plan_end_row = self._data_evolution_filter_by_shard(partitioned_files)
561595

@@ -584,7 +618,7 @@ def weight_func(file_list: List[DataFileMeta]) -> int:
584618

585619
splits += self._build_split_from_pack(flatten_packed_files, sorted_entries, False, deletion_files_map)
586620

587-
if self.idx_of_this_subtask is not None:
621+
if self.start_row_of_this_subtask is not None or self.idx_of_this_subtask is not None:
588622
self._compute_split_start_end_row(splits, plan_start_row, plan_end_row)
589623
return splits
590624

paimon-python/pypaimon/read/table_scan.py

100644100755
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,3 +71,11 @@ def _create_starting_scanner(self) -> Optional[StartingScanner]:
7171
def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 'TableScan':
7272
self.starting_scanner.with_shard(idx_of_this_subtask, number_of_para_subtasks)
7373
return self
74+
75+
def with_row_range(self, start_row, end_row) -> 'TableScan':
76+
"""
77+
Filter file entries by row range. The row_id corresponds to the row position of the
78+
file in all file entries in table scan's partitioned_files.
79+
"""
80+
self.starting_scanner.with_row_range(start_row, end_row)
81+
return self

paimon-python/pypaimon/tests/blob_table_test.py

100644100755
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2373,6 +2373,68 @@ def test_blob_large_data_volume_with_shard(self):
23732373
actual = pa.concat_tables([actual1, actual2, actual3]).sort_by('id')
23742374
self.assertEqual(actual, expected)
23752375

2376+
def test_data_blob_writer_with_row_range(self):
2377+
"""Test DataBlobWriter with mixed data types in blob column."""
2378+
2379+
# Create schema with blob column
2380+
pa_schema = pa.schema([
2381+
('id', pa.int32()),
2382+
('type', pa.string()),
2383+
('data', pa.large_binary()),
2384+
])
2385+
2386+
schema = Schema.from_pyarrow_schema(
2387+
pa_schema,
2388+
options={
2389+
'row-tracking.enabled': 'true',
2390+
'data-evolution.enabled': 'true'
2391+
}
2392+
)
2393+
self.catalog.create_table('test_db.with_row_range_test', schema, False)
2394+
table = self.catalog.get_table('test_db.with_row_range_test')
2395+
2396+
# Use proper table API to create writer
2397+
write_builder = table.new_batch_write_builder()
2398+
blob_writer = write_builder.new_write()
2399+
2400+
# Test data with different types of blob content
2401+
test_data = pa.Table.from_pydict({
2402+
'id': [0, 1, 2, 3, 4],
2403+
'type': ['text', 'json', 'binary', 'image', 'pdf'],
2404+
'data': [
2405+
b'This is text content',
2406+
b'{"key": "value", "number": 42}',
2407+
b'\x00\x01\x02\x03\xff\xfe\xfd',
2408+
b'PNG_IMAGE_DATA_PLACEHOLDER',
2409+
b'%PDF-1.4\nPDF_CONTENT_PLACEHOLDER'
2410+
]
2411+
}, schema=pa_schema)
2412+
2413+
# Write mixed data
2414+
total_rows = 0
2415+
for batch in test_data.to_batches():
2416+
blob_writer.write_arrow_batch(batch)
2417+
total_rows += batch.num_rows
2418+
2419+
# Test prepare commit
2420+
commit_messages = blob_writer.prepare_commit()
2421+
# Create commit and commit the data
2422+
commit = write_builder.new_commit()
2423+
commit.commit(commit_messages)
2424+
blob_writer.close()
2425+
2426+
# Read data back using table API
2427+
read_builder = table.new_read_builder()
2428+
table_scan = read_builder.new_scan().with_row_range(2, 4)
2429+
table_read = read_builder.new_read()
2430+
splits = table_scan.plan().splits()
2431+
result = table_read.to_arrow(splits)
2432+
2433+
# Verify the data was read back correctly
2434+
self.assertEqual(result.num_rows, 2, "Should have 2 rows")
2435+
self.assertEqual(result.num_columns, 3, "Should have 3 columns")
2436+
self.assertEqual(result["id"].unique().to_pylist(), [2, 3], "Get incorrect column ID")
2437+
23762438
def test_data_blob_writer_with_shard(self):
23772439
"""Test DataBlobWriter with mixed data types in blob column."""
23782440

0 commit comments

Comments
 (0)