Skip to content

Commit 9ab6fee

Browse files
committed
[python] support table scan with row range
1 parent ed6d3f1 commit 9ab6fee

File tree

3 files changed

+130
-33
lines changed

3 files changed

+130
-33
lines changed

paimon-python/pypaimon/read/scanner/full_starting_scanner.py

100644100755
Lines changed: 64 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ def __init__(self, table, predicate: Optional[Predicate], limit: Optional[int]):
6363

6464
self.idx_of_this_subtask = None
6565
self.number_of_para_subtasks = None
66+
self.start_row_of_this_subtask = None
67+
self.end_row_of_this_subtask = None
6668

6769
self.only_read_real_buckets = True if options.bucket() == BucketMode.POSTPONE_BUCKET.value else False
6870
self.data_evolution = options.data_evolution_enabled()
@@ -125,33 +127,16 @@ def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 'FullStart
125127
self.number_of_para_subtasks = number_of_para_subtasks
126128
return self
127129

128-
def _append_only_filter_by_shard(self, partitioned_files: defaultdict) -> (defaultdict, int, int):
129-
"""
130-
Filter file entries by shard. Only keep the files within the range, which means
131-
that only the starting and ending files need to be further divided subsequently
132-
"""
133-
total_row = 0
134-
# Sort by file creation time to ensure consistent sharding
135-
for key, file_entries in partitioned_files.items():
136-
for entry in file_entries:
137-
total_row += entry.file.row_count
138-
139-
# Calculate number of rows this shard should process using balanced distribution
140-
# Distribute remainder evenly among first few shards to avoid last shard overload
141-
base_rows_per_shard = total_row // self.number_of_para_subtasks
142-
remainder = total_row % self.number_of_para_subtasks
143-
144-
# Each of the first 'remainder' shards gets one extra row
145-
if self.idx_of_this_subtask < remainder:
146-
num_row = base_rows_per_shard + 1
147-
start_row = self.idx_of_this_subtask * (base_rows_per_shard + 1)
148-
else:
149-
num_row = base_rows_per_shard
150-
start_row = (remainder * (base_rows_per_shard + 1) +
151-
(self.idx_of_this_subtask - remainder) * base_rows_per_shard)
152-
153-
end_row = start_row + num_row
130+
def with_row_range(self, start_row, end_row) -> 'FullStartingScanner':
131+
if start_row >= end_row:
132+
raise Exception("start_row must be less than end_row")
133+
self.start_row_of_this_subtask = start_row
134+
self.end_row_of_this_subtask = end_row
135+
return self
154136

137+
def _append_only_filter_by_row_range(self, partitioned_files: defaultdict,
138+
start_row: int,
139+
end_row: int) -> (defaultdict, int, int):
155140
plan_start_row = 0
156141
plan_end_row = 0
157142
entry_end_row = 0 # end row position of current file in all data
@@ -183,12 +168,16 @@ def _append_only_filter_by_shard(self, partitioned_files: defaultdict) -> (defau
183168

184169
return filtered_partitioned_files, plan_start_row, plan_end_row
185170

186-
def _data_evolution_filter_by_shard(self, partitioned_files: defaultdict) -> (defaultdict, int, int):
171+
def _append_only_filter_by_shard(self, partitioned_files: defaultdict) -> (defaultdict, int, int):
172+
"""
173+
Filter file entries by shard. Only keep the files within the range, which means
174+
that only the starting and ending files need to be further divided subsequently
175+
"""
187176
total_row = 0
177+
# Sort by file creation time to ensure consistent sharding
188178
for key, file_entries in partitioned_files.items():
189179
for entry in file_entries:
190-
if not self._is_blob_file(entry.file.file_name):
191-
total_row += entry.file.row_count
180+
total_row += entry.file.row_count
192181

193182
# Calculate number of rows this shard should process using balanced distribution
194183
# Distribute remainder evenly among first few shards to avoid last shard overload
@@ -206,6 +195,11 @@ def _data_evolution_filter_by_shard(self, partitioned_files: defaultdict) -> (de
206195

207196
end_row = start_row + num_row
208197

198+
return self._append_only_filter_by_row_range(partitioned_files, start_row, end_row)
199+
200+
def _data_evolution_filter_by_row_range(self, partitioned_files: defaultdict,
201+
start_row: int,
202+
end_row: int) -> (defaultdict, int, int):
209203
plan_start_row = 0
210204
plan_end_row = 0
211205
entry_end_row = 0 # end row position of current file in all data
@@ -244,6 +238,30 @@ def _data_evolution_filter_by_shard(self, partitioned_files: defaultdict) -> (de
244238

245239
return filtered_partitioned_files, plan_start_row, plan_end_row
246240

241+
def _data_evolution_filter_by_shard(self, partitioned_files: defaultdict) -> (defaultdict, int, int):
242+
total_row = 0
243+
for key, file_entries in partitioned_files.items():
244+
for entry in file_entries:
245+
if not self._is_blob_file(entry.file.file_name):
246+
total_row += entry.file.row_count
247+
248+
# Calculate number of rows this shard should process using balanced distribution
249+
# Distribute remainder evenly among first few shards to avoid last shard overload
250+
base_rows_per_shard = total_row // self.number_of_para_subtasks
251+
remainder = total_row % self.number_of_para_subtasks
252+
253+
# Each of the first 'remainder' shards gets one extra row
254+
if self.idx_of_this_subtask < remainder:
255+
num_row = base_rows_per_shard + 1
256+
start_row = self.idx_of_this_subtask * (base_rows_per_shard + 1)
257+
else:
258+
num_row = base_rows_per_shard
259+
start_row = (remainder * (base_rows_per_shard + 1) +
260+
(self.idx_of_this_subtask - remainder) * base_rows_per_shard)
261+
262+
end_row = start_row + num_row
263+
return self._data_evolution_filter_by_row_range(partitioned_files, start_row, end_row)
264+
247265
def _compute_split_start_end_row(self, splits: List[Split], plan_start_row, plan_end_row):
248266
"""
249267
Find files that needs to be divided for each split
@@ -451,7 +469,13 @@ def _create_append_only_splits(
451469
for entry in file_entries:
452470
partitioned_files[(tuple(entry.partition.values), entry.bucket)].append(entry)
453471

454-
if self.idx_of_this_subtask is not None:
472+
if self.start_row_of_this_subtask is not None:
473+
# shard data range: [plan_start_row, plan_end_row)
474+
partitioned_files, plan_start_row, plan_end_row = \
475+
self._append_only_filter_by_row_range(partitioned_files,
476+
self.start_row_of_this_subtask,
477+
self.end_row_of_this_subtask)
478+
elif self.idx_of_this_subtask is not None:
455479
partitioned_files, plan_start_row, plan_end_row = self._append_only_filter_by_shard(partitioned_files)
456480

457481
def weight_func(f: DataFileMeta) -> int:
@@ -467,7 +491,7 @@ def weight_func(f: DataFileMeta) -> int:
467491
packed_files: List[List[DataFileMeta]] = self._pack_for_ordered(data_files, weight_func,
468492
self.target_split_size)
469493
splits += self._build_split_from_pack(packed_files, file_entries, False, deletion_files_map)
470-
if self.idx_of_this_subtask is not None:
494+
if self.start_row_of_this_subtask is not None or self.idx_of_this_subtask is not None:
471495
# When files are combined into splits, it is necessary to find files that needs to be divided for each split
472496
self._compute_split_start_end_row(splits, plan_start_row, plan_end_row)
473497
return splits
@@ -555,7 +579,14 @@ def sort_key(manifest_entry: ManifestEntry) -> tuple:
555579
for entry in sorted_entries:
556580
partitioned_files[(tuple(entry.partition.values), entry.bucket)].append(entry)
557581

558-
if self.idx_of_this_subtask is not None:
582+
print("self.start_row_of_this_subtask:{}".format(self.start_row_of_this_subtask))
583+
if self.start_row_of_this_subtask is not None:
584+
# shard data range: [plan_start_row, plan_end_row)
585+
partitioned_files, plan_start_row, plan_end_row = \
586+
self._data_evolution_filter_by_row_range(partitioned_files,
587+
self.start_row_of_this_subtask,
588+
self.end_row_of_this_subtask)
589+
elif self.idx_of_this_subtask is not None:
559590
# shard data range: [plan_start_row, plan_end_row)
560591
partitioned_files, plan_start_row, plan_end_row = self._data_evolution_filter_by_shard(partitioned_files)
561592

@@ -584,7 +615,7 @@ def weight_func(file_list: List[DataFileMeta]) -> int:
584615

585616
splits += self._build_split_from_pack(flatten_packed_files, sorted_entries, False, deletion_files_map)
586617

587-
if self.idx_of_this_subtask is not None:
618+
if self.start_row_of_this_subtask is not None or self.idx_of_this_subtask is not None:
588619
self._compute_split_start_end_row(splits, plan_start_row, plan_end_row)
589620
return splits
590621

paimon-python/pypaimon/read/table_scan.py

100644100755
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,3 +71,7 @@ def _create_starting_scanner(self) -> Optional[StartingScanner]:
7171
def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 'TableScan':
7272
self.starting_scanner.with_shard(idx_of_this_subtask, number_of_para_subtasks)
7373
return self
74+
75+
def with_row_range(self, start_row, end_row) -> 'TableScan':
76+
self.starting_scanner.with_row_range(start_row, end_row)
77+
return self

paimon-python/pypaimon/tests/blob_table_test.py

100644100755
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2373,6 +2373,68 @@ def test_blob_large_data_volume_with_shard(self):
23732373
actual = pa.concat_tables([actual1, actual2, actual3]).sort_by('id')
23742374
self.assertEqual(actual, expected)
23752375

2376+
def test_data_blob_writer_with_row_range(self):
2377+
"""Test DataBlobWriter with mixed data types in blob column."""
2378+
2379+
# Create schema with blob column
2380+
pa_schema = pa.schema([
2381+
('id', pa.int32()),
2382+
('type', pa.string()),
2383+
('data', pa.large_binary()),
2384+
])
2385+
2386+
schema = Schema.from_pyarrow_schema(
2387+
pa_schema,
2388+
options={
2389+
'row-tracking.enabled': 'true',
2390+
'data-evolution.enabled': 'true'
2391+
}
2392+
)
2393+
self.catalog.create_table('test_db.with_row_range_test', schema, False)
2394+
table = self.catalog.get_table('test_db.with_row_range_test')
2395+
2396+
# Use proper table API to create writer
2397+
write_builder = table.new_batch_write_builder()
2398+
blob_writer = write_builder.new_write()
2399+
2400+
# Test data with different types of blob content
2401+
test_data = pa.Table.from_pydict({
2402+
'id': [0, 1, 2, 3, 4],
2403+
'type': ['text', 'json', 'binary', 'image', 'pdf'],
2404+
'data': [
2405+
b'This is text content',
2406+
b'{"key": "value", "number": 42}',
2407+
b'\x00\x01\x02\x03\xff\xfe\xfd',
2408+
b'PNG_IMAGE_DATA_PLACEHOLDER',
2409+
b'%PDF-1.4\nPDF_CONTENT_PLACEHOLDER'
2410+
]
2411+
}, schema=pa_schema)
2412+
2413+
# Write mixed data
2414+
total_rows = 0
2415+
for batch in test_data.to_batches():
2416+
blob_writer.write_arrow_batch(batch)
2417+
total_rows += batch.num_rows
2418+
2419+
# Test prepare commit
2420+
commit_messages = blob_writer.prepare_commit()
2421+
# Create commit and commit the data
2422+
commit = write_builder.new_commit()
2423+
commit.commit(commit_messages)
2424+
blob_writer.close()
2425+
2426+
# Read data back using table API
2427+
read_builder = table.new_read_builder()
2428+
table_scan = read_builder.new_scan().with_row_range(2, 4)
2429+
table_read = read_builder.new_read()
2430+
splits = table_scan.plan().splits()
2431+
result = table_read.to_arrow(splits)
2432+
2433+
# Verify the data was read back correctly
2434+
self.assertEqual(result.num_rows, 2, "Should have 2 rows")
2435+
self.assertEqual(result.num_columns, 3, "Should have 3 columns")
2436+
self.assertEqual(result["id"].unique().to_pylist(), [2, 3], "Get incorrect column ID")
2437+
23762438
def test_data_blob_writer_with_shard(self):
23772439
"""Test DataBlobWriter with mixed data types in blob column."""
23782440

0 commit comments

Comments
 (0)