Skip to content

Commit 1f47ce2

Browse files
author
Tom McCormick
committed
read is working
1 parent 7fc9705 commit 1f47ce2

File tree

1 file changed

+50
-14
lines changed

1 file changed

+50
-14
lines changed

pyiceberg/io/pyarrow.py

Lines changed: 50 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,8 @@
196196
ICEBERG_SCHEMA = b"iceberg.schema"
197197
# The PARQUET: in front means that it is Parquet specific, in this case the field_id
198198
PYARROW_PARQUET_FIELD_ID_KEY = b"PARQUET:field_id"
199+
# ORC stores IDs as string metadata
200+
ORC_FIELD_ID_KEY = b"iceberg.id"
199201
PYARROW_FIELD_DOC_KEY = b"doc"
200202
LIST_ELEMENT_NAME = "element"
201203
MAP_KEY_NAME = "key"
@@ -388,14 +390,28 @@ def __init__(self, properties: Properties = EMPTY_DICT):
388390

389391
@staticmethod
390392
def parse_location(location: str) -> Tuple[str, str, str]:
391-
"""Return the path without the scheme."""
393+
"""Return (scheme, netloc, path) for the given location.
394+
Uses environment variables DEFAULT_SCHEME and DEFAULT_NETLOC
395+
if scheme/netloc are missing.
396+
"""
392397
uri = urlparse(location)
393-
if not uri.scheme:
394-
return "file", uri.netloc, os.path.abspath(location)
395-
elif uri.scheme in ("hdfs", "viewfs"):
396-
return uri.scheme, uri.netloc, uri.path
398+
399+
# Load defaults from environment
400+
default_scheme = os.getenv("DEFAULT_SCHEME", "file")
401+
default_netloc = os.getenv("DEFAULT_NETLOC", "")
402+
403+
# Apply logic
404+
scheme = uri.scheme or default_scheme
405+
netloc = uri.netloc or default_netloc
406+
407+
if scheme in ("hdfs", "viewfs"):
408+
return scheme, netloc, uri.path
397409
else:
398-
return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}"
410+
# For non-HDFS URIs, include netloc in the path if present
411+
path = uri.path if uri.scheme else os.path.abspath(location)
412+
if netloc and not path.startswith(netloc):
413+
path = f"{netloc}{path}"
414+
return scheme, netloc, path
399415

400416
def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSystem:
401417
"""Initialize FileSystem for different scheme."""
@@ -575,7 +591,7 @@ def _initialize_gcs_fs(self) -> FileSystem:
575591
def _initialize_local_fs(self) -> FileSystem:
576592
return PyArrowLocalFileSystem()
577593

578-
def new_input(self, location: str) -> PyArrowFile:
594+
def new_input(self, location: str, fs: Optional[FileIO] = None) -> PyArrowFile:
579595
"""Get a PyArrowFile instance to read bytes from the file at the given location.
580596
581597
Args:
@@ -585,8 +601,11 @@ def new_input(self, location: str) -> PyArrowFile:
585601
PyArrowFile: A PyArrowFile instance for the given location.
586602
"""
587603
scheme, netloc, path = self.parse_location(location)
604+
logger.warning(f"Scheme: {scheme}, Netloc: {netloc}, Path: {path}")
605+
if not fs:
606+
fs = self.fs_by_scheme(scheme, netloc)
588607
return PyArrowFile(
589-
fs=self.fs_by_scheme(scheme, netloc),
608+
fs=fs,
590609
location=location,
591610
path=path,
592611
buffer_size=int(self.properties.get(BUFFER_SIZE, ONE_MEGABYTE)),
@@ -1022,7 +1041,11 @@ def _combine_positional_deletes(positional_deletes: List[pa.ChunkedArray], start
10221041
def pyarrow_to_schema(
10231042
schema: pa.Schema, name_mapping: Optional[NameMapping] = None, downcast_ns_timestamp_to_us: bool = False
10241043
) -> Schema:
1025-
has_ids = visit_pyarrow(schema, _HasIds())
1044+
logger.warning(f"schema {schema}")
1045+
hids = _HasIds()
1046+
logger.warning("hasIds")
1047+
has_ids = visit_pyarrow(schema, hids)
1048+
logger.warning(f"has_ids is {has_ids}, name_mapping is {name_mapping}")
10261049
if has_ids:
10271050
return visit_pyarrow(schema, _ConvertToIceberg(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us))
10281051
elif name_mapping is not None:
@@ -1179,11 +1202,22 @@ def primitive(self, primitive: pa.DataType) -> T:
11791202

11801203

11811204
def _get_field_id(field: pa.Field) -> Optional[int]:
1182-
return (
1183-
int(field_id_str.decode())
1184-
if (field.metadata and (field_id_str := field.metadata.get(PYARROW_PARQUET_FIELD_ID_KEY)))
1185-
else None
1186-
)
1205+
"""Return the Iceberg field ID from Parquet or ORC metadata if available."""
1206+
if not field.metadata:
1207+
return None
1208+
1209+
# Try Parquet field ID first
1210+
field_id_bytes = field.metadata.get(PYARROW_PARQUET_FIELD_ID_KEY)
1211+
if field_id_bytes:
1212+
return int(field_id_bytes.decode())
1213+
1214+
# Fallback: try ORC field ID
1215+
field_id_bytes = field.metadata.get(ORC_FIELD_ID_KEY)
1216+
if field_id_bytes:
1217+
return int(field_id_bytes.decode())
1218+
1219+
return None
1220+
11871221

11881222

11891223
class _HasIds(PyArrowSchemaVisitor[bool]):
@@ -1434,6 +1468,7 @@ def _task_to_record_batches(
14341468
name_mapping: Optional[NameMapping] = None,
14351469
partition_spec: Optional[PartitionSpec] = None,
14361470
) -> Iterator[pa.RecordBatch]:
1471+
logger.warning(f"file format is {task.file.file_format}")
14371472
if task.file.file_format == FileFormat.PARQUET:
14381473
arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8))
14391474
elif task.file.file_format == FileFormat.ORC:
@@ -1443,6 +1478,7 @@ def _task_to_record_batches(
14431478
with io.new_input(task.file.file_path).open() as fin:
14441479
fragment = arrow_format.make_fragment(fin)
14451480
physical_schema = fragment.physical_schema
1481+
logger.warning(f"formats: filepath {task.file.file_path}, fragment {fragment}, physical_schema {physical_schema}")
14461482
# In V1 and V2 table formats, we only support Timestamp 'us' in Iceberg Schema
14471483
# Hence it is reasonable to always cast 'ns' timestamp to 'us' on read.
14481484
# When V3 support is introduced, we will update `downcast_ns_timestamp_to_us` flag based on

0 commit comments

Comments
 (0)