@@ -137,72 +137,69 @@ func (r *Router) Route(ctx context.Context, batch service.MessageBatch) error {
137137}
138138
139139// writeWithRetry writes a batch to a table with retry loop for schema evolution.
140+ // On any failure the writer is always closed so the next attempt reloads the table.
141+ // Every error gets at least one retry; schema evolution errors get up to maxSchemaEvolutionRetries.
140142func (r * Router ) writeWithRetry (ctx context.Context , key tableKey , batch service.MessageBatch ) error {
141143 entry := r .getOrCreateEntry (key )
142144
143- for range maxSchemaEvolutionRetries {
145+ for i := range maxSchemaEvolutionRetries {
144146 err := r .doWrite (ctx , key , entry , batch )
145147 if err == nil {
146148 return nil
147149 }
148150
149- // Stale schema: the writer was using an outdated schema.
150- // Recreate the writer to pick up the current schema and retry.
151- var staleErr * StaleSchemaError
152- if errors .As (err , & staleErr ) {
153- entry .mu .Lock ()
154- r .closeWriter (entry )
155- entry .mu .Unlock ()
156- r .logger .Debugf ("Stale schema for %s.%s: %v, recreating writer" , key .namespace , key .table , err )
157- continue
158- }
159-
160- // Check if schema evolution is disabled - fail immediately
161- if ! r .schemaEvoCfg .Enabled {
162- return fmt .Errorf ("writing to %s.%s: %w" , key .namespace , key .table , err )
163- }
151+ // Always close the writer on failure so the next attempt gets a fresh table.
152+ entry .mu .Lock ()
153+ r .closeWriter (entry )
154+ entry .mu .Unlock ()
164155
165- // Handle specific errors with schema evolution
166- if errors .Is (err , catalog .ErrNoSuchNamespace ) {
167- if err := r .createNamespace (ctx , key , entry ); err != nil {
168- return fmt .Errorf ("creating namespace %s: %w" , key .namespace , err )
156+ // When schema evolution is enabled, perform recovery actions for known errors.
157+ if r .schemaEvoCfg .Enabled {
158+ if errors .Is (err , catalog .ErrNoSuchNamespace ) {
159+ if nsErr := r .createNamespace (ctx , key , entry ); nsErr != nil {
160+ return fmt .Errorf ("creating namespace %s: %w" , key .namespace , nsErr )
161+ }
162+ continue
169163 }
170- continue
171- }
172164
173- if errors .Is (err , catalog .ErrNoSuchTable ) {
174- createErr := r .createTable (ctx , key , batch , entry )
175- if createErr != nil {
176- // If table creation failed because namespace doesn't exist, create it first
177- if errors .Is (createErr , catalog .ErrNoSuchNamespace ) {
178- if nsErr := r .createNamespace (ctx , key , entry ); nsErr != nil {
179- return fmt .Errorf ("creating namespace %s: %w" , key .namespace , nsErr )
165+ if errors .Is (err , catalog .ErrNoSuchTable ) {
166+ createErr := r .createTable (ctx , key , batch , entry )
167+ if createErr != nil {
168+ if errors .Is (createErr , catalog .ErrNoSuchNamespace ) {
169+ if nsErr := r .createNamespace (ctx , key , entry ); nsErr != nil {
170+ return fmt .Errorf ("creating namespace %s: %w" , key .namespace , nsErr )
171+ }
172+ } else {
173+ return fmt .Errorf ("creating table %s.%s: %w" , key .namespace , key .table , createErr )
180174 }
181- // Don't return error, continue retry loop to create table
182- } else {
183- return fmt .Errorf ("creating table %s.%s: %w" , key .namespace , key .table , createErr )
184175 }
176+ continue
185177 }
186- continue
187- }
188178
189- var schemaErr * BatchSchemaEvolutionError
190- if errors .As (err , & schemaErr ) {
191- if err := r .evolveSchema (ctx , key , schemaErr , entry ); err != nil {
192- return fmt .Errorf ("evolving schema for %s.%s: %w" , key .namespace , key .table , err )
179+ var schemaErr * BatchSchemaEvolutionError
180+ if errors .As (err , & schemaErr ) {
181+ if evolveErr := r .evolveSchema (ctx , key , schemaErr , entry ); evolveErr != nil {
182+ return fmt .Errorf ("evolving schema for %s.%s: %w" , key .namespace , key .table , evolveErr )
183+ }
184+ continue
193185 }
194- continue
195- }
196186
197- var reqNullErr * shredder.RequiredFieldNullError
198- if errors .As (err , & reqNullErr ) {
199- if err := r .makeColumnOptional (ctx , key , reqNullErr , entry ); err != nil {
200- return fmt .Errorf ("making column optional for %s.%s: %w" , key .namespace , key .table , err )
187+ var reqNullErr * shredder.RequiredFieldNullError
188+ if errors .As (err , & reqNullErr ) {
189+ if optErr := r .makeColumnOptional (ctx , key , reqNullErr , entry ); optErr != nil {
190+ return fmt .Errorf ("making column optional for %s.%s: %w" , key .namespace , key .table , optErr )
191+ }
192+ continue
201193 }
202- continue
203194 }
204195
205- // Unhandled error - fail
196+ // For all other errors (including stale schema, auth errors, or when schema
197+ // evolution is disabled): the writer is already closed. Always retry at least
198+ // once so the fresh writer can recover from transient failures.
199+ if i == 0 {
200+ r .logger .Debugf ("Write failed for %s.%s, retrying with fresh writer: %v" , key .namespace , key .table , err )
201+ continue
202+ }
206203 return fmt .Errorf ("writing to %s.%s: %w" , key .namespace , key .table , err )
207204 }
208205
@@ -499,8 +496,19 @@ func (r *Router) createWriter(ctx context.Context, key tableKey) (*writer, error
499496 return nil , err
500497 }
501498
499+ // reloadTable creates a fresh catalog client and reloads the table,
500+ // allowing the committer to recover from stale metadata or auth errors.
501+ reloadTable := func (ctx context.Context ) (* table.Table , error ) {
502+ rc , err := catalogx .NewCatalogClient (ctx , r .catalogCfg , nsParts )
503+ if err != nil {
504+ return nil , fmt .Errorf ("creating catalog client for table reload: %w" , err )
505+ }
506+ defer rc .Close ()
507+ return rc .LoadTable (ctx , key .table )
508+ }
509+
502510 // Create committer with its own table reference
503- comm , err := NewCommitter (committerTbl , r .commitCfg , r .logger )
511+ comm , err := NewCommitter (committerTbl , r .commitCfg , reloadTable , r .logger )
504512 if err != nil {
505513 return nil , fmt .Errorf ("creating committer: %w" , err )
506514 }
0 commit comments