Skip to content

Commit 269f4b4

Browse files
authored
chore: adjust StreamingDataFrame impl for the upcoming required APPENDS clause (#1366)
1 parent 65933b6 commit 269f4b4

File tree

1 file changed

+28
-4
lines changed

1 file changed

+28
-4
lines changed

bigframes/streaming/dataframe.py

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
from google.cloud import bigquery
2525

2626
from bigframes import dataframe
27-
from bigframes.core import log_adapter
27+
from bigframes.core import log_adapter, nodes
2828
import bigframes.exceptions as bfe
2929
import bigframes.session
3030

@@ -54,7 +54,7 @@ def _curate_df_doc(doc: Optional[str]):
5454

5555

5656
class StreamingBase:
57-
sql: str
57+
_appends_sql: str
5858
_session: bigframes.session.Session
5959

6060
def to_bigtable(
@@ -124,7 +124,7 @@ def to_bigtable(
124124
can be examined.
125125
"""
126126
return _to_bigtable(
127-
self.sql,
127+
self._appends_sql,
128128
instance=instance,
129129
table=table,
130130
service_account_email=service_account_email,
@@ -181,7 +181,7 @@ def to_pubsub(
181181
can be examined.
182182
"""
183183
return _to_pubsub(
184-
self.sql,
184+
self._appends_sql,
185185
topic=topic,
186186
service_account_email=service_account_email,
187187
session=self._session,
@@ -218,6 +218,19 @@ def __init__(self, df: dataframe.DataFrame, *, create_key=0):
218218
def _from_table_df(cls, df: dataframe.DataFrame) -> StreamingDataFrame:
219219
return cls(df, create_key=cls._create_key)
220220

221+
@property
222+
def _original_table(self):
223+
def traverse(node: nodes.BigFrameNode):
224+
if isinstance(node, nodes.ReadTableNode):
225+
return f"{node.source.table.project_id}.{node.source.table.dataset_id}.{node.source.table.table_id}"
226+
for child in node.child_nodes:
227+
original_table = traverse(child)
228+
if original_table:
229+
return original_table
230+
return None
231+
232+
return traverse(self._df._block._expr.node)
233+
221234
def __getitem__(self, *args, **kwargs):
222235
return _return_type_wrapper(self._df.__getitem__, StreamingDataFrame)(
223236
*args, **kwargs
@@ -266,6 +279,17 @@ def sql(self):
266279

267280
sql.__doc__ = _curate_df_doc(inspect.getdoc(dataframe.DataFrame.sql))
268281

282+
# Patch for the required APPENDS clause
283+
@property
284+
def _appends_sql(self):
285+
sql_str = self.sql
286+
original_table = self._original_table
287+
assert original_table is not None
288+
289+
appends_clause = f"APPENDS(TABLE `{original_table}`, NULL, NULL)"
290+
sql_str = sql_str.replace(f"`{original_table}`", appends_clause)
291+
return sql_str
292+
269293
@property
270294
def _session(self):
271295
return self._df._session

0 commit comments

Comments
 (0)