Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 50 additions & 22 deletions cache/advertise.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,16 @@ func getTickerRate(tok string) time.Duration {
// LaunchFedTokManager starts the federation token refresh loop. When Server.DropPrivileges
// is true, the token file is chown'ed to the xrootd user and group by xrdhttp-pelican plugin
// (via xrootd.FileCopyToXrootdDir(false, 9, file)); pass nil to skip (e.g. in tests).
func LaunchFedTokManager(ctx context.Context, egrp *errgroup.Group, cache server_structs.XRootDServer, copyToXrootdDir server_utils.FedTokCopyToXrootdFunc) {
//
// onTokenUpdate, if non-nil, is called with the token string every time a
// new token is obtained. The persistent cache uses this to keep the
// token in memory.
//
// retryNow, if non-nil, is a channel that triggers an immediate token
// fetch. The launcher signals it after the cache is first registered
// and advertised to the director so the token manager does not have to
// wait for the next ticker fire.
func LaunchFedTokManager(ctx context.Context, egrp *errgroup.Group, cache server_structs.XRootDServer, copyToXrootdDir server_utils.FedTokCopyToXrootdFunc, onTokenUpdate func(string), retryNow <-chan struct{}) {
// Do our initial token fetch+set, then turn things over to the ticker
tok, err := server_utils.CreateFedTok(ctx, cache)
if err != nil {
Expand All @@ -309,6 +318,37 @@ func LaunchFedTokManager(ctx context.Context, egrp *errgroup.Group, cache server
log.Errorf("Failed to set the federation token: %v", err)
}

// Deliver the initial token to the in-memory consumer (if any).
if onTokenUpdate != nil && tok != "" {
onTokenUpdate(tok)
}

// refreshFedTok performs a single fetch-and-set cycle, returning
// the new ticker rate and the token string (empty on failure).
refreshFedTok := func(currentRate time.Duration) (time.Duration, string) {
log.Debugln("Refreshing federation token")
newTok, err := server_utils.CreateFedTok(ctx, cache)
if err != nil {
log.Errorf("Failed to get a federation token: %v", err)
return currentRate, ""
}
log.Traceln("Successfully received new federation token")

newRate := getTickerRate(newTok)

if err := server_utils.SetFedTok(ctx, cache, newTok, copyToXrootdDir); err != nil {
log.Errorf("Failed to write the federation token: %v", err)
} else {
log.Traceln("Successfully wrote new federation token to disk")
}

if onTokenUpdate != nil {
onTokenUpdate(newTok)
}

return newRate, newTok
}

// TODO: Figure out what to do if the Director starts issuing tokens with a different
// lifetime --> we can adjust ticker period dynamically, but what's the sensible thing to do?
fedTokTicker := time.NewTicker(tickerRate)
Expand All @@ -317,29 +357,17 @@ func LaunchFedTokManager(ctx context.Context, egrp *errgroup.Group, cache server
for {
select {
case <-fedTokTicker.C:
// Time to ask the Director for a new token
log.Debugln("Refreshing federation token")
tok, err := server_utils.CreateFedTok(ctx, cache)
if err != nil {
log.Errorf("Failed to get a federation token: %v", err)
continue
}
log.Traceln("Successfully received new federation token")

// Once again, parse the token, use it to set the next ticker fire
// while also building in a circuit breaker to set a min ticker rate
newTickerRate := getTickerRate(tok)
if newTickerRate != tickerRate {
fedTokTicker.Reset(newTickerRate)
tickerRate = newTickerRate
newRate, _ := refreshFedTok(tickerRate)
if newRate != tickerRate {
fedTokTicker.Reset(newRate)
tickerRate = newRate
}

// Set the token in the cache
err = server_utils.SetFedTok(ctx, cache, tok, copyToXrootdDir)
if err != nil {
log.Errorf("Failed to write the federation token: %v", err)
case <-retryNow:
newRate, _ := refreshFedTok(tickerRate)
if newRate != tickerRate {
fedTokTicker.Reset(newRate)
tickerRate = newRate
}
log.Traceln("Successfully wrote new federation token to disk")
case <-ctx.Done():
return nil
}
Expand Down
39 changes: 37 additions & 2 deletions client/acquire_token.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,24 @@ type (
Expiry time.Time
}

// TokenProvider returns a token value, refreshing as needed.
// Implementations must be safe for concurrent use.
// See tokenGenerator for the standard implementation.
TokenProvider interface {
Get() (string, error)
}

// staticTokenProvider is a TokenProvider that always returns
// the same fixed token. It is used for backward-compatibility
// when a caller has a plain string.
staticTokenProvider struct {
token string
}

// An object that can fetch an appropriate token for a given transfer.
//
// Thread-safe and will auto-renew the token throughout the lifetime
// of the process.
// of the process. Satisfies the TokenProvider interface.
tokenGenerator struct {
DirResp *server_structs.DirectorResponse
Destination *pelican_url.PelicanURL
Expand All @@ -88,7 +102,21 @@ type (
}
)

func NewTokenGenerator(dest *pelican_url.PelicanURL, dirResp *server_structs.DirectorResponse, operation config.TokenOperation, enableAcquire bool) *tokenGenerator {
// StaticTokenProvider returns a TokenProvider that always yields the
// given token string. Useful when the caller has a fixed token and
// does not need refresh logic.
func StaticTokenProvider(token string) TokenProvider {
if token == "" {
return nil
}
return &staticTokenProvider{token: token}
}

func (s *staticTokenProvider) Get() (string, error) {
return s.token, nil
}

func newTokenGenerator(dest *pelican_url.PelicanURL, dirResp *server_structs.DirectorResponse, operation config.TokenOperation, enableAcquire bool) *tokenGenerator {
return &tokenGenerator{
DirResp: dirResp,
Destination: dest,
Expand All @@ -98,6 +126,13 @@ func NewTokenGenerator(dest *pelican_url.PelicanURL, dirResp *server_structs.Dir
}
}

// NewTokenGenerator creates a token generator for the given destination and
// operation. This is the exported entry point used by cmd/token.go; most
// internal callers should use the unexported newTokenGenerator.
func NewTokenGenerator(dest *pelican_url.PelicanURL, dirResp *server_structs.DirectorResponse, operation config.TokenOperation, enableAcquire bool) *tokenGenerator {
return newTokenGenerator(dest, dirResp, operation, enableAcquire)
}

func newTokenContentIterator(loc string, name string) *tokenContentIterator {
return &tokenContentIterator{
Location: loc,
Expand Down
4 changes: 2 additions & 2 deletions client/bearer_auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestBearerAuthenticator_Authorize(t *testing.T) {
}))
defer server.Close()

token := NewTokenGenerator(nil, nil, config.TokenSharedRead, false)
token := newTokenGenerator(nil, nil, config.TokenSharedRead, false)
token.SetToken("some_token_1234_abc")
authenticator := &bearerAuthenticator{token: token}
client := &http.Client{}
Expand All @@ -58,7 +58,7 @@ func TestBearerAuthenticator_Authorize(t *testing.T) {

// Test the retry logic for bearer authentication
func TestBearerAuthenticator_Verify(t *testing.T) {
token := NewTokenGenerator(nil, nil, config.TokenSharedRead, false)
token := newTokenGenerator(nil, nil, config.TokenSharedRead, false)
token.SetToken("some_token_1234_abc")
authenticator := &bearerAuthenticator{token: token}

Expand Down
Loading
Loading