Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions go/bulk_ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (bi *bigqueryBulkIngestImpl) Copy(ctx context.Context, chunk driverbase.Bul
if err != nil {
return errToAdbcErr(adbc.StatusIO, err, "run loader")
}
status, err := safeWaitForJob(ctx, job)
status, err := safeWaitForJob(ctx, bi.logger, job)
if err != nil {
return err
}
Expand Down Expand Up @@ -149,12 +149,15 @@ func (bi *bigqueryBulkIngestImpl) CreateTable(ctx context.Context, schema *arrow
if err != nil {
return err
}
js, err := safeWaitForJob(ctx, job)
js, err := safeWaitForJob(ctx, bi.logger, job)
if err != nil {
bi.logger.Debug("failed to create table", "table", bi.options.TableName, "stmt", stmt, "error", err)
return err
} else if err = js.Err(); err != nil {
bi.logger.Debug("failed to create table", "table", bi.options.TableName, "stmt", stmt, "error", err)
return errToAdbcErr(adbc.StatusInternal, err, "create table")
} else if !js.Done() {
bi.logger.Debug("failed to create table", "table", bi.options.TableName, "stmt", stmt, "error", "did not complete")
return adbc.Error{
Code: adbc.StatusInternal,
Msg: "[bq] CREATE TABLE query did not complete",
Expand Down
2 changes: 1 addition & 1 deletion go/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func (c *connectionImpl) exec(ctx context.Context, stmt string, config func(*big
if err != nil {
return nil, err
}
status, err := safeWaitForJob(ctx, job)
status, err := safeWaitForJob(ctx, c.Logger, job)
if err != nil {
return nil, err
} else if err := status.Err(); err != nil {
Expand Down
64 changes: 34 additions & 30 deletions go/record_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"context"
"errors"
"log"
"log/slog"
"sync/atomic"

"cloud.google.com/go/bigquery"
Expand Down Expand Up @@ -60,43 +61,46 @@ func checkContext(ctx context.Context, maybeErr error) error {
return ctx.Err()
}

func runQuery(ctx context.Context, query *bigquery.Query, executeUpdate bool) (bigquery.ArrowIterator, int64, error) {
func runQuery(ctx context.Context, logger *slog.Logger, query *bigquery.Query, executeUpdate bool) (bigquery.ArrowIterator, int64, error) {
job, err := query.Run(ctx)
if err != nil {
return nil, -1, errToAdbcErr(adbc.StatusInternal, err, "run query")
}
if executeUpdate {
// XXX: Google SDK badness. We can't use Wait here because queries that
// *fail* with a rateLimitExceeded (e.g. too many metadata operations)
// will get the *polling* retried infinitely in Google's SDK (I believe
// the SDK wants to retry "polling for job status" rate limit exceeded but
// doesn't differentiate between them because googleapi.CheckResponse
// appears to put the API error from the response object as an error of
// the API call, from digging around using a debugger. In other words, it
// seems to be confusing "I got an error that my API request was rate
// limited" and "I got an error that my job was rate limited" because
// their internal APIs mix both errors into a single error path.)
js, err := safeWaitForJob(ctx, job)
if err != nil {
return nil, -1, err
}

if err := js.Err(); err != nil {
return nil, -1, errToAdbcErr(adbc.StatusInternal, err, "complete job")
} else if !js.Done() {
return nil, -1, adbc.Error{
Code: adbc.StatusInternal,
Msg: "[bq] Query job did not complete",
}
// XXX: Google SDK badness. We can't use Wait here because queries that
// *fail* with a rateLimitExceeded (e.g. too many metadata operations)
// will get the *polling* retried infinitely in Google's SDK (I believe
// the SDK wants to retry "polling for job status" rate limit exceeded but
// doesn't differentiate between them because googleapi.CheckResponse
// appears to put the API error from the response object as an error of
// the API call, from digging around using a debugger. In other words, it
// seems to be confusing "I got an error that my API request was rate
// limited" and "I got an error that my job was rate limited" because
// their internal APIs mix both errors into a single error path.)
js, err := safeWaitForJob(ctx, logger, job)
if err != nil {
return nil, -1, err
}

if err := js.Err(); err != nil {
return nil, -1, errToAdbcErr(adbc.StatusInternal, err, "complete job")
} else if !js.Done() {
return nil, -1, adbc.Error{
Code: adbc.StatusInternal,
Msg: "[bq] Query job did not complete",
}
}

if executeUpdate {
stats, ok := js.Statistics.Details.(*bigquery.QueryStatistics)
if ok {
return nil, stats.NumDMLAffectedRows, nil
}
return nil, -1, nil
}

// XXX: the Google SDK badness also applies here; it makes a similar
// mistake with the retry, so we wait for the job above.
iter, err := job.Read(ctx)
if err != nil {
return nil, -1, errToAdbcErr(adbc.StatusInternal, err, "read query results")
Expand Down Expand Up @@ -163,8 +167,8 @@ func getQueryParameter(values arrow.RecordBatch, row int, parameterMode string)
return parameters, nil
}

func runPlainQuery(ctx context.Context, query *bigquery.Query, alloc memory.Allocator, resultRecordBufferSize int) (bigqueryRdr *reader, totalRows int64, err error) {
arrowIterator, totalRows, err := runQuery(ctx, query, false)
func runPlainQuery(ctx context.Context, logger *slog.Logger, query *bigquery.Query, alloc memory.Allocator, resultRecordBufferSize int) (bigqueryRdr *reader, totalRows int64, err error) {
arrowIterator, totalRows, err := runQuery(ctx, logger, query, false)
if err != nil {
return nil, -1, err
}
Expand Down Expand Up @@ -209,7 +213,7 @@ func runPlainQuery(ctx context.Context, query *bigquery.Query, alloc memory.Allo
return bigqueryRdr, totalRows, nil
}

func queryRecordWithSchemaCallback(ctx context.Context, group *errgroup.Group, query *bigquery.Query, rec arrow.RecordBatch, ch chan arrow.RecordBatch, parameterMode string, alloc memory.Allocator, rdrSchema func(schema *arrow.Schema)) (int64, error) {
func queryRecordWithSchemaCallback(ctx context.Context, logger *slog.Logger, group *errgroup.Group, query *bigquery.Query, rec arrow.RecordBatch, ch chan arrow.RecordBatch, parameterMode string, alloc memory.Allocator, rdrSchema func(schema *arrow.Schema)) (int64, error) {
totalRows := int64(-1)
for i := range int(rec.NumRows()) {
parameters, err := getQueryParameter(rec, i, parameterMode)
Expand All @@ -220,7 +224,7 @@ func queryRecordWithSchemaCallback(ctx context.Context, group *errgroup.Group, q
query.Parameters = parameters
}

arrowIterator, rows, err := runQuery(ctx, query, false)
arrowIterator, rows, err := runQuery(ctx, logger, query, false)
if err != nil {
return -1, err
}
Expand All @@ -245,9 +249,9 @@ func queryRecordWithSchemaCallback(ctx context.Context, group *errgroup.Group, q

// kicks off a goroutine for each endpoint and returns a reader which
// gathers all of the records as they come in.
func newRecordReader(ctx context.Context, query *bigquery.Query, boundParameters array.RecordReader, parameterMode string, alloc memory.Allocator, resultRecordBufferSize, prefetchConcurrency int) (bigqueryRdr *reader, totalRows int64, err error) {
func newRecordReader(ctx context.Context, logger *slog.Logger, query *bigquery.Query, boundParameters array.RecordReader, parameterMode string, alloc memory.Allocator, resultRecordBufferSize, prefetchConcurrency int) (bigqueryRdr *reader, totalRows int64, err error) {
if boundParameters == nil {
return runPlainQuery(ctx, query, alloc, resultRecordBufferSize)
return runPlainQuery(ctx, logger, query, alloc, resultRecordBufferSize)
}
defer boundParameters.Release()

Expand Down Expand Up @@ -283,7 +287,7 @@ func newRecordReader(ctx context.Context, query *bigquery.Query, boundParameters
// Each call to Record() on the record reader is allowed to release the previous record
// and since we're doing this sequentially
// we don't need to call rec.Retain() here and call call rec.Release() in queryRecordWithSchemaCallback
batchRows, err := queryRecordWithSchemaCallback(ctx, group, query, rec, ch, parameterMode, alloc, func(schema *arrow.Schema) {
batchRows, err := queryRecordWithSchemaCallback(ctx, logger, group, query, rec, ch, parameterMode, alloc, func(schema *arrow.Schema) {
bigqueryRdr.schema = schema
})
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions go/statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func (st *statement) ExecuteQuery(ctx context.Context) (array.RecordReader, int6
}
}

rr, totalRows, err := newRecordReader(ctx, st.query(), st.params, st.parameterMode, st.cnxn.Alloc, st.resultRecordBufferSize, st.prefetchConcurrency)
rr, totalRows, err := newRecordReader(ctx, st.cnxn.Logger, st.query(), st.params, st.parameterMode, st.cnxn.Alloc, st.resultRecordBufferSize, st.prefetchConcurrency)
st.params = nil
return rr, totalRows, err
}
Expand All @@ -360,7 +360,7 @@ func (st *statement) ExecuteUpdate(ctx context.Context) (int64, error) {
}

if st.params == nil {
_, totalRows, err := runQuery(ctx, st.query(), true)
_, totalRows, err := runQuery(ctx, st.cnxn.Logger, st.query(), true)
if err != nil {
return -1, err
}
Expand All @@ -382,7 +382,7 @@ func (st *statement) ExecuteUpdate(ctx context.Context) (int64, error) {
st.queryConfig.Parameters = parameters
}

_, currentRows, err := runQuery(ctx, st.query(), true)
_, currentRows, err := runQuery(ctx, st.cnxn.Logger, st.query(), true)
if err != nil {
return -1, err
}
Expand Down
21 changes: 20 additions & 1 deletion go/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"net/url"
"strings"
Expand All @@ -27,6 +28,7 @@ import (
"cloud.google.com/go/auth"
"cloud.google.com/go/bigquery"
"github.com/apache/arrow-adbc/go/adbc"
"github.com/googleapis/gax-go/v2"
"github.com/googleapis/gax-go/v2/apierror"
"google.golang.org/api/googleapi"
)
Expand All @@ -45,7 +47,13 @@ func quoteIdentifier(ident string) string {
// "I got an error that my API request was rate limited" and "I got an error
// that my job was rate limited" because their internal APIs mix both errors
// into a single error path.)
func safeWaitForJob(ctx context.Context, job *bigquery.Job) (js *bigquery.JobStatus, err error) {
func safeWaitForJob(ctx context.Context, logger *slog.Logger, job *bigquery.Job) (js *bigquery.JobStatus, err error) {
logger.DebugContext(ctx, "waiting for job", "id", job.ID())
backoff := gax.Backoff{
Initial: 50 * time.Millisecond,
Multiplier: 1.3,
Max: 60 * time.Second,
}
for {
js, err = func() (*bigquery.JobStatus, error) {
ctxWithDeadline, cancel := context.WithTimeout(ctx, time.Minute*5)
Expand All @@ -65,15 +73,26 @@ func safeWaitForJob(ctx context.Context, job *bigquery.Job) (js *bigquery.JobSta
// and does not put the job's error into the API call's
// error.
if isRetryableError(err) {
duration := backoff.Pause()
logger.DebugContext(ctx, "retry job", "id", job.ID(), "backoff", duration, "error", err)
if err := gax.Sleep(ctx, duration); err != nil {
return nil, err
}

continue
}
logger.DebugContext(ctx, "job failed", "id", job.ID(), "error", err)
return nil, errToAdbcErr(adbc.StatusInternal, err, "poll job status")
}

if js.Err() != nil || js.Done() {
break
}

duration := backoff.Pause()
logger.DebugContext(ctx, "job not complete", "id", job.ID(), "backoff", duration)
}
logger.DebugContext(ctx, "job complete", "id", job.ID())
return
}

Expand Down
Loading