Skip to content

Commit 355359e

Browse files
author
Tom McCormick
committed
read is working
1 parent 289f9f6 commit 355359e

File tree

1 file changed

+50
-14
lines changed

1 file changed

+50
-14
lines changed

pyiceberg/io/pyarrow.py

Lines changed: 50 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,8 @@
197197
ICEBERG_SCHEMA = b"iceberg.schema"
198198
# The PARQUET: in front means that it is Parquet specific, in this case the field_id
199199
PYARROW_PARQUET_FIELD_ID_KEY = b"PARQUET:field_id"
200+
# ORC stores IDs as string metadata
201+
ORC_FIELD_ID_KEY = b"iceberg.id"
200202
PYARROW_FIELD_DOC_KEY = b"doc"
201203
LIST_ELEMENT_NAME = "element"
202204
MAP_KEY_NAME = "key"
@@ -389,14 +391,28 @@ def __init__(self, properties: Properties = EMPTY_DICT):
389391

390392
@staticmethod
391393
def parse_location(location: str) -> Tuple[str, str, str]:
392-
"""Return the path without the scheme."""
394+
"""Return (scheme, netloc, path) for the given location.
395+
Uses environment variables DEFAULT_SCHEME and DEFAULT_NETLOC
396+
if scheme/netloc are missing.
397+
"""
393398
uri = urlparse(location)
394-
if not uri.scheme:
395-
return "file", uri.netloc, os.path.abspath(location)
396-
elif uri.scheme in ("hdfs", "viewfs"):
397-
return uri.scheme, uri.netloc, uri.path
399+
400+
# Load defaults from environment
401+
default_scheme = os.getenv("DEFAULT_SCHEME", "file")
402+
default_netloc = os.getenv("DEFAULT_NETLOC", "")
403+
404+
# Apply logic
405+
scheme = uri.scheme or default_scheme
406+
netloc = uri.netloc or default_netloc
407+
408+
if scheme in ("hdfs", "viewfs"):
409+
return scheme, netloc, uri.path
398410
else:
399-
return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}"
411+
# For non-HDFS URIs, include netloc in the path if present
412+
path = uri.path if uri.scheme else os.path.abspath(location)
413+
if netloc and not path.startswith(netloc):
414+
path = f"{netloc}{path}"
415+
return scheme, netloc, path
400416

401417
def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSystem:
402418
"""Initialize FileSystem for different scheme."""
@@ -576,7 +592,7 @@ def _initialize_gcs_fs(self) -> FileSystem:
576592
def _initialize_local_fs(self) -> FileSystem:
577593
return PyArrowLocalFileSystem()
578594

579-
def new_input(self, location: str) -> PyArrowFile:
595+
def new_input(self, location: str, fs: Optional[FileIO] = None) -> PyArrowFile:
580596
"""Get a PyArrowFile instance to read bytes from the file at the given location.
581597
582598
Args:
@@ -586,8 +602,11 @@ def new_input(self, location: str) -> PyArrowFile:
586602
PyArrowFile: A PyArrowFile instance for the given location.
587603
"""
588604
scheme, netloc, path = self.parse_location(location)
605+
logger.warning(f"Scheme: {scheme}, Netloc: {netloc}, Path: {path}")
606+
if not fs:
607+
fs = self.fs_by_scheme(scheme, netloc)
589608
return PyArrowFile(
590-
fs=self.fs_by_scheme(scheme, netloc),
609+
fs=fs,
591610
location=location,
592611
path=path,
593612
buffer_size=int(self.properties.get(BUFFER_SIZE, ONE_MEGABYTE)),
@@ -1023,7 +1042,11 @@ def _combine_positional_deletes(positional_deletes: List[pa.ChunkedArray], start
10231042
def pyarrow_to_schema(
10241043
schema: pa.Schema, name_mapping: Optional[NameMapping] = None, downcast_ns_timestamp_to_us: bool = False
10251044
) -> Schema:
1026-
has_ids = visit_pyarrow(schema, _HasIds())
1045+
logger.warning(f"schema {schema}")
1046+
hids = _HasIds()
1047+
logger.warning("hasIds")
1048+
has_ids = visit_pyarrow(schema, hids)
1049+
logger.warning(f"has_ids is {has_ids}, name_mapping is {name_mapping}")
10271050
if has_ids:
10281051
return visit_pyarrow(schema, _ConvertToIceberg(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us))
10291052
elif name_mapping is not None:
@@ -1180,11 +1203,22 @@ def primitive(self, primitive: pa.DataType) -> T:
11801203

11811204

11821205
def _get_field_id(field: pa.Field) -> Optional[int]:
1183-
return (
1184-
int(field_id_str.decode())
1185-
if (field.metadata and (field_id_str := field.metadata.get(PYARROW_PARQUET_FIELD_ID_KEY)))
1186-
else None
1187-
)
1206+
"""Return the Iceberg field ID from Parquet or ORC metadata if available."""
1207+
if not field.metadata:
1208+
return None
1209+
1210+
# Try Parquet field ID first
1211+
field_id_bytes = field.metadata.get(PYARROW_PARQUET_FIELD_ID_KEY)
1212+
if field_id_bytes:
1213+
return int(field_id_bytes.decode())
1214+
1215+
# Fallback: try ORC field ID
1216+
field_id_bytes = field.metadata.get(ORC_FIELD_ID_KEY)
1217+
if field_id_bytes:
1218+
return int(field_id_bytes.decode())
1219+
1220+
return None
1221+
11881222

11891223

11901224
class _HasIds(PyArrowSchemaVisitor[bool]):
@@ -1453,6 +1487,7 @@ def _task_to_record_batches(
14531487
name_mapping: Optional[NameMapping] = None,
14541488
partition_spec: Optional[PartitionSpec] = None,
14551489
) -> Iterator[pa.RecordBatch]:
1490+
logger.warning(f"file format is {task.file.file_format}")
14561491
if task.file.file_format == FileFormat.PARQUET:
14571492
arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8))
14581493
elif task.file.file_format == FileFormat.ORC:
@@ -1462,6 +1497,7 @@ def _task_to_record_batches(
14621497
with io.new_input(task.file.file_path).open() as fin:
14631498
fragment = arrow_format.make_fragment(fin)
14641499
physical_schema = fragment.physical_schema
1500+
logger.warning(f"formats: filepath {task.file.file_path}, fragment {fragment}, physical_schema {physical_schema}")
14651501
# In V1 and V2 table formats, we only support Timestamp 'us' in Iceberg Schema
14661502
# Hence it is reasonable to always cast 'ns' timestamp to 'us' on read.
14671503
# When V3 support is introduced, we will update `downcast_ns_timestamp_to_us` flag based on

0 commit comments

Comments
 (0)