1515
1616import dataclasses
1717import functools
18+ import itertools
1819import typing
1920
20- import google .cloud . bigquery as bigquery
21- import sqlglot . expressions as sge
21+ from google .cloud import bigquery
22+ import pyarrow as pa
2223
23- from bigframes .core import expression , nodes , rewrite
24+ from bigframes .core import expression , identifiers , nodes , rewrite
2425from bigframes .core .compile import configs
25- from bigframes .core .compile .sqlglot import sql_gen
26+ import bigframes .core .compile .sqlglot . sqlglot_ir as ir
2627import bigframes .core .ordering as bf_ordering
2728
2829
2930@dataclasses .dataclass (frozen = True )
3031class SQLGlotCompiler :
3132 """Compiles BigFrame nodes into SQL using SQLGlot."""
3233
33- sql_gen = sql_gen .SQLGen ()
34-
3534 def compile (
3635 self ,
3736 node : nodes .BigFrameNode ,
@@ -81,6 +80,7 @@ def _compile_sql(self, request: configs.CompileRequest) -> configs.CompileResult
8180 result_node = typing .cast (
8281 nodes .ResultNode , rewrite .column_pruning (result_node )
8382 )
83+ result_node = _remap_variables (result_node )
8484 sql = self ._compile_result_node (result_node )
8585 return configs .CompileResult (
8686 sql , result_node .schema .to_bigquery (), result_node .order_by
@@ -89,6 +89,8 @@ def _compile_sql(self, request: configs.CompileRequest) -> configs.CompileResult
8989 ordering : typing .Optional [bf_ordering .RowOrdering ] = result_node .order_by
9090 result_node = dataclasses .replace (result_node , order_by = None )
9191 result_node = typing .cast (nodes .ResultNode , rewrite .column_pruning (result_node ))
92+
93+ result_node = _remap_variables (result_node )
9294 sql = self ._compile_result_node (result_node )
9395 # Return the ordering iff no extra columns are needed to define the row order
9496 if ordering is not None :
@@ -103,9 +105,9 @@ def _compile_sql(self, request: configs.CompileRequest) -> configs.CompileResult
103105 )
104106
105107 def _compile_result_node (self , root : nodes .ResultNode ) -> str :
106- sqlglot_expr = compile_node (root .child )
108+ sqlglot_ir = compile_node (root .child )
107109 # TODO: add order_by, limit, and selections to sqlglot_expr
108- return self . sql_gen . sql ( sqlglot_expr )
110+ return sqlglot_ir . sql
109111
110112
111113def _replace_unsupported_ops (node : nodes .BigFrameNode ):
@@ -115,27 +117,52 @@ def _replace_unsupported_ops(node: nodes.BigFrameNode):
115117 return node
116118
117119
120+ def _remap_variables (node : nodes .ResultNode ) -> nodes .ResultNode :
121+ """Remaps `ColumnId`s in the BFET of a `ResultNode` to produce deterministic UIDs."""
122+
123+ def anonymous_column_ids () -> typing .Generator [identifiers .ColumnId , None , None ]:
124+ for i in itertools .count ():
125+ yield identifiers .ColumnId (name = f"bfcol_{ i } " )
126+
127+ result_node , _ = rewrite .remap_variables (node , anonymous_column_ids ())
128+ return typing .cast (nodes .ResultNode , result_node )
129+
130+
118131@functools .lru_cache (maxsize = 5000 )
119- def compile_node (node : nodes .BigFrameNode ) -> sge . Expression :
120- """Compile node into CompileArrayValue. Caches result."""
132+ def compile_node (node : nodes .BigFrameNode ) -> ir . SQLGlotIR :
133+ """Compiles node into CompileArrayValue. Caches result."""
121134 return node .reduce_up (lambda node , children : _compile_node (node , * children ))
122135
123136
124137@functools .singledispatch
125138def _compile_node (
126- node : nodes .BigFrameNode , * compiled_children : sge . Expression
127- ) -> sge . Expression :
139+ node : nodes .BigFrameNode , * compiled_children : ir . SQLGlotIR
140+ ) -> ir . SQLGlotIR :
128141 """Defines transformation but isn't cached, always use compile_node instead"""
129142 raise ValueError (f"Can't compile unrecognized node: { node } " )
130143
131144
132145@_compile_node .register
133- def compile_readlocal (node : nodes .ReadLocalNode , * args ) -> sge .Expression :
134- # TODO: add support for reading from local files
135- return sge .select ()
146+ def compile_readlocal (node : nodes .ReadLocalNode , * args ) -> ir .SQLGlotIR :
147+ offsets = node .offsets_col .sql if node .offsets_col else None
148+ schema_names = node .schema .names
149+ schema_dtypes = node .schema .dtypes
150+
151+ pa_table = node .local_data_source .data
152+ pa_table = pa_table .select ([item .source_id for item in node .scan_list .items ])
153+ pa_table = pa_table .rename_columns (
154+ {item .source_id : item .id .sql for item in node .scan_list .items }
155+ )
156+
157+ if offsets :
158+ pa_table = pa_table .append_column (
159+ offsets , pa .array (range (pa_table .num_rows ), type = pa .int64 ())
160+ )
161+
162+ return ir .SQLGlotIR .from_pandas (pa_table .to_pandas (), schema_names , schema_dtypes )
136163
137164
138165@_compile_node .register
139- def compile_selection (node : nodes .SelectionNode , child : sge . Expression ):
166+ def compile_selection (node : nodes .SelectionNode , child : ir . SQLGlotIR ):
140167 # TODO: add support for selection
141168 return child
0 commit comments