Skip to content

Commit be528ae

Browse files
authored
Fallback for upsert when arrow cannot compare source rows with target rows (#1878)
<!-- Fixes #1711 --> ## Rationale for this change Upsert operations in PyIceberg rely on Arrow joins between source and target rows. However, Arrow Acero cannot compare certain complex types — like `struct`, `list`, and `map` — unless they’re part of the join key. When such types exist in non-join columns, the upsert fails with an error like: ```ArrowInvalid: Data type struct<...> is not supported in join non-key field venue_geo``` This PR introduces a **fallback mechanism**: if Arrow fails to join due to unsupported types, we fall back to comparing only the key columns. Non-key complex fields are ignored in the join condition, but still retained in the final upserted data. --- ### Before ```python # Fails if venue_geo is a non-key struct field txn.upsert(df, join_cols=["match_id"]) ``` > ❌ ArrowInvalid: Data type struct<...> is not supported in join non-key field venue_geo --- ### After ```python # Falls back to key-based join and proceeds txn.upsert(df, join_cols=["match_id"]) ``` > ✅ Successfully inserts or updates the record, skipping complex field comparison during join --- ## ✅ Are these changes tested? Yes: - A test was added to reproduce the failure scenario with complex non-key fields. - The new behavior is verified by asserting that the upsert completes successfully using the fallback logic. --- > ℹ️ **Note** > This change does not affect users who do not include complex types in their schemas. For those who do, it improves resilience while preserving data correctness. --- ## Are there any user-facing changes? Yes — upserts involving complex non-key columns (like `struct`, `list`, or `map`) no longer fail. They now succeed by skipping unsupported comparisons during the join phase.
1 parent 237333d commit be528ae

File tree

2 files changed

+206
-22
lines changed

2 files changed

+206
-22
lines changed

pyiceberg/table/upsert_util.py

Lines changed: 48 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols
6262
"""
6363
all_columns = set(source_table.column_names)
6464
join_cols_set = set(join_cols)
65-
non_key_cols = all_columns - join_cols_set
65+
66+
non_key_cols = list(all_columns - join_cols_set)
6667

6768
if has_duplicate_rows(target_table, join_cols):
6869
raise ValueError("Target table has duplicate rows, aborting upsert")
@@ -71,25 +72,51 @@ def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols
7172
# When the target table is empty, there is nothing to update :)
7273
return source_table.schema.empty_table()
7374

74-
diff_expr = functools.reduce(
75-
operator.or_,
76-
[
77-
pc.or_kleene(
78-
pc.not_equal(pc.field(f"{col}-lhs"), pc.field(f"{col}-rhs")),
79-
pc.is_null(pc.not_equal(pc.field(f"{col}-lhs"), pc.field(f"{col}-rhs"))),
80-
)
81-
for col in non_key_cols
82-
],
75+
# We need to compare non_key_cols in Python as PyArrow
76+
# 1. Cannot do a join when non-join columns have complex types
77+
# 2. Cannot compare columns with complex types
78+
# See: https://github.com/apache/arrow/issues/35785
79+
SOURCE_INDEX_COLUMN_NAME = "__source_index"
80+
TARGET_INDEX_COLUMN_NAME = "__target_index"
81+
82+
if SOURCE_INDEX_COLUMN_NAME in join_cols or TARGET_INDEX_COLUMN_NAME in join_cols:
83+
raise ValueError(
84+
f"{SOURCE_INDEX_COLUMN_NAME} and {TARGET_INDEX_COLUMN_NAME} are reserved for joining "
85+
f"DataFrames, and cannot be used as column names"
86+
) from None
87+
88+
# Step 1: Prepare source index with join keys and a marker index
89+
# Cast to target table schema, so we can do the join
90+
# See: https://github.com/apache/arrow/issues/37542
91+
source_index = (
92+
source_table.cast(target_table.schema)
93+
.select(join_cols_set)
94+
.append_column(SOURCE_INDEX_COLUMN_NAME, pa.array(range(len(source_table))))
8395
)
8496

85-
return (
86-
source_table
87-
# We already know that the schema is compatible, this is to fix large_ types
88-
.cast(target_table.schema)
89-
.join(target_table, keys=list(join_cols_set), join_type="inner", left_suffix="-lhs", right_suffix="-rhs")
90-
.filter(diff_expr)
91-
.drop_columns([f"{col}-rhs" for col in non_key_cols])
92-
.rename_columns({f"{col}-lhs" if col not in join_cols else col: col for col in source_table.column_names})
93-
# Finally cast to the original schema since it doesn't carry nullability:
94-
# https://github.com/apache/arrow/issues/45557
95-
).cast(target_table.schema)
97+
# Step 2: Prepare target index with join keys and a marker
98+
target_index = target_table.select(join_cols_set).append_column(TARGET_INDEX_COLUMN_NAME, pa.array(range(len(target_table))))
99+
100+
# Step 3: Perform an inner join to find which rows from source exist in target
101+
matching_indices = source_index.join(target_index, keys=list(join_cols_set), join_type="inner")
102+
103+
# Step 4: Compare all rows using Python
104+
to_update_indices = []
105+
for source_idx, target_idx in zip(
106+
matching_indices[SOURCE_INDEX_COLUMN_NAME].to_pylist(), matching_indices[TARGET_INDEX_COLUMN_NAME].to_pylist()
107+
):
108+
source_row = source_table.slice(source_idx, 1)
109+
target_row = target_table.slice(target_idx, 1)
110+
111+
for key in non_key_cols:
112+
source_val = source_row.column(key)[0].as_py()
113+
target_val = target_row.column(key)[0].as_py()
114+
if source_val != target_val:
115+
to_update_indices.append(source_idx)
116+
break
117+
118+
# Step 5: Take rows from source table using the indices and cast to target schema
119+
if to_update_indices:
120+
return source_table.take(to_update_indices)
121+
else:
122+
return source_table.schema.empty_table()

tests/table/test_upsert.py

Lines changed: 158 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
from pyiceberg.table import UpsertResult
3131
from pyiceberg.table.snapshots import Operation
3232
from pyiceberg.table.upsert_util import create_match_filter
33-
from pyiceberg.types import IntegerType, NestedField, StringType
33+
from pyiceberg.types import IntegerType, NestedField, StringType, StructType
3434
from tests.catalog.test_base import InMemoryCatalog, Table
3535

3636

@@ -511,6 +511,163 @@ def test_upsert_without_identifier_fields(catalog: Catalog) -> None:
511511
tbl.upsert(df)
512512

513513

514+
def test_upsert_with_struct_field_as_non_join_key(catalog: Catalog) -> None:
515+
identifier = "default.test_upsert_struct_field_fails"
516+
_drop_table(catalog, identifier)
517+
518+
schema = Schema(
519+
NestedField(1, "id", IntegerType(), required=True),
520+
NestedField(
521+
2,
522+
"nested_type",
523+
StructType(
524+
NestedField(3, "sub1", StringType(), required=True),
525+
NestedField(4, "sub2", StringType(), required=True),
526+
),
527+
required=False,
528+
),
529+
identifier_field_ids=[1],
530+
)
531+
532+
tbl = catalog.create_table(identifier, schema=schema)
533+
534+
arrow_schema = pa.schema(
535+
[
536+
pa.field("id", pa.int32(), nullable=False),
537+
pa.field(
538+
"nested_type",
539+
pa.struct(
540+
[
541+
pa.field("sub1", pa.large_string(), nullable=False),
542+
pa.field("sub2", pa.large_string(), nullable=False),
543+
]
544+
),
545+
nullable=True,
546+
),
547+
]
548+
)
549+
550+
initial_data = pa.Table.from_pylist(
551+
[
552+
{
553+
"id": 1,
554+
"nested_type": {"sub1": "bla1", "sub2": "bla"},
555+
}
556+
],
557+
schema=arrow_schema,
558+
)
559+
tbl.append(initial_data)
560+
561+
update_data = pa.Table.from_pylist(
562+
[
563+
{
564+
"id": 2,
565+
"nested_type": {"sub1": "bla1", "sub2": "bla"},
566+
},
567+
{
568+
"id": 1,
569+
"nested_type": {"sub1": "bla1", "sub2": "bla2"},
570+
},
571+
],
572+
schema=arrow_schema,
573+
)
574+
575+
res = tbl.upsert(update_data, join_cols=["id"])
576+
577+
expected_updated = 1
578+
expected_inserted = 1
579+
580+
assert_upsert_result(res, expected_updated, expected_inserted)
581+
582+
update_data = pa.Table.from_pylist(
583+
[
584+
{
585+
"id": 2,
586+
"nested_type": {"sub1": "bla1", "sub2": "bla"},
587+
},
588+
{
589+
"id": 1,
590+
"nested_type": {"sub1": "bla1", "sub2": "bla2"},
591+
},
592+
],
593+
schema=arrow_schema,
594+
)
595+
596+
res = tbl.upsert(update_data, join_cols=["id"])
597+
598+
expected_updated = 0
599+
expected_inserted = 0
600+
601+
assert_upsert_result(res, expected_updated, expected_inserted)
602+
603+
604+
def test_upsert_with_struct_field_as_join_key(catalog: Catalog) -> None:
605+
identifier = "default.test_upsert_with_struct_field_as_join_key"
606+
_drop_table(catalog, identifier)
607+
608+
schema = Schema(
609+
NestedField(1, "id", IntegerType(), required=True),
610+
NestedField(
611+
2,
612+
"nested_type",
613+
StructType(
614+
NestedField(3, "sub1", StringType(), required=True),
615+
NestedField(4, "sub2", StringType(), required=True),
616+
),
617+
required=False,
618+
),
619+
identifier_field_ids=[1],
620+
)
621+
622+
tbl = catalog.create_table(identifier, schema=schema)
623+
624+
arrow_schema = pa.schema(
625+
[
626+
pa.field("id", pa.int32(), nullable=False),
627+
pa.field(
628+
"nested_type",
629+
pa.struct(
630+
[
631+
pa.field("sub1", pa.large_string(), nullable=False),
632+
pa.field("sub2", pa.large_string(), nullable=False),
633+
]
634+
),
635+
nullable=True,
636+
),
637+
]
638+
)
639+
640+
initial_data = pa.Table.from_pylist(
641+
[
642+
{
643+
"id": 1,
644+
"nested_type": {"sub1": "bla1", "sub2": "bla"},
645+
}
646+
],
647+
schema=arrow_schema,
648+
)
649+
tbl.append(initial_data)
650+
651+
update_data = pa.Table.from_pylist(
652+
[
653+
{
654+
"id": 2,
655+
"nested_type": {"sub1": "bla1", "sub2": "bla"},
656+
},
657+
{
658+
"id": 1,
659+
"nested_type": {"sub1": "bla1", "sub2": "bla"},
660+
},
661+
],
662+
schema=arrow_schema,
663+
)
664+
665+
with pytest.raises(
666+
pa.lib.ArrowNotImplementedError, match="Keys of type struct<sub1: large_string not null, sub2: large_string not null>"
667+
):
668+
_ = tbl.upsert(update_data, join_cols=["nested_type"])
669+
670+
514671
def test_upsert_with_nulls(catalog: Catalog) -> None:
515672
identifier = "default.test_upsert_with_nulls"
516673
_drop_table(catalog, identifier)

0 commit comments

Comments
 (0)