Skip to content

Commit b6a45ed

Browse files
fix: sanitize invalid Avro field names in manifest file (#2245)
<!-- Thanks for opening a pull request! --> <!-- In the case this PR will resolve an issue, please replace ${GITHUB_ISSUE_ID} below with the actual Github issue id. --> <!-- Closes #${GITHUB_ISSUE_ID} --> Closes #2123 # Rationale for this change Fixing sanitization behaviour to match specification and Java implementation # Are these changes tested? Yes - Unit and integration tests # Are there any user-facing changes? Yes - Field names will be sanitized to be Avro compatible if not already <!-- In the case of user-facing changes, please add the changelog label. --> --------- Co-authored-by: Kevin Liu <[email protected]>
1 parent 45f66da commit b6a45ed

File tree

4 files changed

+442
-9
lines changed

4 files changed

+442
-9
lines changed

pyiceberg/schema.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@
7878

7979
INITIAL_SCHEMA_ID = 0
8080

81+
FIELD_ID_PROP = "field-id"
82+
ICEBERG_FIELD_NAME_PROP = "iceberg-field-name"
83+
8184

8285
class Schema(IcebergBaseModel):
8386
"""A table Schema.
@@ -1356,6 +1359,21 @@ def primitive(self, primitive: PrimitiveType) -> PrimitiveType:
13561359

13571360
# Implementation copied from Apache Iceberg repo.
13581361
def make_compatible_name(name: str) -> str:
1362+
"""Make a field name compatible with Avro specification.
1363+
1364+
This function sanitizes field names to comply with Avro naming rules:
1365+
- Names must start with [A-Za-z_]
1366+
- Subsequent characters must be [A-Za-z0-9_]
1367+
1368+
Invalid characters are replaced with _xHHHH where HHHH is the hex code.
1369+
Names starting with digits get a leading underscore.
1370+
1371+
Args:
1372+
name: The original field name
1373+
1374+
Returns:
1375+
A sanitized name that complies with Avro specification
1376+
"""
13591377
if not _valid_avro_name(name):
13601378
return _sanitize_name(name)
13611379
return name
@@ -1391,7 +1409,9 @@ def _sanitize_name(name: str) -> str:
13911409

13921410

13931411
def _sanitize_char(character: str) -> str:
1394-
return "_" + character if character.isdigit() else "_x" + hex(ord(character))[2:].upper()
1412+
if character.isdigit():
1413+
return "_" + character
1414+
return "_x" + hex(ord(character))[2:].upper()
13951415

13961416

13971417
def sanitize_column_names(schema: Schema) -> Schema:

pyiceberg/utils/schema_conversion.py

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,14 @@
2626
Union,
2727
)
2828

29-
from pyiceberg.schema import Schema, SchemaVisitorPerPrimitiveType, visit
29+
from pyiceberg.schema import (
30+
FIELD_ID_PROP,
31+
ICEBERG_FIELD_NAME_PROP,
32+
Schema,
33+
SchemaVisitorPerPrimitiveType,
34+
make_compatible_name,
35+
visit,
36+
)
3037
from pyiceberg.types import (
3138
BinaryType,
3239
BooleanType,
@@ -225,13 +232,13 @@ def _convert_field(self, field: Dict[str, Any]) -> NestedField:
225232
Returns:
226233
The Iceberg equivalent field.
227234
"""
228-
if "field-id" not in field:
229-
raise ValueError(f"Cannot convert field, missing field-id: {field}")
235+
if FIELD_ID_PROP not in field:
236+
raise ValueError(f"Cannot convert field, missing {FIELD_ID_PROP}: {field}")
230237

231238
plain_type, required = self._resolve_union(field["type"])
232239

233240
return NestedField(
234-
field_id=field["field-id"],
241+
field_id=field[FIELD_ID_PROP],
235242
name=field["name"],
236243
field_type=self._convert_schema(plain_type),
237244
required=required,
@@ -524,12 +531,18 @@ def field(self, field: NestedField, field_result: AvroType) -> AvroType:
524531
if isinstance(field_result, dict) and field_result.get("type") == "record":
525532
field_result["name"] = f"r{field.field_id}"
526533

534+
original_name = field.name
535+
sanitized_name = make_compatible_name(original_name)
536+
527537
result = {
528-
"name": field.name,
529-
"field-id": field.field_id,
538+
"name": sanitized_name,
539+
FIELD_ID_PROP: field.field_id,
530540
"type": field_result if field.required else ["null", field_result],
531541
}
532542

543+
if original_name != sanitized_name:
544+
result[ICEBERG_FIELD_NAME_PROP] = original_name
545+
533546
if field.write_default is not None:
534547
result["default"] = field.write_default
535548
elif field.optional:
@@ -564,8 +577,8 @@ def map(self, map_type: MapType, key_result: AvroType, value_result: AvroType) -
564577
"type": "record",
565578
"name": f"k{self.last_map_key_field_id}_v{self.last_map_value_field_id}",
566579
"fields": [
567-
{"name": "key", "type": key_result, "field-id": self.last_map_key_field_id},
568-
{"name": "value", "type": value_result, "field-id": self.last_map_value_field_id},
580+
{"name": "key", "type": key_result, FIELD_ID_PROP: self.last_map_key_field_id},
581+
{"name": "value", "type": value_result, FIELD_ID_PROP: self.last_map_value_field_id},
569582
],
570583
},
571584
"logicalType": "map",

tests/integration/test_writes/test_writes.py

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1201,6 +1201,137 @@ def test_sanitize_character_partitioned(catalog: Catalog) -> None:
12011201
assert len(tbl.scan().to_arrow()) == 22
12021202

12031203

1204+
@pytest.mark.integration
1205+
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog")])
1206+
def test_sanitize_character_partitioned_avro_bug(catalog: Catalog) -> None:
1207+
table_name = "default.test_table_partitioned_sanitized_character_avro"
1208+
try:
1209+
catalog.drop_table(table_name)
1210+
except NoSuchTableError:
1211+
pass
1212+
1213+
schema = Schema(
1214+
NestedField(id=1, name="😎", field_type=StringType(), required=False),
1215+
)
1216+
1217+
partition_spec = PartitionSpec(
1218+
PartitionField(
1219+
source_id=1,
1220+
field_id=1001,
1221+
transform=IdentityTransform(),
1222+
name="😎",
1223+
)
1224+
)
1225+
1226+
tbl = _create_table(
1227+
session_catalog=catalog,
1228+
identifier=table_name,
1229+
schema=schema,
1230+
partition_spec=partition_spec,
1231+
data=[
1232+
pa.Table.from_arrays(
1233+
[pa.array([str(i) for i in range(22)])], schema=pa.schema([pa.field("😎", pa.string(), nullable=False)])
1234+
)
1235+
],
1236+
)
1237+
1238+
assert len(tbl.scan().to_arrow()) == 22
1239+
1240+
# verify that we can read the table with DuckDB
1241+
import duckdb
1242+
1243+
location = tbl.metadata_location
1244+
duckdb.sql("INSTALL iceberg; LOAD iceberg;")
1245+
# Configure S3 settings for DuckDB to match the catalog configuration
1246+
duckdb.sql("SET s3_endpoint='localhost:9000';")
1247+
duckdb.sql("SET s3_access_key_id='admin';")
1248+
duckdb.sql("SET s3_secret_access_key='password';")
1249+
duckdb.sql("SET s3_use_ssl=false;")
1250+
duckdb.sql("SET s3_url_style='path';")
1251+
result = duckdb.sql(f"SELECT * FROM iceberg_scan('{location}')").fetchall()
1252+
assert len(result) == 22
1253+
1254+
1255+
@pytest.mark.integration
1256+
@pytest.mark.parametrize("format_version", [1, 2])
1257+
def test_cross_platform_special_character_compatibility(
1258+
spark: SparkSession, session_catalog: Catalog, format_version: int
1259+
) -> None:
1260+
"""Test cross-platform compatibility with special characters in column names."""
1261+
identifier = "default.test_cross_platform_special_characters"
1262+
1263+
# Test various special characters that need sanitization
1264+
special_characters = [
1265+
"😎", # emoji - Java produces _xD83D_xDE0E, Python produces _x1F60E
1266+
"a.b", # dot - both should produce a_x2Eb
1267+
"a#b", # hash - both should produce a_x23b
1268+
"9x", # starts with digit - both should produce _9x
1269+
"x_", # valid - should remain unchanged
1270+
"letter/abc", # slash - both should produce letter_x2Fabc
1271+
]
1272+
1273+
for i, special_char in enumerate(special_characters):
1274+
table_name = f"{identifier}_{format_version}_{i}"
1275+
pyiceberg_table_name = f"{identifier}_pyiceberg_{format_version}_{i}"
1276+
1277+
try:
1278+
session_catalog.drop_table(table_name)
1279+
except Exception:
1280+
pass
1281+
try:
1282+
session_catalog.drop_table(pyiceberg_table_name)
1283+
except Exception:
1284+
pass
1285+
1286+
try:
1287+
# Test 1: Spark writes, PyIceberg reads
1288+
spark_df = spark.createDataFrame([("test_value",)], [special_char])
1289+
spark_df.writeTo(table_name).using("iceberg").createOrReplace()
1290+
1291+
# Read with PyIceberg table scan
1292+
tbl = session_catalog.load_table(table_name)
1293+
pyiceberg_df = tbl.scan().to_pandas()
1294+
assert len(pyiceberg_df) == 1
1295+
assert special_char in pyiceberg_df.columns
1296+
assert pyiceberg_df.iloc[0][special_char] == "test_value"
1297+
1298+
# Test 2: PyIceberg writes, Spark reads
1299+
from pyiceberg.schema import Schema
1300+
from pyiceberg.types import NestedField, StringType
1301+
1302+
schema = Schema(NestedField(field_id=1, name=special_char, field_type=StringType(), required=True))
1303+
1304+
tbl_pyiceberg = session_catalog.create_table(
1305+
identifier=pyiceberg_table_name, schema=schema, properties={"format-version": str(format_version)}
1306+
)
1307+
1308+
import pyarrow as pa
1309+
1310+
# Create PyArrow schema with required field to match Iceberg schema
1311+
pa_schema = pa.schema([pa.field(special_char, pa.string(), nullable=False)])
1312+
data = pa.Table.from_pydict({special_char: ["pyiceberg_value"]}, schema=pa_schema)
1313+
tbl_pyiceberg.append(data)
1314+
1315+
# Read with Spark
1316+
spark_df_read = spark.table(pyiceberg_table_name)
1317+
spark_result = spark_df_read.collect()
1318+
1319+
# Verify data integrity
1320+
assert len(spark_result) == 1
1321+
assert special_char in spark_df_read.columns
1322+
assert spark_result[0][special_char] == "pyiceberg_value"
1323+
1324+
finally:
1325+
try:
1326+
session_catalog.drop_table(table_name)
1327+
except Exception:
1328+
pass
1329+
try:
1330+
session_catalog.drop_table(pyiceberg_table_name)
1331+
except Exception:
1332+
pass
1333+
1334+
12041335
@pytest.mark.integration
12051336
@pytest.mark.parametrize("format_version", [1, 2])
12061337
def test_table_write_subset_of_schema(session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int) -> None:

0 commit comments

Comments
 (0)