Skip to content

Commit 300b840

Browse files
authored
Add upsert docs (#1665)
And make the join-cols optional using the identifier fields.
1 parent dd58ac1 commit 300b840

File tree

3 files changed

+130
-2
lines changed

3 files changed

+130
-2
lines changed

mkdocs/docs/api.md

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -474,6 +474,71 @@ lat: [[48.864716],[52.371807],[53.11254],[37.773972]]
474474
long: [[2.349014],[4.896029],[6.0989],[-122.431297]]
475475
```
476476

477+
### Upsert
478+
479+
PyIceberg supports upsert operations, meaning that it is able to merge an Arrow table into an Iceberg table. Rows are considered the same based on the [identifier field](https://iceberg.apache.org/spec/?column-projection#identifier-field-ids). If a row is already in the table, it will update that row. If a row cannot be found, it will insert that new row.
480+
481+
Consider the following table, with some data:
482+
483+
```python
484+
from pyiceberg.schema import Schema
485+
from pyiceberg.types import IntegerType, NestedField, StringType
486+
487+
import pyarrow as pa
488+
489+
schema = Schema(
490+
NestedField(1, "city", StringType(), required=True),
491+
NestedField(2, "inhabitants", IntegerType(), required=True),
492+
# Mark City as the identifier field, also known as the primary-key
493+
identifier_field_ids=[1]
494+
)
495+
496+
tbl = catalog.create_table("default.cities", schema=schema)
497+
498+
arrow_schema = pa.schema(
499+
[
500+
pa.field("city", pa.string(), nullable=False),
501+
pa.field("inhabitants", pa.int32(), nullable=False),
502+
]
503+
)
504+
505+
# Write some data
506+
df = pa.Table.from_pylist(
507+
[
508+
{"city": "Amsterdam", "inhabitants": 921402},
509+
{"city": "San Francisco", "inhabitants": 808988},
510+
{"city": "Drachten", "inhabitants": 45019},
511+
{"city": "Paris", "inhabitants": 2103000},
512+
],
513+
schema=arrow_schema
514+
)
515+
tbl.append(df)
516+
```
517+
518+
Next, we'll upsert a table into the Iceberg table:
519+
520+
```python
521+
df = pa.Table.from_pylist(
522+
[
523+
# Will be updated, the inhabitants has been updated
524+
{"city": "Drachten", "inhabitants": 45505},
525+
526+
# New row, will be inserted
527+
{"city": "Berlin", "inhabitants": 3432000},
528+
529+
# Ignored, already exists in the table
530+
{"city": "Paris", "inhabitants": 2103000},
531+
],
532+
schema=arrow_schema
533+
)
534+
upd = tbl.upsert(df)
535+
536+
assert upd.rows_updated == 1
537+
assert upd.rows_inserted == 1
538+
```
539+
540+
PyIceberg will automatically detect which rows need to be updated, inserted or can simply be ignored.
541+
477542
## Inspecting tables
478543

479544
To explore the table metadata, tables can be inspected.

pyiceberg/table/__init__.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1109,7 +1109,7 @@ def name_mapping(self) -> Optional[NameMapping]:
11091109
def upsert(
11101110
self,
11111111
df: pa.Table,
1112-
join_cols: list[str],
1112+
join_cols: Optional[List[str]] = None,
11131113
when_matched_update_all: bool = True,
11141114
when_not_matched_insert_all: bool = True,
11151115
case_sensitive: bool = True,
@@ -1119,11 +1119,13 @@ def upsert(
11191119
Args:
11201120
11211121
df: The input dataframe to upsert with the table's data.
1122-
join_cols: The columns to join on. These are essentially analogous to primary keys
1122+
join_cols: Columns to join on, if not provided, it will use the identifier-field-ids.
11231123
when_matched_update_all: Bool indicating to update rows that are matched but require an update due to a value in a non-key column changing
11241124
when_not_matched_insert_all: Bool indicating new rows to be inserted that do not match any existing rows in the table
11251125
case_sensitive: Bool indicating if the match should be case-sensitive
11261126
1127+
To learn more about the identifier-field-ids: https://iceberg.apache.org/spec/#identifier-field-ids
1128+
11271129
Example Use Cases:
11281130
Case 1: Both Parameters = True (Full Upsert)
11291131
Existing row found → Update it
@@ -1148,6 +1150,15 @@ def upsert(
11481150
"""
11491151
from pyiceberg.table import upsert_util
11501152

1153+
if join_cols is None:
1154+
join_cols = []
1155+
for field_id in self.schema().identifier_field_ids:
1156+
col = self.schema().find_column_name(field_id)
1157+
if col is not None:
1158+
join_cols.append(col)
1159+
else:
1160+
raise ValueError(f"Field-ID could not be found: {join_cols}")
1161+
11511162
if not when_matched_update_all and not when_not_matched_insert_all:
11521163
raise ValueError("no upsert options selected...exiting")
11531164

tests/table/test_upsert.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,16 @@
1616
# under the License.
1717
from pathlib import PosixPath
1818

19+
import pyarrow as pa
1920
import pytest
2021
from datafusion import SessionContext
2122
from pyarrow import Table as pa_table
2223

2324
from pyiceberg.catalog import Catalog
2425
from pyiceberg.exceptions import NoSuchTableError
26+
from pyiceberg.schema import Schema
2527
from pyiceberg.table import UpsertResult
28+
from pyiceberg.types import IntegerType, NestedField, StringType
2629
from tests.catalog.test_base import InMemoryCatalog, Table
2730

2831

@@ -314,3 +317,52 @@ def test_key_cols_misaligned(catalog: Catalog) -> None:
314317

315318
with pytest.raises(Exception, match=r"""Field ".*" does not exist in schema"""):
316319
table.upsert(df=df_src, join_cols=["order_id"])
320+
321+
322+
def test_upsert_with_identifier_fields(catalog: Catalog) -> None:
323+
identifier = "default.test_upsert_with_identifier_fields"
324+
_drop_table(catalog, identifier)
325+
326+
schema = Schema(
327+
NestedField(1, "city", StringType(), required=True),
328+
NestedField(2, "inhabitants", IntegerType(), required=True),
329+
# Mark City as the identifier field, also known as the primary-key
330+
identifier_field_ids=[1],
331+
)
332+
333+
tbl = catalog.create_table(identifier, schema=schema)
334+
335+
arrow_schema = pa.schema(
336+
[
337+
pa.field("city", pa.string(), nullable=False),
338+
pa.field("inhabitants", pa.int32(), nullable=False),
339+
]
340+
)
341+
342+
# Write some data
343+
df = pa.Table.from_pylist(
344+
[
345+
{"city": "Amsterdam", "inhabitants": 921402},
346+
{"city": "San Francisco", "inhabitants": 808988},
347+
{"city": "Drachten", "inhabitants": 45019},
348+
{"city": "Paris", "inhabitants": 2103000},
349+
],
350+
schema=arrow_schema,
351+
)
352+
tbl.append(df)
353+
354+
df = pa.Table.from_pylist(
355+
[
356+
# Will be updated, the inhabitants has been updated
357+
{"city": "Drachten", "inhabitants": 45505},
358+
# New row, will be inserted
359+
{"city": "Berlin", "inhabitants": 3432000},
360+
# Ignored, already exists in the table
361+
{"city": "Paris", "inhabitants": 2103000},
362+
],
363+
schema=arrow_schema,
364+
)
365+
upd = tbl.upsert(df)
366+
367+
assert upd.rows_updated == 1
368+
assert upd.rows_inserted == 1

0 commit comments

Comments
 (0)