Skip to content

Commit 4838687

Browse files
Add bigframes.bigquery.load_data function
Implemented bigframes.bigquery.load_data to execute LOAD DATA SQL statements. Supports partitioning, clustering, schema specification, and other options. Includes logic to generate temporary destination table name if not provided. Added DDL generation helper in bigframes.core.sql. Exposed function in bigframes.bigquery package. Added unit tests in tests/unit/bigquery/test_io.py.
1 parent a634e97 commit 4838687

File tree

4 files changed

+376
-1
lines changed

4 files changed

+376
-1
lines changed

bigframes/bigquery/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
st_regionstats,
4444
st_simplify,
4545
)
46+
from bigframes.bigquery._operations.io import load_data
4647
from bigframes.bigquery._operations.json import (
4748
json_extract,
4849
json_extract_array,
@@ -85,6 +86,8 @@
8586
st_length,
8687
st_regionstats,
8788
st_simplify,
89+
# io ops
90+
load_data,
8891
# json ops
8992
json_extract,
9093
json_extract_array,
@@ -135,6 +138,8 @@
135138
"st_length",
136139
"st_regionstats",
137140
"st_simplify",
141+
# io ops
142+
"load_data",
138143
# json ops
139144
"json_extract",
140145
"json_extract_array",
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
# Copyright 2024 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from __future__ import annotations
16+
17+
import typing
18+
from typing import Any, List, Optional
19+
20+
import google.cloud.bigquery as bigquery
21+
22+
import bigframes.core.sql
23+
24+
if typing.TYPE_CHECKING:
25+
import bigframes.dataframe as dataframe
26+
import bigframes.session
27+
28+
_PLACEHOLDER_SCHEMA = [
29+
bigquery.SchemaField("bf_load_placeholder", "INT64"),
30+
]
31+
32+
33+
def load_data(
34+
uris: str | List[str],
35+
format: str,
36+
destination_table: Optional[str] = None,
37+
*,
38+
schema: Optional[List[bigquery.SchemaField]] = None,
39+
cluster_by: Optional[List[str]] = None,
40+
partition_by: Optional[str] = None,
41+
options: Optional[dict[str, Any]] = None,
42+
load_options: Optional[dict[str, Any]] = None,
43+
connection: Optional[str] = None,
44+
hive_partition_columns: Optional[List[bigquery.SchemaField]] = None,
45+
overwrite: bool = False,
46+
session: Optional[bigframes.session.Session] = None,
47+
) -> dataframe.DataFrame:
48+
"""
49+
Loads data from external files into a BigQuery table using the `LOAD DATA` statement.
50+
51+
Args:
52+
uris (str | List[str]):
53+
The fully qualified URIs for the external data locations (e.g., 'gs://bucket/path/file.csv').
54+
format (str):
55+
The format of the external data (e.g., 'CSV', 'PARQUET', 'AVRO', 'JSON').
56+
destination_table (str, optional):
57+
The name of the destination table. If not specified, a temporary table will be created.
58+
schema (List[google.cloud.bigquery.SchemaField], optional):
59+
The schema of the destination table. If not provided, schema auto-detection will be used.
60+
cluster_by (List[str], optional):
61+
A list of columns to cluster the table by.
62+
partition_by (str, optional):
63+
The partition expression for the table.
64+
options (dict[str, Any], optional):
65+
Table options (e.g., {'description': 'my table'}).
66+
load_options (dict[str, Any], optional):
67+
Options for loading data (e.g., {'skip_leading_rows': 1}).
68+
connection (str, optional):
69+
The connection name to use for reading external data.
70+
hive_partition_columns (List[google.cloud.bigquery.SchemaField], optional):
71+
The external partitioning columns. If set to an empty list, partitioning is inferred.
72+
overwrite (bool, default False):
73+
If True, overwrites the destination table. If False, appends to it.
74+
session (bigframes.session.Session, optional):
75+
The session to use. If not provided, the default session is used.
76+
77+
Returns:
78+
bigframes.dataframe.DataFrame: A DataFrame representing the loaded table.
79+
"""
80+
import bigframes.pandas as bpd
81+
82+
if session is None:
83+
session = bpd.get_global_session()
84+
85+
if isinstance(uris, str):
86+
uris = [uris]
87+
88+
if destination_table is None:
89+
# Create a temporary table name
90+
# We need to access the storage manager from the session
91+
# This is internal API usage, but requested by the user
92+
table_ref = session._storage_manager.create_temp_table(_PLACEHOLDER_SCHEMA)
93+
destination_table = f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}"
94+
# Since we created a placeholder table, we must overwrite it
95+
overwrite = True
96+
97+
sql = bigframes.core.sql.load_data_ddl(
98+
destination_table=destination_table,
99+
uris=uris,
100+
format=format,
101+
schema_fields=schema,
102+
cluster_by=cluster_by,
103+
partition_by=partition_by,
104+
table_options=options,
105+
load_options=load_options,
106+
connection=connection,
107+
hive_partition_columns=hive_partition_columns,
108+
overwrite=overwrite,
109+
)
110+
111+
# Execute the LOAD DATA statement
112+
session.read_gbq_query(sql)
113+
114+
# Return a DataFrame pointing to the destination table
115+
# We use session.read_gbq to ensure it uses the same session
116+
return session.read_gbq(destination_table)

bigframes/core/sql/__init__.py

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import decimal
2222
import json
2323
import math
24-
from typing import cast, Collection, Iterable, Mapping, Optional, TYPE_CHECKING, Union
24+
from typing import Any, cast, Collection, Iterable, Mapping, Optional, TYPE_CHECKING, Union
2525

2626
import shapely.geometry.base # type: ignore
2727

@@ -246,3 +246,94 @@ def create_vector_search_sql(
246246
distance,
247247
FROM VECTOR_SEARCH({args_str})
248248
"""
249+
250+
251+
def _field_type_to_sql(field: bigquery.SchemaField) -> str:
252+
if field.field_type in ("RECORD", "STRUCT"):
253+
sub_defs = []
254+
for sub in field.fields:
255+
sub_type = _field_type_to_sql(sub)
256+
sub_def = f"{sub.name} {sub_type}"
257+
if sub.mode == "REQUIRED":
258+
sub_def += " NOT NULL"
259+
sub_defs.append(sub_def)
260+
type_str = f"STRUCT<{', '.join(sub_defs)}>"
261+
else:
262+
type_str = field.field_type
263+
264+
if field.mode == "REPEATED":
265+
return f"ARRAY<{type_str}>"
266+
return type_str
267+
268+
269+
def schema_field_to_sql(field: bigquery.SchemaField) -> str:
270+
"""Convert a BigQuery SchemaField to a SQL DDL column definition."""
271+
type_sql = _field_type_to_sql(field)
272+
sql = f"{field.name} {type_sql}"
273+
if field.mode == "REQUIRED":
274+
sql += " NOT NULL"
275+
if field.description:
276+
sql += f" OPTIONS(description={simple_literal(field.description)})"
277+
return sql
278+
279+
280+
def load_data_ddl(
281+
destination_table: str,
282+
uris: list[str],
283+
format: str,
284+
*,
285+
schema_fields: list[bigquery.SchemaField] | None = None,
286+
cluster_by: list[str] | None = None,
287+
partition_by: str | None = None,
288+
table_options: dict[str, Any] | None = None,
289+
load_options: dict[str, Any] | None = None,
290+
connection: str | None = None,
291+
hive_partition_columns: list[bigquery.SchemaField] | None = None,
292+
overwrite: bool = False,
293+
) -> str:
294+
"""Construct a LOAD DATA DDL statement."""
295+
action = "OVERWRITE" if overwrite else "INTO"
296+
297+
query = f"LOAD DATA {action} {googlesql.identifier(destination_table)}\n"
298+
299+
if schema_fields:
300+
columns_sql = ",\n".join(schema_field_to_sql(field) for field in schema_fields)
301+
query += f"(\n{columns_sql}\n)\n"
302+
303+
if partition_by:
304+
query += f"PARTITION BY {partition_by}\n"
305+
306+
if cluster_by:
307+
query += f"CLUSTER BY {', '.join(cluster_by)}\n"
308+
309+
if table_options:
310+
opts_list = []
311+
for k, v in table_options.items():
312+
opts_list.append(f"{k}={simple_literal(v)}")
313+
query += f"OPTIONS({', '.join(opts_list)})\n"
314+
315+
files_opts = {}
316+
if load_options:
317+
files_opts.update(load_options)
318+
319+
files_opts["uris"] = uris
320+
files_opts["format"] = format
321+
322+
files_opts_list = []
323+
for k, v in files_opts.items():
324+
files_opts_list.append(f"{k}={simple_literal(v)}")
325+
326+
query += f"FROM FILES({', '.join(files_opts_list)})\n"
327+
328+
if hive_partition_columns:
329+
cols_sql = ",\n".join(
330+
schema_field_to_sql(field) for field in hive_partition_columns
331+
)
332+
query += f"WITH PARTITION COLUMNS (\n{cols_sql}\n)\n"
333+
elif hive_partition_columns is not None:
334+
query += "WITH PARTITION COLUMNS\n"
335+
336+
if connection:
337+
query += f"WITH CONNECTION {connection}\n"
338+
339+
return query

0 commit comments

Comments
 (0)