Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 67 additions & 33 deletions paimon-python/pypaimon/read/scanner/full_starting_scanner.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ def __init__(self, table, predicate: Optional[Predicate], limit: Optional[int]):

self.idx_of_this_subtask = None
self.number_of_para_subtasks = None
self.start_row_of_this_subtask = None
self.end_row_of_this_subtask = None

self.only_read_real_buckets = True if options.bucket() == BucketMode.POSTPONE_BUCKET.value else False
self.data_evolution = options.data_evolution_enabled()
Expand Down Expand Up @@ -121,37 +123,24 @@ def read_manifest_entries(self, manifest_files: List[ManifestFileMeta]) -> List[
def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 'FullStartingScanner':
if idx_of_this_subtask >= number_of_para_subtasks:
raise Exception("idx_of_this_subtask must be less than number_of_para_subtasks")
if self.start_row_of_this_subtask is not None:
raise Exception("with_shard and with_row_range cannot be used simultaneously")
self.idx_of_this_subtask = idx_of_this_subtask
self.number_of_para_subtasks = number_of_para_subtasks
return self

def _append_only_filter_by_shard(self, partitioned_files: defaultdict) -> (defaultdict, int, int):
"""
Filter file entries by shard. Only keep the files within the range, which means
that only the starting and ending files need to be further divided subsequently
"""
total_row = 0
# Sort by file creation time to ensure consistent sharding
for key, file_entries in partitioned_files.items():
for entry in file_entries:
total_row += entry.file.row_count

# Calculate number of rows this shard should process using balanced distribution
# Distribute remainder evenly among first few shards to avoid last shard overload
base_rows_per_shard = total_row // self.number_of_para_subtasks
remainder = total_row % self.number_of_para_subtasks

# Each of the first 'remainder' shards gets one extra row
if self.idx_of_this_subtask < remainder:
num_row = base_rows_per_shard + 1
start_row = self.idx_of_this_subtask * (base_rows_per_shard + 1)
else:
num_row = base_rows_per_shard
start_row = (remainder * (base_rows_per_shard + 1) +
(self.idx_of_this_subtask - remainder) * base_rows_per_shard)

end_row = start_row + num_row
def with_row_range(self, start_row, end_row) -> 'FullStartingScanner':
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we raise a exception for primary key table?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use with_row_range and with_shard at the same time or not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not, It is difficult for all users to have a completely consistent behavior when using at the same time. Added an exception to avoid this situation.

if start_row >= end_row:
raise Exception("start_row must be less than end_row")
if self.idx_of_this_subtask is not None:
raise Exception("with_row_range and with_shard cannot be used simultaneously")
self.start_row_of_this_subtask = start_row
self.end_row_of_this_subtask = end_row
return self

def _append_only_filter_by_row_range(self, partitioned_files: defaultdict,
start_row: int,
end_row: int) -> (defaultdict, int, int):
plan_start_row = 0
plan_end_row = 0
entry_end_row = 0 # end row position of current file in all data
Expand Down Expand Up @@ -183,12 +172,16 @@ def _append_only_filter_by_shard(self, partitioned_files: defaultdict) -> (defau

return filtered_partitioned_files, plan_start_row, plan_end_row

def _data_evolution_filter_by_shard(self, partitioned_files: defaultdict) -> (defaultdict, int, int):
def _append_only_filter_by_shard(self, partitioned_files: defaultdict) -> (defaultdict, int, int):
"""
Filter file entries by shard. Only keep the files within the range, which means
that only the starting and ending files need to be further divided subsequently
"""
total_row = 0
# Sort by file creation time to ensure consistent sharding
for key, file_entries in partitioned_files.items():
for entry in file_entries:
if not self._is_blob_file(entry.file.file_name):
total_row += entry.file.row_count
total_row += entry.file.row_count

# Calculate number of rows this shard should process using balanced distribution
# Distribute remainder evenly among first few shards to avoid last shard overload
Expand All @@ -206,6 +199,11 @@ def _data_evolution_filter_by_shard(self, partitioned_files: defaultdict) -> (de

end_row = start_row + num_row

return self._append_only_filter_by_row_range(partitioned_files, start_row, end_row)

def _data_evolution_filter_by_row_range(self, partitioned_files: defaultdict,
start_row: int,
end_row: int) -> (defaultdict, int, int):
plan_start_row = 0
plan_end_row = 0
entry_end_row = 0 # end row position of current file in all data
Expand Down Expand Up @@ -244,6 +242,30 @@ def _data_evolution_filter_by_shard(self, partitioned_files: defaultdict) -> (de

return filtered_partitioned_files, plan_start_row, plan_end_row

def _data_evolution_filter_by_shard(self, partitioned_files: defaultdict) -> (defaultdict, int, int):
total_row = 0
for key, file_entries in partitioned_files.items():
for entry in file_entries:
if not self._is_blob_file(entry.file.file_name):
total_row += entry.file.row_count

# Calculate number of rows this shard should process using balanced distribution
# Distribute remainder evenly among first few shards to avoid last shard overload
base_rows_per_shard = total_row // self.number_of_para_subtasks
remainder = total_row % self.number_of_para_subtasks

# Each of the first 'remainder' shards gets one extra row
if self.idx_of_this_subtask < remainder:
num_row = base_rows_per_shard + 1
start_row = self.idx_of_this_subtask * (base_rows_per_shard + 1)
else:
num_row = base_rows_per_shard
start_row = (remainder * (base_rows_per_shard + 1) +
(self.idx_of_this_subtask - remainder) * base_rows_per_shard)

end_row = start_row + num_row
return self._data_evolution_filter_by_row_range(partitioned_files, start_row, end_row)

def _compute_split_start_end_row(self, splits: List[Split], plan_start_row, plan_end_row):
"""
Find files that needs to be divided for each split
Expand Down Expand Up @@ -451,7 +473,13 @@ def _create_append_only_splits(
for entry in file_entries:
partitioned_files[(tuple(entry.partition.values), entry.bucket)].append(entry)

if self.idx_of_this_subtask is not None:
if self.start_row_of_this_subtask is not None:
# shard data range: [plan_start_row, plan_end_row)
partitioned_files, plan_start_row, plan_end_row = \
self._append_only_filter_by_row_range(partitioned_files,
self.start_row_of_this_subtask,
self.end_row_of_this_subtask)
elif self.idx_of_this_subtask is not None:
partitioned_files, plan_start_row, plan_end_row = self._append_only_filter_by_shard(partitioned_files)

def weight_func(f: DataFileMeta) -> int:
Expand All @@ -467,7 +495,7 @@ def weight_func(f: DataFileMeta) -> int:
packed_files: List[List[DataFileMeta]] = self._pack_for_ordered(data_files, weight_func,
self.target_split_size)
splits += self._build_split_from_pack(packed_files, file_entries, False, deletion_files_map)
if self.idx_of_this_subtask is not None:
if self.start_row_of_this_subtask is not None or self.idx_of_this_subtask is not None:
# When files are combined into splits, it is necessary to find files that needs to be divided for each split
self._compute_split_start_end_row(splits, plan_start_row, plan_end_row)
return splits
Expand Down Expand Up @@ -555,7 +583,13 @@ def sort_key(manifest_entry: ManifestEntry) -> tuple:
for entry in sorted_entries:
partitioned_files[(tuple(entry.partition.values), entry.bucket)].append(entry)

if self.idx_of_this_subtask is not None:
if self.start_row_of_this_subtask is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logger may be better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for debugging purposes, I will delete it soon.

# shard data range: [plan_start_row, plan_end_row)
partitioned_files, plan_start_row, plan_end_row = \
self._data_evolution_filter_by_row_range(partitioned_files,
self.start_row_of_this_subtask,
self.end_row_of_this_subtask)
elif self.idx_of_this_subtask is not None:
# shard data range: [plan_start_row, plan_end_row)
partitioned_files, plan_start_row, plan_end_row = self._data_evolution_filter_by_shard(partitioned_files)

Expand Down Expand Up @@ -584,7 +618,7 @@ def weight_func(file_list: List[DataFileMeta]) -> int:

splits += self._build_split_from_pack(flatten_packed_files, sorted_entries, False, deletion_files_map)

if self.idx_of_this_subtask is not None:
if self.start_row_of_this_subtask is not None or self.idx_of_this_subtask is not None:
self._compute_split_start_end_row(splits, plan_start_row, plan_end_row)
return splits

Expand Down
8 changes: 8 additions & 0 deletions paimon-python/pypaimon/read/table_scan.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,11 @@ def _create_starting_scanner(self) -> Optional[StartingScanner]:
def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 'TableScan':
self.starting_scanner.with_shard(idx_of_this_subtask, number_of_para_subtasks)
return self

def with_row_range(self, start_row, end_row) -> 'TableScan':
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we returning this exact number of lines, or can it be approximate? This needs to be specified clearly in the comments.

"""
Filter file entries by row range. The row_id corresponds to the row position of the
file in all file entries in table scan's partitioned_files.
"""
Copy link
Contributor

@discivigour discivigour Jan 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to clearly describe the inclusion and exclusion of idx within the range in the comments.

self.starting_scanner.with_row_range(start_row, end_row)
return self
62 changes: 62 additions & 0 deletions paimon-python/pypaimon/tests/blob_table_test.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -2373,6 +2373,68 @@ def test_blob_large_data_volume_with_shard(self):
actual = pa.concat_tables([actual1, actual2, actual3]).sort_by('id')
self.assertEqual(actual, expected)

def test_data_blob_writer_with_row_range(self):
"""Test DataBlobWriter with mixed data types in blob column."""

# Create schema with blob column
pa_schema = pa.schema([
('id', pa.int32()),
('type', pa.string()),
('data', pa.large_binary()),
])

schema = Schema.from_pyarrow_schema(
pa_schema,
options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true'
}
)
self.catalog.create_table('test_db.with_row_range_test', schema, False)
table = self.catalog.get_table('test_db.with_row_range_test')

# Use proper table API to create writer
write_builder = table.new_batch_write_builder()
blob_writer = write_builder.new_write()

# Test data with different types of blob content
test_data = pa.Table.from_pydict({
'id': [0, 1, 2, 3, 4],
'type': ['text', 'json', 'binary', 'image', 'pdf'],
'data': [
b'This is text content',
b'{"key": "value", "number": 42}',
b'\x00\x01\x02\x03\xff\xfe\xfd',
b'PNG_IMAGE_DATA_PLACEHOLDER',
b'%PDF-1.4\nPDF_CONTENT_PLACEHOLDER'
]
}, schema=pa_schema)

# Write mixed data
total_rows = 0
for batch in test_data.to_batches():
blob_writer.write_arrow_batch(batch)
total_rows += batch.num_rows

# Test prepare commit
commit_messages = blob_writer.prepare_commit()
# Create commit and commit the data
commit = write_builder.new_commit()
commit.commit(commit_messages)
blob_writer.close()

# Read data back using table API
read_builder = table.new_read_builder()
table_scan = read_builder.new_scan().with_row_range(2, 4)
table_read = read_builder.new_read()
splits = table_scan.plan().splits()
result = table_read.to_arrow(splits)

# Verify the data was read back correctly
self.assertEqual(result.num_rows, 2, "Should have 2 rows")
self.assertEqual(result.num_columns, 3, "Should have 3 columns")
self.assertEqual(result["id"].unique().to_pylist(), [2, 3], "Get incorrect column ID")

def test_data_blob_writer_with_shard(self):
"""Test DataBlobWriter with mixed data types in blob column."""

Expand Down