@@ -213,6 +213,158 @@ write_builder = table.new_batch_write_builder().overwrite()
213213write_builder = table.new_batch_write_builder().overwrite({' dt' : ' 2024-01-01' })
214214```
215215
216+ ### Write partial columns
217+
218+ when enable data-evolution, you can write partial columns to table:
219+
220+ ``` python
221+ simple_pa_schema = pa.schema([
222+ (' f0' , pa.int8()),
223+ (' f1' , pa.int16()),
224+ ])
225+ schema = Schema.from_pyarrow_schema(simple_pa_schema,
226+ options = {' row-tracking.enabled' : ' true' , ' data-evolution.enabled' : ' true' })
227+ catalog.create_table(' default.test_row_tracking' , schema, False )
228+ table = catalog.get_table(' default.test_row_tracking' )
229+
230+ # write all columns
231+ write_builder = table.new_batch_write_builder()
232+ table_write = write_builder.new_write()
233+ table_commit = write_builder.new_commit()
234+ expect_data = pa.Table.from_pydict({
235+ ' f0' : [- 1 , 2 ],
236+ ' f1' : [- 1001 , 1002 ]
237+ }, schema = simple_pa_schema)
238+ table_write.write_arrow(expect_data)
239+ table_commit.commit(table_write.prepare_commit())
240+ table_write.close()
241+ table_commit.close()
242+
243+ # write partial columns
244+ table_write = write_builder.new_write().with_write_type([' f0' ])
245+ table_commit = write_builder.new_commit()
246+ data2 = pa.Table.from_pydict({
247+ ' f0' : [3 , 4 ],
248+ }, schema = pa.schema([
249+ (' f0' , pa.int8()),
250+ ]))
251+ table_write.write_arrow(data2)
252+ cmts = table_write.prepare_commit()
253+
254+ # assign first row id
255+ cmts[0 ].new_files[0 ].first_row_id = 0
256+ table_commit.commit(cmts)
257+ table_write.close()
258+ table_commit.close()
259+ ```
260+
261+ Paimon data-evolution table use ` first_row_id ` to split files, when write partial columns,
262+ you should split data into multiple parts by rows, and assign ` first_row_id ` for each file before commit
263+ , or it may cause some fatal error during table reads.
264+
265+ For example, in the following code, ` write-1 ` will generate a file with ` first_row_id=0 ` which contains 2 rows,
266+ and ` write-2 ` will generate a file with ` first_row_id=2 ` which also contains 2 rows. Then, if we update column ` f0 ` and
267+ do not split data into multiple parts by rows, the generated file would have ` first_row_id=0 ` and contains 4 rows, when reading
268+ this table, it will cause a fatal error.
269+
270+ ``` python
271+ table = catalog.get_table(' default.test_row_tracking' )
272+
273+ # write-1
274+ write_builder = table.new_batch_write_builder()
275+ table_write = write_builder.new_write()
276+ table_commit = write_builder.new_commit()
277+ expect_data = pa.Table.from_pydict({
278+ ' f0' : [- 1 , 2 ],
279+ ' f1' : [- 1001 , 1002 ]
280+ }, schema = simple_pa_schema)
281+ table_write.write_arrow(expect_data)
282+ table_commit.commit(table_write.prepare_commit())
283+ table_write.close()
284+ table_commit.close()
285+
286+ # write-2
287+ table_write = write_builder.new_write()
288+ table_commit = write_builder.new_commit()
289+ expect_data = pa.Table.from_pydict({
290+ ' f0' : [3 , 4 ],
291+ ' f1' : [1003 , 1004 ]
292+ }, schema = simple_pa_schema)
293+ table_write.write_arrow(expect_data)
294+ table_commit.commit(table_write.prepare_commit())
295+ table_write.close()
296+ table_commit.close()
297+
298+ # write partial columns
299+ table_write = write_builder.new_write().with_write_type([' f0' ])
300+ table_commit = write_builder.new_commit()
301+ data2 = pa.Table.from_pydict({
302+ ' f0' : [5 , 6 , 7 , 8 ],
303+ }, schema = pa.schema([
304+ (' f0' , pa.int8()),
305+ ]))
306+ table_write.write_arrow(data2)
307+ cmts = table_write.prepare_commit()
308+ cmts[0 ].new_files[0 ].first_row_id = 0
309+ table_commit.commit(cmts)
310+ table_write.close()
311+ table_commit.close()
312+
313+ read_builder = table.new_read_builder()
314+ table_scan = read_builder.new_scan()
315+ table_read = read_builder.new_read()
316+
317+ # a fatal error will be thrown
318+ actual_data = table_read.to_arrow(table_scan.plan().splits())
319+ ```
320+
321+ ### Update columns
322+
323+ Handle file ` first_row_id ` manually is inconvenient and error-prone. If you don't want to do this, you can enable ` update_columns_by_row_id `
324+ when create ` WriteBuilder ` and set write type for ` TableWrite ` , then you can write partial columns without handling file ` first_row_id ` .
325+ The input data should include the ` _ROW_ID ` column, writing operation will automatically sort and match each ` _ROW_ID ` to
326+ its corresponding ` first_row_id ` , then groups rows with the same ` first_row_id ` and writes them to a separate file.
327+
328+ ``` python
329+ table = catalog.get_table(' default.test_row_tracking' )
330+
331+ # write-1
332+ # same as above
333+
334+ # write-2
335+ # same as above
336+
337+ # update partial columns
338+ write_builder = table.new_batch_write_builder().update_columns_by_row_id()
339+ table_write = write_builder.new_write().with_write_type([' f0' ])
340+ table_commit = write_builder.new_commit()
341+ data2 = pa.Table.from_pydict({
342+ ' _ROW_ID' : [0 , 1 , 2 , 3 ],
343+ ' f0' : [5 , 6 , 7 , 8 ],
344+ }, schema = pa.schema([
345+ (' _ROW_ID' , pa.int64()),
346+ (' f0' , pa.int8()),
347+ ]))
348+ table_write.write_arrow(data2)
349+ cmts = table_write.prepare_commit()
350+ table_commit.commit(cmts)
351+ table_write.close()
352+ table_commit.close()
353+
354+ read_builder = table.new_read_builder()
355+ table_scan = read_builder.new_scan()
356+ table_read = read_builder.new_read()
357+ actual_data = table_read.to_arrow(table_scan.plan().splits())
358+ expect_data = pa.Table.from_pydict({
359+ ' f0' : [5 , 6 , 7 , 8 ],
360+ ' f1' : [- 1001 , 1002 , 1003 , 1004 ]
361+ }, schema = pa.schema([
362+ (' f0' , pa.int8()),
363+ (' f1' , pa.int16()),
364+ ]))
365+ self .assertEqual(actual_data, expect_data)
366+ ```
367+
216368## Batch Read
217369
218370### Predicate pushdown
0 commit comments