diff --git a/go/bulk_ingest_storage_write.go b/go/bulk_ingest_storage_write.go index 621f3d2..14c86c3 100644 --- a/go/bulk_ingest_storage_write.go +++ b/go/bulk_ingest_storage_write.go @@ -76,9 +76,9 @@ func (impl *storageWriteBulkIngestImpl) createWriteStream(ctx context.Context) e var apiError *apierror.APIError if errors.As(err, &apiError) && apiError.GRPCStatus().Code() == codes.NotFound { impl.logger.Debug("retrying create write stream", "table", impl.tableReference, "error", err) - return false, nil + return false, err } - return false, err + return true, err }); err != nil { return errToAdbcErr(adbc.StatusIO, err, "create write stream for %s", impl.tableReference) } @@ -277,7 +277,6 @@ func (impl *storageWriteBulkIngestImpl) Copy(ctx context.Context, chunk driverba }, } if impl.appendStream == nil { - // Include schema on first request if err := impl.createWriteStream(ctx); err != nil { return err } @@ -296,15 +295,23 @@ func (impl *storageWriteBulkIngestImpl) Copy(ctx context.Context, chunk driverba Rows: rows, } - // Send the schema and the first batch to try to flush out an - // issue where BQ doesn't seem to know that the stream - // exists... + // The first time we try to write, BigQuery appears to have an issue + // where it thinks the stream doesn't exist, even though we just + // created it, so retry if needed if err := retry(ctx, "append rows to "+impl.tableReference, func() (bool, error) { err := func() error { - appendStream, err := impl.writeClient.AppendRows(ctx) - if err != nil { - return errToAdbcErr(adbc.StatusIO, err, "begin AppendRows(%s)", impl.tableReference) + var appendStream storagepb.BigQueryWrite_AppendRowsClient + + if impl.appendStream != nil { + appendStream = impl.appendStream + } else { + var err error + appendStream, err = impl.writeClient.AppendRows(ctx) + if err != nil { + return errToAdbcErr(adbc.StatusIO, err, "begin AppendRows(%s)", impl.tableReference) + } } + if err := appendStream.Send(req); err != nil { return errToAdbcErr(adbc.StatusIO, err, "send AppendRows(%s)", impl.tableReference) } @@ -326,8 +333,10 @@ func (impl *storageWriteBulkIngestImpl) Copy(ctx context.Context, chunk driverba } // Only retry on the first request; if we've already created - // the stream, we can't recreate it. Weirdly annoyingly, gRPC - // returns a status.Error which is an internal type + // the stream, we don't want to recreate it. Weirdly + // annoyingly, gRPC returns a status.Error which is an + // internal type, making it hard to determine whether to retry + // TODO(lidavidm): maybe it's OK to recreate appendStream? if impl.appendStream == nil && strings.Contains(err.Error(), "NotFound") { impl.logger.Debug("retrying AppendRows", "table", impl.tableReference, @@ -335,7 +344,7 @@ func (impl *storageWriteBulkIngestImpl) Copy(ctx context.Context, chunk driverba return false, nil } - return false, err + return true, err }); err != nil { return errToAdbcErr(adbc.StatusIO, err, "AppendRows(%s)", impl.tableReference) } diff --git a/go/go.mod b/go/go.mod index c073667..d4c413b 100644 --- a/go/go.mod +++ b/go/go.mod @@ -19,7 +19,7 @@ go 1.26.0 require ( cloud.google.com/go/auth v0.18.2 cloud.google.com/go/bigquery v1.73.1 - github.com/adbc-drivers/driverbase-go/driverbase v0.0.0-20260211074943-935104712593 + github.com/adbc-drivers/driverbase-go/driverbase v0.0.0-20260217234140-addad9015731 github.com/adbc-drivers/driverbase-go/testutil v0.0.0-20251204005500-ef1acb8b4023 github.com/apache/arrow-adbc/go/adbc v1.10.0 github.com/apache/arrow-go/v18 v18.5.1 @@ -28,7 +28,7 @@ require ( github.com/stretchr/testify v1.11.1 golang.org/x/oauth2 v0.35.0 golang.org/x/sync v0.19.0 - google.golang.org/api v0.266.0 + google.golang.org/api v0.267.0 google.golang.org/grpc v1.79.1 ) @@ -50,7 +50,7 @@ require ( github.com/google/flatbuffers v25.12.19+incompatible // indirect github.com/google/s2a-go v0.1.9 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.12 // indirect - github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.8 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 // indirect github.com/klauspost/asmfmt v1.3.2 // indirect github.com/klauspost/compress v1.18.4 // indirect github.com/klauspost/cpuid/v2 v2.3.0 // indirect @@ -72,18 +72,18 @@ require ( go.opentelemetry.io/otel/trace v1.40.0 // indirect go.opentelemetry.io/proto/otlp v1.9.0 // indirect golang.org/x/crypto v0.48.0 // indirect - golang.org/x/exp v0.0.0-20260209203927-2842357ff358 // indirect + golang.org/x/exp v0.0.0-20260212183809-81e46e3db34a // indirect golang.org/x/mod v0.33.0 // indirect golang.org/x/net v0.50.0 // indirect golang.org/x/sys v0.41.0 // indirect - golang.org/x/telemetry v0.0.0-20260209163413-e7419c687ee4 // indirect + golang.org/x/telemetry v0.0.0-20260213145524-e0ab670178e1 // indirect golang.org/x/text v0.34.0 // indirect golang.org/x/time v0.14.0 // indirect golang.org/x/tools v0.42.0 // indirect golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect - google.golang.org/genproto v0.0.0-20260209200024-4cfbd4190f57 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20260209200024-4cfbd4190f57 // indirect + google.golang.org/genproto v0.0.0-20260217215200-42d3e9bedb6d // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20260217215200-42d3e9bedb6d // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20260217215200-42d3e9bedb6d // indirect google.golang.org/protobuf v1.36.11 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go/go.sum b/go/go.sum index ae765dd..99f4dd4 100644 --- a/go/go.sum +++ b/go/go.sum @@ -24,8 +24,8 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.54 github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.54.0/go.mod h1:l9rva3ApbBpEJxSNYnwT9N4CDLrWgtq3u8736C5hyJw= github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.54.0 h1:s0WlVbf9qpvkh1c/uDAPElam0WrL7fHRIidgZJ7UqZI= github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.54.0/go.mod h1:Mf6O40IAyB9zR/1J8nGDDPirZQQPbYJni8Yisy7NTMc= -github.com/adbc-drivers/driverbase-go/driverbase v0.0.0-20260211074943-935104712593 h1:kqhGuO/CPxPPbNY+14CiRgEohIIh9gJKeNapHwW2L8E= -github.com/adbc-drivers/driverbase-go/driverbase v0.0.0-20260211074943-935104712593/go.mod h1:GbMifodY8nLi/TDF5wNzCjUO0nvDK1w8i5SAGJ8UegQ= +github.com/adbc-drivers/driverbase-go/driverbase v0.0.0-20260217234140-addad9015731 h1:zte+OgJNsJJ5eGA3S1oD9YxnL27FUQU5QMHtORfJaSY= +github.com/adbc-drivers/driverbase-go/driverbase v0.0.0-20260217234140-addad9015731/go.mod h1:GbMifodY8nLi/TDF5wNzCjUO0nvDK1w8i5SAGJ8UegQ= github.com/adbc-drivers/driverbase-go/testutil v0.0.0-20251204005500-ef1acb8b4023 h1:7TIuxwU4EkneKSP6QrWH/WPrVyicU0ST+XufYGjuBFI= github.com/adbc-drivers/driverbase-go/testutil v0.0.0-20251204005500-ef1acb8b4023/go.mod h1:4Wtj2lGJ1tfR/X1GJXy+XC9g8DBg8HiaOETHygGj+Tc= github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ= @@ -78,8 +78,8 @@ github.com/googleapis/enterprise-certificate-proxy v0.3.12 h1:Fg+zsqzYEs1Znvmczt github.com/googleapis/enterprise-certificate-proxy v0.3.12/go.mod h1:vqVt9yG9480NtzREnTlmGSBmFrA+bzb0yl0TxoBQXOg= github.com/googleapis/gax-go/v2 v2.17.0 h1:RksgfBpxqff0EZkDWYuz9q/uWsTVz+kf43LsZ1J6SMc= github.com/googleapis/gax-go/v2 v2.17.0/go.mod h1:mzaqghpQp4JDh3HvADwrat+6M3MOIDp5YKHhb9PAgDY= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.8 h1:NpbJl/eVbvrGE0MJ6X16X9SAifesl6Fwxg/YmCvubRI= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.8/go.mod h1:mi7YA+gCzVem12exXy46ZespvGtX/lZmD/RLnQhVW7U= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 h1:HWRh5R2+9EifMyIHV7ZV+MIZqgz+PMpZ14Jynv3O2Zs= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0/go.mod h1:JfhWUomR1baixubs02l85lZYYOm7LV6om4ceouMv45c= github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4= github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c= @@ -148,8 +148,8 @@ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts= golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos= -golang.org/x/exp v0.0.0-20260209203927-2842357ff358 h1:kpfSV7uLwKJbFSEgNhWzGSL47NDSF/5pYYQw1V0ub6c= -golang.org/x/exp v0.0.0-20260209203927-2842357ff358/go.mod h1:R3t0oliuryB5eenPWl3rrQxwnNM3WTwnsRZZiXLAAW8= +golang.org/x/exp v0.0.0-20260212183809-81e46e3db34a h1:ovFr6Z0MNmU7nH8VaX5xqw+05ST2uO1exVfZPVqRC5o= +golang.org/x/exp v0.0.0-20260212183809-81e46e3db34a/go.mod h1:K79w1Vqn7PoiZn+TkNpx3BUWUQksGO3JcVX6qIjytmA= golang.org/x/mod v0.33.0 h1:tHFzIWbBifEmbwtGz65eaWyGiGZatSrT9prnU8DbVL8= golang.org/x/mod v0.33.0/go.mod h1:swjeQEj+6r7fODbD2cqrnje9PnziFuw4bmLbBZFrQ5w= golang.org/x/net v0.50.0 h1:ucWh9eiCGyDR3vtzso0WMQinm2Dnt8cFMuQa9K33J60= @@ -160,8 +160,8 @@ golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= -golang.org/x/telemetry v0.0.0-20260209163413-e7419c687ee4 h1:bTLqdHv7xrGlFbvf5/TXNxy/iUwwdkjhqQTJDjW7aj0= -golang.org/x/telemetry v0.0.0-20260209163413-e7419c687ee4/go.mod h1:g5NllXBEermZrmR51cJDQxmJUHUOfRAaNyWBM+R+548= +golang.org/x/telemetry v0.0.0-20260213145524-e0ab670178e1 h1:QNaHp8YvpPswfDNxlCmJyeesxbGOgaKf41iT9/QrErY= +golang.org/x/telemetry v0.0.0-20260213145524-e0ab670178e1/go.mod h1:NuITXsA9cTiqnXtVk+/wrBT2Ja4X5hsfGOYRJ6kgYjs= golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk= golang.org/x/text v0.34.0/go.mod h1:homfLqTYRFyVYemLBFl5GgL/DWEiH5wcsQ5gSh1yziA= golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI= @@ -172,14 +172,14 @@ golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da h1:noIWHXmPHxILtqtCOPIhS golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= -google.golang.org/api v0.266.0 h1:hco+oNCf9y7DmLeAtHJi/uBAY7n/7XC9mZPxu1ROiyk= -google.golang.org/api v0.266.0/go.mod h1:Jzc0+ZfLnyvXma3UtaTl023TdhZu6OMBP9tJ+0EmFD0= -google.golang.org/genproto v0.0.0-20260209200024-4cfbd4190f57 h1:uZSB/r2MjH9IsqpG2vRNSV1Juteix90oHe8oTcLW9tk= -google.golang.org/genproto v0.0.0-20260209200024-4cfbd4190f57/go.mod h1:nGuPfp0lnDJcJD0J47StV0Skgnw3qMSQhjsLKiejq5Y= -google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57 h1:JLQynH/LBHfCTSbDWl+py8C+Rg/k1OVH3xfcaiANuF0= -google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57/go.mod h1:kSJwQxqmFXeo79zOmbrALdflXQeAYcUbgS7PbpMknCY= -google.golang.org/genproto/googleapis/rpc v0.0.0-20260209200024-4cfbd4190f57 h1:mWPCjDEyshlQYzBpMNHaEof6UX1PmHcaUODUywQ0uac= -google.golang.org/genproto/googleapis/rpc v0.0.0-20260209200024-4cfbd4190f57/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ= +google.golang.org/api v0.267.0 h1:w+vfWPMPYeRs8qH1aYYsFX68jMls5acWl/jocfLomwE= +google.golang.org/api v0.267.0/go.mod h1:Jzc0+ZfLnyvXma3UtaTl023TdhZu6OMBP9tJ+0EmFD0= +google.golang.org/genproto v0.0.0-20260217215200-42d3e9bedb6d h1:vsOm753cOAMkt76efriTCDKjpCbK18XGHMJHo0JUKhc= +google.golang.org/genproto v0.0.0-20260217215200-42d3e9bedb6d/go.mod h1:0oz9d7g9QLSdv9/lgbIjowW1JoxMbxmBVNe8i6tORJI= +google.golang.org/genproto/googleapis/api v0.0.0-20260217215200-42d3e9bedb6d h1:EocjzKLywydp5uZ5tJ79iP6Q0UjDnyiHkGRWxuPBP8s= +google.golang.org/genproto/googleapis/api v0.0.0-20260217215200-42d3e9bedb6d/go.mod h1:48U2I+QQUYhsFrg2SY6r+nJzeOtjey7j//WBESw+qyQ= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260217215200-42d3e9bedb6d h1:t/LOSXPJ9R0B6fnZNyALBRfZBH0Uy0gT+uR+SJ6syqQ= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260217215200-42d3e9bedb6d/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8= google.golang.org/grpc v1.79.1 h1:zGhSi45ODB9/p3VAawt9a+O/MULLl9dpizzNNpq7flY= google.golang.org/grpc v1.79.1/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= diff --git a/go/util.go b/go/util.go index 2cf5fb4..fd9b41f 100644 --- a/go/util.go +++ b/go/util.go @@ -231,14 +231,12 @@ func retryWithBackoff(ctx context.Context, context string, maxAttempts int, back attempt := 0 for { complete, err := f() - if err != nil { + if complete { return err - } else if complete { - return nil } duration := backoff.Pause() - if err := gax.Sleep(ctx, duration); err != nil { + if sleepErr := gax.Sleep(ctx, duration); sleepErr != nil { return err } attempt++ @@ -246,7 +244,7 @@ func retryWithBackoff(ctx context.Context, context string, maxAttempts int, back if attempt >= maxAttempts { return adbc.Error{ Code: adbc.StatusInternal, - Msg: fmt.Sprintf("[bq] could not %s: maximum retry attempts exceeded", context), + Msg: fmt.Sprintf("[bq] could not %s: maximum retry attempts exceeded: %v", context, err), } } } @@ -256,7 +254,7 @@ func retry(ctx context.Context, context string, f func() (bool, error)) error { backoff := gax.Backoff{ Initial: 100 * time.Millisecond, Multiplier: 2.0, - Max: 10 * time.Second, + Max: 15 * time.Second, } - return retryWithBackoff(ctx, context, 15, backoff, f) + return retryWithBackoff(ctx, context, 20, backoff, f) }