Skip to content

Commit 430d2cb

Browse files
committed
iceberg: refresh on auth errors
1 parent b3a135f commit 430d2cb

File tree

2 files changed

+338
-22
lines changed

2 files changed

+338
-22
lines changed

internal/impl/iceberg/catalogx/catalog.go

Lines changed: 112 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ import (
1515
"fmt"
1616
"net/url"
1717
"strings"
18+
"sync/atomic"
19+
20+
"golang.org/x/sync/semaphore"
1821

1922
"github.com/apache/iceberg-go"
2023
"github.com/apache/iceberg-go/catalog"
@@ -24,8 +27,12 @@ import (
2427

2528
// Client wraps the iceberg-go REST catalog client.
2629
type Client struct {
27-
catalog catalog.Catalog
30+
url string
31+
opts []rest.Option
2832
namespace []string
33+
mu *semaphore.Weighted
34+
35+
catalog atomic.Pointer[rest.Catalog]
2936
}
3037

3138
// Config holds the catalog configuration.
@@ -111,27 +118,33 @@ func NewCatalogClient(ctx context.Context, cfg Config, namespace []string) (*Cli
111118
}))
112119
}
113120

121+
c := &Client{
122+
url: cfg.URL,
123+
opts: opts,
124+
namespace: namespace,
125+
mu: semaphore.NewWeighted(1),
126+
}
114127
// Create REST catalog
115-
restCatalog, err := rest.NewCatalog(
116-
ctx,
117-
"rest",
118-
cfg.URL,
119-
opts...,
120-
)
121-
if err != nil {
122-
return nil, fmt.Errorf("creating REST catalog: %w", err)
128+
if err := c.refreshCatalog(ctx); err != nil {
129+
return nil, err
123130
}
131+
return c, nil
132+
}
124133

125-
return &Client{
126-
catalog: restCatalog,
127-
namespace: namespace,
128-
}, nil
134+
func isAuthErr(err error) bool {
135+
return errors.Is(err, rest.ErrAuthorizationExpired) || errors.Is(err, rest.ErrForbidden) || errors.Is(err, rest.ErrUnauthorized)
129136
}
130137

131138
// LoadTable loads an existing table from the catalog.
132139
func (c *Client) LoadTable(ctx context.Context, tableName string) (*table.Table, error) {
133140
identifier := toTableIdentifier(c.namespace, tableName)
134-
tbl, err := c.catalog.LoadTable(ctx, identifier)
141+
tbl, err := c.loadCatalog().LoadTable(ctx, identifier)
142+
if isAuthErr(err) {
143+
if err := c.refreshCatalog(ctx); err != nil {
144+
return nil, fmt.Errorf("loading table %s: %w", strings.Join(identifier, "."), err)
145+
}
146+
tbl, err = c.loadCatalog().LoadTable(ctx, identifier)
147+
}
135148
if err != nil {
136149
return nil, fmt.Errorf("loading table %s: %w", strings.Join(identifier, "."), err)
137150
}
@@ -141,7 +154,13 @@ func (c *Client) LoadTable(ctx context.Context, tableName string) (*table.Table,
141154
// CreateTable creates a new table with the given schema and optional create options.
142155
func (c *Client) CreateTable(ctx context.Context, tableName string, schema *iceberg.Schema, opts ...catalog.CreateTableOpt) (*table.Table, error) {
143156
identifier := toTableIdentifier(c.namespace, tableName)
144-
tbl, err := c.catalog.CreateTable(ctx, identifier, schema, opts...)
157+
tbl, err := c.loadCatalog().CreateTable(ctx, identifier, schema, opts...)
158+
if isAuthErr(err) {
159+
if err := c.refreshCatalog(ctx); err != nil {
160+
return nil, fmt.Errorf("creating table %s: %w", strings.Join(identifier, "."), err)
161+
}
162+
tbl, err = c.loadCatalog().CreateTable(ctx, identifier, schema, opts...)
163+
}
145164
if err != nil {
146165
return nil, fmt.Errorf("creating table %s: %w", strings.Join(identifier, "."), err)
147166
}
@@ -158,7 +177,7 @@ func (c *Client) CreateTable(ctx context.Context, tableName string, schema *iceb
158177
// us.AddColumn([]string{"email"}, iceberg.StringType{}, "Email address", false, nil)
159178
// us.AddColumn([]string{"age"}, iceberg.Int32Type{}, "", false, nil)
160179
// })
161-
func (*Client) UpdateSchema(ctx context.Context, tbl *table.Table, fn func(*table.UpdateSchema), opts ...table.UpdateSchemaOption) (*table.Table, error) {
180+
func (c *Client) UpdateSchema(ctx context.Context, tbl *table.Table, fn func(*table.UpdateSchema), opts ...table.UpdateSchemaOption) (*table.Table, error) {
162181
txn := tbl.NewTransaction()
163182
updateSchema := txn.UpdateSchema(
164183
true, // caseSensitive
@@ -171,26 +190,54 @@ func (*Client) UpdateSchema(ctx context.Context, tbl *table.Table, fn func(*tabl
171190

172191
// Commit the schema update to the transaction
173192
if err := updateSchema.Commit(); err != nil {
193+
if isAuthErr(err) {
194+
if refreshErr := c.refreshCatalog(ctx); refreshErr != nil {
195+
return nil, fmt.Errorf("failed to refresh catalog during updating schema txn %w: %v", err, refreshErr)
196+
}
197+
}
174198
return nil, fmt.Errorf("applying schema update: %w", err)
175199
}
176200

177201
// Commit the transaction to persist changes
178-
return txn.Commit(ctx)
202+
table, err := txn.Commit(ctx)
203+
if isAuthErr(err) {
204+
if refreshErr := c.refreshCatalog(ctx); refreshErr != nil {
205+
return nil, fmt.Errorf("failed to refresh catalog during updating schema txn %w: %v", err, refreshErr)
206+
}
207+
}
208+
return table, err
179209
}
180210

181211
// AppendDataFiles commits a batch of data files to the table.
182-
func (*Client) AppendDataFiles(ctx context.Context, tbl *table.Table, dataFiles []string) (*table.Table, error) {
212+
func (c *Client) AppendDataFiles(ctx context.Context, tbl *table.Table, dataFiles []string) (*table.Table, error) {
183213
txn := tbl.NewTransaction()
184214
if err := txn.AddFiles(ctx, dataFiles, nil, true); err != nil {
215+
if isAuthErr(err) {
216+
if refreshErr := c.refreshCatalog(ctx); refreshErr != nil {
217+
return nil, fmt.Errorf("failed to refresh catalog during appending data files %w: %v", err, refreshErr)
218+
}
219+
}
185220
return nil, err
186221
}
187-
return txn.Commit(ctx)
222+
table, err := txn.Commit(ctx)
223+
if err != nil && isAuthErr(err) {
224+
if refreshErr := c.refreshCatalog(ctx); refreshErr != nil {
225+
return nil, fmt.Errorf("failed to refresh catalog during committing data file txn %w: %v", err, refreshErr)
226+
}
227+
}
228+
return table, err
188229
}
189230

190231
// CheckTableExists checks if the table exists in the catalog.
191232
func (c *Client) CheckTableExists(ctx context.Context, tableName string) (bool, error) {
192233
identifier := toTableIdentifier(c.namespace, tableName)
193-
exists, err := c.catalog.CheckTableExists(ctx, identifier)
234+
exists, err := c.loadCatalog().CheckTableExists(ctx, identifier)
235+
if isAuthErr(err) {
236+
if err := c.refreshCatalog(ctx); err != nil {
237+
return false, fmt.Errorf("checking table existence %s: %w", strings.Join(identifier, "."), err)
238+
}
239+
exists, err = c.loadCatalog().CheckTableExists(ctx, identifier)
240+
}
194241
if err != nil {
195242
return false, fmt.Errorf("checking table existence %s: %w", strings.Join(identifier, "."), err)
196243
}
@@ -200,7 +247,13 @@ func (c *Client) CheckTableExists(ctx context.Context, tableName string) (bool,
200247
// CreateNamespace creates the configured namespace with the given properties.
201248
// Returns nil if the namespace already exists (idempotent).
202249
func (c *Client) CreateNamespace(ctx context.Context, props iceberg.Properties) error {
203-
err := c.catalog.CreateNamespace(ctx, c.namespace, props)
250+
err := c.loadCatalog().CreateNamespace(ctx, c.namespace, props)
251+
if isAuthErr(err) {
252+
if err := c.refreshCatalog(ctx); err != nil {
253+
return fmt.Errorf("creating namespace %s: %w", strings.Join(c.namespace, "."), err)
254+
}
255+
err = c.loadCatalog().CreateNamespace(ctx, c.namespace, props)
256+
}
204257
if err != nil {
205258
// Check if namespace already exists - treat as success
206259
if isNamespaceAlreadyExists(err) {
@@ -213,13 +266,50 @@ func (c *Client) CreateNamespace(ctx context.Context, props iceberg.Properties)
213266

214267
// CheckNamespaceExists checks if the configured namespace exists.
215268
func (c *Client) CheckNamespaceExists(ctx context.Context) (bool, error) {
216-
exists, err := c.catalog.CheckNamespaceExists(ctx, c.namespace)
269+
exists, err := c.loadCatalog().CheckNamespaceExists(ctx, c.namespace)
270+
if isAuthErr(err) {
271+
if err := c.refreshCatalog(ctx); err != nil {
272+
return false, fmt.Errorf("checking namespace existence %s: %w", strings.Join(c.namespace, "."), err)
273+
}
274+
exists, err = c.loadCatalog().CheckNamespaceExists(ctx, c.namespace)
275+
}
217276
if err != nil {
218277
return false, fmt.Errorf("checking namespace existence %s: %w", strings.Join(c.namespace, "."), err)
219278
}
220279
return exists, nil
221280
}
222281

282+
func (c *Client) refreshCatalog(ctx context.Context) error {
283+
if !c.mu.TryAcquire(1) {
284+
// In this case someone else is trying to refresh the catalog,
285+
// let them do it and we can just wait for them to finish without
286+
// too much extra IO
287+
err := c.mu.Acquire(ctx, 1)
288+
if err != nil {
289+
return err
290+
}
291+
c.mu.Release(1)
292+
return nil
293+
}
294+
defer c.mu.Release(1)
295+
// Create REST catalog
296+
restCatalog, err := rest.NewCatalog(
297+
ctx,
298+
"rest",
299+
c.url,
300+
c.opts...,
301+
)
302+
if err != nil {
303+
return fmt.Errorf("creating REST catalog: %w", err)
304+
}
305+
c.catalog.Store(restCatalog)
306+
return nil
307+
}
308+
309+
func (c *Client) loadCatalog() catalog.Catalog {
310+
return c.catalog.Load()
311+
}
312+
223313
// isNamespaceAlreadyExists checks if the error indicates the namespace already exists.
224314
func isNamespaceAlreadyExists(err error) bool {
225315
return errors.Is(err, catalog.ErrNamespaceAlreadyExists)

0 commit comments

Comments
 (0)