Skip to content

Commit 6478ad7

Browse files
authored
chore: add load tests session for reading large tables (#410)
* chore: add load tests session for reading large tables * update junit prefix * xfail for to_pandas_batches * use smaller table but still beyond query results limit
1 parent d92ced2 commit 6478ad7

File tree

6 files changed

+249
-2
lines changed

6 files changed

+249
-2
lines changed

.kokoro/load/common.cfg

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# Format: //devtools/kokoro/config/proto/build.proto
2+
3+
# Build logs will be here
4+
action {
5+
define_artifacts {
6+
regex: "**/*sponge_log.xml"
7+
}
8+
}
9+
10+
build_file: "python-bigquery-dataframes/.kokoro/build.sh"

.kokoro/load/load.cfg

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# Format: //devtools/kokoro/config/proto/build.proto
2+
3+
# Only run this nox session.
4+
env_vars: {
5+
key: "NOX_SESSION"
6+
value: "load"
7+
}
8+
9+
env_vars: {
10+
key: "GOOGLE_CLOUD_PROJECT"
11+
value: "bigframes-load-testing"
12+
}
13+
14+
env_vars: {
15+
key: "BIGFRAMES_TEST_MODEL_VERTEX_ENDPOINT"
16+
value: "https://us-central1-aiplatform.googleapis.com/v1/projects/272725758477/locations/us-central1/endpoints/590545496255234048"
17+
}

noxfile.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,17 @@ def e2e(session: nox.sessions.Session):
387387
)
388388

389389

390+
@nox.session(python=SYSTEM_TEST_PYTHON_VERSIONS[-1])
391+
def load(session: nox.sessions.Session):
392+
"""Run the very large tests in system test suite."""
393+
run_system(
394+
session=session,
395+
prefix_name="load",
396+
test_folder=os.path.join("tests", "system", "load"),
397+
print_duration=True,
398+
)
399+
400+
390401
@nox.session(python=SYSTEM_TEST_PYTHON_VERSIONS)
391402
def samples(session):
392403
"""Run the samples test suite."""

scripts/create_load_test_tables.py

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
# Copyright 2024 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import math
16+
import os
17+
import pathlib
18+
import sys
19+
20+
import google.cloud.bigquery as bigquery
21+
22+
REPO_ROOT = pathlib.Path(__file__).parent.parent
23+
24+
PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT")
25+
26+
if not PROJECT_ID:
27+
print(
28+
"Please set GOOGLE_CLOUD_PROJECT environment variable before running.",
29+
file=sys.stderr,
30+
)
31+
sys.exit(1)
32+
33+
DATASET_ID = f"{PROJECT_ID}.load_testing"
34+
TABLE_ID = f"{DATASET_ID}.scalars"
35+
TABLE_ID_FORMAT = f"{DATASET_ID}.scalars_{{size}}"
36+
37+
KB_BYTES = 1000
38+
MB_BYTES = 1000 * KB_BYTES
39+
GB_BYTES = 1000 * MB_BYTES
40+
TB_BYTES = 1000 * GB_BYTES
41+
SIZES = (
42+
("1mb", MB_BYTES),
43+
("10mb", 10 * MB_BYTES),
44+
("100mb", 100 * MB_BYTES),
45+
("1gb", GB_BYTES),
46+
("10gb", 10 * GB_BYTES),
47+
("100gb", 100 * GB_BYTES),
48+
("1tb", TB_BYTES),
49+
)
50+
SCHEMA_PATH = REPO_ROOT / "tests" / "data" / "scalars_schema.json"
51+
DATA_PATH = REPO_ROOT / "tests" / "data" / "scalars.jsonl"
52+
BQCLIENT = bigquery.Client()
53+
54+
55+
def create_dataset():
56+
dataset = bigquery.Dataset(DATASET_ID)
57+
BQCLIENT.create_dataset(dataset, exists_ok=True)
58+
59+
60+
def load_scalars_table():
61+
schema = BQCLIENT.schema_from_json(SCHEMA_PATH)
62+
job_config = bigquery.LoadJobConfig()
63+
job_config.schema = schema
64+
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
65+
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
66+
67+
print(f"Creating {TABLE_ID}")
68+
with open(DATA_PATH, "rb") as data_file:
69+
BQCLIENT.load_table_from_file(
70+
data_file,
71+
TABLE_ID,
72+
job_config=job_config,
73+
).result()
74+
75+
76+
def multiply_table(previous_table_id, target_table_id, multiplier):
77+
clauses = [f"SELECT * FROM `{previous_table_id}`"] * multiplier
78+
query = " UNION ALL ".join(clauses)
79+
job_config = bigquery.QueryJobConfig()
80+
job_config.destination = target_table_id
81+
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
82+
print(f"Creating {target_table_id}, {multiplier} x {previous_table_id}")
83+
BQCLIENT.query_and_wait(query, job_config=job_config)
84+
85+
86+
def create_tables():
87+
base_table = BQCLIENT.get_table(TABLE_ID)
88+
previous_bytes = base_table.num_bytes
89+
previous_table_id = TABLE_ID
90+
91+
for table_suffix, target_bytes in SIZES:
92+
# Make sure we exceed the desired bytes by adding to the multiplier.
93+
multiplier = math.ceil(target_bytes / previous_bytes) + 1
94+
target_table_id = TABLE_ID_FORMAT.format(size=table_suffix)
95+
multiply_table(previous_table_id, target_table_id, multiplier)
96+
97+
table = BQCLIENT.get_table(target_table_id)
98+
previous_bytes = table.num_bytes
99+
previous_table_id = target_table_id
100+
101+
102+
def main():
103+
create_dataset()
104+
load_scalars_table()
105+
create_tables()
106+
107+
108+
if __name__ == "__main__":
109+
main()

tests/system/conftest.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,11 @@ def cloudfunctions_client(
104104
return session.cloudfunctionsclient
105105

106106

107+
@pytest.fixture(scope="session")
108+
def project_id(bigquery_client: bigquery.Client) -> str:
109+
return bigquery_client.project
110+
111+
107112
@pytest.fixture(scope="session")
108113
def resourcemanager_client(
109114
session: bigframes.Session,
@@ -159,9 +164,8 @@ def dataset_id_not_created(bigquery_client: bigquery.Client):
159164

160165

161166
@pytest.fixture(scope="session")
162-
def dataset_id_permanent(bigquery_client: bigquery.Client) -> str:
167+
def dataset_id_permanent(bigquery_client: bigquery.Client, project_id: str) -> str:
163168
"""Create a dataset if it doesn't exist."""
164-
project_id = bigquery_client.project
165169
dataset_id = f"{project_id}.{PERMANENT_DATASET}"
166170
dataset = bigquery.Dataset(dataset_id)
167171
bigquery_client.create_dataset(dataset, exists_ok=True)
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
# Copyright 2024 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Load test for query (SQL) inputs with large results sizes."""
16+
17+
import pytest
18+
19+
import bigframes.pandas as bpd
20+
21+
KB_BYTES = 1000
22+
MB_BYTES = 1000 * KB_BYTES
23+
GB_BYTES = 1000 * MB_BYTES
24+
TB_BYTES = 1000 * GB_BYTES
25+
26+
27+
@pytest.mark.parametrize(
28+
("sql", "expected_bytes"),
29+
(
30+
pytest.param(
31+
"SELECT * FROM load_testing.scalars_1gb",
32+
GB_BYTES,
33+
id="1gb",
34+
),
35+
pytest.param(
36+
"SELECT * FROM load_testing.scalars_10gb",
37+
10 * GB_BYTES,
38+
id="10gb",
39+
),
40+
pytest.param(
41+
"SELECT * FROM load_testing.scalars_100gb",
42+
100 * GB_BYTES,
43+
id="100gb",
44+
),
45+
pytest.param(
46+
"SELECT * FROM load_testing.scalars_1tb",
47+
TB_BYTES,
48+
id="1tb",
49+
),
50+
),
51+
)
52+
def test_read_gbq_sql_large_results(sql, expected_bytes):
53+
df = bpd.read_gbq(sql)
54+
assert df.memory_usage().sum() >= expected_bytes
55+
56+
57+
def test_df_repr_large_table():
58+
df = bpd.read_gbq("load_testing.scalars_100gb")
59+
row_count, column_count = df.shape
60+
expected = f"[{row_count} rows x {column_count} columns]"
61+
actual = repr(df)
62+
assert expected in actual
63+
64+
65+
def test_series_repr_large_table():
66+
df = bpd.read_gbq("load_testing.scalars_1tb")
67+
actual = repr(df["string_col"])
68+
assert actual is not None
69+
70+
71+
def test_index_repr_large_table():
72+
df = bpd.read_gbq("load_testing.scalars_1tb")
73+
actual = repr(df.index)
74+
assert actual is not None
75+
76+
77+
# FAILED
78+
# tests/system/load/test_large_tables.py::test_to_pandas_batches_large_table
79+
# google.api_core.exceptions.Forbidden: 403 Response too large to return.
80+
# Consider specifying a destination table in your job...
81+
@pytest.mark.xfail
82+
def test_to_pandas_batches_large_table():
83+
df = bpd.read_gbq("load_testing.scalars_100gb")
84+
expected_row_count, expected_column_count = df.shape
85+
86+
row_count = 0
87+
for df in df.to_pandas_batches():
88+
batch_row_count, batch_column_count = df.shape
89+
assert batch_column_count == expected_column_count
90+
row_count += batch_row_count
91+
92+
# Attempt to save on memory by manually removing the batch df
93+
# from local memory after finishing with processing.
94+
del df
95+
96+
assert row_count == expected_row_count

0 commit comments

Comments
 (0)