Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions python/pyarrow/parquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,69 @@ def _check_filters(filters, check_null_strings=True):
return filters


def _map_spark_to_arrow_types(datatype: pa.DataType) -> str | None:
lookup = {
"NA": "null",
"BOOL": "boolean",
"INT8": "byte",
"INT16": "short",
**dict.fromkeys(
["UINT8", "UINT16", "UINT32", "UINT64", "INT32"], "integer"),
"INT64": "long",
**dict.fromkeys(["HALF_FLOAT", "FLOAT"], "float"),
"DOUBLE": "double",
"BINARY": "binary",
"STRING": "string",
**dict.fromkeys(
["DECIMAL" + str(2 ** i) for i in range(5, 9)], "decimal"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and decimal precision

**dict.fromkeys(
["LIST", "LARGE_LIST", "LIST_VIEW", "LARGE_LIST_VIEW", "FIXED_SIZE_LIST"],
"array",
),
"MAP": "map",
**dict.fromkeys(["DATE32", "DATE64"], "date"),
"TIMESTAMP": "timestamp",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and timestamp ntz and ltz.

"INTERVAL_MONTH_DAY_NANO": "Calendar Interval", # TODO: Correct this
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH, I think it's too much to handle Spark specific cases like this in PyArrow. Spark is even adding more types such as variant type, and more interval types.

}

str_value = pa.types.TypesEnum(datatype.id).name

try:
return lookup[str_value]
except KeyError:
return None


def _substitute_spark_metadata(schema: pa.Schema) -> dict:
metadata = schema.metadata
spark_key = b"org.apache.spark.sql.parquet.row.metadata"

try:
spark_row_metadata = json.loads(
schema.metadata.pop(spark_key, None))
except (TypeError, json.JSONDecodeError): # Could not convert Spark's row metadata
return metadata

spark_fields = [field["name"] for field in spark_row_metadata["fields"]]

for name in schema.names:
field = schema.field(name)

if name in spark_fields:
continue

spark_row_metadata["fields"].append({
"name": field.name,
"type": _map_spark_to_arrow_types(field.type),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And seem like we're missing nested types like struct type and array/map

"nullable": field.nullable,
"metadata": {},
})

metadata[spark_key] = json.dumps(spark_row_metadata).encode("utf-8")

return metadata


_DNF_filter_doc = """Predicates are expressed using an ``Expression`` or using
the disjunctive normal form (DNF), like ``[[('x', '=', 0), ...], ...]``.
DNF allows arbitrary boolean logical combinations of single column predicates.
Expand Down Expand Up @@ -1954,6 +2017,11 @@ def write_table(table, where, row_group_size=None, version='2.6',
# update it in write_to_dataset and _dataset_parquet.pyx ParquetFileWriteOptions
row_group_size = kwargs.pop('chunk_size', row_group_size)
use_int96 = use_deprecated_int96_timestamps

if flavor == "spark" and b"org.apache.spark.sql.parquet.row.metadata" in table.schema.metadata:
new_metadata = _substitute_spark_metadata(table.schema)
table = table.replace_schema_metadata(new_metadata)

try:
with ParquetWriter(
where, table.schema,
Expand Down
Loading