-
Notifications
You must be signed in to change notification settings - Fork 75
Expand file tree
/
Copy pathregen.py
More file actions
168 lines (146 loc) · 8.34 KB
/
regen.py
File metadata and controls
168 lines (146 loc) · 8.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# This program uses Apache Spark to generate example binary Variant data
#
# Requirements
# pip install pyarrow
# pip install pyspark
#
# Last run with Spark 4.0 preview 2:
# https://spark.apache.org/news/spark-4.0.0-preview2.html
from pyspark.sql import SparkSession
import pyarrow.parquet as pq
import os
import json
# Initialize Spark session and create variant data via SQL
spark = SparkSession.builder \
.appName("PySpark SQL Example") \
.getOrCreate()
# recursively cleanup the spark-warehouse directory
if os.path.exists('spark-warehouse'):
for root, dirs, files in os.walk('spark-warehouse', topdown=False):
for name in files:
os.remove(os.path.join(root, name))
for name in dirs:
os.rmdir(os.path.join(root, name))
# Create a table with variant and insert various types into it
#
# This writes data files into spark-warehouse/output
sql = """
CREATE TABLE T (name VARCHAR(2000), variant_col VARIANT);
-------------------------------
-- Primitive type (basic_type=0)
-------------------------------
-- One row with a value from each type listed in
-- https://github.com/apache/parquet-format/blob/master/VariantEncoding.md#encoding-types
--
-- Spark Types: https://spark.apache.org/docs/latest/sql-ref-datatypes.html
-- Note: must use explicit typecasts as Spark returns an error for implicit casts
INSERT INTO T VALUES ('primitive_null', NULL);
INSERT INTO T VALUES ('primitive_boolean_true', true::Variant);
INSERT INTO T VALUES ('primitive_boolean_false', false::Variant);
INSERT INTO T VALUES ('primitive_int8', 42::Byte::Variant);
INSERT INTO T VALUES ('primitive_int16', 1234::Short::Variant);
INSERT INTO T VALUES ('primitive_int32', 123456::Integer::Variant);
INSERT INTO T VALUES ('primitive_int64', 1234567890123456789::Long::Variant); -- must be too large to fit in int32: https://github.com/apache/parquet-testing/issues/82
INSERT INTO T VALUES ('primitive_double', 1234567890.1234::Double::Variant);
INSERT INTO T VALUES ('primitive_decimal4', 12.34::Decimal(8,2)::Variant);
INSERT INTO T VALUES ('primitive_decimal8', 12345678.90::Decimal(12,2)::Variant);
INSERT INTO T VALUES ('primitive_decimal16', 12345678912345678.90::Decimal(30,2)::Variant);
INSERT INTO T VALUES ('primitive_date', '2025-04-16'::Date::Variant);
INSERT INTO T VALUES ('primitive_timestamp', '2025-04-16T12:34:56.78'::Timestamp::Variant);
INSERT INTO T VALUES ('primitive_timestampntz', '2025-04-16T12:34:56.78'::Timestamp_NTZ::Variant);
INSERT INTO T VALUES ('primitive_float', 1234567890.1234::Float::Variant);
INSERT INTO T VALUES ('primitive_binary', X'31337deadbeefcafe'::Variant);
INSERT INTO T VALUES ('primitive_string', 'This string is longer than 64 bytes and therefore does not fit in a short_string and it also includes several non ascii characters such as 🐢, 💖, ♥️, 🎣 and 🤦!!'::Variant);
-- binary artifacts of 'TimeNTZ'/'timestamp with timezone (NANOS)'/'timestamp without time zone (NANOS)'/'UUID' was generated by the iceberg test code, please ref to https://github.com/apache/parquet-testing/pull/92 for more detail
-------------------------------
-- Short string (basic_type=1)
-------------------------------
INSERT INTO T VALUES ('short_string', 'Less than 64 bytes (❤️ with utf8)'::Variant);
-------------------------------
-- Object (basic_type=2)
-------------------------------
-- Use parse_json to create Variant, as spark does not seem to support casting structs --> Variant.
INSERT INTO T VALUES ('object_empty', parse_json('{}')::Variant);
INSERT INTO T VALUES ('object_primitive', parse_json('{"int_field" : 1, "double_field": 1.23456789, "boolean_true_field": true, "boolean_false_field": false, "string_field": "Apache Parquet", "null_field": null, "timestamp_field": "2025-04-16T12:34:56.78"}')::Variant);
INSERT INTO T VALUES ('object_nested', parse_json('{ "id" : 1, "species" : { "name": "lava monster", "population": 6789}, "observation" : { "time": "12:34:56", "location": "In the Volcano", "value" : { "temperature": 123, "humidity": 456 } } }')::Variant);
-- https://github.com/apache/parquet-testing/issues/77
-- TODO create example variant objects with fields that non-json types (like timestamp, date, etc)
-- Casting from "STRUCT<...>" to "VARIANT"" is not yet supported
-- INSERT INTO T VALUES ('object_primitive', struct(1234.56::Double as double_field, true as boolean_true_field, false as boolean_false_field, '2025-04-16T12:34:56.78'::Timestamp as timestamp_field, 'Apache Parquet' as string_field, null as null_field)::Variant);
--TODO objects with more than 2**8 distinct fields (that require using more than one byte for field offset)
--TODO objects with more than 2**16 distinct fields (that require using more than 2 bytes for field offset)
--TODO objects with more than 2**24 distinct fields (that require using more than 3 bytes for field offset)
-------------------------------
-- Array (basic_type=3)
-------------------------------
INSERT INTO T VALUES ('array_empty', parse_json('[]')::Variant);
INSERT INTO T VALUES ('array_primitive', parse_json('[2, 1, 5, 9]')::Variant);
INSERT INTO T VALUES ('array_nested', parse_json('[ { "id": 1, "thing": { "names": ["Contrarian", "Spider"] } }, null, { "id": 2, "type": "if", "names": ["Apple", "Ray", null] } ]')::Variant);
-- https://github.com/apache/parquet-testing/issues/78
-- TODO arrays with more than 2**8 distinct elements (that require using more than one byte for count)
-- TODO arrays where the total length of all values is greater than 2**8, 2**16, and 2**24 bytes (that require using more than one byte for the offsets)
-------------------------------
-- Output the value to a new table that also has the JSON representation of the variant column
-------------------------------
DROP TABLE IF EXISTS output;
CREATE TABLE output AS SELECT name, variant_col, to_json(variant_col) as json_col FROM T;
"""
for statement in sql.split("\n"):
statement = statement.strip()
if not statement or statement.startswith("--"):
continue
print("Running SQL:", statement)
spark.sql(statement)
mypath = 'spark-warehouse/output'
parquet_files = [f for f in os.listdir(mypath) if f.endswith('.parquet')]
# extract the values from the parquet files
data_dictionary = {}
for f in parquet_files:
table = pq.read_table(os.path.join(mypath, f))
for row in range(len(table)):
name = table[0][row]
# variants are stored as StructArrays with two fields:
# metadata, and value
variant_col = table[1][row]
metadata = variant_col['metadata']
value = variant_col['value']
json_value = table[2][row]
print("Writing metadata for", name)
# write the metadata, value, and json representation to files
with open(f"{name}.metadata", "wb") as f:
buffer = metadata.as_buffer()
if buffer is not None:
f.write(buffer)
with open(f"{name}.value", "wb") as f:
buffer = value.as_buffer()
if buffer is not None:
f.write(buffer)
# Add the JSON representation to the data dictionary
name = name.as_py()
json_value = json_value.as_py()
if json_value is not None:
data_dictionary[name] = json.loads(json_value)
else:
data_dictionary[name] = None
with open(f"data_dictionary.json", "w") as f:
f.write(json.dumps(data_dictionary, sort_keys = True, indent=4))
# Note: It is possible to write the output to a single parquet file, using a command
# such as:
# spark.sql("SELECT * FROM output").repartition(1).write.parquet('variant.parquet')
# At the time of writing, this file does not have the logical type annotation for VARIANT