From 44297eb71a6f246a3f24e404aca8a788896111cd Mon Sep 17 00:00:00 2001
From: David Li
Date: Tue, 30 Dec 2025 14:44:29 +0900
Subject: [PATCH] fix(go): debug stuck ingestion job
---
go/bulk_ingest.go | 7 +++--
go/connection.go | 2 +-
go/record_reader.go | 64 ++++++++++++++++++++++++---------------------
go/statement.go | 6 ++---
go/util.go | 21 ++++++++++++++-
5 files changed, 63 insertions(+), 37 deletions(-)
diff --git a/go/bulk_ingest.go b/go/bulk_ingest.go
index c4b42ee..9778b94 100644
--- a/go/bulk_ingest.go
+++ b/go/bulk_ingest.go
@@ -115,7 +115,7 @@ func (bi *bigqueryBulkIngestImpl) Copy(ctx context.Context, chunk driverbase.Bul
if err != nil {
return errToAdbcErr(adbc.StatusIO, err, "run loader")
}
- status, err := safeWaitForJob(ctx, job)
+ status, err := safeWaitForJob(ctx, bi.logger, job)
if err != nil {
return err
}
@@ -149,12 +149,15 @@ func (bi *bigqueryBulkIngestImpl) CreateTable(ctx context.Context, schema *arrow
if err != nil {
return err
}
- js, err := safeWaitForJob(ctx, job)
+ js, err := safeWaitForJob(ctx, bi.logger, job)
if err != nil {
+ bi.logger.Debug("failed to create table", "table", bi.options.TableName, "stmt", stmt, "error", err)
return err
} else if err = js.Err(); err != nil {
+ bi.logger.Debug("failed to create table", "table", bi.options.TableName, "stmt", stmt, "error", err)
return errToAdbcErr(adbc.StatusInternal, err, "create table")
} else if !js.Done() {
+ bi.logger.Debug("failed to create table", "table", bi.options.TableName, "stmt", stmt, "error", "did not complete")
return adbc.Error{
Code: adbc.StatusInternal,
Msg: "[bq] CREATE TABLE query did not complete",
diff --git a/go/connection.go b/go/connection.go
index 52b96ed..a8636c7 100644
--- a/go/connection.go
+++ b/go/connection.go
@@ -326,7 +326,7 @@ func (c *connectionImpl) exec(ctx context.Context, stmt string, config func(*big
if err != nil {
return nil, err
}
- status, err := safeWaitForJob(ctx, job)
+ status, err := safeWaitForJob(ctx, c.Logger, job)
if err != nil {
return nil, err
} else if err := status.Err(); err != nil {
diff --git a/go/record_reader.go b/go/record_reader.go
index e9476cd..b1fa6d1 100644
--- a/go/record_reader.go
+++ b/go/record_reader.go
@@ -27,6 +27,7 @@ import (
"context"
"errors"
"log"
+ "log/slog"
"sync/atomic"
"cloud.google.com/go/bigquery"
@@ -60,36 +61,37 @@ func checkContext(ctx context.Context, maybeErr error) error {
return ctx.Err()
}
-func runQuery(ctx context.Context, query *bigquery.Query, executeUpdate bool) (bigquery.ArrowIterator, int64, error) {
+func runQuery(ctx context.Context, logger *slog.Logger, query *bigquery.Query, executeUpdate bool) (bigquery.ArrowIterator, int64, error) {
job, err := query.Run(ctx)
if err != nil {
return nil, -1, errToAdbcErr(adbc.StatusInternal, err, "run query")
}
- if executeUpdate {
- // XXX: Google SDK badness. We can't use Wait here because queries that
- // *fail* with a rateLimitExceeded (e.g. too many metadata operations)
- // will get the *polling* retried infinitely in Google's SDK (I believe
- // the SDK wants to retry "polling for job status" rate limit exceeded but
- // doesn't differentiate between them because googleapi.CheckResponse
- // appears to put the API error from the response object as an error of
- // the API call, from digging around using a debugger. In other words, it
- // seems to be confusing "I got an error that my API request was rate
- // limited" and "I got an error that my job was rate limited" because
- // their internal APIs mix both errors into a single error path.)
- js, err := safeWaitForJob(ctx, job)
- if err != nil {
- return nil, -1, err
- }
- if err := js.Err(); err != nil {
- return nil, -1, errToAdbcErr(adbc.StatusInternal, err, "complete job")
- } else if !js.Done() {
- return nil, -1, adbc.Error{
- Code: adbc.StatusInternal,
- Msg: "[bq] Query job did not complete",
- }
+ // XXX: Google SDK badness. We can't use Wait here because queries that
+ // *fail* with a rateLimitExceeded (e.g. too many metadata operations)
+ // will get the *polling* retried infinitely in Google's SDK (I believe
+ // the SDK wants to retry "polling for job status" rate limit exceeded but
+ // doesn't differentiate between them because googleapi.CheckResponse
+ // appears to put the API error from the response object as an error of
+ // the API call, from digging around using a debugger. In other words, it
+ // seems to be confusing "I got an error that my API request was rate
+ // limited" and "I got an error that my job was rate limited" because
+ // their internal APIs mix both errors into a single error path.)
+ js, err := safeWaitForJob(ctx, logger, job)
+ if err != nil {
+ return nil, -1, err
+ }
+
+ if err := js.Err(); err != nil {
+ return nil, -1, errToAdbcErr(adbc.StatusInternal, err, "complete job")
+ } else if !js.Done() {
+ return nil, -1, adbc.Error{
+ Code: adbc.StatusInternal,
+ Msg: "[bq] Query job did not complete",
}
+ }
+ if executeUpdate {
stats, ok := js.Statistics.Details.(*bigquery.QueryStatistics)
if ok {
return nil, stats.NumDMLAffectedRows, nil
@@ -97,6 +99,8 @@ func runQuery(ctx context.Context, query *bigquery.Query, executeUpdate bool) (b
return nil, -1, nil
}
+ // XXX: the Google SDK badness also applies here; it makes a similar
+ // mistake with the retry, so we wait for the job above.
iter, err := job.Read(ctx)
if err != nil {
return nil, -1, errToAdbcErr(adbc.StatusInternal, err, "read query results")
@@ -163,8 +167,8 @@ func getQueryParameter(values arrow.RecordBatch, row int, parameterMode string)
return parameters, nil
}
-func runPlainQuery(ctx context.Context, query *bigquery.Query, alloc memory.Allocator, resultRecordBufferSize int) (bigqueryRdr *reader, totalRows int64, err error) {
- arrowIterator, totalRows, err := runQuery(ctx, query, false)
+func runPlainQuery(ctx context.Context, logger *slog.Logger, query *bigquery.Query, alloc memory.Allocator, resultRecordBufferSize int) (bigqueryRdr *reader, totalRows int64, err error) {
+ arrowIterator, totalRows, err := runQuery(ctx, logger, query, false)
if err != nil {
return nil, -1, err
}
@@ -209,7 +213,7 @@ func runPlainQuery(ctx context.Context, query *bigquery.Query, alloc memory.Allo
return bigqueryRdr, totalRows, nil
}
-func queryRecordWithSchemaCallback(ctx context.Context, group *errgroup.Group, query *bigquery.Query, rec arrow.RecordBatch, ch chan arrow.RecordBatch, parameterMode string, alloc memory.Allocator, rdrSchema func(schema *arrow.Schema)) (int64, error) {
+func queryRecordWithSchemaCallback(ctx context.Context, logger *slog.Logger, group *errgroup.Group, query *bigquery.Query, rec arrow.RecordBatch, ch chan arrow.RecordBatch, parameterMode string, alloc memory.Allocator, rdrSchema func(schema *arrow.Schema)) (int64, error) {
totalRows := int64(-1)
for i := range int(rec.NumRows()) {
parameters, err := getQueryParameter(rec, i, parameterMode)
@@ -220,7 +224,7 @@ func queryRecordWithSchemaCallback(ctx context.Context, group *errgroup.Group, q
query.Parameters = parameters
}
- arrowIterator, rows, err := runQuery(ctx, query, false)
+ arrowIterator, rows, err := runQuery(ctx, logger, query, false)
if err != nil {
return -1, err
}
@@ -245,9 +249,9 @@ func queryRecordWithSchemaCallback(ctx context.Context, group *errgroup.Group, q
// kicks off a goroutine for each endpoint and returns a reader which
// gathers all of the records as they come in.
-func newRecordReader(ctx context.Context, query *bigquery.Query, boundParameters array.RecordReader, parameterMode string, alloc memory.Allocator, resultRecordBufferSize, prefetchConcurrency int) (bigqueryRdr *reader, totalRows int64, err error) {
+func newRecordReader(ctx context.Context, logger *slog.Logger, query *bigquery.Query, boundParameters array.RecordReader, parameterMode string, alloc memory.Allocator, resultRecordBufferSize, prefetchConcurrency int) (bigqueryRdr *reader, totalRows int64, err error) {
if boundParameters == nil {
- return runPlainQuery(ctx, query, alloc, resultRecordBufferSize)
+ return runPlainQuery(ctx, logger, query, alloc, resultRecordBufferSize)
}
defer boundParameters.Release()
@@ -283,7 +287,7 @@ func newRecordReader(ctx context.Context, query *bigquery.Query, boundParameters
// Each call to Record() on the record reader is allowed to release the previous record
// and since we're doing this sequentially
// we don't need to call rec.Retain() here and call call rec.Release() in queryRecordWithSchemaCallback
- batchRows, err := queryRecordWithSchemaCallback(ctx, group, query, rec, ch, parameterMode, alloc, func(schema *arrow.Schema) {
+ batchRows, err := queryRecordWithSchemaCallback(ctx, logger, group, query, rec, ch, parameterMode, alloc, func(schema *arrow.Schema) {
bigqueryRdr.schema = schema
})
if err != nil {
diff --git a/go/statement.go b/go/statement.go
index 8114c74..3de7ab6 100644
--- a/go/statement.go
+++ b/go/statement.go
@@ -346,7 +346,7 @@ func (st *statement) ExecuteQuery(ctx context.Context) (array.RecordReader, int6
}
}
- rr, totalRows, err := newRecordReader(ctx, st.query(), st.params, st.parameterMode, st.cnxn.Alloc, st.resultRecordBufferSize, st.prefetchConcurrency)
+ rr, totalRows, err := newRecordReader(ctx, st.cnxn.Logger, st.query(), st.params, st.parameterMode, st.cnxn.Alloc, st.resultRecordBufferSize, st.prefetchConcurrency)
st.params = nil
return rr, totalRows, err
}
@@ -360,7 +360,7 @@ func (st *statement) ExecuteUpdate(ctx context.Context) (int64, error) {
}
if st.params == nil {
- _, totalRows, err := runQuery(ctx, st.query(), true)
+ _, totalRows, err := runQuery(ctx, st.cnxn.Logger, st.query(), true)
if err != nil {
return -1, err
}
@@ -382,7 +382,7 @@ func (st *statement) ExecuteUpdate(ctx context.Context) (int64, error) {
st.queryConfig.Parameters = parameters
}
- _, currentRows, err := runQuery(ctx, st.query(), true)
+ _, currentRows, err := runQuery(ctx, st.cnxn.Logger, st.query(), true)
if err != nil {
return -1, err
}
diff --git a/go/util.go b/go/util.go
index b2a9cae..1911b8e 100644
--- a/go/util.go
+++ b/go/util.go
@@ -19,6 +19,7 @@ import (
"errors"
"fmt"
"io"
+ "log/slog"
"net/http"
"net/url"
"strings"
@@ -27,6 +28,7 @@ import (
"cloud.google.com/go/auth"
"cloud.google.com/go/bigquery"
"github.com/apache/arrow-adbc/go/adbc"
+ "github.com/googleapis/gax-go/v2"
"github.com/googleapis/gax-go/v2/apierror"
"google.golang.org/api/googleapi"
)
@@ -45,7 +47,13 @@ func quoteIdentifier(ident string) string {
// "I got an error that my API request was rate limited" and "I got an error
// that my job was rate limited" because their internal APIs mix both errors
// into a single error path.)
-func safeWaitForJob(ctx context.Context, job *bigquery.Job) (js *bigquery.JobStatus, err error) {
+func safeWaitForJob(ctx context.Context, logger *slog.Logger, job *bigquery.Job) (js *bigquery.JobStatus, err error) {
+ logger.DebugContext(ctx, "waiting for job", "id", job.ID())
+ backoff := gax.Backoff{
+ Initial: 50 * time.Millisecond,
+ Multiplier: 1.3,
+ Max: 60 * time.Second,
+ }
for {
js, err = func() (*bigquery.JobStatus, error) {
ctxWithDeadline, cancel := context.WithTimeout(ctx, time.Minute*5)
@@ -65,15 +73,26 @@ func safeWaitForJob(ctx context.Context, job *bigquery.Job) (js *bigquery.JobSta
// and does not put the job's error into the API call's
// error.
if isRetryableError(err) {
+ duration := backoff.Pause()
+ logger.DebugContext(ctx, "retry job", "id", job.ID(), "backoff", duration, "error", err)
+ if err := gax.Sleep(ctx, duration); err != nil {
+ return nil, err
+ }
+
continue
}
+ logger.DebugContext(ctx, "job failed", "id", job.ID(), "error", err)
return nil, errToAdbcErr(adbc.StatusInternal, err, "poll job status")
}
if js.Err() != nil || js.Done() {
break
}
+
+ duration := backoff.Pause()
+ logger.DebugContext(ctx, "job not complete", "id", job.ID(), "backoff", duration)
}
+ logger.DebugContext(ctx, "job complete", "id", job.ID())
return
}