Skip to content

Commit da21d47

Browse files
lidavidmclaude
andauthored
fix(go): wait for table creation in bulk ingest (#107)
Even though the job completes, BigQuery errors when you try and create a write stream. Co-authored-by: Claude <noreply@anthropic.com>
1 parent 7f5a7fa commit da21d47

File tree

1 file changed

+53
-0
lines changed

1 file changed

+53
-0
lines changed

go/bulk_ingest_storage_write.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"errors"
2121
"fmt"
2222
"log/slog"
23+
"time"
2324

2425
storage "cloud.google.com/go/bigquery/storage/apiv1"
2526
"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
@@ -29,6 +30,8 @@ import (
2930
"github.com/apache/arrow-go/v18/arrow/array"
3031
"github.com/apache/arrow-go/v18/arrow/ipc"
3132
"github.com/apache/arrow-go/v18/arrow/memory"
33+
"github.com/googleapis/gax-go/v2"
34+
"google.golang.org/api/googleapi"
3235
"google.golang.org/protobuf/types/known/wrapperspb"
3336
)
3437

@@ -164,9 +167,59 @@ func (st *statement) createTableForIngest(ctx context.Context, schema *arrow.Sch
164167
return errToAdbcErr(adbc.StatusInternal, err, "create table")
165168
}
166169

170+
// Poll until BigQuery recognizes the new table
171+
if err := st.waitForTableAvailable(ctx); err != nil {
172+
return err
173+
}
174+
167175
return nil
168176
}
169177

178+
// waitForTableAvailable polls until the newly created table is available in BigQuery.
179+
// BigQuery can have eventual consistency delays after table creation.
180+
func (st *statement) waitForTableAvailable(ctx context.Context) error {
181+
catalog := st.ingest.CatalogName
182+
if catalog == "" {
183+
catalog = st.queryConfig.DefaultProjectID
184+
}
185+
186+
schema := st.ingest.SchemaName
187+
if schema == "" {
188+
schema = st.queryConfig.DefaultDatasetID
189+
}
190+
191+
table := st.cnxn.client.DatasetInProject(catalog, schema).Table(st.ingest.TableName)
192+
backoff := gax.Backoff{
193+
Initial: 100 * time.Millisecond,
194+
Multiplier: 2.0,
195+
Max: 5 * time.Second,
196+
}
197+
198+
for {
199+
_, err := table.Metadata(ctx)
200+
if err == nil {
201+
st.cnxn.Logger.Debug("table is available", "table", st.ingest.TableName)
202+
return nil
203+
}
204+
205+
// Check if it's a 404 (not found) error - this is expected while waiting
206+
if apiErr, ok := err.(*googleapi.Error); ok && apiErr.Code == 404 {
207+
duration := backoff.Pause()
208+
st.cnxn.Logger.Debug("waiting for table to be available",
209+
"table", st.ingest.TableName,
210+
"backoff", duration)
211+
212+
if err := gax.Sleep(ctx, duration); err != nil {
213+
return err
214+
}
215+
continue
216+
}
217+
218+
// For other errors, fail immediately
219+
return errToAdbcErr(adbc.StatusInternal, err, "check table availability")
220+
}
221+
}
222+
170223
// storageWriteStream manages a BigQuery Storage Write API stream.
171224
type storageWriteStream struct {
172225
client *storage.BigQueryWriteClient

0 commit comments

Comments
 (0)