Skip to content

Commit e5e7453

Browse files
mccormickt12Tom McCormick
andauthored
Add comprehensive ORC read support to PyArrow I/O (#2432)
Features implemented: - Record batching and table reading via ArrowScan - Column projection and row filtering with predicate pushdown - Positional deletes support (with ORC-specific non-dictionary handling) - Schema mapping for files without field IDs - Streaming via Iterator[pa.RecordBatch] for memory efficiency - Full integration with Iceberg metadata and partitioning <!-- Thanks for opening a pull request! --> <!-- In the case this PR will resolve an issue, please replace ${GITHUB_ISSUE_ID} below with the actual Github issue id. --> <!-- Closes #${GITHUB_ISSUE_ID} --> # Rationale for this change ## Are these changes tested? ## Are there any user-facing changes? <!-- In the case of user-facing changes, please add the changelog label. --> --------- Co-authored-by: Tom McCormick <[email protected]>
1 parent 067cf05 commit e5e7453

File tree

6 files changed

+1934
-33
lines changed

6 files changed

+1934
-33
lines changed

pyiceberg/io/pyarrow.py

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,8 @@
201201
ICEBERG_SCHEMA = b"iceberg.schema"
202202
# The PARQUET: in front means that it is Parquet specific, in this case the field_id
203203
PYARROW_PARQUET_FIELD_ID_KEY = b"PARQUET:field_id"
204+
# ORC field ID key for Iceberg field IDs in ORC metadata
205+
ORC_FIELD_ID_KEY = b"iceberg.id"
204206
PYARROW_FIELD_DOC_KEY = b"doc"
205207
LIST_ELEMENT_NAME = "element"
206208
MAP_KEY_NAME = "key"
@@ -690,16 +692,20 @@ def schema_to_pyarrow(
690692
schema: Union[Schema, IcebergType],
691693
metadata: Dict[bytes, bytes] = EMPTY_DICT,
692694
include_field_ids: bool = True,
695+
file_format: FileFormat = FileFormat.PARQUET,
693696
) -> pa.schema:
694-
return visit(schema, _ConvertToArrowSchema(metadata, include_field_ids))
697+
return visit(schema, _ConvertToArrowSchema(metadata, include_field_ids, file_format))
695698

696699

697700
class _ConvertToArrowSchema(SchemaVisitorPerPrimitiveType[pa.DataType]):
698701
_metadata: Dict[bytes, bytes]
699702

700-
def __init__(self, metadata: Dict[bytes, bytes] = EMPTY_DICT, include_field_ids: bool = True) -> None:
703+
def __init__(
704+
self, metadata: Dict[bytes, bytes] = EMPTY_DICT, include_field_ids: bool = True, file_format: Optional[FileFormat] = None
705+
) -> None:
701706
self._metadata = metadata
702707
self._include_field_ids = include_field_ids
708+
self._file_format = file_format
703709

704710
def schema(self, _: Schema, struct_result: pa.StructType) -> pa.schema:
705711
return pa.schema(list(struct_result), metadata=self._metadata)
@@ -712,7 +718,12 @@ def field(self, field: NestedField, field_result: pa.DataType) -> pa.Field:
712718
if field.doc:
713719
metadata[PYARROW_FIELD_DOC_KEY] = field.doc
714720
if self._include_field_ids:
715-
metadata[PYARROW_PARQUET_FIELD_ID_KEY] = str(field.field_id)
721+
# Add field ID based on file format
722+
if self._file_format == FileFormat.ORC:
723+
metadata[ORC_FIELD_ID_KEY] = str(field.field_id)
724+
else:
725+
# Default to Parquet for backward compatibility
726+
metadata[PYARROW_PARQUET_FIELD_ID_KEY] = str(field.field_id)
716727

717728
return pa.field(
718729
name=field.name,
@@ -1011,6 +1022,10 @@ def _expression_to_complementary_pyarrow(expr: BooleanExpression) -> pc.Expressi
10111022
def _get_file_format(file_format: FileFormat, **kwargs: Dict[str, Any]) -> ds.FileFormat:
10121023
if file_format == FileFormat.PARQUET:
10131024
return ds.ParquetFileFormat(**kwargs)
1025+
elif file_format == FileFormat.ORC:
1026+
# ORC doesn't support pre_buffer and buffer_size parameters
1027+
orc_kwargs = {k: v for k, v in kwargs.items() if k not in ["pre_buffer", "buffer_size"]}
1028+
return ds.OrcFileFormat(**orc_kwargs)
10141029
else:
10151030
raise ValueError(f"Unsupported file format: {file_format}")
10161031

@@ -1027,6 +1042,15 @@ def _read_deletes(io: FileIO, data_file: DataFile) -> Dict[str, pa.ChunkedArray]
10271042
file.as_py(): table.filter(pc.field("file_path") == file).column("pos")
10281043
for file in table.column("file_path").chunks[0].dictionary
10291044
}
1045+
elif data_file.file_format == FileFormat.ORC:
1046+
with io.new_input(data_file.file_path).open() as fi:
1047+
delete_fragment = _get_file_format(data_file.file_format).make_fragment(fi)
1048+
table = ds.Scanner.from_fragment(fragment=delete_fragment).to_table()
1049+
# For ORC, file_path columns are not dictionary-encoded, so we use unique() directly
1050+
return {
1051+
path.as_py(): table.filter(pc.field("file_path") == path).column("pos")
1052+
for path in table.column("file_path").unique()
1053+
}
10301054
elif data_file.file_format == FileFormat.PUFFIN:
10311055
with io.new_input(data_file.file_path).open() as fi:
10321056
payload = fi.read()
@@ -1228,11 +1252,17 @@ def primitive(self, primitive: pa.DataType) -> T:
12281252

12291253

12301254
def _get_field_id(field: pa.Field) -> Optional[int]:
1231-
return (
1232-
int(field_id_str.decode())
1233-
if (field.metadata and (field_id_str := field.metadata.get(PYARROW_PARQUET_FIELD_ID_KEY)))
1234-
else None
1235-
)
1255+
"""Return the Iceberg field ID from Parquet or ORC metadata if available."""
1256+
if field.metadata:
1257+
# Try Parquet field ID first
1258+
if field_id_bytes := field.metadata.get(PYARROW_PARQUET_FIELD_ID_KEY):
1259+
return int(field_id_bytes.decode())
1260+
1261+
# Fallback: try ORC field ID
1262+
if field_id_bytes := field.metadata.get(ORC_FIELD_ID_KEY):
1263+
return int(field_id_bytes.decode())
1264+
1265+
return None
12361266

12371267

12381268
class _HasIds(PyArrowSchemaVisitor[bool]):
@@ -1495,7 +1525,7 @@ def _task_to_record_batches(
14951525
format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION,
14961526
downcast_ns_timestamp_to_us: Optional[bool] = None,
14971527
) -> Iterator[pa.RecordBatch]:
1498-
arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8))
1528+
arrow_format = _get_file_format(task.file.file_format, pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8))
14991529
with io.new_input(task.file.file_path).open() as fin:
15001530
fragment = arrow_format.make_fragment(fin)
15011531
physical_schema = fragment.physical_schema
@@ -1845,6 +1875,8 @@ def _construct_field(self, field: NestedField, arrow_type: pa.DataType) -> pa.Fi
18451875
if field.doc:
18461876
metadata[PYARROW_FIELD_DOC_KEY] = field.doc
18471877
if self._include_field_ids:
1878+
# For projection visitor, we don't know the file format, so default to Parquet
1879+
# This is used for schema conversion during reads, not writes
18481880
metadata[PYARROW_PARQUET_FIELD_ID_KEY] = str(field.field_id)
18491881

18501882
return pa.field(

pyiceberg/table/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,9 @@ class TableProperties:
211211
WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULT = True
212212

213213
WRITE_DATA_PATH = "write.data.path"
214+
215+
WRITE_FILE_FORMAT = "write.format.default"
216+
WRITE_FILE_FORMAT_DEFAULT = "parquet"
214217
WRITE_METADATA_PATH = "write.metadata.path"
215218

216219
DELETE_MODE = "write.delete.mode"

tests/conftest.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2413,6 +2413,32 @@ def example_task(data_file: str) -> FileScanTask:
24132413
)
24142414

24152415

2416+
@pytest.fixture
2417+
def data_file_orc(table_schema_simple: Schema, tmp_path: str) -> str:
2418+
import pyarrow as pa
2419+
import pyarrow.orc as orc
2420+
2421+
from pyiceberg.io.pyarrow import schema_to_pyarrow
2422+
2423+
table = pa.table(
2424+
{"foo": ["a", "b", "c"], "bar": [1, 2, 3], "baz": [True, False, None]},
2425+
schema=schema_to_pyarrow(table_schema_simple),
2426+
)
2427+
2428+
file_path = f"{tmp_path}/0000-data.orc"
2429+
orc.write_table(table=table, where=file_path)
2430+
return file_path
2431+
2432+
2433+
@pytest.fixture
2434+
def example_task_orc(data_file_orc: str) -> FileScanTask:
2435+
datafile = DataFile.from_args(file_path=data_file_orc, file_format=FileFormat.ORC, file_size_in_bytes=1925)
2436+
datafile.spec_id = 0
2437+
return FileScanTask(
2438+
data_file=datafile,
2439+
)
2440+
2441+
24162442
@pytest.fixture(scope="session")
24172443
def warehouse(tmp_path_factory: pytest.TempPathFactory) -> Path:
24182444
return tmp_path_factory.mktemp("test_sql")
@@ -2442,6 +2468,24 @@ def table_v2(example_table_metadata_v2: Dict[str, Any]) -> Table:
24422468
)
24432469

24442470

2471+
@pytest.fixture
2472+
def table_v2_orc(example_table_metadata_v2: Dict[str, Any]) -> Table:
2473+
import copy
2474+
2475+
metadata_dict = copy.deepcopy(example_table_metadata_v2)
2476+
if not metadata_dict["properties"]:
2477+
metadata_dict["properties"] = {}
2478+
metadata_dict["properties"]["write.format.default"] = "ORC"
2479+
table_metadata = TableMetadataV2(**metadata_dict)
2480+
return Table(
2481+
identifier=("database", "table_orc"),
2482+
metadata=table_metadata,
2483+
metadata_location=f"{table_metadata.location}/uuid.metadata.json",
2484+
io=load_file_io(),
2485+
catalog=NoopCatalog("NoopCatalog"),
2486+
)
2487+
2488+
24452489
@pytest.fixture
24462490
def table_v2_with_fixed_and_decimal_types(
24472491
table_metadata_v2_with_fixed_and_decimal_types: Dict[str, Any],

tests/integration/test_writes/test_writes.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
from pyiceberg.exceptions import CommitFailedException, NoSuchTableError
4747
from pyiceberg.expressions import And, EqualTo, GreaterThanOrEqual, In, LessThan, Not
4848
from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException, _dataframe_to_data_files
49+
from pyiceberg.manifest import FileFormat
4950
from pyiceberg.partitioning import PartitionField, PartitionSpec
5051
from pyiceberg.schema import Schema
5152
from pyiceberg.table import TableProperties
@@ -709,6 +710,78 @@ def test_write_parquet_unsupported_properties(
709710
tbl.append(arrow_table_with_null)
710711

711712

713+
@pytest.mark.integration
714+
@pytest.mark.parametrize("format_version", [1, 2])
715+
def test_spark_writes_orc_pyiceberg_reads(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None:
716+
"""Test that ORC files written by Spark can be read by PyIceberg."""
717+
identifier = f"default.spark_writes_orc_pyiceberg_reads_v{format_version}"
718+
719+
# Create test data
720+
test_data = [
721+
(1, "Alice", 25, True),
722+
(2, "Bob", 30, False),
723+
(3, "Charlie", 35, True),
724+
(4, "David", 28, True),
725+
(5, "Eve", 32, False),
726+
]
727+
728+
# Create Spark DataFrame
729+
spark_df = spark.createDataFrame(test_data, ["id", "name", "age", "is_active"])
730+
731+
# Ensure a clean slate to avoid replacing a v2 table with v1
732+
spark.sql(f"DROP TABLE IF EXISTS {identifier}")
733+
734+
# Create table with Spark using ORC format and desired format-version
735+
spark_df.writeTo(identifier).using("iceberg").tableProperty("write.format.default", "orc").tableProperty(
736+
"format-version", str(format_version)
737+
).createOrReplace()
738+
739+
# Write data with ORC format using Spark
740+
spark_df.writeTo(identifier).using("iceberg").append()
741+
742+
# Read with PyIceberg - this is the main focus of our validation
743+
tbl = session_catalog.load_table(identifier)
744+
pyiceberg_df = tbl.scan().to_pandas()
745+
746+
# Verify PyIceberg results have the expected number of rows
747+
assert len(pyiceberg_df) == 10 # 5 rows from create + 5 rows from append
748+
749+
# Verify PyIceberg column names
750+
assert list(pyiceberg_df.columns) == ["id", "name", "age", "is_active"]
751+
752+
# Verify PyIceberg data integrity - check the actual data values
753+
expected_data = [
754+
(1, "Alice", 25, True),
755+
(2, "Bob", 30, False),
756+
(3, "Charlie", 35, True),
757+
(4, "David", 28, True),
758+
(5, "Eve", 32, False),
759+
]
760+
761+
# Verify PyIceberg results contain the expected data (appears twice due to create + append)
762+
pyiceberg_data = list(zip(pyiceberg_df["id"], pyiceberg_df["name"], pyiceberg_df["age"], pyiceberg_df["is_active"]))
763+
assert pyiceberg_data == expected_data + expected_data # Data should appear twice
764+
765+
# Verify PyIceberg data types are correct
766+
assert pyiceberg_df["id"].dtype == "int64"
767+
assert pyiceberg_df["name"].dtype == "object" # string
768+
assert pyiceberg_df["age"].dtype == "int64"
769+
assert pyiceberg_df["is_active"].dtype == "bool"
770+
771+
# Cross-validate with Spark to ensure consistency (ensure deterministic ordering)
772+
spark_result = spark.sql(f"SELECT * FROM {identifier}").toPandas()
773+
sort_cols = ["id", "name", "age", "is_active"]
774+
spark_result = spark_result.sort_values(by=sort_cols).reset_index(drop=True)
775+
pyiceberg_df = pyiceberg_df.sort_values(by=sort_cols).reset_index(drop=True)
776+
pandas.testing.assert_frame_equal(spark_result, pyiceberg_df, check_dtype=False)
777+
778+
# Verify the files are actually ORC format
779+
files = list(tbl.scan().plan_files())
780+
assert len(files) > 0
781+
for file_task in files:
782+
assert file_task.file.file_format == FileFormat.ORC
783+
784+
712785
@pytest.mark.integration
713786
def test_invalid_arguments(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
714787
identifier = "default.arrow_data_files"

0 commit comments

Comments
 (0)