Skip to content

Commit 7306b73

Browse files
dataframe reader stuff
1 parent 7a22fd7 commit 7306b73

File tree

1 file changed

+88
-3
lines changed

1 file changed

+88
-3
lines changed

tests/ast/decoder.py

Lines changed: 88 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,59 @@ def decode_col_exprs(self, expr: proto.Expr) -> List[Column]:
188188
col_list = [self.decode_expr(arg) for arg in expr]
189189
return col_list
190190

191+
def decode_dataframe_reader_expr(self, df_reader_expr: proto.SpDataframeReader):
192+
"""
193+
Decode a dataframe reader expression to get the dataframe.
194+
195+
Parameters
196+
----------
197+
df_reader_expr : proto.SpDataframeReader
198+
The expression to decode.
199+
200+
"""
201+
match df_reader_expr.WhichOneof("variant"):
202+
case "sp_dataframe_reader_init":
203+
return self.session.read
204+
case "sp_dataframe_reader_option":
205+
reader = self.decode_dataframe_reader_expr(
206+
df_reader_expr.sp_dataframe_reader_option.reader
207+
)
208+
key = df_reader_expr.sp_dataframe_reader_option.key
209+
value = self.decode_expr(
210+
df_reader_expr.sp_dataframe_reader_option.value
211+
)
212+
return reader.option(key, value)
213+
case "sp_dataframe_reader_options":
214+
reader = self.decode_dataframe_reader_expr(
215+
df_reader_expr.sp_dataframe_reader_options.reader
216+
)
217+
configs = self.decode_dsl_map_expr(
218+
df_reader_expr.sp_dataframe_reader_options.configs
219+
)
220+
return reader.options(configs)
221+
case "sp_dataframe_reader_schema":
222+
reader = self.decode_dataframe_reader_expr(
223+
df_reader_expr.sp_dataframe_reader_schema.reader
224+
)
225+
schema = self.decode_struct_type_expr(
226+
df_reader_expr.sp_dataframe_reader_schema.schema
227+
)
228+
return reader.schema(schema)
229+
case "sp_dataframe_reader_with_metadata":
230+
reader = self.decode_dataframe_reader_expr(
231+
df_reader_expr.sp_dataframe_reader_with_metadata.reader
232+
)
233+
metadata_columns = [
234+
self.decode_expr(arg)
235+
for arg in df_reader_expr.sp_dataframe_reader_with_metadata.metadata_columns.args
236+
]
237+
return reader.with_metadata(*metadata_columns)
238+
case _:
239+
raise ValueError(
240+
"Unknown dataframe reader type: %s"
241+
% df_reader_expr.WhichOneof("variant")
242+
)
243+
191244
def decode_dsl_map_expr(self, map_expr: Iterable) -> dict:
192245
"""
193246
Given a map expression, return the result as a Python dictionary.
@@ -465,9 +518,8 @@ def decode_data_type_expr(
465518
return ShortType()
466519
case "sp_string_type":
467520
length = (
468-
data_type_expr.sp_string_type.length
521+
data_type_expr.sp_string_type.length.value
469522
if data_type_expr.sp_string_type.HasField("length")
470-
and isinstance(data_type_expr.sp_string_type.length, int)
471523
else None
472524
)
473525
return StringType(length)
@@ -487,7 +539,10 @@ def decode_data_type_expr(
487539
for field in data_type_expr.sp_struct_type.fields.list:
488540
column_identifier = field.column_identifier.name
489541
data_type = self.decode_data_type_expr(field.data_type)
490-
fields.append(StructField(column_identifier, data_type))
542+
nullable = field.nullable
543+
fields.append(
544+
StructField(column_identifier, data_type, nullable)
545+
)
491546
else:
492547
fields = None
493548
structured = data_type_expr.sp_struct_type.structured
@@ -2512,6 +2567,36 @@ def decode_expr(self, expr: proto.Expr, **kwargs) -> Any:
25122567
iceberg_config=iceberg_config,
25132568
)
25142569

2570+
case "sp_read_avro":
2571+
path = expr.sp_read_avro.path
2572+
reader = self.decode_dataframe_reader_expr(expr.sp_read_avro.reader)
2573+
return reader.avro(path)
2574+
2575+
case "sp_read_csv":
2576+
path = expr.sp_read_csv.path
2577+
reader = self.decode_dataframe_reader_expr(expr.sp_read_csv.reader)
2578+
return reader.csv(path)
2579+
2580+
case "sp_read_json":
2581+
path = expr.sp_read_json.path
2582+
reader = self.decode_dataframe_reader_expr(expr.sp_read_json.reader)
2583+
return reader.json(path)
2584+
2585+
case "sp_read_orc":
2586+
path = expr.sp_read_orc.path
2587+
reader = self.decode_dataframe_reader_expr(expr.sp_read_orc.reader)
2588+
return reader.orc(path)
2589+
2590+
case "sp_read_parquet":
2591+
path = expr.sp_read_parquet.path
2592+
reader = self.decode_dataframe_reader_expr(expr.sp_read_parquet.reader)
2593+
return reader.parquet(path)
2594+
2595+
case "sp_read_xml":
2596+
path = expr.sp_read_xml.path
2597+
reader = self.decode_dataframe_reader_expr(expr.sp_read_xml.reader)
2598+
return reader.xml(path)
2599+
25152600
case "sp_dataframe_write":
25162601
df = self.decode_expr(expr.sp_dataframe_write.df)
25172602
res = df.write

0 commit comments

Comments
 (0)