Skip to content

Commit bc942cd

Browse files
committed
[python] Refactor update by row id to TableUpdate
1 parent 6fe570b commit bc942cd

File tree

6 files changed

+110
-202
lines changed

6 files changed

+110
-202
lines changed

docs/content/program-api/python-api.md

Lines changed: 13 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -213,9 +213,12 @@ write_builder = table.new_batch_write_builder().overwrite()
213213
write_builder = table.new_batch_write_builder().overwrite({'dt': '2024-01-01'})
214214
```
215215

216-
### Write partial columns
216+
### Update columns
217+
218+
You can create `TableUpdate.update_by_arrow_with_row_id` to update columns to data evolution tables.
217219

218-
when enable data-evolution, you can write partial columns to table:
220+
The input data should include the `_ROW_ID` column, update operation will automatically sort and match each `_ROW_ID` to
221+
its corresponding `first_row_id`, then groups rows with the same `first_row_id` and writes them to a separate file.
219222

220223
```python
221224
simple_pa_schema = pa.schema([
@@ -240,129 +243,24 @@ table_commit.commit(table_write.prepare_commit())
240243
table_write.close()
241244
table_commit.close()
242245

243-
# write partial columns
244-
table_write = write_builder.new_write().with_write_type(['f0'])
245-
table_commit = write_builder.new_commit()
246-
data2 = pa.Table.from_pydict({
247-
'f0': [3, 4],
248-
}, schema=pa.schema([
249-
('f0', pa.int8()),
250-
]))
251-
table_write.write_arrow(data2)
252-
cmts = table_write.prepare_commit()
253-
254-
# assign first row id
255-
cmts[0].new_files[0].first_row_id = 0
256-
table_commit.commit(cmts)
257-
table_write.close()
258-
table_commit.close()
259-
```
260-
261-
Paimon data-evolution table use `first_row_id` to split files, when write partial columns,
262-
you should split data into multiple parts by rows, and assign `first_row_id` for each file before commit
263-
, or it may cause some fatal error during table reads.
264-
265-
For example, in the following code, `write-1` will generate a file with `first_row_id=0` which contains 2 rows,
266-
and `write-2` will generate a file with `first_row_id=2` which also contains 2 rows. Then, if we update column `f0` and
267-
do not split data into multiple parts by rows, the generated file would have `first_row_id=0` and contains 4 rows, when reading
268-
this table, it will cause a fatal error.
269-
270-
```python
271-
table = catalog.get_table('default.test_row_tracking')
272-
273-
# write-1
274-
write_builder = table.new_batch_write_builder()
275-
table_write = write_builder.new_write()
276-
table_commit = write_builder.new_commit()
277-
expect_data = pa.Table.from_pydict({
278-
'f0': [-1, 2],
279-
'f1': [-1001, 1002]
280-
}, schema=simple_pa_schema)
281-
table_write.write_arrow(expect_data)
282-
table_commit.commit(table_write.prepare_commit())
283-
table_write.close()
284-
table_commit.close()
285-
286-
# write-2
287-
table_write = write_builder.new_write()
288-
table_commit = write_builder.new_commit()
289-
expect_data = pa.Table.from_pydict({
290-
'f0': [3, 4],
291-
'f1': [1003, 1004]
292-
}, schema=simple_pa_schema)
293-
table_write.write_arrow(expect_data)
294-
table_commit.commit(table_write.prepare_commit())
295-
table_write.close()
296-
table_commit.close()
297-
298-
# write partial columns
299-
table_write = write_builder.new_write().with_write_type(['f0'])
300-
table_commit = write_builder.new_commit()
301-
data2 = pa.Table.from_pydict({
302-
'f0': [5, 6, 7, 8],
303-
}, schema=pa.schema([
304-
('f0', pa.int8()),
305-
]))
306-
table_write.write_arrow(data2)
307-
cmts = table_write.prepare_commit()
308-
cmts[0].new_files[0].first_row_id = 0
309-
table_commit.commit(cmts)
310-
table_write.close()
311-
table_commit.close()
312-
313-
read_builder = table.new_read_builder()
314-
table_scan = read_builder.new_scan()
315-
table_read = read_builder.new_read()
316-
317-
# a fatal error will be thrown
318-
actual_data = table_read.to_arrow(table_scan.plan().splits())
319-
```
320-
321-
### Update columns
322-
323-
Handle file `first_row_id` manually is inconvenient and error-prone. If you don't want to do this, you can enable `update_columns_by_row_id`
324-
when create `WriteBuilder` and set write type for `TableWrite`, then you can write partial columns without handling file `first_row_id`.
325-
The input data should include the `_ROW_ID` column, writing operation will automatically sort and match each `_ROW_ID` to
326-
its corresponding `first_row_id`, then groups rows with the same `first_row_id` and writes them to a separate file.
327-
328-
```python
329-
table = catalog.get_table('default.test_row_tracking')
330-
331-
# write-1
332-
# same as above
333-
334-
# write-2
335-
# same as above
336-
337246
# update partial columns
338-
write_builder = table.new_batch_write_builder().update_columns_by_row_id()
339-
table_write = write_builder.new_write().with_write_type(['f0'])
247+
write_builder = table.new_batch_write_builder()
248+
table_update = write_builder.new_update().with_update_type(['f0'])
340249
table_commit = write_builder.new_commit()
341250
data2 = pa.Table.from_pydict({
342-
'_ROW_ID': [0, 1, 2, 3],
343-
'f0': [5, 6, 7, 8],
251+
'_ROW_ID': [0, 1],
252+
'f0': [5, 6],
344253
}, schema=pa.schema([
345254
('_ROW_ID', pa.int64()),
346255
('f0', pa.int8()),
347256
]))
348-
table_write.write_arrow(data2)
349-
cmts = table_write.prepare_commit()
257+
cmts = table_update.update_by_arrow_with_row_id(data2)
350258
table_commit.commit(cmts)
351-
table_write.close()
352259
table_commit.close()
353260

354-
read_builder = table.new_read_builder()
355-
table_scan = read_builder.new_scan()
356-
table_read = read_builder.new_read()
357-
actual_data = table_read.to_arrow(table_scan.plan().splits())
358-
expect_data = pa.Table.from_pydict({
359-
'f0': [5, 6, 7, 8],
360-
'f1': [-1001, 1002, 1003, 1004]
361-
}, schema=pa.schema([
362-
('f0', pa.int8()),
363-
('f1', pa.int16()),
364-
]))
365-
self.assertEqual(actual_data, expect_data)
261+
# content should be:
262+
# 'f0': [5, 6],
263+
# 'f1': [-1001, 1002]
366264
```
367265

368266
## Batch Read

paimon-python/pypaimon/tests/partial_columns_write_test.py renamed to paimon-python/pypaimon/tests/table_update_test.py

Lines changed: 33 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from pypaimon import CatalogFactory, Schema
2626

2727

28-
class PartialColumnsWriteTest(unittest.TestCase):
28+
class TableUpdateTest(unittest.TestCase):
2929
@classmethod
3030
def setUpClass(cls):
3131
cls.tempdir = tempfile.mkdtemp()
@@ -101,7 +101,7 @@ def test_update_existing_column(self):
101101
# Create table with initial data
102102
table = self._create_table()
103103

104-
# Create data evolution writer using BatchTableWrite
104+
# Create data evolution table update
105105
write_builder = table.new_batch_write_builder()
106106
batch_write = write_builder.new_write()
107107

@@ -112,10 +112,9 @@ def test_update_existing_column(self):
112112
})
113113

114114
# Update the age column
115-
write_builder = table.new_batch_write_builder().update_columns_by_row_id()
116-
batch_write = write_builder.new_write().with_write_type(['age'])
117-
batch_write.write_arrow(update_data)
118-
commit_messages = batch_write.prepare_commit()
115+
write_builder = table.new_batch_write_builder()
116+
table_update = write_builder.new_update().with_update_type(['age'])
117+
commit_messages = table_update.update_by_arrow_with_row_id(update_data)
119118

120119
# Commit the changes
121120
table_commit = write_builder.new_commit()
@@ -139,7 +138,7 @@ def test_update_multiple_columns(self):
139138
# Create table with initial data
140139
table = self._create_table()
141140

142-
# Create data evolution writer using BatchTableWrite
141+
# Create data evolution table update
143142
write_builder = table.new_batch_write_builder()
144143
batch_write = write_builder.new_write()
145144

@@ -151,10 +150,9 @@ def test_update_multiple_columns(self):
151150
})
152151

153152
# Update multiple columns
154-
write_builder = table.new_batch_write_builder().update_columns_by_row_id()
155-
batch_write = write_builder.new_write().with_write_type(['age', 'city'])
156-
batch_write.write_arrow(update_data)
157-
commit_messages = batch_write.prepare_commit()
153+
write_builder = table.new_batch_write_builder()
154+
table_update = write_builder.new_update().with_update_type(['age', 'city'])
155+
commit_messages = table_update.update_by_arrow_with_row_id(update_data)
158156

159157
# Commit the changes
160158
table_commit = write_builder.new_commit()
@@ -182,10 +180,6 @@ def test_nonexistent_column(self):
182180
"""Test that updating a non-existent column raises an error."""
183181
table = self._create_table()
184182

185-
# Create data evolution writer using BatchTableWrite
186-
write_builder = table.new_batch_write_builder()
187-
batch_write = write_builder.new_write()
188-
189183
# Try to update a non-existent column
190184
update_data = pa.Table.from_pydict({
191185
'_ROW_ID': [0, 1, 2, 3, 4],
@@ -194,18 +188,17 @@ def test_nonexistent_column(self):
194188

195189
# Should raise ValueError
196190
with self.assertRaises(ValueError) as context:
197-
write_builder = table.new_batch_write_builder().update_columns_by_row_id()
198-
batch_write = write_builder.new_write().with_write_type(['nonexistent_column'])
199-
batch_write.write_arrow(update_data)
191+
write_builder = table.new_batch_write_builder()
192+
table_update = write_builder.new_update().with_update_type(['nonexistent_column'])
193+
table_update.update_by_arrow_with_row_id(update_data)
200194

201195
self.assertIn('not in table schema', str(context.exception))
202-
batch_write.close()
203196

204197
def test_missing_row_id_column(self):
205198
"""Test that missing row_id column raises an error."""
206199
table = self._create_table()
207200

208-
# Create data evolution writer using BatchTableWrite
201+
# Create data evolution table update
209202
write_builder = table.new_batch_write_builder()
210203
batch_write = write_builder.new_write()
211204

@@ -216,9 +209,9 @@ def test_missing_row_id_column(self):
216209

217210
# Should raise ValueError
218211
with self.assertRaises(ValueError) as context:
219-
write_builder = table.new_batch_write_builder().update_columns_by_row_id()
220-
batch_write = write_builder.new_write().with_write_type(['age'])
221-
batch_write.write_arrow(update_data)
212+
write_builder = table.new_batch_write_builder()
213+
table_update = write_builder.new_update().with_update_type(['age'])
214+
table_update.update_by_arrow_with_row_id(update_data)
222215

223216
self.assertIn("Input data must contain _ROW_ID column", str(context.exception))
224217
batch_write.close()
@@ -247,24 +240,22 @@ def test_partitioned_table_update(self):
247240
table_write.close()
248241
table_commit.close()
249242

250-
# Create data evolution writer using BatchTableWrite
251-
write_builder = table.new_batch_write_builder().update_columns_by_row_id()
252-
batch_write = write_builder.new_write().with_write_type(['age'])
243+
# Create data evolution table update
244+
write_builder = table.new_batch_write_builder()
245+
table_update = write_builder.new_update().with_update_type(['age'])
253246

254247
# Update ages
255248
update_data = pa.Table.from_pydict({
256249
'_ROW_ID': [1, 0, 2, 3, 4],
257250
'age': [31, 26, 36, 41, 46]
258251
})
259252

260-
batch_write.write_arrow(update_data)
261-
commit_messages = batch_write.prepare_commit()
253+
commit_messages = table_update.update_by_arrow_with_row_id(update_data)
262254

263255
# Commit the changes
264256
table_commit = write_builder.new_commit()
265257
table_commit.commit(commit_messages)
266258
table_commit.close()
267-
batch_write.close()
268259

269260
# Verify the updated data
270261
read_builder = table.new_read_builder()
@@ -283,16 +274,15 @@ def test_multiple_calls(self):
283274
table = self._create_table()
284275

285276
# First update: Update age column
286-
write_builder = table.new_batch_write_builder().update_columns_by_row_id()
287-
batch_write = write_builder.new_write().with_write_type(['age'])
277+
write_builder = table.new_batch_write_builder()
278+
table_update = write_builder.new_update().with_update_type(['age'])
288279

289280
update_age_data = pa.Table.from_pydict({
290281
'_ROW_ID': [1, 0, 2, 3, 4],
291282
'age': [31, 26, 36, 41, 46]
292283
})
293284

294-
batch_write.write_arrow(update_age_data)
295-
commit_messages = batch_write.prepare_commit()
285+
commit_messages = table_update.update_by_arrow_with_row_id(update_age_data)
296286
table_commit = write_builder.new_commit()
297287
table_commit.commit(commit_messages)
298288
table_commit.close()
@@ -302,17 +292,12 @@ def test_multiple_calls(self):
302292
'_ROW_ID': [1, 0, 2, 3, 4],
303293
'city': ['Los Angeles', 'New York', 'Chicago', 'Phoenix', 'Houston']
304294
})
305-
write_builder = table.new_batch_write_builder().update_columns_by_row_id()
306-
batch_write = write_builder.new_write().with_write_type(['city'])
307-
batch_write.write_arrow(update_city_data)
308-
commit_messages = batch_write.prepare_commit()
295+
table_update.with_update_type(['city'])
296+
commit_messages = table_update.update_by_arrow_with_row_id(update_city_data)
309297
table_commit = write_builder.new_commit()
310298
table_commit.commit(commit_messages)
311299
table_commit.close()
312300

313-
# Close the batch write
314-
batch_write.close()
315-
316301
# Verify both columns were updated correctly
317302
read_builder = table.new_read_builder()
318303
table_read = read_builder.new_read()
@@ -333,9 +318,9 @@ def test_wrong_total_row_count(self):
333318
# Create table with initial data
334319
table = self._create_table()
335320

336-
# Create data evolution writer using BatchTableWrite
337-
write_builder = table.new_batch_write_builder().update_columns_by_row_id()
338-
batch_write = write_builder.new_write().with_write_type(['age'])
321+
# Create data evolution table update
322+
write_builder = table.new_batch_write_builder()
323+
table_update = write_builder.new_update().with_update_type(['age'])
339324

340325
# Prepare update data with wrong row count (only 3 rows instead of 5)
341326
update_data = pa.Table.from_pydict({
@@ -345,19 +330,18 @@ def test_wrong_total_row_count(self):
345330

346331
# Should raise ValueError for total row count mismatch
347332
with self.assertRaises(ValueError) as context:
348-
batch_write.write_arrow(update_data)
333+
table_update.update_by_arrow_with_row_id(update_data)
349334

350335
self.assertIn("does not match table total row count", str(context.exception))
351-
batch_write.close()
352336

353337
def test_wrong_first_row_id_row_count(self):
354338
"""Test that wrong row count for a first_row_id raises an error."""
355339
# Create table with initial data
356340
table = self._create_table()
357341

358-
# Create data evolution writer using BatchTableWrite
359-
write_builder = table.new_batch_write_builder().update_columns_by_row_id()
360-
batch_write = write_builder.new_write().with_write_type(['age'])
342+
# Create data evolution table update
343+
write_builder = table.new_batch_write_builder()
344+
table_update = write_builder.new_update().with_update_type(['age'])
361345

362346
# Prepare update data with duplicate row_id (violates monotonically increasing)
363347
update_data = pa.Table.from_pydict({
@@ -367,10 +351,9 @@ def test_wrong_first_row_id_row_count(self):
367351

368352
# Should raise ValueError for row ID validation
369353
with self.assertRaises(ValueError) as context:
370-
batch_write.write_arrow(update_data)
354+
table_update.update_by_arrow_with_row_id(update_data)
371355

372356
self.assertIn("Row IDs are not monotonically increasing", str(context.exception))
373-
batch_write.close()
374357

375358
if __name__ == '__main__':
376359
unittest.main()

0 commit comments

Comments
 (0)