Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions docs/content/program-api/python-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,158 @@ write_builder = table.new_batch_write_builder().overwrite()
write_builder = table.new_batch_write_builder().overwrite({'dt': '2024-01-01'})
```

### Write partial columns

when enable data-evolution, you can write partial columns to table:

```python
simple_pa_schema = pa.schema([
('f0', pa.int8()),
('f1', pa.int16()),
])
schema = Schema.from_pyarrow_schema(simple_pa_schema,
options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'})
catalog.create_table('default.test_row_tracking', schema, False)
table = catalog.get_table('default.test_row_tracking')

# write all columns
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
expect_data = pa.Table.from_pydict({
'f0': [-1, 2],
'f1': [-1001, 1002]
}, schema=simple_pa_schema)
table_write.write_arrow(expect_data)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()

# write partial columns
table_write = write_builder.new_write().with_write_type(['f0'])
table_commit = write_builder.new_commit()
data2 = pa.Table.from_pydict({
'f0': [3, 4],
}, schema=pa.schema([
('f0', pa.int8()),
]))
table_write.write_arrow(data2)
cmts = table_write.prepare_commit()

# assign first row id
cmts[0].new_files[0].first_row_id = 0
table_commit.commit(cmts)
table_write.close()
table_commit.close()
```

Paimon data-evolution table use `first_row_id` to split files, when write partial columns,
you should split data into multiple parts by rows, and assign `first_row_id` for each file before commit
, or it may cause some fatal error during table reads.

For example, in the following code, `write-1` will generate a file with `first_row_id=0` which contains 2 rows,
and `write-2` will generate a file with `first_row_id=2` which also contains 2 rows. Then, if we update column `f0` and
do not split data into multiple parts by rows, the generated file would have `first_row_id=0` and contains 4 rows, when reading
this table, it will cause a fatal error.

```python
table = catalog.get_table('default.test_row_tracking')

# write-1
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
expect_data = pa.Table.from_pydict({
'f0': [-1, 2],
'f1': [-1001, 1002]
}, schema=simple_pa_schema)
table_write.write_arrow(expect_data)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()

# write-2
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
expect_data = pa.Table.from_pydict({
'f0': [3, 4],
'f1': [1003, 1004]
}, schema=simple_pa_schema)
table_write.write_arrow(expect_data)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()

# write partial columns
table_write = write_builder.new_write().with_write_type(['f0'])
table_commit = write_builder.new_commit()
data2 = pa.Table.from_pydict({
'f0': [5, 6, 7, 8],
}, schema=pa.schema([
('f0', pa.int8()),
]))
table_write.write_arrow(data2)
cmts = table_write.prepare_commit()
cmts[0].new_files[0].first_row_id = 0
table_commit.commit(cmts)
table_write.close()
table_commit.close()

read_builder = table.new_read_builder()
table_scan = read_builder.new_scan()
table_read = read_builder.new_read()

# a fatal error will be thrown
actual_data = table_read.to_arrow(table_scan.plan().splits())
```

### Update columns

Handle file `first_row_id` manually is inconvenient and error-prone. If you don't want to do this, you can enable `update_columns_by_row_id`
when create `WriteBuilder` and set write type for `TableWrite`, then you can write partial columns without handling file `first_row_id`.
The input data should include the `_ROW_ID` column, writing operation will automatically sort and match each `_ROW_ID` to
its corresponding `first_row_id`, then groups rows with the same `first_row_id` and writes them to a separate file.

```python
table = catalog.get_table('default.test_row_tracking')

# write-1
# same as above

# write-2
# same as above

# update partial columns
write_builder = table.new_batch_write_builder().update_columns_by_row_id()
table_write = write_builder.new_write().with_write_type(['f0'])
table_commit = write_builder.new_commit()
data2 = pa.Table.from_pydict({
'_ROW_ID': [0, 1, 2, 3],
'f0': [5, 6, 7, 8],
}, schema=pa.schema([
('_ROW_ID', pa.int64()),
('f0', pa.int8()),
]))
table_write.write_arrow(data2)
cmts = table_write.prepare_commit()
table_commit.commit(cmts)
table_write.close()
table_commit.close()

read_builder = table.new_read_builder()
table_scan = read_builder.new_scan()
table_read = read_builder.new_read()
actual_data = table_read.to_arrow(table_scan.plan().splits())
expect_data = pa.Table.from_pydict({
'f0': [5, 6, 7, 8],
'f1': [-1001, 1002, 1003, 1004]
}, schema=pa.schema([
('f0', pa.int8()),
('f1', pa.int16()),
]))
self.assertEqual(actual_data, expect_data)
```

## Batch Read

### Predicate pushdown
Expand Down
Loading