Skip to content

Commit 4520802

Browse files
committed
iceberg: retry failed table operations
This can happen if auth expires, etc, and forcing a refresh and retry is a good idea (the snowflake connector does this too).
1 parent 5e308cc commit 4520802

File tree

2 files changed

+75
-56
lines changed

2 files changed

+75
-56
lines changed

internal/impl/iceberg/committer.go

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,18 +51,20 @@ func (e *StaleSchemaError) Error() string {
5151
// committer batches data file commits for a single table.
5252
// Commits are serialized - only one commit at a time per committer.
5353
type committer struct {
54-
table *table.Table
55-
cfg CommitConfig
56-
batcher *asyncroutine.Batcher[CommitInput, struct{}]
57-
logger *service.Logger
54+
table *table.Table
55+
cfg CommitConfig
56+
reloadTable func(ctx context.Context) (*table.Table, error)
57+
batcher *asyncroutine.Batcher[CommitInput, struct{}]
58+
logger *service.Logger
5859
}
5960

6061
// NewCommitter creates a new committer for a specific table.
61-
func NewCommitter(tbl *table.Table, cfg CommitConfig, logger *service.Logger) (*committer, error) {
62+
func NewCommitter(tbl *table.Table, cfg CommitConfig, reloadTable func(ctx context.Context) (*table.Table, error), logger *service.Logger) (*committer, error) {
6263
c := &committer{
63-
table: tbl,
64-
cfg: cfg,
65-
logger: logger,
64+
table: tbl,
65+
cfg: cfg,
66+
reloadTable: reloadTable,
67+
logger: logger,
6668
}
6769

6870
batcher, err := asyncroutine.NewBatcher(100, c.doCommit)
@@ -116,9 +118,18 @@ func (c *committer) doCommit(ctx context.Context, inputs []CommitInput) ([]struc
116118
if errors.Is(err, rest.ErrCommitFailed) {
117119
commitErr = err
118120
c.logger.Warnf("Commit attempt %d/%d failed: %v", attempt, c.cfg.MaxRetries, err)
121+
// Reload table to get fresh metadata before retrying.
122+
if reloaded, reloadErr := c.reloadTable(ctx); reloadErr == nil {
123+
c.table = reloaded
124+
} else {
125+
c.logger.Warnf("Failed to reload table during commit retry: %v", reloadErr)
126+
}
119127
continue
120128
} else if err != nil {
121-
// If we get an error other than a conflict then we should not retry.
129+
// Non-retryable error: reload table so next call uses fresh metadata.
130+
if reloaded, reloadErr := c.reloadTable(ctx); reloadErr == nil {
131+
c.table = reloaded
132+
}
122133
commitErr = err
123134
break
124135
}

internal/impl/iceberg/router.go

Lines changed: 55 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -137,72 +137,69 @@ func (r *Router) Route(ctx context.Context, batch service.MessageBatch) error {
137137
}
138138

139139
// writeWithRetry writes a batch to a table with retry loop for schema evolution.
140+
// On any failure the writer is always closed so the next attempt reloads the table.
141+
// Every error gets at least one retry; schema evolution errors get up to maxSchemaEvolutionRetries.
140142
func (r *Router) writeWithRetry(ctx context.Context, key tableKey, batch service.MessageBatch) error {
141143
entry := r.getOrCreateEntry(key)
142144

143-
for range maxSchemaEvolutionRetries {
145+
for i := range maxSchemaEvolutionRetries {
144146
err := r.doWrite(ctx, key, entry, batch)
145147
if err == nil {
146148
return nil
147149
}
148150

149-
// Stale schema: the writer was using an outdated schema.
150-
// Recreate the writer to pick up the current schema and retry.
151-
var staleErr *StaleSchemaError
152-
if errors.As(err, &staleErr) {
153-
entry.mu.Lock()
154-
r.closeWriter(entry)
155-
entry.mu.Unlock()
156-
r.logger.Debugf("Stale schema for %s.%s: %v, recreating writer", key.namespace, key.table, err)
157-
continue
158-
}
159-
160-
// Check if schema evolution is disabled - fail immediately
161-
if !r.schemaEvoCfg.Enabled {
162-
return fmt.Errorf("writing to %s.%s: %w", key.namespace, key.table, err)
163-
}
151+
// Always close the writer on failure so the next attempt gets a fresh table.
152+
entry.mu.Lock()
153+
r.closeWriter(entry)
154+
entry.mu.Unlock()
164155

165-
// Handle specific errors with schema evolution
166-
if errors.Is(err, catalog.ErrNoSuchNamespace) {
167-
if err := r.createNamespace(ctx, key, entry); err != nil {
168-
return fmt.Errorf("creating namespace %s: %w", key.namespace, err)
156+
// When schema evolution is enabled, perform recovery actions for known errors.
157+
if r.schemaEvoCfg.Enabled {
158+
if errors.Is(err, catalog.ErrNoSuchNamespace) {
159+
if nsErr := r.createNamespace(ctx, key, entry); nsErr != nil {
160+
return fmt.Errorf("creating namespace %s: %w", key.namespace, nsErr)
161+
}
162+
continue
169163
}
170-
continue
171-
}
172164

173-
if errors.Is(err, catalog.ErrNoSuchTable) {
174-
createErr := r.createTable(ctx, key, batch, entry)
175-
if createErr != nil {
176-
// If table creation failed because namespace doesn't exist, create it first
177-
if errors.Is(createErr, catalog.ErrNoSuchNamespace) {
178-
if nsErr := r.createNamespace(ctx, key, entry); nsErr != nil {
179-
return fmt.Errorf("creating namespace %s: %w", key.namespace, nsErr)
165+
if errors.Is(err, catalog.ErrNoSuchTable) {
166+
createErr := r.createTable(ctx, key, batch, entry)
167+
if createErr != nil {
168+
if errors.Is(createErr, catalog.ErrNoSuchNamespace) {
169+
if nsErr := r.createNamespace(ctx, key, entry); nsErr != nil {
170+
return fmt.Errorf("creating namespace %s: %w", key.namespace, nsErr)
171+
}
172+
} else {
173+
return fmt.Errorf("creating table %s.%s: %w", key.namespace, key.table, createErr)
180174
}
181-
// Don't return error, continue retry loop to create table
182-
} else {
183-
return fmt.Errorf("creating table %s.%s: %w", key.namespace, key.table, createErr)
184175
}
176+
continue
185177
}
186-
continue
187-
}
188178

189-
var schemaErr *BatchSchemaEvolutionError
190-
if errors.As(err, &schemaErr) {
191-
if err := r.evolveSchema(ctx, key, schemaErr, entry); err != nil {
192-
return fmt.Errorf("evolving schema for %s.%s: %w", key.namespace, key.table, err)
179+
var schemaErr *BatchSchemaEvolutionError
180+
if errors.As(err, &schemaErr) {
181+
if evolveErr := r.evolveSchema(ctx, key, schemaErr, entry); evolveErr != nil {
182+
return fmt.Errorf("evolving schema for %s.%s: %w", key.namespace, key.table, evolveErr)
183+
}
184+
continue
193185
}
194-
continue
195-
}
196186

197-
var reqNullErr *shredder.RequiredFieldNullError
198-
if errors.As(err, &reqNullErr) {
199-
if err := r.makeColumnOptional(ctx, key, reqNullErr, entry); err != nil {
200-
return fmt.Errorf("making column optional for %s.%s: %w", key.namespace, key.table, err)
187+
var reqNullErr *shredder.RequiredFieldNullError
188+
if errors.As(err, &reqNullErr) {
189+
if optErr := r.makeColumnOptional(ctx, key, reqNullErr, entry); optErr != nil {
190+
return fmt.Errorf("making column optional for %s.%s: %w", key.namespace, key.table, optErr)
191+
}
192+
continue
201193
}
202-
continue
203194
}
204195

205-
// Unhandled error - fail
196+
// For all other errors (including stale schema, auth errors, or when schema
197+
// evolution is disabled): the writer is already closed. Always retry at least
198+
// once so the fresh writer can recover from transient failures.
199+
if i == 0 {
200+
r.logger.Debugf("Write failed for %s.%s, retrying with fresh writer: %v", key.namespace, key.table, err)
201+
continue
202+
}
206203
return fmt.Errorf("writing to %s.%s: %w", key.namespace, key.table, err)
207204
}
208205

@@ -499,8 +496,19 @@ func (r *Router) createWriter(ctx context.Context, key tableKey) (*writer, error
499496
return nil, err
500497
}
501498

499+
// reloadTable creates a fresh catalog client and reloads the table,
500+
// allowing the committer to recover from stale metadata or auth errors.
501+
reloadTable := func(ctx context.Context) (*table.Table, error) {
502+
rc, err := catalogx.NewCatalogClient(ctx, r.catalogCfg, nsParts)
503+
if err != nil {
504+
return nil, fmt.Errorf("creating catalog client for table reload: %w", err)
505+
}
506+
defer rc.Close()
507+
return rc.LoadTable(ctx, key.table)
508+
}
509+
502510
// Create committer with its own table reference
503-
comm, err := NewCommitter(committerTbl, r.commitCfg, r.logger)
511+
comm, err := NewCommitter(committerTbl, r.commitCfg, reloadTable, r.logger)
504512
if err != nil {
505513
return nil, fmt.Errorf("creating committer: %w", err)
506514
}

0 commit comments

Comments
 (0)