Skip to content

Commit 44297eb

Browse files
committed
fix(go): debug stuck ingestion job
1 parent 8e2e298 commit 44297eb

File tree

5 files changed

+63
-37
lines changed

5 files changed

+63
-37
lines changed

go/bulk_ingest.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ func (bi *bigqueryBulkIngestImpl) Copy(ctx context.Context, chunk driverbase.Bul
115115
if err != nil {
116116
return errToAdbcErr(adbc.StatusIO, err, "run loader")
117117
}
118-
status, err := safeWaitForJob(ctx, job)
118+
status, err := safeWaitForJob(ctx, bi.logger, job)
119119
if err != nil {
120120
return err
121121
}
@@ -149,12 +149,15 @@ func (bi *bigqueryBulkIngestImpl) CreateTable(ctx context.Context, schema *arrow
149149
if err != nil {
150150
return err
151151
}
152-
js, err := safeWaitForJob(ctx, job)
152+
js, err := safeWaitForJob(ctx, bi.logger, job)
153153
if err != nil {
154+
bi.logger.Debug("failed to create table", "table", bi.options.TableName, "stmt", stmt, "error", err)
154155
return err
155156
} else if err = js.Err(); err != nil {
157+
bi.logger.Debug("failed to create table", "table", bi.options.TableName, "stmt", stmt, "error", err)
156158
return errToAdbcErr(adbc.StatusInternal, err, "create table")
157159
} else if !js.Done() {
160+
bi.logger.Debug("failed to create table", "table", bi.options.TableName, "stmt", stmt, "error", "did not complete")
158161
return adbc.Error{
159162
Code: adbc.StatusInternal,
160163
Msg: "[bq] CREATE TABLE query did not complete",

go/connection.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ func (c *connectionImpl) exec(ctx context.Context, stmt string, config func(*big
326326
if err != nil {
327327
return nil, err
328328
}
329-
status, err := safeWaitForJob(ctx, job)
329+
status, err := safeWaitForJob(ctx, c.Logger, job)
330330
if err != nil {
331331
return nil, err
332332
} else if err := status.Err(); err != nil {

go/record_reader.go

Lines changed: 34 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"context"
2828
"errors"
2929
"log"
30+
"log/slog"
3031
"sync/atomic"
3132

3233
"cloud.google.com/go/bigquery"
@@ -60,43 +61,46 @@ func checkContext(ctx context.Context, maybeErr error) error {
6061
return ctx.Err()
6162
}
6263

63-
func runQuery(ctx context.Context, query *bigquery.Query, executeUpdate bool) (bigquery.ArrowIterator, int64, error) {
64+
func runQuery(ctx context.Context, logger *slog.Logger, query *bigquery.Query, executeUpdate bool) (bigquery.ArrowIterator, int64, error) {
6465
job, err := query.Run(ctx)
6566
if err != nil {
6667
return nil, -1, errToAdbcErr(adbc.StatusInternal, err, "run query")
6768
}
68-
if executeUpdate {
69-
// XXX: Google SDK badness. We can't use Wait here because queries that
70-
// *fail* with a rateLimitExceeded (e.g. too many metadata operations)
71-
// will get the *polling* retried infinitely in Google's SDK (I believe
72-
// the SDK wants to retry "polling for job status" rate limit exceeded but
73-
// doesn't differentiate between them because googleapi.CheckResponse
74-
// appears to put the API error from the response object as an error of
75-
// the API call, from digging around using a debugger. In other words, it
76-
// seems to be confusing "I got an error that my API request was rate
77-
// limited" and "I got an error that my job was rate limited" because
78-
// their internal APIs mix both errors into a single error path.)
79-
js, err := safeWaitForJob(ctx, job)
80-
if err != nil {
81-
return nil, -1, err
82-
}
8369

84-
if err := js.Err(); err != nil {
85-
return nil, -1, errToAdbcErr(adbc.StatusInternal, err, "complete job")
86-
} else if !js.Done() {
87-
return nil, -1, adbc.Error{
88-
Code: adbc.StatusInternal,
89-
Msg: "[bq] Query job did not complete",
90-
}
70+
// XXX: Google SDK badness. We can't use Wait here because queries that
71+
// *fail* with a rateLimitExceeded (e.g. too many metadata operations)
72+
// will get the *polling* retried infinitely in Google's SDK (I believe
73+
// the SDK wants to retry "polling for job status" rate limit exceeded but
74+
// doesn't differentiate between them because googleapi.CheckResponse
75+
// appears to put the API error from the response object as an error of
76+
// the API call, from digging around using a debugger. In other words, it
77+
// seems to be confusing "I got an error that my API request was rate
78+
// limited" and "I got an error that my job was rate limited" because
79+
// their internal APIs mix both errors into a single error path.)
80+
js, err := safeWaitForJob(ctx, logger, job)
81+
if err != nil {
82+
return nil, -1, err
83+
}
84+
85+
if err := js.Err(); err != nil {
86+
return nil, -1, errToAdbcErr(adbc.StatusInternal, err, "complete job")
87+
} else if !js.Done() {
88+
return nil, -1, adbc.Error{
89+
Code: adbc.StatusInternal,
90+
Msg: "[bq] Query job did not complete",
9191
}
92+
}
9293

94+
if executeUpdate {
9395
stats, ok := js.Statistics.Details.(*bigquery.QueryStatistics)
9496
if ok {
9597
return nil, stats.NumDMLAffectedRows, nil
9698
}
9799
return nil, -1, nil
98100
}
99101

102+
// XXX: the Google SDK badness also applies here; it makes a similar
103+
// mistake with the retry, so we wait for the job above.
100104
iter, err := job.Read(ctx)
101105
if err != nil {
102106
return nil, -1, errToAdbcErr(adbc.StatusInternal, err, "read query results")
@@ -163,8 +167,8 @@ func getQueryParameter(values arrow.RecordBatch, row int, parameterMode string)
163167
return parameters, nil
164168
}
165169

166-
func runPlainQuery(ctx context.Context, query *bigquery.Query, alloc memory.Allocator, resultRecordBufferSize int) (bigqueryRdr *reader, totalRows int64, err error) {
167-
arrowIterator, totalRows, err := runQuery(ctx, query, false)
170+
func runPlainQuery(ctx context.Context, logger *slog.Logger, query *bigquery.Query, alloc memory.Allocator, resultRecordBufferSize int) (bigqueryRdr *reader, totalRows int64, err error) {
171+
arrowIterator, totalRows, err := runQuery(ctx, logger, query, false)
168172
if err != nil {
169173
return nil, -1, err
170174
}
@@ -209,7 +213,7 @@ func runPlainQuery(ctx context.Context, query *bigquery.Query, alloc memory.Allo
209213
return bigqueryRdr, totalRows, nil
210214
}
211215

212-
func queryRecordWithSchemaCallback(ctx context.Context, group *errgroup.Group, query *bigquery.Query, rec arrow.RecordBatch, ch chan arrow.RecordBatch, parameterMode string, alloc memory.Allocator, rdrSchema func(schema *arrow.Schema)) (int64, error) {
216+
func queryRecordWithSchemaCallback(ctx context.Context, logger *slog.Logger, group *errgroup.Group, query *bigquery.Query, rec arrow.RecordBatch, ch chan arrow.RecordBatch, parameterMode string, alloc memory.Allocator, rdrSchema func(schema *arrow.Schema)) (int64, error) {
213217
totalRows := int64(-1)
214218
for i := range int(rec.NumRows()) {
215219
parameters, err := getQueryParameter(rec, i, parameterMode)
@@ -220,7 +224,7 @@ func queryRecordWithSchemaCallback(ctx context.Context, group *errgroup.Group, q
220224
query.Parameters = parameters
221225
}
222226

223-
arrowIterator, rows, err := runQuery(ctx, query, false)
227+
arrowIterator, rows, err := runQuery(ctx, logger, query, false)
224228
if err != nil {
225229
return -1, err
226230
}
@@ -245,9 +249,9 @@ func queryRecordWithSchemaCallback(ctx context.Context, group *errgroup.Group, q
245249

246250
// kicks off a goroutine for each endpoint and returns a reader which
247251
// gathers all of the records as they come in.
248-
func newRecordReader(ctx context.Context, query *bigquery.Query, boundParameters array.RecordReader, parameterMode string, alloc memory.Allocator, resultRecordBufferSize, prefetchConcurrency int) (bigqueryRdr *reader, totalRows int64, err error) {
252+
func newRecordReader(ctx context.Context, logger *slog.Logger, query *bigquery.Query, boundParameters array.RecordReader, parameterMode string, alloc memory.Allocator, resultRecordBufferSize, prefetchConcurrency int) (bigqueryRdr *reader, totalRows int64, err error) {
249253
if boundParameters == nil {
250-
return runPlainQuery(ctx, query, alloc, resultRecordBufferSize)
254+
return runPlainQuery(ctx, logger, query, alloc, resultRecordBufferSize)
251255
}
252256
defer boundParameters.Release()
253257

@@ -283,7 +287,7 @@ func newRecordReader(ctx context.Context, query *bigquery.Query, boundParameters
283287
// Each call to Record() on the record reader is allowed to release the previous record
284288
// and since we're doing this sequentially
285289
// we don't need to call rec.Retain() here and call call rec.Release() in queryRecordWithSchemaCallback
286-
batchRows, err := queryRecordWithSchemaCallback(ctx, group, query, rec, ch, parameterMode, alloc, func(schema *arrow.Schema) {
290+
batchRows, err := queryRecordWithSchemaCallback(ctx, logger, group, query, rec, ch, parameterMode, alloc, func(schema *arrow.Schema) {
287291
bigqueryRdr.schema = schema
288292
})
289293
if err != nil {

go/statement.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ func (st *statement) ExecuteQuery(ctx context.Context) (array.RecordReader, int6
346346
}
347347
}
348348

349-
rr, totalRows, err := newRecordReader(ctx, st.query(), st.params, st.parameterMode, st.cnxn.Alloc, st.resultRecordBufferSize, st.prefetchConcurrency)
349+
rr, totalRows, err := newRecordReader(ctx, st.cnxn.Logger, st.query(), st.params, st.parameterMode, st.cnxn.Alloc, st.resultRecordBufferSize, st.prefetchConcurrency)
350350
st.params = nil
351351
return rr, totalRows, err
352352
}
@@ -360,7 +360,7 @@ func (st *statement) ExecuteUpdate(ctx context.Context) (int64, error) {
360360
}
361361

362362
if st.params == nil {
363-
_, totalRows, err := runQuery(ctx, st.query(), true)
363+
_, totalRows, err := runQuery(ctx, st.cnxn.Logger, st.query(), true)
364364
if err != nil {
365365
return -1, err
366366
}
@@ -382,7 +382,7 @@ func (st *statement) ExecuteUpdate(ctx context.Context) (int64, error) {
382382
st.queryConfig.Parameters = parameters
383383
}
384384

385-
_, currentRows, err := runQuery(ctx, st.query(), true)
385+
_, currentRows, err := runQuery(ctx, st.cnxn.Logger, st.query(), true)
386386
if err != nil {
387387
return -1, err
388388
}

go/util.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"errors"
2020
"fmt"
2121
"io"
22+
"log/slog"
2223
"net/http"
2324
"net/url"
2425
"strings"
@@ -27,6 +28,7 @@ import (
2728
"cloud.google.com/go/auth"
2829
"cloud.google.com/go/bigquery"
2930
"github.com/apache/arrow-adbc/go/adbc"
31+
"github.com/googleapis/gax-go/v2"
3032
"github.com/googleapis/gax-go/v2/apierror"
3133
"google.golang.org/api/googleapi"
3234
)
@@ -45,7 +47,13 @@ func quoteIdentifier(ident string) string {
4547
// "I got an error that my API request was rate limited" and "I got an error
4648
// that my job was rate limited" because their internal APIs mix both errors
4749
// into a single error path.)
48-
func safeWaitForJob(ctx context.Context, job *bigquery.Job) (js *bigquery.JobStatus, err error) {
50+
func safeWaitForJob(ctx context.Context, logger *slog.Logger, job *bigquery.Job) (js *bigquery.JobStatus, err error) {
51+
logger.DebugContext(ctx, "waiting for job", "id", job.ID())
52+
backoff := gax.Backoff{
53+
Initial: 50 * time.Millisecond,
54+
Multiplier: 1.3,
55+
Max: 60 * time.Second,
56+
}
4957
for {
5058
js, err = func() (*bigquery.JobStatus, error) {
5159
ctxWithDeadline, cancel := context.WithTimeout(ctx, time.Minute*5)
@@ -65,15 +73,26 @@ func safeWaitForJob(ctx context.Context, job *bigquery.Job) (js *bigquery.JobSta
6573
// and does not put the job's error into the API call's
6674
// error.
6775
if isRetryableError(err) {
76+
duration := backoff.Pause()
77+
logger.DebugContext(ctx, "retry job", "id", job.ID(), "backoff", duration, "error", err)
78+
if err := gax.Sleep(ctx, duration); err != nil {
79+
return nil, err
80+
}
81+
6882
continue
6983
}
84+
logger.DebugContext(ctx, "job failed", "id", job.ID(), "error", err)
7085
return nil, errToAdbcErr(adbc.StatusInternal, err, "poll job status")
7186
}
7287

7388
if js.Err() != nil || js.Done() {
7489
break
7590
}
91+
92+
duration := backoff.Pause()
93+
logger.DebugContext(ctx, "job not complete", "id", job.ID(), "backoff", duration)
7694
}
95+
logger.DebugContext(ctx, "job complete", "id", job.ID())
7796
return
7897
}
7998

0 commit comments

Comments
 (0)