Skip to content

Commit f89c551

Browse files
committed
Merge branch 'main' into max_slots
2 parents 48e96f3 + 84fa75b commit f89c551

28 files changed

+2066
-1026
lines changed

CHANGELOG.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,20 @@
55
[1]: https://pypi.org/project/google-cloud-bigquery/#history
66

77

8+
## [3.36.0](https://github.com/googleapis/python-bigquery/compare/v3.35.1...v3.36.0) (2025-08-20)
9+
10+
11+
### Features
12+
13+
* Add created/started/ended properties to RowIterator. ([#2260](https://github.com/googleapis/python-bigquery/issues/2260)) ([0a95b24](https://github.com/googleapis/python-bigquery/commit/0a95b24192395cc3ccf801aa9bc318999873a2bf))
14+
* Retry query jobs if `jobBackendError` or `jobInternalError` are encountered ([#2256](https://github.com/googleapis/python-bigquery/issues/2256)) ([3deff1d](https://github.com/googleapis/python-bigquery/commit/3deff1d963980800e8b79fa3aaf5b712d4fd5062))
15+
16+
17+
### Documentation
18+
19+
* Add a TROUBLESHOOTING.md file with tips for logging ([#2262](https://github.com/googleapis/python-bigquery/issues/2262)) ([b684832](https://github.com/googleapis/python-bigquery/commit/b68483227693ea68f6b12eacca2be1803cffb1d1))
20+
* Update README to break infinite redirect loop ([#2254](https://github.com/googleapis/python-bigquery/issues/2254)) ([8f03166](https://github.com/googleapis/python-bigquery/commit/8f031666114a826da2ad965f8ecd4727466cb480))
21+
822
## [3.35.1](https://github.com/googleapis/python-bigquery/compare/v3.35.0...v3.35.1) (2025-07-21)
923

1024

README.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ processing power of Google's infrastructure.
1818
.. |versions| image:: https://img.shields.io/pypi/pyversions/google-cloud-bigquery.svg
1919
:target: https://pypi.org/project/google-cloud-bigquery/
2020
.. _BigQuery: https://cloud.google.com/bigquery/what-is-bigquery
21-
.. _Client Library Documentation: https://googleapis.dev/python/bigquery/latest
21+
.. _Client Library Documentation: https://cloud.google.com/python/docs/reference/bigquery/latest/summary_overview
2222
.. _Product Documentation: https://cloud.google.com/bigquery/docs/reference/v2/
2323

2424
Quick Start

TROUBLESHOOTING.md

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# Troubleshooting steps
2+
3+
## Enable logging of BQ Storage Read API session creation
4+
5+
It can be helpful to get the BQ Storage Read API session to allow the BigQuery
6+
backend team to debug cases of API instability. The logs that share the session
7+
creation are in a module-specific logger. To enable the logs, refer to the
8+
following code sample:
9+
10+
```python
11+
import logging
12+
import google.cloud.bigquery
13+
14+
# Configure the basic logging to show DEBUG level messages
15+
log_formatter = logging.Formatter(
16+
'%(asctime)s - %(levelname)s - %(message)s'
17+
)
18+
handler = logging.StreamHandler()
19+
handler.setFormatter(log_formatter)
20+
default_logger = logging.getLogger()
21+
default_logger.setLevel(logging.DEBUG)
22+
default_logger.addHandler(handler)
23+
to_dataframe_logger = logging.getLogger("google.cloud.bigquery._pandas_helpers")
24+
to_dataframe_logger.setLevel(logging.DEBUG)
25+
to_dataframe_logger.addHandler(handler)
26+
27+
# Example code that touches the BQ Storage Read API.
28+
bqclient = google.cloud.bigquery.Client()
29+
results = bqclient.query_and_wait("SELECT * FROM `bigquery-public-data.usa_names.usa_1910_2013`")
30+
print(results.to_dataframe().head())
31+
```
32+
33+
In particular, watch for the text "with BQ Storage API session" in the logs
34+
to get the streaming API session ID to share with your support person.

google/cloud/bigquery/_job_helpers.py

Lines changed: 158 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,17 +35,22 @@
3535
predicates where it is safe to generate a new query ID.
3636
"""
3737

38+
from __future__ import annotations
39+
3840
import copy
41+
import dataclasses
42+
import datetime
3943
import functools
4044
import uuid
4145
import textwrap
42-
from typing import Any, Dict, Optional, TYPE_CHECKING, Union
46+
from typing import Any, Callable, Dict, Optional, TYPE_CHECKING, Union
4347
import warnings
4448

4549
import google.api_core.exceptions as core_exceptions
4650
from google.api_core import retry as retries
4751

4852
from google.cloud.bigquery import job
53+
import google.cloud.bigquery.job.query
4954
import google.cloud.bigquery.query
5055
from google.cloud.bigquery import table
5156
import google.cloud.bigquery.retry
@@ -116,14 +121,21 @@ def query_jobs_insert(
116121
retry: Optional[retries.Retry],
117122
timeout: Optional[float],
118123
job_retry: Optional[retries.Retry],
124+
*,
125+
callback: Callable = lambda _: None,
119126
) -> job.QueryJob:
120127
"""Initiate a query using jobs.insert.
121128
122129
See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert
130+
131+
Args:
132+
callback (Callable):
133+
A callback function used by bigframes to report query progress.
123134
"""
124135
job_id_given = job_id is not None
125136
job_id_save = job_id
126137
job_config_save = job_config
138+
query_sent_factory = QuerySentEventFactory()
127139

128140
def do_query():
129141
# Make a copy now, so that original doesn't get changed by the process
@@ -136,6 +148,16 @@ def do_query():
136148

137149
try:
138150
query_job._begin(retry=retry, timeout=timeout)
151+
if job_config is not None and not job_config.dry_run:
152+
callback(
153+
query_sent_factory(
154+
query=query,
155+
billing_project=query_job.project,
156+
location=query_job.location,
157+
job_id=query_job.job_id,
158+
request_id=None,
159+
)
160+
)
139161
except core_exceptions.Conflict as create_exc:
140162
# The thought is if someone is providing their own job IDs and they get
141163
# their job ID generation wrong, this could end up returning results for
@@ -396,6 +418,7 @@ def query_and_wait(
396418
job_retry: Optional[retries.Retry],
397419
page_size: Optional[int] = None,
398420
max_results: Optional[int] = None,
421+
callback: Callable = lambda _: None,
399422
) -> table.RowIterator:
400423
"""Run the query, wait for it to finish, and return the results.
401424
@@ -415,9 +438,8 @@ def query_and_wait(
415438
location (Optional[str]):
416439
Location where to run the job. Must match the location of the
417440
table used in the query as well as the destination table.
418-
project (Optional[str]):
419-
Project ID of the project of where to run the job. Defaults
420-
to the client's project.
441+
project (str):
442+
Project ID of the project of where to run the job.
421443
api_timeout (Optional[float]):
422444
The number of seconds to wait for the underlying HTTP transport
423445
before using ``retry``.
@@ -441,6 +463,8 @@ def query_and_wait(
441463
request. Non-positive values are ignored.
442464
max_results (Optional[int]):
443465
The maximum total number of rows from this request.
466+
callback (Callable):
467+
A callback function used by bigframes to report query progress.
444468
445469
Returns:
446470
google.cloud.bigquery.table.RowIterator:
@@ -479,12 +503,14 @@ def query_and_wait(
479503
retry=retry,
480504
timeout=api_timeout,
481505
job_retry=job_retry,
506+
callback=callback,
482507
),
483508
api_timeout=api_timeout,
484509
wait_timeout=wait_timeout,
485510
retry=retry,
486511
page_size=page_size,
487512
max_results=max_results,
513+
callback=callback,
488514
)
489515

490516
path = _to_query_path(project)
@@ -496,10 +522,24 @@ def query_and_wait(
496522
if client.default_job_creation_mode:
497523
request_body["jobCreationMode"] = client.default_job_creation_mode
498524

525+
query_sent_factory = QuerySentEventFactory()
526+
499527
def do_query():
500-
request_body["requestId"] = make_job_id()
528+
request_id = make_job_id()
529+
request_body["requestId"] = request_id
501530
span_attributes = {"path": path}
502531

532+
if "dryRun" not in request_body:
533+
callback(
534+
query_sent_factory(
535+
query=query,
536+
billing_project=project,
537+
location=location,
538+
job_id=None,
539+
request_id=request_id,
540+
)
541+
)
542+
503543
# For easier testing, handle the retries ourselves.
504544
if retry is not None:
505545
response = retry(client._call_api)(
@@ -542,8 +582,25 @@ def do_query():
542582
retry=retry,
543583
page_size=page_size,
544584
max_results=max_results,
585+
callback=callback,
545586
)
546587

588+
if "dryRun" not in request_body:
589+
callback(
590+
QueryFinishedEvent(
591+
billing_project=project,
592+
location=query_results.location,
593+
query_id=query_results.query_id,
594+
job_id=query_results.job_id,
595+
total_rows=query_results.total_rows,
596+
total_bytes_processed=query_results.total_bytes_processed,
597+
slot_millis=query_results.slot_millis,
598+
destination=None,
599+
created=query_results.created,
600+
started=query_results.started,
601+
ended=query_results.ended,
602+
)
603+
)
547604
return table.RowIterator(
548605
client=client,
549606
api_request=functools.partial(client._call_api, retry, timeout=api_timeout),
@@ -561,6 +618,9 @@ def do_query():
561618
query=query,
562619
total_bytes_processed=query_results.total_bytes_processed,
563620
slot_millis=query_results.slot_millis,
621+
created=query_results.created,
622+
started=query_results.started,
623+
ended=query_results.ended,
564624
)
565625

566626
if job_retry is not None:
@@ -612,19 +672,52 @@ def _wait_or_cancel(
612672
retry: Optional[retries.Retry],
613673
page_size: Optional[int],
614674
max_results: Optional[int],
675+
*,
676+
callback: Callable = lambda _: None,
615677
) -> table.RowIterator:
616678
"""Wait for a job to complete and return the results.
617679
618680
If we can't return the results within the ``wait_timeout``, try to cancel
619681
the job.
620682
"""
621683
try:
622-
return job.result(
684+
if not job.dry_run:
685+
callback(
686+
QueryReceivedEvent(
687+
billing_project=job.project,
688+
location=job.location,
689+
job_id=job.job_id,
690+
statement_type=job.statement_type,
691+
state=job.state,
692+
query_plan=job.query_plan,
693+
created=job.created,
694+
started=job.started,
695+
ended=job.ended,
696+
)
697+
)
698+
query_results = job.result(
623699
page_size=page_size,
624700
max_results=max_results,
625701
retry=retry,
626702
timeout=wait_timeout,
627703
)
704+
if not job.dry_run:
705+
callback(
706+
QueryFinishedEvent(
707+
billing_project=job.project,
708+
location=query_results.location,
709+
query_id=query_results.query_id,
710+
job_id=query_results.job_id,
711+
total_rows=query_results.total_rows,
712+
total_bytes_processed=query_results.total_bytes_processed,
713+
slot_millis=query_results.slot_millis,
714+
destination=job.destination,
715+
created=job.created,
716+
started=job.started,
717+
ended=job.ended,
718+
)
719+
)
720+
return query_results
628721
except Exception:
629722
# Attempt to cancel the job since we can't return the results.
630723
try:
@@ -633,3 +726,62 @@ def _wait_or_cancel(
633726
# Don't eat the original exception if cancel fails.
634727
pass
635728
raise
729+
730+
731+
@dataclasses.dataclass(frozen=True)
732+
class QueryFinishedEvent:
733+
"""Query finished successfully."""
734+
735+
billing_project: Optional[str]
736+
location: Optional[str]
737+
query_id: Optional[str]
738+
job_id: Optional[str]
739+
destination: Optional[table.TableReference]
740+
total_rows: Optional[int]
741+
total_bytes_processed: Optional[int]
742+
slot_millis: Optional[int]
743+
created: Optional[datetime.datetime]
744+
started: Optional[datetime.datetime]
745+
ended: Optional[datetime.datetime]
746+
747+
748+
@dataclasses.dataclass(frozen=True)
749+
class QueryReceivedEvent:
750+
"""Query received and acknowledged by the BigQuery API."""
751+
752+
billing_project: Optional[str]
753+
location: Optional[str]
754+
job_id: Optional[str]
755+
statement_type: Optional[str]
756+
state: Optional[str]
757+
query_plan: Optional[list[google.cloud.bigquery.job.query.QueryPlanEntry]]
758+
created: Optional[datetime.datetime]
759+
started: Optional[datetime.datetime]
760+
ended: Optional[datetime.datetime]
761+
762+
763+
@dataclasses.dataclass(frozen=True)
764+
class QuerySentEvent:
765+
"""Query sent to BigQuery."""
766+
767+
query: str
768+
billing_project: Optional[str]
769+
location: Optional[str]
770+
job_id: Optional[str]
771+
request_id: Optional[str]
772+
773+
774+
class QueryRetryEvent(QuerySentEvent):
775+
"""Query sent another time because the previous attempt failed."""
776+
777+
778+
class QuerySentEventFactory:
779+
"""Creates a QuerySentEvent first, then QueryRetryEvent after that."""
780+
781+
def __init__(self):
782+
self._event_constructor = QuerySentEvent
783+
784+
def __call__(self, **kwargs):
785+
result = self._event_constructor(**kwargs)
786+
self._event_constructor = QueryRetryEvent
787+
return result

0 commit comments

Comments
 (0)