Skip to content

Commit 427846f

Browse files
feat: batch predict with BQ
1 parent 65bb552 commit 427846f

File tree

6 files changed

+97
-19
lines changed

6 files changed

+97
-19
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
google-api-core==2.24.0
2+
google-cloud-bigquery==3.29.0
23
google-cloud-storage==2.19.0
34
pytest==8.2.0
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
def generate_content(output_uri: str) -> str:
17+
# [START googlegenaisdk_batchpredict_with_bq]
18+
import time
19+
20+
from google import genai
21+
22+
client = genai.Client()
23+
# TODO(developer): Update and un-comment below line
24+
# output_uri = f"bq://your-project.your_dataset.your_table"
25+
26+
job = client.batches.create(
27+
model="gemini-1.5-pro-002",
28+
src="bq://storage-samples.generative_ai.batch_requests_for_multimodal_input",
29+
config={
30+
"dest": output_uri
31+
}
32+
)
33+
print(f"Job name: {job.name}")
34+
print(f"Job state: {job.state}")
35+
# Example response:
36+
# Job name: projects/%PROJECT_ID%/locations/us-central1/batchPredictionJobs/9876453210000000000
37+
# Job state: JOB_STATE_PENDING
38+
39+
# See the documentation: https://googleapis.github.io/python-genai/genai.html#genai.types.BatchJob
40+
completed_states = [
41+
"JOB_STATE_SUCCEEDED",
42+
"JOB_STATE_FAILED",
43+
"JOB_STATE_CANCELLED",
44+
"JOB_STATE_PAUSED",
45+
]
46+
while job.state not in completed_states:
47+
time.sleep(30)
48+
job = client.batches.get(name=job.name)
49+
print(f"Job state: {job.state}")
50+
# Example response:
51+
# Job state: JOB_STATE_PENDING
52+
# Job state: JOB_STATE_RUNNING
53+
# Job state: JOB_STATE_RUNNING
54+
# ...
55+
# Job state: JOB_STATE_SUCCEEDED
56+
57+
# [END googlegenaisdk_batchpredict_with_bq]
58+
return job.state
59+
60+
61+
if __name__ == "__main__":
62+
generate_content(
63+
output_uri="bq://your-project.your_dataset.your_table"
64+
)

genai/batch_predict/batch_prediction_with_gcs.py renamed to genai/batch_prediction/submit_with_gcs.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,9 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from google.genai.types import BatchJob
1615

17-
18-
def create_job(output_uri: str) -> BatchJob:
19-
# [START googlegenaisdk_batch_prediction_with_gcs]
16+
def generate_content(output_uri: str) -> str:
17+
# [START googlegenaisdk_batchpredict_with_gcs]
2018
import time
2119

2220
from google import genai
@@ -26,7 +24,7 @@ def create_job(output_uri: str) -> BatchJob:
2624
# output_uri = "gs://your-bucket/your-prefix/..."
2725

2826
job = client.batches.create(
29-
model="gemini-2.0-flash-001",
27+
model="gemini-1.5-pro-002",
3028
src="gs://cloud-samples-data/batch/prompt_for_batch_gemini_predict.jsonl",
3129
config={
3230
"dest": output_uri
@@ -56,11 +54,11 @@ def create_job(output_uri: str) -> BatchJob:
5654
# ...
5755
# Job state: JOB_STATE_SUCCEEDED
5856

59-
# [END googlegenaisdk_batch_prediction_with_gcs]
60-
return job
57+
# [END googlegenaisdk_batchpredict_with_gcs]
58+
return job.state
6159

6260

6361
if __name__ == "__main__":
64-
create_job(
62+
generate_content(
6563
output_uri="gs://your-bucket/your-prefix/..."
6664
)

genai/batch_predict/test_batch_predict.py renamed to genai/batch_prediction/test_batch_predict.py

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,27 +12,40 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from datetime import datetime as dt, UTC
15+
from datetime import datetime as dt
1616
import os
1717

18-
from google.cloud import storage
18+
from google.cloud import bigquery, storage
1919
from google.genai.types import JobState
20+
2021
import pytest
2122

22-
import batch_prediction_with_gcs
23+
import submit_with_bq
24+
import submit_with_gcs
2325

2426

2527
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "True"
2628
os.environ["GOOGLE_CLOUD_LOCATION"] = "us-central1"
2729
# The project name is included in the CICD pipeline
2830
# os.environ['GOOGLE_CLOUD_PROJECT'] = "add-your-project-name"
31+
BQ_OUTPUT_DATASET = f"{os.environ['GOOGLE_CLOUD_PROJECT']}.gen_ai_batch_prediction"
2932
GCS_OUTPUT_BUCKET = "python-docs-samples-tests"
30-
GCS_OUTPUT_BUCKET = "gemini-batch-prediction-results"
33+
34+
35+
@pytest.fixture(scope="session")
36+
def bq_output_uri():
37+
table_name = f"text_output_{dt.now().strftime('%Y_%m_%d_T%H_%M_%S')}"
38+
table_uri = f"{BQ_OUTPUT_DATASET}.{table_name}"
39+
40+
yield f"bq://{table_uri}"
41+
42+
bq_client = bigquery.Client()
43+
bq_client.delete_table(table_uri, not_found_ok=True)
3144

3245

3346
@pytest.fixture(scope="session")
3447
def gcs_output_uri():
35-
prefix = f"text_output/{dt.now(UTC)}"
48+
prefix = f"text_output/{dt.now()}"
3649

3750
yield f"gs://{GCS_OUTPUT_BUCKET}/{prefix}"
3851

@@ -43,9 +56,11 @@ def gcs_output_uri():
4356
blob.delete()
4457

4558

59+
def test_batch_prediction_with_bq(bq_output_uri) -> None:
60+
response = submit_with_bq.generate_content(output_uri=bq_output_uri)
61+
assert response == JobState.JOB_STATE_SUCCEEDED
62+
63+
4664
def test_batch_prediction_with_gcs(gcs_output_uri) -> None:
47-
job = batch_prediction_with_gcs.create_job(output_uri=gcs_output_uri)
48-
assert job
49-
assert job.state == "JOB_STATE_SUCCEEDED"
50-
assert job.dest.gcs_uri == gcs_output_uri
51-
assert job.state == JobState.JOB_STATE_SUCCEEDED
65+
response = submit_with_gcs.generate_content(output_uri=gcs_output_uri)
66+
assert response == JobState.JOB_STATE_SUCCEEDED

genai/controlled_generation/ctrlgen_with_class_schema.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
def generate_content() -> str:
1717
# [START googlegenaisdk_ctrlgen_with_class_schema]
1818
from google import genai
19-
from pydantic import BaseModel, TypeAdapter
19+
from pydantic import BaseModel
2020

2121
class Recipe(BaseModel):
2222
recipe_name: str

0 commit comments

Comments
 (0)