|
14 | 14 |
|
15 | 15 | """Private module: Helpers for I/O operations.""" |
16 | 16 |
|
17 | | -from __future__ import annotations |
18 | | - |
19 | 17 | import datetime |
20 | 18 | import textwrap |
21 | 19 | import types |
22 | | -import typing |
23 | 20 | from typing import Dict, Iterable, Union |
24 | 21 | import uuid |
25 | 22 |
|
26 | 23 | import google.cloud.bigquery as bigquery |
27 | 24 |
|
28 | | -if typing.TYPE_CHECKING: |
29 | | - import bigframes.session |
30 | | - |
31 | | - |
32 | 25 | IO_ORDERING_ID = "bqdf_row_nums" |
33 | 26 | TEMP_TABLE_PREFIX = "bqdf{date}_{random_id}" |
34 | 27 |
|
@@ -76,83 +69,43 @@ def create_export_data_statement( |
76 | 69 | ) |
77 | 70 |
|
78 | 71 |
|
79 | | -def random_table(dataset: bigquery.DatasetReference) -> bigquery.TableReference: |
80 | | - """Generate a random table ID with BigQuery DataFrames prefix. |
81 | | -
|
82 | | - Args: |
83 | | - dataset (google.cloud.bigquery.DatasetReference): |
84 | | - The dataset to make the table reference in. Usually the anonymous |
85 | | - dataset for the session. |
86 | | -
|
87 | | - Returns: |
88 | | - google.cloud.bigquery.TableReference: |
89 | | - Fully qualified table ID of a table that doesn't exist. |
90 | | - """ |
91 | | - now = datetime.datetime.now(datetime.timezone.utc) |
92 | | - random_id = uuid.uuid4().hex |
93 | | - table_id = TEMP_TABLE_PREFIX.format( |
94 | | - date=now.strftime("%Y%m%d"), random_id=random_id |
95 | | - ) |
96 | | - return dataset.table(table_id) |
97 | | - |
98 | | - |
99 | | -def table_ref_to_sql(table: bigquery.TableReference) -> str: |
100 | | - """Format a table reference as escaped SQL.""" |
101 | | - return f"`{table.project}`.`{table.dataset_id}`.`{table.table_id}`" |
| 72 | +def create_snapshot_sql( |
| 73 | + table_ref: bigquery.TableReference, current_timestamp: datetime.datetime |
| 74 | +) -> str: |
| 75 | + """Query a table via 'time travel' for consistent reads.""" |
102 | 76 |
|
| 77 | + # If we have a _SESSION table, assume that it's already a copy. Nothing to do here. |
| 78 | + if table_ref.dataset_id.upper() == "_SESSION": |
| 79 | + return f"SELECT * FROM `_SESSION`.`{table_ref.table_id}`" |
103 | 80 |
|
104 | | -def create_table_clone( |
105 | | - source: bigquery.TableReference, |
106 | | - dataset: bigquery.DatasetReference, |
107 | | - expiration: datetime.datetime, |
108 | | - session: bigframes.session.Session, |
109 | | - api_name: str, |
110 | | -) -> bigquery.TableReference: |
111 | | - """Create a table clone for consistent reads.""" |
112 | 81 | # If we have an anonymous query results table, it can't be modified and |
113 | 82 | # there isn't any BigQuery time travel. |
114 | | - if source.dataset_id.startswith("_"): |
115 | | - return source |
116 | | - |
117 | | - fully_qualified_source_id = table_ref_to_sql(source) |
118 | | - destination = random_table(dataset) |
119 | | - fully_qualified_destination_id = table_ref_to_sql(destination) |
| 83 | + if table_ref.dataset_id.startswith("_"): |
| 84 | + return f"SELECT * FROM `{table_ref.project}`.`{table_ref.dataset_id}`.`{table_ref.table_id}`" |
120 | 85 |
|
121 | | - # Include a label so that Dataplex Lineage can identify temporary |
122 | | - # tables that BigQuery DataFrames creates. Googlers: See internal issue |
123 | | - # 296779699. |
124 | | - ddl = textwrap.dedent( |
| 86 | + return textwrap.dedent( |
125 | 87 | f""" |
126 | | - CREATE OR REPLACE TABLE |
127 | | - {fully_qualified_destination_id} |
128 | | - CLONE {fully_qualified_source_id} |
129 | | - OPTIONS( |
130 | | - expiration_timestamp=TIMESTAMP "{expiration.isoformat()}", |
131 | | - labels=[ |
132 | | - ("source", "bigquery-dataframes-temp"), |
133 | | - ("bigframes-api", {repr(api_name)}) |
134 | | - ] |
135 | | - ) |
| 88 | + SELECT * |
| 89 | + FROM `{table_ref.project}`.`{table_ref.dataset_id}`.`{table_ref.table_id}` |
| 90 | + FOR SYSTEM_TIME AS OF TIMESTAMP({repr(current_timestamp.isoformat())}) |
136 | 91 | """ |
137 | 92 | ) |
138 | | - job_config = bigquery.QueryJobConfig() |
139 | | - job_config.labels = { |
140 | | - "source": "bigquery-dataframes-temp", |
141 | | - "bigframes-api": api_name, |
142 | | - } |
143 | | - session._start_query(ddl, job_config=job_config) |
144 | | - return destination |
145 | 93 |
|
146 | 94 |
|
147 | 95 | def create_temp_table( |
148 | 96 | bqclient: bigquery.Client, |
149 | 97 | dataset: bigquery.DatasetReference, |
150 | | - expiration: datetime.datetime, |
| 98 | + expiration: datetime.timedelta, |
151 | 99 | ) -> str: |
152 | 100 | """Create an empty table with an expiration in the desired dataset.""" |
153 | | - table_ref = random_table(dataset) |
| 101 | + now = datetime.datetime.now(datetime.timezone.utc) |
| 102 | + random_id = uuid.uuid4().hex |
| 103 | + table_id = TEMP_TABLE_PREFIX.format( |
| 104 | + date=now.strftime("%Y%m%d"), random_id=random_id |
| 105 | + ) |
| 106 | + table_ref = dataset.table(table_id) |
154 | 107 | destination = bigquery.Table(table_ref) |
155 | | - destination.expires = expiration |
| 108 | + destination.expires = now + expiration |
156 | 109 | bqclient.create_table(destination) |
157 | 110 | return f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}" |
158 | 111 |
|
|
0 commit comments