Skip to content

Commit 5e308cc

Browse files
committed
iceberg: refresh on auth errors
1 parent b3a135f commit 5e308cc

File tree

2 files changed

+329
-17
lines changed

2 files changed

+329
-17
lines changed

internal/impl/iceberg/catalogx/catalog.go

Lines changed: 103 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"crypto/tls"
1414
"errors"
1515
"fmt"
16+
"golang.org/x/sync/semaphore"
1617
"net/url"
1718
"strings"
1819

@@ -24,8 +25,12 @@ import (
2425

2526
// Client wraps the iceberg-go REST catalog client.
2627
type Client struct {
27-
catalog catalog.Catalog
28+
url string
29+
opts []rest.Option
2830
namespace []string
31+
mu *semaphore.Weighted
32+
33+
catalog catalog.Catalog
2934
}
3035

3136
// Config holds the catalog configuration.
@@ -111,27 +116,34 @@ func NewCatalogClient(ctx context.Context, cfg Config, namespace []string) (*Cli
111116
}))
112117
}
113118

119+
c := &Client{
120+
url: cfg.URL,
121+
opts: opts,
122+
namespace: namespace,
123+
catalog: nil,
124+
mu: semaphore.NewWeighted(1),
125+
}
114126
// Create REST catalog
115-
restCatalog, err := rest.NewCatalog(
116-
ctx,
117-
"rest",
118-
cfg.URL,
119-
opts...,
120-
)
121-
if err != nil {
122-
return nil, fmt.Errorf("creating REST catalog: %w", err)
127+
if err := c.refreshCatalog(ctx); err != nil {
128+
return nil, err
123129
}
130+
return c, nil
131+
}
124132

125-
return &Client{
126-
catalog: restCatalog,
127-
namespace: namespace,
128-
}, nil
133+
func isAuthErr(err error) bool {
134+
return errors.Is(err, rest.ErrAuthorizationExpired) || errors.Is(err, rest.ErrForbidden) || errors.Is(err, rest.ErrAuthorizationExpired)
129135
}
130136

131137
// LoadTable loads an existing table from the catalog.
132138
func (c *Client) LoadTable(ctx context.Context, tableName string) (*table.Table, error) {
133139
identifier := toTableIdentifier(c.namespace, tableName)
134140
tbl, err := c.catalog.LoadTable(ctx, identifier)
141+
if isAuthErr(err) {
142+
if err := c.refreshCatalog(ctx); err != nil {
143+
return nil, fmt.Errorf("loading table %s: %w", strings.Join(identifier, "."), err)
144+
}
145+
tbl, err = c.catalog.LoadTable(ctx, identifier)
146+
}
135147
if err != nil {
136148
return nil, fmt.Errorf("loading table %s: %w", strings.Join(identifier, "."), err)
137149
}
@@ -142,6 +154,12 @@ func (c *Client) LoadTable(ctx context.Context, tableName string) (*table.Table,
142154
func (c *Client) CreateTable(ctx context.Context, tableName string, schema *iceberg.Schema, opts ...catalog.CreateTableOpt) (*table.Table, error) {
143155
identifier := toTableIdentifier(c.namespace, tableName)
144156
tbl, err := c.catalog.CreateTable(ctx, identifier, schema, opts...)
157+
if isAuthErr(err) {
158+
if err := c.refreshCatalog(ctx); err != nil {
159+
return nil, fmt.Errorf("creating table %s: %w", strings.Join(identifier, "."), err)
160+
}
161+
tbl, err = c.catalog.CreateTable(ctx, identifier, schema, opts...)
162+
}
145163
if err != nil {
146164
return nil, fmt.Errorf("creating table %s: %w", strings.Join(identifier, "."), err)
147165
}
@@ -158,7 +176,7 @@ func (c *Client) CreateTable(ctx context.Context, tableName string, schema *iceb
158176
// us.AddColumn([]string{"email"}, iceberg.StringType{}, "Email address", false, nil)
159177
// us.AddColumn([]string{"age"}, iceberg.Int32Type{}, "", false, nil)
160178
// })
161-
func (*Client) UpdateSchema(ctx context.Context, tbl *table.Table, fn func(*table.UpdateSchema), opts ...table.UpdateSchemaOption) (*table.Table, error) {
179+
func (c *Client) UpdateSchema(ctx context.Context, tbl *table.Table, fn func(*table.UpdateSchema), opts ...table.UpdateSchemaOption) (*table.Table, error) {
162180
txn := tbl.NewTransaction()
163181
updateSchema := txn.UpdateSchema(
164182
true, // caseSensitive
@@ -171,26 +189,54 @@ func (*Client) UpdateSchema(ctx context.Context, tbl *table.Table, fn func(*tabl
171189

172190
// Commit the schema update to the transaction
173191
if err := updateSchema.Commit(); err != nil {
192+
if isAuthErr(err) {
193+
if refreshErr := c.refreshCatalog(ctx); refreshErr != nil {
194+
return nil, fmt.Errorf("failed to refresh catalog during updating schema txn %w: %v", err, refreshErr)
195+
}
196+
}
174197
return nil, fmt.Errorf("applying schema update: %w", err)
175198
}
176199

177200
// Commit the transaction to persist changes
178-
return txn.Commit(ctx)
201+
table, err := txn.Commit(ctx)
202+
if isAuthErr(err) {
203+
if refreshErr := c.refreshCatalog(ctx); refreshErr != nil {
204+
return nil, fmt.Errorf("failed to refresh catalog during updating schema txn %w: %v", err, refreshErr)
205+
}
206+
}
207+
return table, err
179208
}
180209

181210
// AppendDataFiles commits a batch of data files to the table.
182-
func (*Client) AppendDataFiles(ctx context.Context, tbl *table.Table, dataFiles []string) (*table.Table, error) {
211+
func (c *Client) AppendDataFiles(ctx context.Context, tbl *table.Table, dataFiles []string) (*table.Table, error) {
183212
txn := tbl.NewTransaction()
184213
if err := txn.AddFiles(ctx, dataFiles, nil, true); err != nil {
214+
if isAuthErr(err) {
215+
if refreshErr := c.refreshCatalog(ctx); refreshErr != nil {
216+
return nil, fmt.Errorf("failed to refresh catalog during appending data files %w: %v", err, refreshErr)
217+
}
218+
}
185219
return nil, err
186220
}
187-
return txn.Commit(ctx)
221+
table, err := txn.Commit(ctx)
222+
if err != nil && isAuthErr(err) {
223+
if refreshErr := c.refreshCatalog(ctx); refreshErr != nil {
224+
return nil, fmt.Errorf("failed to refresh catalog during committing data file txn %w: %v", err, refreshErr)
225+
}
226+
}
227+
return table, err
188228
}
189229

190230
// CheckTableExists checks if the table exists in the catalog.
191231
func (c *Client) CheckTableExists(ctx context.Context, tableName string) (bool, error) {
192232
identifier := toTableIdentifier(c.namespace, tableName)
193233
exists, err := c.catalog.CheckTableExists(ctx, identifier)
234+
if isAuthErr(err) {
235+
if err := c.refreshCatalog(ctx); err != nil {
236+
return false, fmt.Errorf("checking table existence %s: %w", strings.Join(identifier, "."), err)
237+
}
238+
exists, err = c.catalog.CheckTableExists(ctx, identifier)
239+
}
194240
if err != nil {
195241
return false, fmt.Errorf("checking table existence %s: %w", strings.Join(identifier, "."), err)
196242
}
@@ -201,6 +247,12 @@ func (c *Client) CheckTableExists(ctx context.Context, tableName string) (bool,
201247
// Returns nil if the namespace already exists (idempotent).
202248
func (c *Client) CreateNamespace(ctx context.Context, props iceberg.Properties) error {
203249
err := c.catalog.CreateNamespace(ctx, c.namespace, props)
250+
if isAuthErr(err) {
251+
if err := c.refreshCatalog(ctx); err != nil {
252+
return fmt.Errorf("creating namespace %s: %w", strings.Join(c.namespace, "."), err)
253+
}
254+
err = c.catalog.CreateNamespace(ctx, c.namespace, props)
255+
}
204256
if err != nil {
205257
// Check if namespace already exists - treat as success
206258
if isNamespaceAlreadyExists(err) {
@@ -214,12 +266,46 @@ func (c *Client) CreateNamespace(ctx context.Context, props iceberg.Properties)
214266
// CheckNamespaceExists checks if the configured namespace exists.
215267
func (c *Client) CheckNamespaceExists(ctx context.Context) (bool, error) {
216268
exists, err := c.catalog.CheckNamespaceExists(ctx, c.namespace)
269+
if isAuthErr(err) {
270+
c.refreshCatalog(ctx)
271+
if err := c.refreshCatalog(ctx); err != nil {
272+
return false, fmt.Errorf("checking namespace existence %s: %w", strings.Join(c.namespace, "."), err)
273+
}
274+
exists, err = c.catalog.CheckNamespaceExists(ctx, c.namespace)
275+
}
217276
if err != nil {
218277
return false, fmt.Errorf("checking namespace existence %s: %w", strings.Join(c.namespace, "."), err)
219278
}
220279
return exists, nil
221280
}
222281

282+
func (c *Client) refreshCatalog(ctx context.Context) error {
283+
if !c.mu.TryAcquire(1) {
284+
// In this case someone else is trying to refresh the catalog,
285+
// let them do it and we can just wait for them to finish without
286+
// too much extra IO
287+
err := c.mu.Acquire(ctx, 1)
288+
if err != nil {
289+
return err
290+
}
291+
c.mu.Release(1)
292+
return nil
293+
}
294+
defer c.mu.Release(1)
295+
// Create REST catalog
296+
restCatalog, err := rest.NewCatalog(
297+
ctx,
298+
"rest",
299+
c.url,
300+
c.opts...,
301+
)
302+
if err != nil {
303+
return fmt.Errorf("creating REST catalog: %w", err)
304+
}
305+
c.catalog = restCatalog
306+
return nil
307+
}
308+
223309
// isNamespaceAlreadyExists checks if the error indicates the namespace already exists.
224310
func isNamespaceAlreadyExists(err error) bool {
225311
return errors.Is(err, catalog.ErrNamespaceAlreadyExists)

0 commit comments

Comments
 (0)