Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 85 additions & 82 deletions go.mod

Large diffs are not rendered by default.

484 changes: 226 additions & 258 deletions go.sum

Large diffs are not rendered by default.

135 changes: 113 additions & 22 deletions internal/impl/iceberg/catalogx/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,24 @@ import (
"fmt"
"net/url"
"strings"
"sync/atomic"

"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/catalog"
"github.com/apache/iceberg-go/catalog/rest"
"github.com/apache/iceberg-go/table"

"github.com/redpanda-data/connect/v4/internal/syncx"
)

// Client wraps the iceberg-go REST catalog client.
type Client struct {
catalog catalog.Catalog
url string
opts []rest.Option
namespace []string
mu *syncx.RWMutex

catalog atomic.Pointer[rest.Catalog]
}

// Config holds the catalog configuration.
Expand Down Expand Up @@ -111,27 +118,33 @@ func NewCatalogClient(ctx context.Context, cfg Config, namespace []string) (*Cli
}))
}

c := &Client{
url: cfg.URL,
opts: opts,
namespace: namespace,
mu: syncx.NewRWMutex(),
}
// Create REST catalog
restCatalog, err := rest.NewCatalog(
ctx,
"rest",
cfg.URL,
opts...,
)
if err != nil {
return nil, fmt.Errorf("creating REST catalog: %w", err)
if err := c.refreshCatalog(ctx); err != nil {
return nil, err
}
return c, nil
}

return &Client{
catalog: restCatalog,
namespace: namespace,
}, nil
func isAuthErr(err error) bool {
return errors.Is(err, rest.ErrAuthorizationExpired) || errors.Is(err, rest.ErrForbidden) || errors.Is(err, rest.ErrUnauthorized)
}

// LoadTable loads an existing table from the catalog.
func (c *Client) LoadTable(ctx context.Context, tableName string) (*table.Table, error) {
identifier := toTableIdentifier(c.namespace, tableName)
tbl, err := c.catalog.LoadTable(ctx, identifier)
tbl, err := c.loadCatalog().LoadTable(ctx, identifier)
if isAuthErr(err) {
if err = c.refreshCatalogOnAuthErr(ctx, err); err != nil {
return nil, fmt.Errorf("loading table %s: %w", strings.Join(identifier, "."), err)
}
tbl, err = c.loadCatalog().LoadTable(ctx, identifier)
}
if err != nil {
return nil, fmt.Errorf("loading table %s: %w", strings.Join(identifier, "."), err)
}
Expand All @@ -141,7 +154,13 @@ func (c *Client) LoadTable(ctx context.Context, tableName string) (*table.Table,
// CreateTable creates a new table with the given schema and optional create options.
func (c *Client) CreateTable(ctx context.Context, tableName string, schema *iceberg.Schema, opts ...catalog.CreateTableOpt) (*table.Table, error) {
identifier := toTableIdentifier(c.namespace, tableName)
tbl, err := c.catalog.CreateTable(ctx, identifier, schema, opts...)
tbl, err := c.loadCatalog().CreateTable(ctx, identifier, schema, opts...)
if isAuthErr(err) {
if err = c.refreshCatalogOnAuthErr(ctx, err); err != nil {
return nil, fmt.Errorf("creating table %s: %w", strings.Join(identifier, "."), err)
}
tbl, err = c.loadCatalog().CreateTable(ctx, identifier, schema, opts...)
}
if err != nil {
return nil, fmt.Errorf("creating table %s: %w", strings.Join(identifier, "."), err)
}
Expand All @@ -158,7 +177,7 @@ func (c *Client) CreateTable(ctx context.Context, tableName string, schema *iceb
// us.AddColumn([]string{"email"}, iceberg.StringType{}, "Email address", false, nil)
// us.AddColumn([]string{"age"}, iceberg.Int32Type{}, "", false, nil)
// })
func (*Client) UpdateSchema(ctx context.Context, tbl *table.Table, fn func(*table.UpdateSchema), opts ...table.UpdateSchemaOption) (*table.Table, error) {
func (c *Client) UpdateSchema(ctx context.Context, tbl *table.Table, fn func(*table.UpdateSchema), opts ...table.UpdateSchemaOption) (*table.Table, error) {
txn := tbl.NewTransaction()
updateSchema := txn.UpdateSchema(
true, // caseSensitive
Expand All @@ -171,26 +190,46 @@ func (*Client) UpdateSchema(ctx context.Context, tbl *table.Table, fn func(*tabl

// Commit the schema update to the transaction
if err := updateSchema.Commit(); err != nil {
if refreshErr := c.refreshCatalogOnAuthErr(ctx, err); refreshErr != nil {
return nil, fmt.Errorf("refreshing catalog during updating schema txn %w: %v", err, refreshErr)
}
return nil, fmt.Errorf("applying schema update: %w", err)
}

// Commit the transaction to persist changes
return txn.Commit(ctx)
table, err := txn.Commit(ctx)
if refreshErr := c.refreshCatalogOnAuthErr(ctx, err); refreshErr != nil {
return nil, fmt.Errorf("refreshing catalog during updating schema txn %w: %v", err, refreshErr)
}
return table, err
}

// AppendDataFiles commits a batch of data files to the table.
func (*Client) AppendDataFiles(ctx context.Context, tbl *table.Table, dataFiles []string) (*table.Table, error) {
func (c *Client) AppendDataFiles(ctx context.Context, tbl *table.Table, dataFiles []string) (*table.Table, error) {
txn := tbl.NewTransaction()
if err := txn.AddFiles(ctx, dataFiles, nil, true); err != nil {
if refreshErr := c.refreshCatalogOnAuthErr(ctx, err); refreshErr != nil {
return nil, fmt.Errorf("refreshing catalog during appending data files %w: %v", err, refreshErr)
}
return nil, err
}
return txn.Commit(ctx)
table, err := txn.Commit(ctx)
if refreshErr := c.refreshCatalogOnAuthErr(ctx, err); refreshErr != nil {
return nil, fmt.Errorf("refreshing catalog during committing data file txn %w: %v", err, refreshErr)
}
return table, err
}

// CheckTableExists checks if the table exists in the catalog.
func (c *Client) CheckTableExists(ctx context.Context, tableName string) (bool, error) {
identifier := toTableIdentifier(c.namespace, tableName)
exists, err := c.catalog.CheckTableExists(ctx, identifier)
exists, err := c.loadCatalog().CheckTableExists(ctx, identifier)
if isAuthErr(err) {
if err = c.refreshCatalogOnAuthErr(ctx, err); err != nil {
return false, fmt.Errorf("checking table existence %s: %w", strings.Join(identifier, "."), err)
}
exists, err = c.loadCatalog().CheckTableExists(ctx, identifier)
}
if err != nil {
return false, fmt.Errorf("checking table existence %s: %w", strings.Join(identifier, "."), err)
}
Expand All @@ -200,7 +239,13 @@ func (c *Client) CheckTableExists(ctx context.Context, tableName string) (bool,
// CreateNamespace creates the configured namespace with the given properties.
// Returns nil if the namespace already exists (idempotent).
func (c *Client) CreateNamespace(ctx context.Context, props iceberg.Properties) error {
err := c.catalog.CreateNamespace(ctx, c.namespace, props)
err := c.loadCatalog().CreateNamespace(ctx, c.namespace, props)
if isAuthErr(err) {
if err = c.refreshCatalogOnAuthErr(ctx, err); err != nil {
return fmt.Errorf("creating namespace %s: %w", strings.Join(c.namespace, "."), err)
}
err = c.loadCatalog().CreateNamespace(ctx, c.namespace, props)
}
if err != nil {
// Check if namespace already exists - treat as success
if isNamespaceAlreadyExists(err) {
Expand All @@ -213,13 +258,59 @@ func (c *Client) CreateNamespace(ctx context.Context, props iceberg.Properties)

// CheckNamespaceExists checks if the configured namespace exists.
func (c *Client) CheckNamespaceExists(ctx context.Context) (bool, error) {
exists, err := c.catalog.CheckNamespaceExists(ctx, c.namespace)
exists, err := c.loadCatalog().CheckNamespaceExists(ctx, c.namespace)
if isAuthErr(err) {
if err = c.refreshCatalogOnAuthErr(ctx, err); err != nil {
return false, fmt.Errorf("checking namespace existence %s: %w", strings.Join(c.namespace, "."), err)
}
exists, err = c.loadCatalog().CheckNamespaceExists(ctx, c.namespace)
}
if err != nil {
return false, fmt.Errorf("checking namespace existence %s: %w", strings.Join(c.namespace, "."), err)
}
return exists, nil
}

// refreshCatalogOnAuthErr refreshes the catalog if err is an authorization error.
// Returns the refresh error if the refresh fails, nil otherwise (including if err is not an auth error).
func (c *Client) refreshCatalogOnAuthErr(ctx context.Context, err error) error {
if !isAuthErr(err) {
return nil
}
return c.refreshCatalog(ctx)
}

func (c *Client) refreshCatalog(ctx context.Context) error {
if !c.mu.TryLock() {
// In this case someone else is trying to refresh the catalog,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to understand what is the benefit compared to RW mutex?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the context can be cancelled and that is respected. When doing IO and the lock can be held for a while it's still nice to support context cancellation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's cool, thanks

// let them do it and we can just wait for them to finish without
// too much extra IO
err := c.mu.Lock(ctx)
if err != nil {
return err
}
c.mu.Unlock()
return nil
}
defer c.mu.Unlock()
// Create REST catalog
restCatalog, err := rest.NewCatalog(
ctx,
"rest",
c.url,
c.opts...,
)
if err != nil {
return fmt.Errorf("creating REST catalog: %w", err)
}
c.catalog.Store(restCatalog)
return nil
}

func (c *Client) loadCatalog() catalog.Catalog {
return c.catalog.Load()
}

// isNamespaceAlreadyExists checks if the error indicates the namespace already exists.
func isNamespaceAlreadyExists(err error) bool {
return errors.Is(err, catalog.ErrNamespaceAlreadyExists)
Expand Down
Loading
Loading