Skip to content

Commit f342e7f

Browse files
committed
feat(catalog): use specific credential refresh endpoint
1 parent 89da82b commit f342e7f

File tree

3 files changed

+120
-31
lines changed

3 files changed

+120
-31
lines changed

catalog/rest/rest.go

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -136,11 +136,21 @@ func (t *commitTableResponse) UnmarshalJSON(b []byte) (err error) {
136136
return err
137137
}
138138

139+
type storageCredential struct {
140+
Prefix string `json:"prefix"`
141+
Config iceberg.Properties `json:"config"`
142+
}
143+
139144
type loadTableResponse struct {
140-
MetadataLoc string `json:"metadata-location"`
141-
RawMetadata json.RawMessage `json:"metadata"`
142-
Config iceberg.Properties `json:"config"`
143-
Metadata table.Metadata `json:"-"`
145+
MetadataLoc string `json:"metadata-location"`
146+
RawMetadata json.RawMessage `json:"metadata"`
147+
Config iceberg.Properties `json:"config"`
148+
StorageCredentials []storageCredential `json:"storage-credentials"`
149+
Metadata table.Metadata `json:"-"`
150+
}
151+
152+
type loadCredentialsResponse struct {
153+
StorageCredentials []storageCredential `json:"storage-credentials"`
144154
}
145155

146156
func (t *loadTableResponse) UnmarshalJSON(b []byte) (err error) {
@@ -677,11 +687,13 @@ func checkValidNamespace(ident table.Identifier) error {
677687

678688
func (r *Catalog) tableFromResponse(_ context.Context, identifier []string, metadata table.Metadata, loc string, config iceberg.Properties) (*table.Table, error) {
679689
refresher := &vendedCredentialRefresher{
680-
mu: semaphore.NewWeighted(1),
681-
identifier: identifier,
682-
location: loc,
683-
props: config,
684-
fetchConfig: r.fetchTableConfig,
690+
mu: semaphore.NewWeighted(1),
691+
identifier: identifier,
692+
location: loc,
693+
props: config,
694+
fetchCreds: func(ctx context.Context, ident []string) (iceberg.Properties, error) {
695+
return r.fetchTableCreds(ctx, ident, loc)
696+
},
685697
}
686698

687699
return table.New(
@@ -693,25 +705,19 @@ func (r *Catalog) tableFromResponse(_ context.Context, identifier []string, meta
693705
), nil
694706
}
695707

696-
func (r *Catalog) fetchTableConfig(ctx context.Context, ident []string) (iceberg.Properties, error) {
708+
func (r *Catalog) fetchTableCreds(ctx context.Context, ident []string, location string) (iceberg.Properties, error) {
697709
ns, tbl, err := splitIdentForPath(ident)
698710
if err != nil {
699711
return nil, err
700712
}
701713

702-
ret, err := doGet[loadTableResponse](ctx, r.baseURI, []string{"namespaces", ns, "tables", tbl},
714+
ret, err := doGet[loadCredentialsResponse](ctx, r.baseURI, []string{"namespaces", ns, "tables", tbl, "credentials"},
703715
r.cl, map[int]error{http.StatusNotFound: catalog.ErrNoSuchTable})
704716
if err != nil {
705717
return nil, err
706718
}
707719

708-
config := maps.Clone(r.props)
709-
maps.Copy(config, ret.Metadata.Properties())
710-
for k, v := range ret.Config {
711-
config[k] = v
712-
}
713-
714-
return config, nil
720+
return resolveStorageCredentials(ret.StorageCredentials, location), nil
715721
}
716722

717723
func (r *Catalog) ListTables(ctx context.Context, namespace table.Identifier) iter.Seq2[table.Identifier, error] {
@@ -825,6 +831,7 @@ func (r *Catalog) CreateTable(ctx context.Context, identifier table.Identifier,
825831
config := maps.Clone(r.props)
826832
maps.Copy(config, ret.Metadata.Properties())
827833
maps.Copy(config, ret.Config)
834+
maps.Copy(config, resolveStorageCredentials(ret.StorageCredentials, ret.MetadataLoc))
828835

829836
return r.tableFromResponse(ctx, identifier, ret.Metadata, ret.MetadataLoc, config)
830837
}
@@ -936,6 +943,7 @@ func (r *Catalog) RegisterTable(ctx context.Context, identifier table.Identifier
936943
config := maps.Clone(r.props)
937944
maps.Copy(config, ret.Metadata.Properties())
938945
maps.Copy(config, ret.Config)
946+
maps.Copy(config, resolveStorageCredentials(ret.StorageCredentials, ret.MetadataLoc))
939947

940948
return r.tableFromResponse(ctx, identifier, ret.Metadata, ret.MetadataLoc, config)
941949
}
@@ -954,9 +962,8 @@ func (r *Catalog) LoadTable(ctx context.Context, identifier table.Identifier) (*
954962

955963
config := maps.Clone(r.props)
956964
maps.Copy(config, ret.Metadata.Properties())
957-
for k, v := range ret.Config {
958-
config[k] = v
959-
}
965+
maps.Copy(config, ret.Config)
966+
maps.Copy(config, resolveStorageCredentials(ret.StorageCredentials, ret.MetadataLoc))
960967

961968
return r.tableFromResponse(ctx, identifier, ret.Metadata, ret.MetadataLoc, config)
962969
}

catalog/rest/vended_creds.go

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"context"
2222
"maps"
2323
"strconv"
24+
"strings"
2425
"time"
2526

2627
"github.com/apache/iceberg-go"
@@ -37,6 +38,24 @@ const (
3738
defaultVendedCredentialsTTL = 60 * time.Minute
3839
)
3940

41+
// resolveStorageCredentials finds the best-matching credential for the given
42+
// location using longest-prefix match, mirroring the Java and Python implementations.
43+
func resolveStorageCredentials(creds []storageCredential, location string) iceberg.Properties {
44+
var best *storageCredential
45+
for i := range creds {
46+
c := &creds[i]
47+
if strings.HasPrefix(location, c.Prefix) {
48+
if best == nil || len(c.Prefix) > len(best.Prefix) {
49+
best = c
50+
}
51+
}
52+
}
53+
if best == nil {
54+
return nil
55+
}
56+
return best.Config
57+
}
58+
4059
var credentialExpiryKeys = []string{
4160
keyS3TokenExpiresAtMs,
4261
keyAdlsSasExpiresAtMs,
@@ -66,7 +85,7 @@ type vendedCredentialRefresher struct {
6685
location string
6786
props iceberg.Properties
6887

69-
fetchConfig func(ctx context.Context, ident []string) (iceberg.Properties, error)
88+
fetchCreds func(ctx context.Context, ident []string) (iceberg.Properties, error)
7089

7190
nowFunc func() time.Time // for testing
7291
}
@@ -93,14 +112,14 @@ func (v *vendedCredentialRefresher) loadFS(ctx context.Context) (iceio.IO, error
93112
if v.cachedIO == nil {
94113
config = v.props
95114
} else {
96-
freshConfig, err := v.fetchConfig(ctx, v.identifier)
115+
freshCreds, err := v.fetchCreds(ctx, v.identifier)
97116
if err != nil {
98117
return v.cachedIO, nil
99118
}
100119

101-
config = make(iceberg.Properties, len(v.props)+len(freshConfig))
120+
config = make(iceberg.Properties, len(v.props)+len(freshCreds))
102121
maps.Copy(config, v.props)
103-
maps.Copy(config, freshConfig)
122+
maps.Copy(config, freshCreds)
104123
}
105124

106125
newIO, err := iceio.LoadFS(ctx, config, v.location)

catalog/rest/vended_creds_test.go

Lines changed: 69 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,13 @@ import (
3333
"golang.org/x/sync/semaphore"
3434
)
3535

36-
func newTestRefresher(fetchConfig func(ctx context.Context, ident []string) (iceberg.Properties, error)) *vendedCredentialRefresher {
36+
func newTestRefresher(fetchCreds func(ctx context.Context, ident []string) (iceberg.Properties, error)) *vendedCredentialRefresher {
3737
return &vendedCredentialRefresher{
38-
mu: semaphore.NewWeighted(1),
39-
identifier: []string{"db", "tbl"},
40-
location: "file:///tmp/test",
41-
props: iceberg.Properties{},
42-
fetchConfig: fetchConfig,
38+
mu: semaphore.NewWeighted(1),
39+
identifier: []string{"db", "tbl"},
40+
location: "file:///tmp/test",
41+
props: iceberg.Properties{},
42+
fetchCreds: fetchCreds,
4343
}
4444
}
4545

@@ -321,3 +321,66 @@ func TestVendedCredsServerExpiryUsedOnRefresh(t *testing.T) {
321321
// expiresAt should be the server-provided value, not now+default.
322322
assert.Equal(t, serverExpiry.UnixMilli(), r.expiresAt.UnixMilli())
323323
}
324+
325+
func TestResolveStorageCredentials(t *testing.T) {
326+
t.Parallel()
327+
328+
s3Creds := iceberg.Properties{"s3.access-key-id": "AKID", "s3.secret-access-key": "secret"}
329+
specificCreds := iceberg.Properties{"s3.access-key-id": "SPECIFIC"}
330+
331+
tests := []struct {
332+
name string
333+
creds []storageCredential
334+
location string
335+
want iceberg.Properties
336+
}{
337+
{
338+
name: "empty credentials",
339+
creds: nil,
340+
location: "s3://bucket/path",
341+
want: nil,
342+
},
343+
{
344+
name: "matching prefix",
345+
creds: []storageCredential{
346+
{Prefix: "s3://bucket/", Config: s3Creds},
347+
},
348+
location: "s3://bucket/path/to/file",
349+
want: s3Creds,
350+
},
351+
{
352+
name: "no matching prefix",
353+
creds: []storageCredential{
354+
{Prefix: "s3://other-bucket/", Config: s3Creds},
355+
},
356+
location: "s3://bucket/path",
357+
want: nil,
358+
},
359+
{
360+
name: "longest prefix wins",
361+
creds: []storageCredential{
362+
{Prefix: "s3://bucket/", Config: s3Creds},
363+
{Prefix: "s3://bucket/specific/", Config: specificCreds},
364+
},
365+
location: "s3://bucket/specific/path",
366+
want: specificCreds,
367+
},
368+
{
369+
name: "longest prefix wins regardless of order",
370+
creds: []storageCredential{
371+
{Prefix: "s3://bucket/specific/", Config: specificCreds},
372+
{Prefix: "s3://bucket/", Config: s3Creds},
373+
},
374+
location: "s3://bucket/specific/path",
375+
want: specificCreds,
376+
},
377+
}
378+
379+
for _, tt := range tests {
380+
t.Run(tt.name, func(t *testing.T) {
381+
t.Parallel()
382+
got := resolveStorageCredentials(tt.creds, tt.location)
383+
assert.Equal(t, tt.want, got)
384+
})
385+
}
386+
}

0 commit comments

Comments
 (0)