Skip to content

Commit 8aeab49

Browse files
authored
Pyarrow IO property for configuring large v small types on read (#986)
* upyarrow IO property for configuring large v small types on read * tests * adopt feedback * use property_as_bool * fix * docs * nits * respect flag on promotion * lint --------- Co-authored-by: Sung Yun <[email protected]>
1 parent ba85dd1 commit 8aeab49

File tree

5 files changed

+174
-11
lines changed

5 files changed

+174
-11
lines changed

mkdocs/docs/configuration.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,16 @@ For the FileIO there are several configuration options available:
137137

138138
<!-- markdown-link-check-enable-->
139139

140+
### PyArrow
141+
142+
<!-- markdown-link-check-disable -->
143+
144+
| Key | Example | Description |
145+
| ------------------------------- | ------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
146+
| pyarrow.use-large-types-on-read | True | Use large PyArrow types i.e. [large_string](https://arrow.apache.org/docs/python/generated/pyarrow.large_string.html), [large_binary](https://arrow.apache.org/docs/python/generated/pyarrow.large_binary.html) and [large_list](https://arrow.apache.org/docs/python/generated/pyarrow.large_list.html) field types on table scans. The default value is True. |
147+
148+
<!-- markdown-link-check-enable-->
149+
140150
## Catalogs
141151

142152
PyIceberg currently has native catalog type support for REST, SQL, Hive, Glue and DynamoDB.

pyiceberg/io/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
GCS_ENDPOINT = "gcs.endpoint"
8181
GCS_DEFAULT_LOCATION = "gcs.default-bucket-location"
8282
GCS_VERSION_AWARE = "gcs.version-aware"
83+
PYARROW_USE_LARGE_TYPES_ON_READ = "pyarrow.use-large-types-on-read"
8384

8485

8586
@runtime_checkable

pyiceberg/io/pyarrow.py

Lines changed: 71 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@
9595
HDFS_KERB_TICKET,
9696
HDFS_PORT,
9797
HDFS_USER,
98+
PYARROW_USE_LARGE_TYPES_ON_READ,
9899
S3_ACCESS_KEY_ID,
99100
S3_CONNECT_TIMEOUT,
100101
S3_ENDPOINT,
@@ -158,7 +159,7 @@
158159
from pyiceberg.utils.config import Config
159160
from pyiceberg.utils.datetime import millis_to_datetime
160161
from pyiceberg.utils.deprecated import deprecated
161-
from pyiceberg.utils.properties import get_first_property_value, property_as_int
162+
from pyiceberg.utils.properties import get_first_property_value, property_as_bool, property_as_int
162163
from pyiceberg.utils.singleton import Singleton
163164
from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string
164165

@@ -835,6 +836,10 @@ def _pyarrow_schema_ensure_large_types(schema: pa.Schema) -> pa.Schema:
835836
return visit_pyarrow(schema, _ConvertToLargeTypes())
836837

837838

839+
def _pyarrow_schema_ensure_small_types(schema: pa.Schema) -> pa.Schema:
840+
return visit_pyarrow(schema, _ConvertToSmallTypes())
841+
842+
838843
@singledispatch
839844
def visit_pyarrow(obj: Union[pa.DataType, pa.Schema], visitor: PyArrowSchemaVisitor[T]) -> T:
840845
"""Apply a pyarrow schema visitor to any point within a schema.
@@ -876,7 +881,6 @@ def _(obj: Union[pa.ListType, pa.LargeListType, pa.FixedSizeListType], visitor:
876881
visitor.before_list_element(obj.value_field)
877882
result = visit_pyarrow(obj.value_type, visitor)
878883
visitor.after_list_element(obj.value_field)
879-
880884
return visitor.list(obj, result)
881885

882886

@@ -1145,6 +1149,30 @@ def primitive(self, primitive: pa.DataType) -> pa.DataType:
11451149
return primitive
11461150

11471151

1152+
class _ConvertToSmallTypes(PyArrowSchemaVisitor[Union[pa.DataType, pa.Schema]]):
1153+
def schema(self, schema: pa.Schema, struct_result: pa.StructType) -> pa.Schema:
1154+
return pa.schema(struct_result)
1155+
1156+
def struct(self, struct: pa.StructType, field_results: List[pa.Field]) -> pa.StructType:
1157+
return pa.struct(field_results)
1158+
1159+
def field(self, field: pa.Field, field_result: pa.DataType) -> pa.Field:
1160+
return field.with_type(field_result)
1161+
1162+
def list(self, list_type: pa.ListType, element_result: pa.DataType) -> pa.DataType:
1163+
return pa.list_(element_result)
1164+
1165+
def map(self, map_type: pa.MapType, key_result: pa.DataType, value_result: pa.DataType) -> pa.DataType:
1166+
return pa.map_(key_result, value_result)
1167+
1168+
def primitive(self, primitive: pa.DataType) -> pa.DataType:
1169+
if primitive == pa.large_string():
1170+
return pa.string()
1171+
elif primitive == pa.large_binary():
1172+
return pa.binary()
1173+
return primitive
1174+
1175+
11481176
class _ConvertToIcebergWithoutIDs(_ConvertToIceberg):
11491177
"""
11501178
Converts PyArrowSchema to Iceberg Schema with all -1 ids.
@@ -1169,6 +1197,7 @@ def _task_to_record_batches(
11691197
positional_deletes: Optional[List[ChunkedArray]],
11701198
case_sensitive: bool,
11711199
name_mapping: Optional[NameMapping] = None,
1200+
use_large_types: bool = True,
11721201
) -> Iterator[pa.RecordBatch]:
11731202
_, _, path = PyArrowFileIO.parse_location(task.file.file_path)
11741203
arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8))
@@ -1197,7 +1226,9 @@ def _task_to_record_batches(
11971226
# https://github.com/apache/arrow/issues/41884
11981227
# https://github.com/apache/arrow/issues/43183
11991228
# Would be good to remove this later on
1200-
schema=_pyarrow_schema_ensure_large_types(physical_schema),
1229+
schema=_pyarrow_schema_ensure_large_types(physical_schema)
1230+
if use_large_types
1231+
else (_pyarrow_schema_ensure_small_types(physical_schema)),
12011232
# This will push down the query to Arrow.
12021233
# But in case there are positional deletes, we have to apply them first
12031234
filter=pyarrow_filter if not positional_deletes else None,
@@ -1219,7 +1250,9 @@ def _task_to_record_batches(
12191250
arrow_table = pa.Table.from_batches([batch])
12201251
arrow_table = arrow_table.filter(pyarrow_filter)
12211252
batch = arrow_table.to_batches()[0]
1222-
yield _to_requested_schema(projected_schema, file_project_schema, batch, downcast_ns_timestamp_to_us=True)
1253+
yield _to_requested_schema(
1254+
projected_schema, file_project_schema, batch, downcast_ns_timestamp_to_us=True, use_large_types=use_large_types
1255+
)
12231256
current_index += len(batch)
12241257

12251258

@@ -1232,10 +1265,19 @@ def _task_to_table(
12321265
positional_deletes: Optional[List[ChunkedArray]],
12331266
case_sensitive: bool,
12341267
name_mapping: Optional[NameMapping] = None,
1268+
use_large_types: bool = True,
12351269
) -> Optional[pa.Table]:
12361270
batches = list(
12371271
_task_to_record_batches(
1238-
fs, task, bound_row_filter, projected_schema, projected_field_ids, positional_deletes, case_sensitive, name_mapping
1272+
fs,
1273+
task,
1274+
bound_row_filter,
1275+
projected_schema,
1276+
projected_field_ids,
1277+
positional_deletes,
1278+
case_sensitive,
1279+
name_mapping,
1280+
use_large_types,
12391281
)
12401282
)
12411283

@@ -1303,6 +1345,8 @@ def project_table(
13031345
# When FsSpec is not installed
13041346
raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}") from e
13051347

1348+
use_large_types = property_as_bool(io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, True)
1349+
13061350
bound_row_filter = bind(table_metadata.schema(), row_filter, case_sensitive=case_sensitive)
13071351

13081352
projected_field_ids = {
@@ -1322,6 +1366,7 @@ def project_table(
13221366
deletes_per_file.get(task.file.file_path),
13231367
case_sensitive,
13241368
table_metadata.name_mapping(),
1369+
use_large_types,
13251370
)
13261371
for task in tasks
13271372
]
@@ -1394,6 +1439,8 @@ def project_batches(
13941439
# When FsSpec is not installed
13951440
raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}") from e
13961441

1442+
use_large_types = property_as_bool(io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, True)
1443+
13971444
bound_row_filter = bind(table_metadata.schema(), row_filter, case_sensitive=case_sensitive)
13981445

13991446
projected_field_ids = {
@@ -1414,6 +1461,7 @@ def project_batches(
14141461
deletes_per_file.get(task.file.file_path),
14151462
case_sensitive,
14161463
table_metadata.name_mapping(),
1464+
use_large_types,
14171465
)
14181466
for batch in batches:
14191467
if limit is not None:
@@ -1447,12 +1495,13 @@ def _to_requested_schema(
14471495
batch: pa.RecordBatch,
14481496
downcast_ns_timestamp_to_us: bool = False,
14491497
include_field_ids: bool = False,
1498+
use_large_types: bool = True,
14501499
) -> pa.RecordBatch:
14511500
# We could re-use some of these visitors
14521501
struct_array = visit_with_partner(
14531502
requested_schema,
14541503
batch,
1455-
ArrowProjectionVisitor(file_schema, downcast_ns_timestamp_to_us, include_field_ids),
1504+
ArrowProjectionVisitor(file_schema, downcast_ns_timestamp_to_us, include_field_ids, use_large_types),
14561505
ArrowAccessor(file_schema),
14571506
)
14581507
return pa.RecordBatch.from_struct_array(struct_array)
@@ -1462,20 +1511,31 @@ class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Arra
14621511
_file_schema: Schema
14631512
_include_field_ids: bool
14641513
_downcast_ns_timestamp_to_us: bool
1514+
_use_large_types: bool
14651515

1466-
def __init__(self, file_schema: Schema, downcast_ns_timestamp_to_us: bool = False, include_field_ids: bool = False) -> None:
1516+
def __init__(
1517+
self,
1518+
file_schema: Schema,
1519+
downcast_ns_timestamp_to_us: bool = False,
1520+
include_field_ids: bool = False,
1521+
use_large_types: bool = True,
1522+
) -> None:
14671523
self._file_schema = file_schema
14681524
self._include_field_ids = include_field_ids
14691525
self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
1526+
self._use_large_types = use_large_types
14701527

14711528
def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array:
14721529
file_field = self._file_schema.find_field(field.field_id)
14731530

14741531
if field.field_type.is_primitive:
14751532
if field.field_type != file_field.field_type:
1476-
return values.cast(
1477-
schema_to_pyarrow(promote(file_field.field_type, field.field_type), include_field_ids=self._include_field_ids)
1533+
target_schema = schema_to_pyarrow(
1534+
promote(file_field.field_type, field.field_type), include_field_ids=self._include_field_ids
14781535
)
1536+
if not self._use_large_types:
1537+
target_schema = _pyarrow_schema_ensure_small_types(target_schema)
1538+
return values.cast(target_schema)
14791539
elif (target_type := schema_to_pyarrow(field.field_type, include_field_ids=self._include_field_ids)) != values.type:
14801540
if field.field_type == TimestampType():
14811541
# Downcasting of nanoseconds to microseconds
@@ -1547,12 +1607,13 @@ def field(self, field: NestedField, _: Optional[pa.Array], field_array: Optional
15471607

15481608
def list(self, list_type: ListType, list_array: Optional[pa.Array], value_array: Optional[pa.Array]) -> Optional[pa.Array]:
15491609
if isinstance(list_array, (pa.ListArray, pa.LargeListArray, pa.FixedSizeListArray)) and value_array is not None:
1610+
list_initializer = pa.large_list if isinstance(list_array, pa.LargeListArray) else pa.list_
15501611
if isinstance(value_array, pa.StructArray):
15511612
# This can be removed once this has been fixed:
15521613
# https://github.com/apache/arrow/issues/38809
15531614
list_array = pa.LargeListArray.from_arrays(list_array.offsets, value_array)
15541615
value_array = self._cast_if_needed(list_type.element_field, value_array)
1555-
arrow_field = pa.large_list(self._construct_field(list_type.element_field, value_array.type))
1616+
arrow_field = list_initializer(self._construct_field(list_type.element_field, value_array.type))
15561617
return list_array.cast(arrow_field)
15571618
else:
15581619
return None

tests/integration/test_reads.py

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,14 @@
4040
NotEqualTo,
4141
NotNaN,
4242
)
43-
from pyiceberg.io.pyarrow import pyarrow_to_schema
43+
from pyiceberg.io import PYARROW_USE_LARGE_TYPES_ON_READ
44+
from pyiceberg.io.pyarrow import (
45+
pyarrow_to_schema,
46+
)
4447
from pyiceberg.schema import Schema
4548
from pyiceberg.table import Table
4649
from pyiceberg.types import (
50+
BinaryType,
4751
BooleanType,
4852
IntegerType,
4953
NestedField,
@@ -665,6 +669,87 @@ def another_task() -> None:
665669
assert table.properties.get("lock") == "xxx"
666670

667671

672+
@pytest.mark.integration
673+
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
674+
def test_table_scan_default_to_large_types(catalog: Catalog) -> None:
675+
identifier = "default.test_table_scan_default_to_large_types"
676+
arrow_table = pa.Table.from_arrays(
677+
[
678+
pa.array(["a", "b", "c"]),
679+
pa.array(["a", "b", "c"]),
680+
pa.array([b"a", b"b", b"c"]),
681+
pa.array([["a", "b"], ["c", "d"], ["e", "f"]]),
682+
],
683+
names=["string", "string-to-binary", "binary", "list"],
684+
)
685+
686+
try:
687+
catalog.drop_table(identifier)
688+
except NoSuchTableError:
689+
pass
690+
691+
tbl = catalog.create_table(
692+
identifier,
693+
schema=arrow_table.schema,
694+
)
695+
696+
tbl.append(arrow_table)
697+
698+
with tbl.update_schema() as update_schema:
699+
update_schema.update_column("string-to-binary", BinaryType())
700+
701+
result_table = tbl.scan().to_arrow()
702+
703+
expected_schema = pa.schema([
704+
pa.field("string", pa.large_string()),
705+
pa.field("string-to-binary", pa.large_binary()),
706+
pa.field("binary", pa.large_binary()),
707+
pa.field("list", pa.large_list(pa.large_string())),
708+
])
709+
assert result_table.schema.equals(expected_schema)
710+
711+
712+
@pytest.mark.integration
713+
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
714+
def test_table_scan_override_with_small_types(catalog: Catalog) -> None:
715+
identifier = "default.test_table_scan_override_with_small_types"
716+
arrow_table = pa.Table.from_arrays(
717+
[
718+
pa.array(["a", "b", "c"]),
719+
pa.array(["a", "b", "c"]),
720+
pa.array([b"a", b"b", b"c"]),
721+
pa.array([["a", "b"], ["c", "d"], ["e", "f"]]),
722+
],
723+
names=["string", "string-to-binary", "binary", "list"],
724+
)
725+
726+
try:
727+
catalog.drop_table(identifier)
728+
except NoSuchTableError:
729+
pass
730+
731+
tbl = catalog.create_table(
732+
identifier,
733+
schema=arrow_table.schema,
734+
)
735+
736+
tbl.append(arrow_table)
737+
738+
with tbl.update_schema() as update_schema:
739+
update_schema.update_column("string-to-binary", BinaryType())
740+
741+
tbl.io.properties[PYARROW_USE_LARGE_TYPES_ON_READ] = "False"
742+
result_table = tbl.scan().to_arrow()
743+
744+
expected_schema = pa.schema([
745+
pa.field("string", pa.string()),
746+
pa.field("string-to-binary", pa.binary()),
747+
pa.field("binary", pa.binary()),
748+
pa.field("list", pa.list_(pa.string())),
749+
])
750+
assert result_table.schema.equals(expected_schema)
751+
752+
668753
@pytest.mark.integration
669754
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
670755
def test_empty_scan_ordered_str(catalog: Catalog) -> None:

tests/io/test_pyarrow_visitor.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
_HasIds,
4141
_NullNaNUnmentionedTermsCollector,
4242
_pyarrow_schema_ensure_large_types,
43+
_pyarrow_schema_ensure_small_types,
4344
pyarrow_to_schema,
4445
schema_to_pyarrow,
4546
visit_pyarrow,
@@ -596,6 +597,11 @@ def test_pyarrow_schema_ensure_large_types(pyarrow_schema_nested_without_ids: pa
596597
assert _pyarrow_schema_ensure_large_types(pyarrow_schema_nested_without_ids) == expected_schema
597598

598599

600+
def test_pyarrow_schema_round_trip_ensure_large_types_and_then_small_types(pyarrow_schema_nested_without_ids: pa.Schema) -> None:
601+
schema_with_large_types = _pyarrow_schema_ensure_large_types(pyarrow_schema_nested_without_ids)
602+
assert _pyarrow_schema_ensure_small_types(schema_with_large_types) == pyarrow_schema_nested_without_ids
603+
604+
599605
@pytest.fixture
600606
def bound_reference_str() -> BoundReference[Any]:
601607
return BoundReference(

0 commit comments

Comments
 (0)