From b85ae447e02144ceb76363a078a1f4a7e73ae632 Mon Sep 17 00:00:00 2001 From: zhoulii Date: Mon, 22 Dec 2025 18:32:47 +0800 Subject: [PATCH 1/6] [python] introduce update_columns in python api. --- .../tests/partial_columns_write_test.py | 319 ++++++++++++++++++ .../pypaimon/write/partial_column_write.py | 182 ++++++++++ paimon-python/pypaimon/write/table_write.py | 27 +- 3 files changed, 526 insertions(+), 2 deletions(-) create mode 100644 paimon-python/pypaimon/tests/partial_columns_write_test.py create mode 100644 paimon-python/pypaimon/write/partial_column_write.py diff --git a/paimon-python/pypaimon/tests/partial_columns_write_test.py b/paimon-python/pypaimon/tests/partial_columns_write_test.py new file mode 100644 index 000000000000..c69a53b05304 --- /dev/null +++ b/paimon-python/pypaimon/tests/partial_columns_write_test.py @@ -0,0 +1,319 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import os +import shutil +import tempfile +import unittest + +import pyarrow as pa + +from pypaimon import CatalogFactory, Schema + + +class PartialColumnsWriteTest(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.tempdir = tempfile.mkdtemp() + cls.warehouse = os.path.join(cls.tempdir, 'warehouse') + cls.catalog = CatalogFactory.create({ + 'warehouse': cls.warehouse + }) + cls.catalog.create_database('default', True) + + # Define table schema for testing + cls.pa_schema = pa.schema([ + ('id', pa.int32()), + ('name', pa.string()), + ('age', pa.int32()), + ('city', pa.string()), + ]) + + # Define options for data evolution + cls.table_options = { + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true' + } + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.tempdir, ignore_errors=True) + + def _create_table(self): + """Helper method to create a table with initial data.""" + # Generate unique table name for each test + import uuid + table_name = f'test_data_evolution_{uuid.uuid4().hex[:8]}' + schema = Schema.from_pyarrow_schema(self.pa_schema, options=self.table_options) + self.catalog.create_table(f'default.{table_name}', schema, False) + table = self.catalog.get_table(f'default.{table_name}') + + # Write batch-1 + write_builder = table.new_batch_write_builder() + + initial_data = pa.Table.from_pydict({ + 'id': [1, 2], + 'name': ['Alice', 'Bob'], + 'age': [25, 30], + 'city': ['NYC', 'LA'] + }, schema=self.pa_schema) + + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + table_write.write_arrow(initial_data) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + # Write batch-2 + following_data = pa.Table.from_pydict({ + 'id': [3, 4, 5], + 'name': ['Charlie', 'David', 'Eve'], + 'age': [35, 40, 45], + 'city': ['Chicago', 'Houston', 'Phoenix'] + }, schema=self.pa_schema) + + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + table_write.write_arrow(following_data) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + return table + + def test_update_existing_column(self): + """Test updating an existing column using data evolution.""" + # Create table with initial data + table = self._create_table() + + # Create data evolution writer using BatchTableWrite + write_builder = table.new_batch_write_builder() + batch_write = write_builder.new_write() + + # Prepare update data (sorted by row_id) + update_data = pa.Table.from_pydict({ + '_ROW_ID': [1, 0, 2, 3, 4], + 'age': [31, 26, 36, 39, 42] + }) + + # Update the age column + commit_messages = batch_write.update_columns(update_data, ['age']) + + # Commit the changes + table_commit = write_builder.new_commit() + table_commit.commit(commit_messages) + table_commit.close() + batch_write.close() + + # Verify the updated data + read_builder = table.new_read_builder() + table_read = read_builder.new_read() + splits = read_builder.new_scan().plan().splits() + result = table_read.to_arrow(splits) + + # Check that ages were updated for rows 0-2 + ages = result['age'].to_pylist() + expected_ages = [26, 31, 36, 39, 42] + self.assertEqual(ages, expected_ages) + + def test_update_multiple_columns(self): + """Test updating multiple columns at once.""" + # Create table with initial data + table = self._create_table() + + # Create data evolution writer using BatchTableWrite + write_builder = table.new_batch_write_builder() + batch_write = write_builder.new_write() + + # Prepare update data (sorted by row_id) + update_data = pa.Table.from_pydict({ + '_ROW_ID': [1, 0, 2, 3, 4], + 'age': [31, 26, 36, 39, 42], + 'city': ['Los Angeles', 'New York', 'Chicago', 'Phoenix', 'Houston'] + }) + + # Update multiple columns + commit_messages = batch_write.update_columns(update_data, ['age', 'city']) + + # Commit the changes + table_commit = write_builder.new_commit() + table_commit.commit(commit_messages) + table_commit.close() + batch_write.close() + + # Verify the updated data + read_builder = table.new_read_builder() + table_read = read_builder.new_read() + splits = read_builder.new_scan().plan().splits() + result = table_read.to_arrow(splits) + + # Check that both age and city were updated for rows 0-2 + ages = result['age'].to_pylist() + cities = result['city'].to_pylist() + + expected_ages = [26, 31, 36, 39, 42] + expected_cities = ['New York', 'Los Angeles', 'Chicago', 'Phoenix', 'Houston'] + + self.assertEqual(ages, expected_ages) + self.assertEqual(cities, expected_cities) + + def test_nonexistent_column(self): + """Test that updating a non-existent column raises an error.""" + table = self._create_table() + + # Create data evolution writer using BatchTableWrite + write_builder = table.new_batch_write_builder() + batch_write = write_builder.new_write() + + # Try to update a non-existent column + update_data = pa.Table.from_pydict({ + '_ROW_ID': [0, 1, 2, 3, 4], + 'nonexistent_column': [100, 200, 300, 400, 500] + }) + + # Should raise ValueError + with self.assertRaises(ValueError) as context: + batch_write.update_columns(update_data, ['nonexistent_column']) + + self.assertIn('not found in table schema', str(context.exception)) + batch_write.close() + + def test_missing_row_id_column(self): + """Test that missing row_id column raises an error.""" + table = self._create_table() + + # Create data evolution writer using BatchTableWrite + write_builder = table.new_batch_write_builder() + batch_write = write_builder.new_write() + + # Prepare update data without row_id column + update_data = pa.Table.from_pydict({ + 'age': [26, 27, 28, 29, 30] + }) + + # Should raise ValueError + with self.assertRaises(ValueError) as context: + batch_write.update_columns(update_data, ['age']) + + self.assertIn("Input data must contain _ROW_ID column", str(context.exception)) + batch_write.close() + + def test_partitioned_table_update(self): + """Test updating columns in a partitioned table.""" + # Create partitioned table + schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['city'], options=self.table_options) + self.catalog.create_table('default.test_partitioned_evolution', schema, False) + table = self.catalog.get_table('default.test_partitioned_evolution') + + # Write initial data + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + + initial_data = pa.Table.from_pydict({ + 'id': [1, 2, 3, 4, 5], + 'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'], + 'age': [25, 30, 35, 40, 45], + 'city': ['NYC', 'NYC', 'LA', 'LA', 'Chicago'] + }, schema=self.pa_schema) + + table_write.write_arrow(initial_data) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + # Create data evolution writer using BatchTableWrite + write_builder = table.new_batch_write_builder() + batch_write = write_builder.new_write() + + # Update ages + update_data = pa.Table.from_pydict({ + '_ROW_ID': [1, 0, 2, 3, 4], + 'age': [31, 26, 36, 41, 46] + }) + + commit_messages = batch_write.update_columns(update_data, ['age']) + + # Commit the changes + table_commit = write_builder.new_commit() + table_commit.commit(commit_messages) + table_commit.close() + batch_write.close() + + # Verify the updated data + read_builder = table.new_read_builder() + table_read = read_builder.new_read() + splits = read_builder.new_scan().plan().splits() + result = table_read.to_arrow(splits) + + # Check ages were updated + ages = result['age'].to_pylist() + expected_ages = [26, 31, 36, 41, 46] + self.assertEqual(ages, expected_ages) + + def test_multiple_calls(self): + """Test multiple calls to update_columns, each updating a single column.""" + # Create table with initial data + table = self._create_table() + + # First update: Update age column + write_builder = table.new_batch_write_builder() + batch_write = write_builder.new_write() + + update_age_data = pa.Table.from_pydict({ + '_ROW_ID': [1, 0, 2, 3, 4], + 'age': [31, 26, 36, 41, 46] + }) + + commit_messages = batch_write.update_columns(update_age_data, ['age']) + table_commit = write_builder.new_commit() + table_commit.commit(commit_messages) + table_commit.close() + + # Second update: Update city column + update_city_data = pa.Table.from_pydict({ + '_ROW_ID': [1, 0, 2, 3, 4], + 'city': ['Los Angeles', 'New York', 'Chicago', 'Phoenix', 'Houston'] + }) + + commit_messages = batch_write.update_columns(update_city_data, ['city']) + table_commit = write_builder.new_commit() + table_commit.commit(commit_messages) + table_commit.close() + + # Close the batch write + batch_write.close() + + # Verify both columns were updated correctly + read_builder = table.new_read_builder() + table_read = read_builder.new_read() + splits = read_builder.new_scan().plan().splits() + result = table_read.to_arrow(splits) + + ages = result['age'].to_pylist() + cities = result['city'].to_pylist() + + expected_ages = [26, 31, 36, 41, 46] + expected_cities = ['New York', 'Los Angeles', 'Chicago', 'Phoenix', 'Houston'] + + self.assertEqual(ages, expected_ages, "Age column was not updated correctly") + self.assertEqual(cities, expected_cities, "City column was not updated correctly") + + +if __name__ == '__main__': + unittest.main() diff --git a/paimon-python/pypaimon/write/partial_column_write.py b/paimon-python/pypaimon/write/partial_column_write.py new file mode 100644 index 000000000000..8407a669db03 --- /dev/null +++ b/paimon-python/pypaimon/write/partial_column_write.py @@ -0,0 +1,182 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import bisect +from typing import Dict, List, Optional + +import pyarrow as pa +import pyarrow.compute as pc + +from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER +from pypaimon.table.row.generic_row import GenericRow +from pypaimon.table.special_fields import SpecialFields +from pypaimon.write.file_store_write import FileStoreWrite + + +class PartialColumnWrite: + """ + Table write for partial column updates (data evolution). + + This writer is designed for adding/updating specific columns in existing tables. + Input data should contain _ROW_ID column. + """ + + FIRST_ROW_ID_COLUMN = '_FIRST_ROW_ID' + + def __init__(self, table, commit_user: str): + from pypaimon.table.file_store_table import FileStoreTable + + self.table: FileStoreTable = table + self.commit_user = commit_user + + # Load existing first_row_ids and build partition map + self.first_row_ids, self.first_row_id_to_partition_map = self._load_existing_files_info() + + # Collect commit messages + self.commit_messages = [] + + def _load_existing_files_info(self): + """Load existing first_row_ids and build partition map for efficient lookup.""" + first_row_ids = [] + first_row_id_to_partition_map: Dict[int, GenericRow] = {} + + read_builder = self.table.new_read_builder() + scan = read_builder.new_scan() + splits = scan.plan().splits() + + for split in splits: + for file in split.files: + if file.first_row_id is not None: + first_row_ids.append(file.first_row_id) + first_row_id_to_partition_map[file.first_row_id] = split.partition + + return sorted(list(set(first_row_ids))), first_row_id_to_partition_map + + def update_columns(self, data: pa.Table, column_names: List[str]) -> List: + """ + Add or update columns in the table. + + Args: + data: Input data containing row_id and columns to update + column_names: Names of columns to update (excluding row_id) + + Returns: + List of commit messages + """ + # Validate input data has row_id column + if SpecialFields.ROW_ID.name not in data.column_names: + raise ValueError(f"Input data must contain {SpecialFields.ROW_ID.name} column") + + # Validate all update columns exist in the schema + for col_name in column_names: + if col_name not in self.table.field_names: + raise ValueError(f"Column {col_name} not found in table schema") + + # Sort data by _ROW_ID column + sorted_data = data.sort_by([(SpecialFields.ROW_ID.name, "ascending")]) + + # Calculate first_row_id for each row + data_with_first_row_id = self._calculate_first_row_id(sorted_data) + + # Group by first_row_id and write each group + self._write_by_first_row_id(data_with_first_row_id, column_names) + + return self.commit_messages + + def _calculate_first_row_id(self, data: pa.Table) -> pa.Table: + """Calculate _first_row_id for each row based on _ROW_ID.""" + row_ids = data[SpecialFields.ROW_ID.name].to_pylist() + + # Calculate first_row_id for each row_id + first_row_id_values = [] + for row_id in row_ids: + first_row_id = self._floor_binary_search(self.first_row_ids, row_id) + first_row_id_values.append(first_row_id) + + # Add first_row_id column to the table + first_row_id_array = pa.array(first_row_id_values, type=pa.int64()) + return data.append_column(self.FIRST_ROW_ID_COLUMN, first_row_id_array) + + def _floor_binary_search(self, sorted_seq: List[int], value: int) -> int: + """Binary search to find the floor value in sorted sequence.""" + if not sorted_seq: + raise ValueError("The input sorted sequence is empty.") + + idx = bisect.bisect_right(sorted_seq, value) - 1 + if idx < 0: + raise ValueError(f"Value {value} is less than the first element in the sorted sequence.") + + return sorted_seq[idx] + + def _write_by_first_row_id(self, data: pa.Table, column_names: List[str]): + """Write data grouped by first_row_id.""" + # Extract unique first_row_id values + first_row_id_array = data[self.FIRST_ROW_ID_COLUMN] + unique_first_row_ids = pc.unique(first_row_id_array).to_pylist() + + for first_row_id in unique_first_row_ids: + # Filter rows for this first_row_id + mask = pc.equal(first_row_id_array, first_row_id) + group_data = data.filter(mask) + + # Get partition for this first_row_id + partition = self._find_partition_by_first_row_id(first_row_id) + + if partition is None: + raise ValueError(f"No existing file found for first_row_id {first_row_id}") + + # Write this group + self._write_group(partition, first_row_id, group_data, column_names) + + def _find_partition_by_first_row_id(self, first_row_id: int) -> Optional[GenericRow]: + """Find the partition for a given first_row_id using pre-built partition map.""" + return self.first_row_id_to_partition_map.get(first_row_id) + + def _write_group(self, partition: GenericRow, first_row_id: int, + data: pa.Table, column_names: List[str]): + """Write a group of data with the same first_row_id.""" + + # Create a file store write for this partition + file_store_write = FileStoreWrite(self.table, self.commit_user) + + # Set write columns to only update specific columns + # Note: _ROW_ID is metadata column, not part of schema + write_cols = column_names + file_store_write.write_cols = write_cols + + # Convert partition to tuple for hashing + partition_tuple = tuple(partition.values) + + # Write data - convert Table to RecordBatch + data_to_write = data.select(write_cols) + for batch in data_to_write.to_batches(): + file_store_write.write(partition_tuple, 0, batch) + + # Prepare commit and assign first_row_id + commit_messages = file_store_write.prepare_commit(BATCH_COMMIT_IDENTIFIER) + + # Assign first_row_id to the new files + for msg in commit_messages: + for file in msg.new_files: + # Assign the same first_row_id as the original file + file.first_row_id = first_row_id + file.write_cols = write_cols + + self.commit_messages.extend(commit_messages) + + # Close the writer + file_store_write.close() diff --git a/paimon-python/pypaimon/write/table_write.py b/paimon-python/pypaimon/write/table_write.py index 0ac73356a31d..55a12a106a12 100644 --- a/paimon-python/pypaimon/write/table_write.py +++ b/paimon-python/pypaimon/write/table_write.py @@ -16,7 +16,7 @@ # limitations under the License. ################################################################################ from collections import defaultdict -from typing import List +from typing import List, Optional import pyarrow as pa @@ -24,6 +24,7 @@ from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER from pypaimon.write.commit_message import CommitMessage from pypaimon.write.file_store_write import FileStoreWrite +from pypaimon.write.partial_column_write import PartialColumnWrite class TableWrite: @@ -83,12 +84,34 @@ class BatchTableWrite(TableWrite): def __init__(self, table, commit_user): super().__init__(table, commit_user) self.batch_committed = False + self._partial_column_write: Optional[PartialColumnWrite] = None + + def update_columns(self, data: pa.Table, column_names: List[str]) -> List[CommitMessage]: + """ + Add or update columns in the table. + + Args: + data: Input data containing row_id and columns to update + column_names: Names of columns to update (excluding row_id) + + Returns: + List of commit messages + """ + + if self._partial_column_write is None: + self._partial_column_write = PartialColumnWrite(self.table, self.commit_user) + + return self._partial_column_write.update_columns(data, column_names) def prepare_commit(self) -> List[CommitMessage]: if self.batch_committed: raise RuntimeError("BatchTableWrite only supports one-time committing.") self.batch_committed = True - return self.file_store_write.prepare_commit(BATCH_COMMIT_IDENTIFIER) + + if self._partial_column_write is not None: + return self._partial_column_write.commit_messages + else: + return self.file_store_write.prepare_commit(BATCH_COMMIT_IDENTIFIER) class StreamTableWrite(TableWrite): From 8e96e2d16bc60db180c9b7b076ab29d7bb6111f8 Mon Sep 17 00:00:00 2001 From: zhoulii Date: Tue, 23 Dec 2025 11:38:07 +0800 Subject: [PATCH 2/6] [python] introduce update_columns in python api. --- docs/content/program-api/python-api.md | 169 +++++++++++++++++++++++++ 1 file changed, 169 insertions(+) diff --git a/docs/content/program-api/python-api.md b/docs/content/program-api/python-api.md index c2c6a2fd3010..a1d0d2dbb9ca 100644 --- a/docs/content/program-api/python-api.md +++ b/docs/content/program-api/python-api.md @@ -213,6 +213,175 @@ write_builder = table.new_batch_write_builder().overwrite() write_builder = table.new_batch_write_builder().overwrite({'dt': '2024-01-01'}) ``` +### Write partial columns + +when enable data-evolution, you can write partial columns to table: + +```python +simple_pa_schema = pa.schema([ + ('f0', pa.int8()), + ('f1', pa.int16()), +]) +schema = Schema.from_pyarrow_schema(simple_pa_schema, + options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'}) +catalog.create_table('default.test_row_tracking', schema, False) +table = catalog.get_table('default.test_row_tracking') + +# write all columns +write_builder = table.new_batch_write_builder() +table_write = write_builder.new_write() +table_commit = write_builder.new_commit() +expect_data = pa.Table.from_pydict({ + 'f0': [-1, 2], + 'f1': [-1001, 1002] +}, schema=simple_pa_schema) +table_write.write_arrow(expect_data) +table_commit.commit(table_write.prepare_commit()) +table_write.close() +table_commit.close() + +# write partial columns +table_write = write_builder.new_write().with_write_type(['f0']) +table_commit = write_builder.new_commit() +data2 = pa.Table.from_pydict({ + 'f0': [3, 4], +}, schema=pa.schema([ + ('f0', pa.int8()), +])) +table_write.write_arrow(data2) +cmts = table_write.prepare_commit() + +# assign first row id +cmts[0].new_files[0].first_row_id = 0 +table_commit.commit(cmts) +table_write.close() +table_commit.close() +``` + +Paimon data-evolution table use `first_row_id` to split files, when write partial columns, +you should split data into multiple parts by rows, and assign `first_row_id` for each file before commit +, or it may cause some fatal error during table reads. + +For example, in the following code, `write-1` will generate a file with `first_row_id=0` which contains 2 rows, +and `write-2` will generate a file with `first_row_id=2` which also contains 2 rows. Then, if we update column `f0` and +do not split data into multiple parts by rows, the generated file would have `first_row_id=0` and contains 4 rows, when reading +this table, it will cause a fatal error. + +```python +table = catalog.get_table('default.test_row_tracking') + +# write-1 +write_builder = table.new_batch_write_builder() +table_write = write_builder.new_write() +table_commit = write_builder.new_commit() +expect_data = pa.Table.from_pydict({ + 'f0': [-1, 2], + 'f1': [-1001, 1002] +}, schema=simple_pa_schema) +table_write.write_arrow(expect_data) +table_commit.commit(table_write.prepare_commit()) +table_write.close() +table_commit.close() + +# write-2 +table_write = write_builder.new_write() +table_commit = write_builder.new_commit() +expect_data = pa.Table.from_pydict({ + 'f0': [3, 4], + 'f1': [1003, 1004] +}, schema=simple_pa_schema) +table_write.write_arrow(expect_data) +table_commit.commit(table_write.prepare_commit()) +table_write.close() +table_commit.close() + +# write partial columns +table_write = write_builder.new_write().with_write_type(['f0']) +table_commit = write_builder.new_commit() +data2 = pa.Table.from_pydict({ + 'f0': [5, 6, 7, 8], +}, schema=pa.schema([ + ('f0', pa.int8()), +])) +table_write.write_arrow(data2) +cmts = table_write.prepare_commit() +cmts[0].new_files[0].first_row_id = 0 +table_commit.commit(cmts) +table_write.close() +table_commit.close() + +read_builder = table.new_read_builder() +table_scan = read_builder.new_scan() +table_read = read_builder.new_read() + +# a fatal error will be thrown +actual_data = table_read.to_arrow(table_scan.plan().splits()) +``` + +### Update columns + +Handle file `first_row_id` manually is inconvenient and error-prone, you can use `update_columns` to write partial columns without handling file `first_row_id`. +The input data should include the `_ROW_ID` column, `update_columns` will automatically sorts and matches each `_ROW_ID` to +its corresponding `first_row_id`, then groups rows with the same `first_row_id` and writes them to a separate file. + +```python +table = catalog.get_table('default.test_row_tracking') + +# write 1 +write_builder = table.new_batch_write_builder() +table_write = write_builder.new_write() +table_commit = write_builder.new_commit() +expect_data = pa.Table.from_pydict({ + 'f0': [-1, 2], + 'f1': [-1001, 1002] +}, schema=simple_pa_schema) +table_write.write_arrow(expect_data) +table_commit.commit(table_write.prepare_commit()) +table_write.close() +table_commit.close() + +# write 2 +table_write = write_builder.new_write() +table_commit = write_builder.new_commit() +expect_data = pa.Table.from_pydict({ + 'f0': [3, 4], + 'f1': [1003, 1004] +}, schema=simple_pa_schema) +table_write.write_arrow(expect_data) +table_commit.commit(table_write.prepare_commit()) +table_write.close() +table_commit.close() + +# update partial columns +table_write = write_builder.new_write() +table_commit = write_builder.new_commit() +data2 = pa.Table.from_pydict({ + '_ROW_ID': [0, 1, 2, 3], + 'f0': [5, 6, 7, 8], +}, schema=pa.schema([ + ('_ROW_ID', pa.int64()), + ('f0', pa.int8()), +])) +table_write.update_columns(data2, ['f0']) +cmts = table_write.prepare_commit() +table_commit.commit(cmts) +table_write.close() +table_commit.close() + +read_builder = table.new_read_builder() +table_scan = read_builder.new_scan() +table_read = read_builder.new_read() +actual_data = table_read.to_arrow(table_scan.plan().splits()) +expect_data = pa.Table.from_pydict({ + 'f0': [5, 6, 7, 8], + 'f1': [-1001, 1002, 1003, 1004] +}, schema=pa.schema([ + ('f0', pa.int8()), + ('f1', pa.int16()), +])) +self.assertEqual(actual_data, expect_data) +``` + ## Batch Read ### Predicate pushdown From 88878ad9fdf142dd897a847159bb220ff352a923 Mon Sep 17 00:00:00 2001 From: zhoulii Date: Tue, 23 Dec 2025 16:15:51 +0800 Subject: [PATCH 3/6] [python] introduce update_columns in python api. --- docs/content/program-api/python-api.md | 37 +++++------------ .../tests/partial_columns_write_test.py | 40 +++++++++++++------ .../pypaimon/write/partial_column_write.py | 5 +++ paimon-python/pypaimon/write/table_write.py | 35 +++++++++------- paimon-python/pypaimon/write/write_builder.py | 13 ++++-- 5 files changed, 72 insertions(+), 58 deletions(-) diff --git a/docs/content/program-api/python-api.md b/docs/content/program-api/python-api.md index a1d0d2dbb9ca..12c15c8f6d36 100644 --- a/docs/content/program-api/python-api.md +++ b/docs/content/program-api/python-api.md @@ -320,40 +320,23 @@ actual_data = table_read.to_arrow(table_scan.plan().splits()) ### Update columns -Handle file `first_row_id` manually is inconvenient and error-prone, you can use `update_columns` to write partial columns without handling file `first_row_id`. -The input data should include the `_ROW_ID` column, `update_columns` will automatically sorts and matches each `_ROW_ID` to +Handle file `first_row_id` manually is inconvenient and error-prone. If you don't want to do this, you can enable `update_columns_by_row_id` +when create `WriteBuilder` and set write type for `TableWrite`, then you can write partial columns without handling file `first_row_id`. +The input data should include the `_ROW_ID` column, writing operation will automatically sort and match each `_ROW_ID` to its corresponding `first_row_id`, then groups rows with the same `first_row_id` and writes them to a separate file. ```python table = catalog.get_table('default.test_row_tracking') -# write 1 -write_builder = table.new_batch_write_builder() -table_write = write_builder.new_write() -table_commit = write_builder.new_commit() -expect_data = pa.Table.from_pydict({ - 'f0': [-1, 2], - 'f1': [-1001, 1002] -}, schema=simple_pa_schema) -table_write.write_arrow(expect_data) -table_commit.commit(table_write.prepare_commit()) -table_write.close() -table_commit.close() +# write-1 +# same as above -# write 2 -table_write = write_builder.new_write() -table_commit = write_builder.new_commit() -expect_data = pa.Table.from_pydict({ - 'f0': [3, 4], - 'f1': [1003, 1004] -}, schema=simple_pa_schema) -table_write.write_arrow(expect_data) -table_commit.commit(table_write.prepare_commit()) -table_write.close() -table_commit.close() +# write-2 +# same as above # update partial columns -table_write = write_builder.new_write() +write_builder = table.new_batch_write_builder().update_columns_by_row_id() +table_write = write_builder.new_write().with_write_type(['f0']) table_commit = write_builder.new_commit() data2 = pa.Table.from_pydict({ '_ROW_ID': [0, 1, 2, 3], @@ -362,7 +345,7 @@ data2 = pa.Table.from_pydict({ ('_ROW_ID', pa.int64()), ('f0', pa.int8()), ])) -table_write.update_columns(data2, ['f0']) +table_write.write_arrow(data2) cmts = table_write.prepare_commit() table_commit.commit(cmts) table_write.close() diff --git a/paimon-python/pypaimon/tests/partial_columns_write_test.py b/paimon-python/pypaimon/tests/partial_columns_write_test.py index c69a53b05304..f6636ffad783 100644 --- a/paimon-python/pypaimon/tests/partial_columns_write_test.py +++ b/paimon-python/pypaimon/tests/partial_columns_write_test.py @@ -112,7 +112,10 @@ def test_update_existing_column(self): }) # Update the age column - commit_messages = batch_write.update_columns(update_data, ['age']) + write_builder = table.new_batch_write_builder().update_columns_by_row_id() + batch_write = write_builder.new_write().with_write_type(['age']) + batch_write.write_arrow(update_data) + commit_messages = batch_write.prepare_commit() # Commit the changes table_commit = write_builder.new_commit() @@ -148,7 +151,10 @@ def test_update_multiple_columns(self): }) # Update multiple columns - commit_messages = batch_write.update_columns(update_data, ['age', 'city']) + write_builder = table.new_batch_write_builder().update_columns_by_row_id() + batch_write = write_builder.new_write().with_write_type(['age', 'city']) + batch_write.write_arrow(update_data) + commit_messages = batch_write.prepare_commit() # Commit the changes table_commit = write_builder.new_commit() @@ -188,9 +194,11 @@ def test_nonexistent_column(self): # Should raise ValueError with self.assertRaises(ValueError) as context: - batch_write.update_columns(update_data, ['nonexistent_column']) + write_builder = table.new_batch_write_builder().update_columns_by_row_id() + batch_write = write_builder.new_write().with_write_type(['nonexistent_column']) + batch_write.write_arrow(update_data) - self.assertIn('not found in table schema', str(context.exception)) + self.assertIn('not in table schema', str(context.exception)) batch_write.close() def test_missing_row_id_column(self): @@ -208,7 +216,9 @@ def test_missing_row_id_column(self): # Should raise ValueError with self.assertRaises(ValueError) as context: - batch_write.update_columns(update_data, ['age']) + write_builder = table.new_batch_write_builder().update_columns_by_row_id() + batch_write = write_builder.new_write().with_write_type(['age']) + batch_write.write_arrow(update_data) self.assertIn("Input data must contain _ROW_ID column", str(context.exception)) batch_write.close() @@ -238,8 +248,8 @@ def test_partitioned_table_update(self): table_commit.close() # Create data evolution writer using BatchTableWrite - write_builder = table.new_batch_write_builder() - batch_write = write_builder.new_write() + write_builder = table.new_batch_write_builder().update_columns_by_row_id() + batch_write = write_builder.new_write().with_write_type(['age']) # Update ages update_data = pa.Table.from_pydict({ @@ -247,7 +257,8 @@ def test_partitioned_table_update(self): 'age': [31, 26, 36, 41, 46] }) - commit_messages = batch_write.update_columns(update_data, ['age']) + batch_write.write_arrow(update_data) + commit_messages = batch_write.prepare_commit() # Commit the changes table_commit = write_builder.new_commit() @@ -272,15 +283,16 @@ def test_multiple_calls(self): table = self._create_table() # First update: Update age column - write_builder = table.new_batch_write_builder() - batch_write = write_builder.new_write() + write_builder = table.new_batch_write_builder().update_columns_by_row_id() + batch_write = write_builder.new_write().with_write_type(['age']) update_age_data = pa.Table.from_pydict({ '_ROW_ID': [1, 0, 2, 3, 4], 'age': [31, 26, 36, 41, 46] }) - commit_messages = batch_write.update_columns(update_age_data, ['age']) + batch_write.write_arrow(update_age_data) + commit_messages = batch_write.prepare_commit() table_commit = write_builder.new_commit() table_commit.commit(commit_messages) table_commit.close() @@ -290,8 +302,10 @@ def test_multiple_calls(self): '_ROW_ID': [1, 0, 2, 3, 4], 'city': ['Los Angeles', 'New York', 'Chicago', 'Phoenix', 'Houston'] }) - - commit_messages = batch_write.update_columns(update_city_data, ['city']) + write_builder = table.new_batch_write_builder().update_columns_by_row_id() + batch_write = write_builder.new_write().with_write_type(['city']) + batch_write.write_arrow(update_city_data) + commit_messages = batch_write.prepare_commit() table_commit = write_builder.new_commit() table_commit.commit(commit_messages) table_commit.close() diff --git a/paimon-python/pypaimon/write/partial_column_write.py b/paimon-python/pypaimon/write/partial_column_write.py index 8407a669db03..23f08d9dfe0e 100644 --- a/paimon-python/pypaimon/write/partial_column_write.py +++ b/paimon-python/pypaimon/write/partial_column_write.py @@ -77,6 +77,11 @@ def update_columns(self, data: pa.Table, column_names: List[str]) -> List: Returns: List of commit messages """ + + # Validate column_names is not empty + if not column_names: + raise ValueError("column_names cannot be empty") + # Validate input data has row_id column if SpecialFields.ROW_ID.name not in data.column_names: raise ValueError(f"Input data must contain {SpecialFields.ROW_ID.name} column") diff --git a/paimon-python/pypaimon/write/table_write.py b/paimon-python/pypaimon/write/table_write.py index 55a12a106a12..b3b4daa81fcb 100644 --- a/paimon-python/pypaimon/write/table_write.py +++ b/paimon-python/pypaimon/write/table_write.py @@ -81,27 +81,32 @@ def _validate_pyarrow_schema(self, data_schema: pa.Schema): class BatchTableWrite(TableWrite): - def __init__(self, table, commit_user): + def __init__(self, table, commit_user, update_columns_by_row_id=False): super().__init__(table, commit_user) self.batch_committed = False self._partial_column_write: Optional[PartialColumnWrite] = None + if update_columns_by_row_id: + self._partial_column_write = PartialColumnWrite(self.table, self.commit_user) - def update_columns(self, data: pa.Table, column_names: List[str]) -> List[CommitMessage]: - """ - Add or update columns in the table. - - Args: - data: Input data containing row_id and columns to update - column_names: Names of columns to update (excluding row_id) - - Returns: - List of commit messages - """ + def write_arrow(self, table: pa.Table): + if self._partial_column_write is not None: + self._partial_column_write.update_columns(table, self.file_store_write.write_cols) + else: + super().write_arrow(table) - if self._partial_column_write is None: - self._partial_column_write = PartialColumnWrite(self.table, self.commit_user) + def write_arrow_batch(self, data: pa.RecordBatch): + if self._partial_column_write is not None: + table = pa.Table.from_batches([data]) + self._partial_column_write.update_columns(table, self.file_store_write.write_cols) + else: + super().write_arrow_batch(data) - return self._partial_column_write.update_columns(data, column_names) + def write_pandas(self, dataframe): + if self._partial_column_write is not None: + table = pa.Table.from_pandas(dataframe) + self._partial_column_write.update_columns(table, self.file_store_write.write_cols) + else: + super().write_pandas(dataframe) def prepare_commit(self) -> List[CommitMessage]: if self.batch_committed: diff --git a/paimon-python/pypaimon/write/write_builder.py b/paimon-python/pypaimon/write/write_builder.py index 7b96a8c2a2b5..c83c11e7462a 100644 --- a/paimon-python/pypaimon/write/write_builder.py +++ b/paimon-python/pypaimon/write/write_builder.py @@ -20,8 +20,10 @@ from abc import ABC from typing import Optional -from pypaimon.write.table_commit import BatchTableCommit, StreamTableCommit, TableCommit -from pypaimon.write.table_write import BatchTableWrite, StreamTableWrite, TableWrite +from pypaimon.write.table_commit import (BatchTableCommit, StreamTableCommit, + TableCommit) +from pypaimon.write.table_write import (BatchTableWrite, StreamTableWrite, + TableWrite) class WriteBuilder(ABC): @@ -31,6 +33,7 @@ def __init__(self, table): self.table: FileStoreTable = table self.commit_user = self._create_commit_user() self.static_partition = None + self._update_columns_by_row_id = False def overwrite(self, static_partition: Optional[dict] = None): self.static_partition = static_partition if static_partition is not None else {} @@ -49,11 +52,15 @@ def _create_commit_user(self): else: return str(uuid.uuid4()) + def update_columns_by_row_id(self): + self._update_columns_by_row_id = True + return self + class BatchWriteBuilder(WriteBuilder): def new_write(self) -> BatchTableWrite: - return BatchTableWrite(self.table, self.commit_user) + return BatchTableWrite(self.table, self.commit_user, self._update_columns_by_row_id) def new_commit(self) -> BatchTableCommit: commit = BatchTableCommit(self.table, self.commit_user, self.static_partition) From 4e2008461a6ebb382964266fd2a75937bd3517c7 Mon Sep 17 00:00:00 2001 From: zhoulii Date: Tue, 23 Dec 2025 17:15:50 +0800 Subject: [PATCH 4/6] [python] introduce update_columns in python api. --- paimon-python/pypaimon/write/table_write.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/paimon-python/pypaimon/write/table_write.py b/paimon-python/pypaimon/write/table_write.py index b3b4daa81fcb..8bc2c023eed6 100644 --- a/paimon-python/pypaimon/write/table_write.py +++ b/paimon-python/pypaimon/write/table_write.py @@ -90,23 +90,20 @@ def __init__(self, table, commit_user, update_columns_by_row_id=False): def write_arrow(self, table: pa.Table): if self._partial_column_write is not None: - self._partial_column_write.update_columns(table, self.file_store_write.write_cols) - else: - super().write_arrow(table) + return self._partial_column_write.update_columns(table, self.file_store_write.write_cols) + super().write_arrow(table) def write_arrow_batch(self, data: pa.RecordBatch): if self._partial_column_write is not None: table = pa.Table.from_batches([data]) - self._partial_column_write.update_columns(table, self.file_store_write.write_cols) - else: - super().write_arrow_batch(data) + return self._partial_column_write.update_columns(table, self.file_store_write.write_cols) + super().write_arrow_batch(data) def write_pandas(self, dataframe): if self._partial_column_write is not None: table = pa.Table.from_pandas(dataframe) - self._partial_column_write.update_columns(table, self.file_store_write.write_cols) - else: - super().write_pandas(dataframe) + return self._partial_column_write.update_columns(table, self.file_store_write.write_cols) + super().write_pandas(dataframe) def prepare_commit(self) -> List[CommitMessage]: if self.batch_committed: From 1a61811a67a8e2a5758a2a1b7666400ff974b2f1 Mon Sep 17 00:00:00 2001 From: zhoulii Date: Tue, 23 Dec 2025 19:23:45 +0800 Subject: [PATCH 5/6] [python] introduce update_columns in python api. --- .../tests/partial_columns_write_test.py | 43 +++++++++++++++++++ .../pypaimon/write/partial_column_write.py | 36 +++++++++++++--- 2 files changed, 74 insertions(+), 5 deletions(-) diff --git a/paimon-python/pypaimon/tests/partial_columns_write_test.py b/paimon-python/pypaimon/tests/partial_columns_write_test.py index f6636ffad783..6f3b0b75f6d4 100644 --- a/paimon-python/pypaimon/tests/partial_columns_write_test.py +++ b/paimon-python/pypaimon/tests/partial_columns_write_test.py @@ -328,6 +328,49 @@ def test_multiple_calls(self): self.assertEqual(ages, expected_ages, "Age column was not updated correctly") self.assertEqual(cities, expected_cities, "City column was not updated correctly") + def test_wrong_total_row_count(self): + """Test that wrong total row count raises an error.""" + # Create table with initial data + table = self._create_table() + + # Create data evolution writer using BatchTableWrite + write_builder = table.new_batch_write_builder().update_columns_by_row_id() + batch_write = write_builder.new_write().with_write_type(['age']) + + # Prepare update data with wrong row count (only 3 rows instead of 5) + update_data = pa.Table.from_pydict({ + '_ROW_ID': [0, 1, 2], + 'age': [26, 31, 36] + }) + + # Should raise ValueError for total row count mismatch + with self.assertRaises(ValueError) as context: + batch_write.write_arrow(update_data) + + self.assertIn("does not match table total row count", str(context.exception)) + batch_write.close() + + def test_wrong_first_row_id_row_count(self): + """Test that wrong row count for a first_row_id raises an error.""" + # Create table with initial data + table = self._create_table() + + # Create data evolution writer using BatchTableWrite + write_builder = table.new_batch_write_builder().update_columns_by_row_id() + batch_write = write_builder.new_write().with_write_type(['age']) + + # Prepare update data with duplicate row_id (violates monotonically increasing) + update_data = pa.Table.from_pydict({ + '_ROW_ID': [0, 1, 1, 4, 5], + 'age': [26, 31, 36, 37, 38] + }) + + # Should raise ValueError for row ID validation + with self.assertRaises(ValueError) as context: + batch_write.write_arrow(update_data) + + self.assertIn("Row IDs are not monotonically increasing", str(context.exception)) + batch_write.close() if __name__ == '__main__': unittest.main() diff --git a/paimon-python/pypaimon/write/partial_column_write.py b/paimon-python/pypaimon/write/partial_column_write.py index 23f08d9dfe0e..f1dac5a10c1c 100644 --- a/paimon-python/pypaimon/write/partial_column_write.py +++ b/paimon-python/pypaimon/write/partial_column_write.py @@ -44,7 +44,10 @@ def __init__(self, table, commit_user: str): self.commit_user = commit_user # Load existing first_row_ids and build partition map - self.first_row_ids, self.first_row_id_to_partition_map = self._load_existing_files_info() + (self.first_row_ids, + self.first_row_id_to_partition_map, + self.first_row_id_to_row_count_map, + self.total_row_count) = self._load_existing_files_info() # Collect commit messages self.commit_messages = [] @@ -53,6 +56,7 @@ def _load_existing_files_info(self): """Load existing first_row_ids and build partition map for efficient lookup.""" first_row_ids = [] first_row_id_to_partition_map: Dict[int, GenericRow] = {} + first_row_id_to_row_count_map: Dict[int, int] = {} read_builder = self.table.new_read_builder() scan = read_builder.new_scan() @@ -60,11 +64,16 @@ def _load_existing_files_info(self): for split in splits: for file in split.files: - if file.first_row_id is not None: - first_row_ids.append(file.first_row_id) - first_row_id_to_partition_map[file.first_row_id] = split.partition + if file.first_row_id is not None and not file.file_name.endswith('.blob'): + first_row_id = file.first_row_id + first_row_ids.append(first_row_id) + first_row_id_to_partition_map[first_row_id] = split.partition + first_row_id_to_row_count_map[first_row_id] = file.row_count - return sorted(list(set(first_row_ids))), first_row_id_to_partition_map + total_row_count = sum(first_row_id_to_row_count_map.values()) + + return sorted(list(set(first_row_ids)) + ), first_row_id_to_partition_map, first_row_id_to_row_count_map, total_row_count def update_columns(self, data: pa.Table, column_names: List[str]) -> List: """ @@ -91,6 +100,11 @@ def update_columns(self, data: pa.Table, column_names: List[str]) -> List: if col_name not in self.table.field_names: raise ValueError(f"Column {col_name} not found in table schema") + # Validate data row count matches total row count + if data.num_rows != self.total_row_count: + raise ValueError( + f"Input data row count ({data.num_rows}) does not match table total row count ({self.total_row_count})") + # Sort data by _ROW_ID column sorted_data = data.sort_by([(SpecialFields.ROW_ID.name, "ascending")]) @@ -106,6 +120,12 @@ def _calculate_first_row_id(self, data: pa.Table) -> pa.Table: """Calculate _first_row_id for each row based on _ROW_ID.""" row_ids = data[SpecialFields.ROW_ID.name].to_pylist() + # Validate that row_ids are monotonically increasing starting from 0 + expected_row_ids = list(range(len(row_ids))) + if row_ids != expected_row_ids: + raise ValueError(f"Row IDs are not monotonically increasing starting from 0. " + f"Expected: {expected_row_ids}") + # Calculate first_row_id for each row_id first_row_id_values = [] for row_id in row_ids: @@ -155,6 +175,12 @@ def _write_group(self, partition: GenericRow, first_row_id: int, data: pa.Table, column_names: List[str]): """Write a group of data with the same first_row_id.""" + # Validate data row count matches the first_row_id's row count + expected_row_count = self.first_row_id_to_row_count_map.get(first_row_id, 0) + if data.num_rows != expected_row_count: + raise ValueError( + f"Data row count ({data.num_rows}) does not match expected row count ({expected_row_count}) for first_row_id {first_row_id}") + # Create a file store write for this partition file_store_write = FileStoreWrite(self.table, self.commit_user) From 6ae9bff5f0b2e3d3c617b13107dc4bb5ce1e4fa2 Mon Sep 17 00:00:00 2001 From: zhoulii Date: Tue, 23 Dec 2025 19:47:39 +0800 Subject: [PATCH 6/6] fix style. --- paimon-python/pypaimon/write/partial_column_write.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/write/partial_column_write.py b/paimon-python/pypaimon/write/partial_column_write.py index f1dac5a10c1c..c60464f30686 100644 --- a/paimon-python/pypaimon/write/partial_column_write.py +++ b/paimon-python/pypaimon/write/partial_column_write.py @@ -179,7 +179,8 @@ def _write_group(self, partition: GenericRow, first_row_id: int, expected_row_count = self.first_row_id_to_row_count_map.get(first_row_id, 0) if data.num_rows != expected_row_count: raise ValueError( - f"Data row count ({data.num_rows}) does not match expected row count ({expected_row_count}) for first_row_id {first_row_id}") + f"Data row count ({data.num_rows}) does not match expected row count ({expected_row_count}) " + f"for first_row_id {first_row_id}") # Create a file store write for this partition file_store_write = FileStoreWrite(self.table, self.commit_user)