Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 21 additions & 12 deletions go/bulk_ingest_storage_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ func (impl *storageWriteBulkIngestImpl) createWriteStream(ctx context.Context) e
var apiError *apierror.APIError
if errors.As(err, &apiError) && apiError.GRPCStatus().Code() == codes.NotFound {
impl.logger.Debug("retrying create write stream", "table", impl.tableReference, "error", err)
return false, nil
return false, err
}
return false, err
return true, err
}); err != nil {
return errToAdbcErr(adbc.StatusIO, err, "create write stream for %s", impl.tableReference)
}
Expand Down Expand Up @@ -277,7 +277,6 @@ func (impl *storageWriteBulkIngestImpl) Copy(ctx context.Context, chunk driverba
},
}
if impl.appendStream == nil {
// Include schema on first request
if err := impl.createWriteStream(ctx); err != nil {
return err
}
Expand All @@ -296,15 +295,23 @@ func (impl *storageWriteBulkIngestImpl) Copy(ctx context.Context, chunk driverba
Rows: rows,
}

// Send the schema and the first batch to try to flush out an
// issue where BQ doesn't seem to know that the stream
// exists...
// The first time we try to write, BigQuery appears to have an issue
// where it thinks the stream doesn't exist, even though we just
// created it, so retry if needed
if err := retry(ctx, "append rows to "+impl.tableReference, func() (bool, error) {
err := func() error {
appendStream, err := impl.writeClient.AppendRows(ctx)
if err != nil {
return errToAdbcErr(adbc.StatusIO, err, "begin AppendRows(%s)", impl.tableReference)
var appendStream storagepb.BigQueryWrite_AppendRowsClient

if impl.appendStream != nil {
appendStream = impl.appendStream
} else {
var err error
appendStream, err = impl.writeClient.AppendRows(ctx)
if err != nil {
return errToAdbcErr(adbc.StatusIO, err, "begin AppendRows(%s)", impl.tableReference)
}
}

if err := appendStream.Send(req); err != nil {
return errToAdbcErr(adbc.StatusIO, err, "send AppendRows(%s)", impl.tableReference)
}
Expand All @@ -326,16 +333,18 @@ func (impl *storageWriteBulkIngestImpl) Copy(ctx context.Context, chunk driverba
}

// Only retry on the first request; if we've already created
// the stream, we can't recreate it. Weirdly annoyingly, gRPC
// returns a status.Error which is an internal type
// the stream, we don't want to recreate it. Weirdly
// annoyingly, gRPC returns a status.Error which is an
// internal type, making it hard to determine whether to retry
// TODO(lidavidm): maybe it's OK to recreate appendStream?
if impl.appendStream == nil && strings.Contains(err.Error(), "NotFound") {
impl.logger.Debug("retrying AppendRows",
"table", impl.tableReference,
"error", err)

return false, nil
}
return false, err
return true, err
}); err != nil {
return errToAdbcErr(adbc.StatusIO, err, "AppendRows(%s)", impl.tableReference)
}
Expand Down
16 changes: 8 additions & 8 deletions go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ go 1.26.0
require (
cloud.google.com/go/auth v0.18.2
cloud.google.com/go/bigquery v1.73.1
github.com/adbc-drivers/driverbase-go/driverbase v0.0.0-20260211074943-935104712593
github.com/adbc-drivers/driverbase-go/driverbase v0.0.0-20260217234140-addad9015731
github.com/adbc-drivers/driverbase-go/testutil v0.0.0-20251204005500-ef1acb8b4023
github.com/apache/arrow-adbc/go/adbc v1.10.0
github.com/apache/arrow-go/v18 v18.5.1
Expand All @@ -28,7 +28,7 @@ require (
github.com/stretchr/testify v1.11.1
golang.org/x/oauth2 v0.35.0
golang.org/x/sync v0.19.0
google.golang.org/api v0.266.0
google.golang.org/api v0.267.0
google.golang.org/grpc v1.79.1
)

Expand All @@ -50,7 +50,7 @@ require (
github.com/google/flatbuffers v25.12.19+incompatible // indirect
github.com/google/s2a-go v0.1.9 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.12 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.8 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.18.4 // indirect
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
Expand All @@ -72,18 +72,18 @@ require (
go.opentelemetry.io/otel/trace v1.40.0 // indirect
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
golang.org/x/crypto v0.48.0 // indirect
golang.org/x/exp v0.0.0-20260209203927-2842357ff358 // indirect
golang.org/x/exp v0.0.0-20260212183809-81e46e3db34a // indirect
golang.org/x/mod v0.33.0 // indirect
golang.org/x/net v0.50.0 // indirect
golang.org/x/sys v0.41.0 // indirect
golang.org/x/telemetry v0.0.0-20260209163413-e7419c687ee4 // indirect
golang.org/x/telemetry v0.0.0-20260213145524-e0ab670178e1 // indirect
golang.org/x/text v0.34.0 // indirect
golang.org/x/time v0.14.0 // indirect
golang.org/x/tools v0.42.0 // indirect
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
google.golang.org/genproto v0.0.0-20260209200024-4cfbd4190f57 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260209200024-4cfbd4190f57 // indirect
google.golang.org/genproto v0.0.0-20260217215200-42d3e9bedb6d // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20260217215200-42d3e9bedb6d // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260217215200-42d3e9bedb6d // indirect
google.golang.org/protobuf v1.36.11 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Expand Down
32 changes: 16 additions & 16 deletions go/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.54
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.54.0/go.mod h1:l9rva3ApbBpEJxSNYnwT9N4CDLrWgtq3u8736C5hyJw=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.54.0 h1:s0WlVbf9qpvkh1c/uDAPElam0WrL7fHRIidgZJ7UqZI=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.54.0/go.mod h1:Mf6O40IAyB9zR/1J8nGDDPirZQQPbYJni8Yisy7NTMc=
github.com/adbc-drivers/driverbase-go/driverbase v0.0.0-20260211074943-935104712593 h1:kqhGuO/CPxPPbNY+14CiRgEohIIh9gJKeNapHwW2L8E=
github.com/adbc-drivers/driverbase-go/driverbase v0.0.0-20260211074943-935104712593/go.mod h1:GbMifodY8nLi/TDF5wNzCjUO0nvDK1w8i5SAGJ8UegQ=
github.com/adbc-drivers/driverbase-go/driverbase v0.0.0-20260217234140-addad9015731 h1:zte+OgJNsJJ5eGA3S1oD9YxnL27FUQU5QMHtORfJaSY=
github.com/adbc-drivers/driverbase-go/driverbase v0.0.0-20260217234140-addad9015731/go.mod h1:GbMifodY8nLi/TDF5wNzCjUO0nvDK1w8i5SAGJ8UegQ=
github.com/adbc-drivers/driverbase-go/testutil v0.0.0-20251204005500-ef1acb8b4023 h1:7TIuxwU4EkneKSP6QrWH/WPrVyicU0ST+XufYGjuBFI=
github.com/adbc-drivers/driverbase-go/testutil v0.0.0-20251204005500-ef1acb8b4023/go.mod h1:4Wtj2lGJ1tfR/X1GJXy+XC9g8DBg8HiaOETHygGj+Tc=
github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ=
Expand Down Expand Up @@ -78,8 +78,8 @@ github.com/googleapis/enterprise-certificate-proxy v0.3.12 h1:Fg+zsqzYEs1Znvmczt
github.com/googleapis/enterprise-certificate-proxy v0.3.12/go.mod h1:vqVt9yG9480NtzREnTlmGSBmFrA+bzb0yl0TxoBQXOg=
github.com/googleapis/gax-go/v2 v2.17.0 h1:RksgfBpxqff0EZkDWYuz9q/uWsTVz+kf43LsZ1J6SMc=
github.com/googleapis/gax-go/v2 v2.17.0/go.mod h1:mzaqghpQp4JDh3HvADwrat+6M3MOIDp5YKHhb9PAgDY=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.8 h1:NpbJl/eVbvrGE0MJ6X16X9SAifesl6Fwxg/YmCvubRI=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.8/go.mod h1:mi7YA+gCzVem12exXy46ZespvGtX/lZmD/RLnQhVW7U=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 h1:HWRh5R2+9EifMyIHV7ZV+MIZqgz+PMpZ14Jynv3O2Zs=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0/go.mod h1:JfhWUomR1baixubs02l85lZYYOm7LV6om4ceouMv45c=
github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4=
github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE=
github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c=
Expand Down Expand Up @@ -148,8 +148,8 @@ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts=
golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos=
golang.org/x/exp v0.0.0-20260209203927-2842357ff358 h1:kpfSV7uLwKJbFSEgNhWzGSL47NDSF/5pYYQw1V0ub6c=
golang.org/x/exp v0.0.0-20260209203927-2842357ff358/go.mod h1:R3t0oliuryB5eenPWl3rrQxwnNM3WTwnsRZZiXLAAW8=
golang.org/x/exp v0.0.0-20260212183809-81e46e3db34a h1:ovFr6Z0MNmU7nH8VaX5xqw+05ST2uO1exVfZPVqRC5o=
golang.org/x/exp v0.0.0-20260212183809-81e46e3db34a/go.mod h1:K79w1Vqn7PoiZn+TkNpx3BUWUQksGO3JcVX6qIjytmA=
golang.org/x/mod v0.33.0 h1:tHFzIWbBifEmbwtGz65eaWyGiGZatSrT9prnU8DbVL8=
golang.org/x/mod v0.33.0/go.mod h1:swjeQEj+6r7fODbD2cqrnje9PnziFuw4bmLbBZFrQ5w=
golang.org/x/net v0.50.0 h1:ucWh9eiCGyDR3vtzso0WMQinm2Dnt8cFMuQa9K33J60=
Expand All @@ -160,8 +160,8 @@ golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4=
golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k=
golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/telemetry v0.0.0-20260209163413-e7419c687ee4 h1:bTLqdHv7xrGlFbvf5/TXNxy/iUwwdkjhqQTJDjW7aj0=
golang.org/x/telemetry v0.0.0-20260209163413-e7419c687ee4/go.mod h1:g5NllXBEermZrmR51cJDQxmJUHUOfRAaNyWBM+R+548=
golang.org/x/telemetry v0.0.0-20260213145524-e0ab670178e1 h1:QNaHp8YvpPswfDNxlCmJyeesxbGOgaKf41iT9/QrErY=
golang.org/x/telemetry v0.0.0-20260213145524-e0ab670178e1/go.mod h1:NuITXsA9cTiqnXtVk+/wrBT2Ja4X5hsfGOYRJ6kgYjs=
golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk=
golang.org/x/text v0.34.0/go.mod h1:homfLqTYRFyVYemLBFl5GgL/DWEiH5wcsQ5gSh1yziA=
golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI=
Expand All @@ -172,14 +172,14 @@ golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da h1:noIWHXmPHxILtqtCOPIhS
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90=
gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
google.golang.org/api v0.266.0 h1:hco+oNCf9y7DmLeAtHJi/uBAY7n/7XC9mZPxu1ROiyk=
google.golang.org/api v0.266.0/go.mod h1:Jzc0+ZfLnyvXma3UtaTl023TdhZu6OMBP9tJ+0EmFD0=
google.golang.org/genproto v0.0.0-20260209200024-4cfbd4190f57 h1:uZSB/r2MjH9IsqpG2vRNSV1Juteix90oHe8oTcLW9tk=
google.golang.org/genproto v0.0.0-20260209200024-4cfbd4190f57/go.mod h1:nGuPfp0lnDJcJD0J47StV0Skgnw3qMSQhjsLKiejq5Y=
google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57 h1:JLQynH/LBHfCTSbDWl+py8C+Rg/k1OVH3xfcaiANuF0=
google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57/go.mod h1:kSJwQxqmFXeo79zOmbrALdflXQeAYcUbgS7PbpMknCY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260209200024-4cfbd4190f57 h1:mWPCjDEyshlQYzBpMNHaEof6UX1PmHcaUODUywQ0uac=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260209200024-4cfbd4190f57/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ=
google.golang.org/api v0.267.0 h1:w+vfWPMPYeRs8qH1aYYsFX68jMls5acWl/jocfLomwE=
google.golang.org/api v0.267.0/go.mod h1:Jzc0+ZfLnyvXma3UtaTl023TdhZu6OMBP9tJ+0EmFD0=
google.golang.org/genproto v0.0.0-20260217215200-42d3e9bedb6d h1:vsOm753cOAMkt76efriTCDKjpCbK18XGHMJHo0JUKhc=
google.golang.org/genproto v0.0.0-20260217215200-42d3e9bedb6d/go.mod h1:0oz9d7g9QLSdv9/lgbIjowW1JoxMbxmBVNe8i6tORJI=
google.golang.org/genproto/googleapis/api v0.0.0-20260217215200-42d3e9bedb6d h1:EocjzKLywydp5uZ5tJ79iP6Q0UjDnyiHkGRWxuPBP8s=
google.golang.org/genproto/googleapis/api v0.0.0-20260217215200-42d3e9bedb6d/go.mod h1:48U2I+QQUYhsFrg2SY6r+nJzeOtjey7j//WBESw+qyQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260217215200-42d3e9bedb6d h1:t/LOSXPJ9R0B6fnZNyALBRfZBH0Uy0gT+uR+SJ6syqQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260217215200-42d3e9bedb6d/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8=
google.golang.org/grpc v1.79.1 h1:zGhSi45ODB9/p3VAawt9a+O/MULLl9dpizzNNpq7flY=
google.golang.org/grpc v1.79.1/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
Expand Down
12 changes: 5 additions & 7 deletions go/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,22 +231,20 @@ func retryWithBackoff(ctx context.Context, context string, maxAttempts int, back
attempt := 0
for {
complete, err := f()
if err != nil {
if complete {
return err
} else if complete {
return nil
}

duration := backoff.Pause()
if err := gax.Sleep(ctx, duration); err != nil {
if sleepErr := gax.Sleep(ctx, duration); sleepErr != nil {
return err
}
attempt++

if attempt >= maxAttempts {
return adbc.Error{
Code: adbc.StatusInternal,
Msg: fmt.Sprintf("[bq] could not %s: maximum retry attempts exceeded", context),
Msg: fmt.Sprintf("[bq] could not %s: maximum retry attempts exceeded: %v", context, err),
}
}
}
Expand All @@ -256,7 +254,7 @@ func retry(ctx context.Context, context string, f func() (bool, error)) error {
backoff := gax.Backoff{
Initial: 100 * time.Millisecond,
Multiplier: 2.0,
Max: 10 * time.Second,
Max: 15 * time.Second,
}
return retryWithBackoff(ctx, context, 15, backoff, f)
return retryWithBackoff(ctx, context, 20, backoff, f)
}
Loading