diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py old mode 100644 new mode 100755 index eebd3b6008d1..3698f9de6fe4 --- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py +++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py @@ -63,6 +63,8 @@ def __init__(self, table, predicate: Optional[Predicate], limit: Optional[int]): self.idx_of_this_subtask = None self.number_of_para_subtasks = None + self.start_row_of_this_subtask = None + self.end_row_of_this_subtask = None self.only_read_real_buckets = True if options.bucket() == BucketMode.POSTPONE_BUCKET.value else False self.data_evolution = options.data_evolution_enabled() @@ -121,37 +123,24 @@ def read_manifest_entries(self, manifest_files: List[ManifestFileMeta]) -> List[ def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 'FullStartingScanner': if idx_of_this_subtask >= number_of_para_subtasks: raise Exception("idx_of_this_subtask must be less than number_of_para_subtasks") + if self.start_row_of_this_subtask is not None: + raise Exception("with_shard and with_row_range cannot be used simultaneously") self.idx_of_this_subtask = idx_of_this_subtask self.number_of_para_subtasks = number_of_para_subtasks return self - def _append_only_filter_by_shard(self, partitioned_files: defaultdict) -> (defaultdict, int, int): - """ - Filter file entries by shard. Only keep the files within the range, which means - that only the starting and ending files need to be further divided subsequently - """ - total_row = 0 - # Sort by file creation time to ensure consistent sharding - for key, file_entries in partitioned_files.items(): - for entry in file_entries: - total_row += entry.file.row_count - - # Calculate number of rows this shard should process using balanced distribution - # Distribute remainder evenly among first few shards to avoid last shard overload - base_rows_per_shard = total_row // self.number_of_para_subtasks - remainder = total_row % self.number_of_para_subtasks - - # Each of the first 'remainder' shards gets one extra row - if self.idx_of_this_subtask < remainder: - num_row = base_rows_per_shard + 1 - start_row = self.idx_of_this_subtask * (base_rows_per_shard + 1) - else: - num_row = base_rows_per_shard - start_row = (remainder * (base_rows_per_shard + 1) + - (self.idx_of_this_subtask - remainder) * base_rows_per_shard) - - end_row = start_row + num_row + def with_row_range(self, start_row, end_row) -> 'FullStartingScanner': + if start_row >= end_row: + raise Exception("start_row must be less than end_row") + if self.idx_of_this_subtask is not None: + raise Exception("with_row_range and with_shard cannot be used simultaneously") + self.start_row_of_this_subtask = start_row + self.end_row_of_this_subtask = end_row + return self + def _append_only_filter_by_row_range(self, partitioned_files: defaultdict, + start_row: int, + end_row: int) -> (defaultdict, int, int): plan_start_row = 0 plan_end_row = 0 entry_end_row = 0 # end row position of current file in all data @@ -183,12 +172,16 @@ def _append_only_filter_by_shard(self, partitioned_files: defaultdict) -> (defau return filtered_partitioned_files, plan_start_row, plan_end_row - def _data_evolution_filter_by_shard(self, partitioned_files: defaultdict) -> (defaultdict, int, int): + def _append_only_filter_by_shard(self, partitioned_files: defaultdict) -> (defaultdict, int, int): + """ + Filter file entries by shard. Only keep the files within the range, which means + that only the starting and ending files need to be further divided subsequently + """ total_row = 0 + # Sort by file creation time to ensure consistent sharding for key, file_entries in partitioned_files.items(): for entry in file_entries: - if not self._is_blob_file(entry.file.file_name): - total_row += entry.file.row_count + total_row += entry.file.row_count # Calculate number of rows this shard should process using balanced distribution # Distribute remainder evenly among first few shards to avoid last shard overload @@ -206,6 +199,11 @@ def _data_evolution_filter_by_shard(self, partitioned_files: defaultdict) -> (de end_row = start_row + num_row + return self._append_only_filter_by_row_range(partitioned_files, start_row, end_row) + + def _data_evolution_filter_by_row_range(self, partitioned_files: defaultdict, + start_row: int, + end_row: int) -> (defaultdict, int, int): plan_start_row = 0 plan_end_row = 0 entry_end_row = 0 # end row position of current file in all data @@ -244,6 +242,30 @@ def _data_evolution_filter_by_shard(self, partitioned_files: defaultdict) -> (de return filtered_partitioned_files, plan_start_row, plan_end_row + def _data_evolution_filter_by_shard(self, partitioned_files: defaultdict) -> (defaultdict, int, int): + total_row = 0 + for key, file_entries in partitioned_files.items(): + for entry in file_entries: + if not self._is_blob_file(entry.file.file_name): + total_row += entry.file.row_count + + # Calculate number of rows this shard should process using balanced distribution + # Distribute remainder evenly among first few shards to avoid last shard overload + base_rows_per_shard = total_row // self.number_of_para_subtasks + remainder = total_row % self.number_of_para_subtasks + + # Each of the first 'remainder' shards gets one extra row + if self.idx_of_this_subtask < remainder: + num_row = base_rows_per_shard + 1 + start_row = self.idx_of_this_subtask * (base_rows_per_shard + 1) + else: + num_row = base_rows_per_shard + start_row = (remainder * (base_rows_per_shard + 1) + + (self.idx_of_this_subtask - remainder) * base_rows_per_shard) + + end_row = start_row + num_row + return self._data_evolution_filter_by_row_range(partitioned_files, start_row, end_row) + def _compute_split_start_end_row(self, splits: List[Split], plan_start_row, plan_end_row): """ Find files that needs to be divided for each split @@ -451,7 +473,13 @@ def _create_append_only_splits( for entry in file_entries: partitioned_files[(tuple(entry.partition.values), entry.bucket)].append(entry) - if self.idx_of_this_subtask is not None: + if self.start_row_of_this_subtask is not None: + # shard data range: [plan_start_row, plan_end_row) + partitioned_files, plan_start_row, plan_end_row = \ + self._append_only_filter_by_row_range(partitioned_files, + self.start_row_of_this_subtask, + self.end_row_of_this_subtask) + elif self.idx_of_this_subtask is not None: partitioned_files, plan_start_row, plan_end_row = self._append_only_filter_by_shard(partitioned_files) def weight_func(f: DataFileMeta) -> int: @@ -467,7 +495,7 @@ def weight_func(f: DataFileMeta) -> int: packed_files: List[List[DataFileMeta]] = self._pack_for_ordered(data_files, weight_func, self.target_split_size) splits += self._build_split_from_pack(packed_files, file_entries, False, deletion_files_map) - if self.idx_of_this_subtask is not None: + if self.start_row_of_this_subtask is not None or self.idx_of_this_subtask is not None: # When files are combined into splits, it is necessary to find files that needs to be divided for each split self._compute_split_start_end_row(splits, plan_start_row, plan_end_row) return splits @@ -555,7 +583,13 @@ def sort_key(manifest_entry: ManifestEntry) -> tuple: for entry in sorted_entries: partitioned_files[(tuple(entry.partition.values), entry.bucket)].append(entry) - if self.idx_of_this_subtask is not None: + if self.start_row_of_this_subtask is not None: + # shard data range: [plan_start_row, plan_end_row) + partitioned_files, plan_start_row, plan_end_row = \ + self._data_evolution_filter_by_row_range(partitioned_files, + self.start_row_of_this_subtask, + self.end_row_of_this_subtask) + elif self.idx_of_this_subtask is not None: # shard data range: [plan_start_row, plan_end_row) partitioned_files, plan_start_row, plan_end_row = self._data_evolution_filter_by_shard(partitioned_files) @@ -584,7 +618,7 @@ def weight_func(file_list: List[DataFileMeta]) -> int: splits += self._build_split_from_pack(flatten_packed_files, sorted_entries, False, deletion_files_map) - if self.idx_of_this_subtask is not None: + if self.start_row_of_this_subtask is not None or self.idx_of_this_subtask is not None: self._compute_split_start_end_row(splits, plan_start_row, plan_end_row) return splits diff --git a/paimon-python/pypaimon/read/table_scan.py b/paimon-python/pypaimon/read/table_scan.py old mode 100644 new mode 100755 index bf6621c7646a..7f603ed71a7b --- a/paimon-python/pypaimon/read/table_scan.py +++ b/paimon-python/pypaimon/read/table_scan.py @@ -71,3 +71,11 @@ def _create_starting_scanner(self) -> Optional[StartingScanner]: def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 'TableScan': self.starting_scanner.with_shard(idx_of_this_subtask, number_of_para_subtasks) return self + + def with_row_range(self, start_row, end_row) -> 'TableScan': + """ + Filter file entries by row range. The row_id corresponds to the row position of the + file in all file entries in table scan's partitioned_files. + """ + self.starting_scanner.with_row_range(start_row, end_row) + return self diff --git a/paimon-python/pypaimon/tests/blob_table_test.py b/paimon-python/pypaimon/tests/blob_table_test.py old mode 100644 new mode 100755 index c689e4f291f6..c17cf2c9d4ec --- a/paimon-python/pypaimon/tests/blob_table_test.py +++ b/paimon-python/pypaimon/tests/blob_table_test.py @@ -2373,6 +2373,68 @@ def test_blob_large_data_volume_with_shard(self): actual = pa.concat_tables([actual1, actual2, actual3]).sort_by('id') self.assertEqual(actual, expected) + def test_data_blob_writer_with_row_range(self): + """Test DataBlobWriter with mixed data types in blob column.""" + + # Create schema with blob column + pa_schema = pa.schema([ + ('id', pa.int32()), + ('type', pa.string()), + ('data', pa.large_binary()), + ]) + + schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true' + } + ) + self.catalog.create_table('test_db.with_row_range_test', schema, False) + table = self.catalog.get_table('test_db.with_row_range_test') + + # Use proper table API to create writer + write_builder = table.new_batch_write_builder() + blob_writer = write_builder.new_write() + + # Test data with different types of blob content + test_data = pa.Table.from_pydict({ + 'id': [0, 1, 2, 3, 4], + 'type': ['text', 'json', 'binary', 'image', 'pdf'], + 'data': [ + b'This is text content', + b'{"key": "value", "number": 42}', + b'\x00\x01\x02\x03\xff\xfe\xfd', + b'PNG_IMAGE_DATA_PLACEHOLDER', + b'%PDF-1.4\nPDF_CONTENT_PLACEHOLDER' + ] + }, schema=pa_schema) + + # Write mixed data + total_rows = 0 + for batch in test_data.to_batches(): + blob_writer.write_arrow_batch(batch) + total_rows += batch.num_rows + + # Test prepare commit + commit_messages = blob_writer.prepare_commit() + # Create commit and commit the data + commit = write_builder.new_commit() + commit.commit(commit_messages) + blob_writer.close() + + # Read data back using table API + read_builder = table.new_read_builder() + table_scan = read_builder.new_scan().with_row_range(2, 4) + table_read = read_builder.new_read() + splits = table_scan.plan().splits() + result = table_read.to_arrow(splits) + + # Verify the data was read back correctly + self.assertEqual(result.num_rows, 2, "Should have 2 rows") + self.assertEqual(result.num_columns, 3, "Should have 3 columns") + self.assertEqual(result["id"].unique().to_pylist(), [2, 3], "Get incorrect column ID") + def test_data_blob_writer_with_shard(self): """Test DataBlobWriter with mixed data types in blob column."""