From c66df788b72bef7ecaff6af0afc84c10381a3888 Mon Sep 17 00:00:00 2001 From: ofekshenawa Date: Thu, 16 Jan 2025 11:23:56 +0200 Subject: [PATCH 1/3] Create token manager --- token_manager.go | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 token_manager.go diff --git a/token_manager.go b/token_manager.go new file mode 100644 index 0000000000..1361f636f8 --- /dev/null +++ b/token_manager.go @@ -0,0 +1,41 @@ +package redis + +import ( + "sync" + "time" +) + +type TokenManager struct { + token string + expiresAt time.Time + mutex sync.Mutex +} + +func NewTokenManager() *TokenManager { + return &TokenManager{} +} + +func (tm *TokenManager) SetToken(token string, ttl time.Duration) { + tm.mutex.Lock() + defer tm.mutex.Unlock() + tm.token = token + tm.expiresAt = time.Now().Add(ttl) +} + +func (tm *TokenManager) GetToken() (string, bool) { + tm.mutex.Lock() + defer tm.mutex.Unlock() + if time.Now().After(tm.expiresAt) { + return "", false + } + return tm.token, true +} + +func (tm *TokenManager) RefreshToken(fetchToken func() (string, time.Duration, error)) error { + token, ttl, err := fetchToken() + if err != nil { + return err + } + tm.SetToken(token, ttl) + return nil +} From 4d13d4bc736a50bfb55241ab7fab94f1a170598e Mon Sep 17 00:00:00 2001 From: ofekshenawa Date: Tue, 28 Jan 2025 12:39:01 +0200 Subject: [PATCH 2/3] Add telemetry and auto refresh --- token_manager.go | 86 ++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 79 insertions(+), 7 deletions(-) diff --git a/token_manager.go b/token_manager.go index 1361f636f8..c6c4d7ddb3 100644 --- a/token_manager.go +++ b/token_manager.go @@ -1,27 +1,37 @@ package redis import ( + "log" "sync" "time" ) type TokenManager struct { - token string - expiresAt time.Time - mutex sync.Mutex + token string + expiresAt time.Time + mutex sync.Mutex + refreshFunc func() (string, time.Duration, error) + stopChan chan struct{} } -func NewTokenManager() *TokenManager { - return &TokenManager{} +// NewTokenManager initializes a new TokenManager. +func NewTokenManager(refreshFunc func() (string, time.Duration, error)) *TokenManager { + return &TokenManager{ + refreshFunc: refreshFunc, + stopChan: make(chan struct{}), + } } +// SetToken updates the token and its expiration. func (tm *TokenManager) SetToken(token string, ttl time.Duration) { tm.mutex.Lock() defer tm.mutex.Unlock() tm.token = token tm.expiresAt = time.Now().Add(ttl) + log.Printf("Token updated with TTL: %s", ttl) } +// GetToken returns the current token if it's still valid. func (tm *TokenManager) GetToken() (string, bool) { tm.mutex.Lock() defer tm.mutex.Unlock() @@ -31,11 +41,73 @@ func (tm *TokenManager) GetToken() (string, bool) { return tm.token, true } -func (tm *TokenManager) RefreshToken(fetchToken func() (string, time.Duration, error)) error { - token, ttl, err := fetchToken() +// RefreshToken fetches a new token using the provided refresh function. +func (tm *TokenManager) RefreshToken() error { + if tm.refreshFunc == nil { + return nil + } + token, ttl, err := tm.refreshFunc() if err != nil { return err } tm.SetToken(token, ttl) return nil } + +// StartAutoRefresh starts a goroutine to proactively refresh the token. +func (tm *TokenManager) StartAutoRefresh() { + go func() { + ticker := time.NewTicker(1 * time.Minute) // Check periodically + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if tm.shouldRefresh() { + log.Println("Proactively refreshing token...") + if err := tm.RefreshToken(); err != nil { + log.Printf("Failed to refresh token: %v", err) + } + } + case <-tm.stopChan: + log.Println("Stopping auto-refresh...") + return + } + } + }() +} + +// StopAutoRefresh stops the auto-refresh goroutine. +func (tm *TokenManager) StopAutoRefresh() { + close(tm.stopChan) +} + +// shouldRefresh checks if the token is nearing expiration. +func (tm *TokenManager) shouldRefresh() bool { + tm.mutex.Lock() + defer tm.mutex.Unlock() + remaining := time.Until(tm.expiresAt) + return remaining < 5*time.Minute // Refresh if less than 5 minutes remain +} + +// MonitorTelemetry adds monitoring for token usage and expiration. +func (tm *TokenManager) MonitorTelemetry() { + go func() { + ticker := time.NewTicker(30 * time.Second) // Adjust as needed + defer ticker.Stop() + + for { + select { + case <-ticker.C: + token, valid := tm.GetToken() + if !valid { + log.Println("Token has expired.") + } else { + log.Printf("Token is valid: %s, expires in: %s", token, time.Until(tm.expiresAt)) + } + case <-tm.stopChan: + return + } + } + }() +} From 22826634e6cc7b6e14d3405607f8e3057ba8fa39 Mon Sep 17 00:00:00 2001 From: ofekshenawa Date: Tue, 28 Jan 2025 21:18:17 +0200 Subject: [PATCH 3/3] update token manger to match credential provider --- token_manager.go | 52 ++++++++++++++++++++++++++++++------------------ 1 file changed, 33 insertions(+), 19 deletions(-) diff --git a/token_manager.go b/token_manager.go index c6c4d7ddb3..3a88dc74e9 100644 --- a/token_manager.go +++ b/token_manager.go @@ -7,18 +7,23 @@ import ( ) type TokenManager struct { - token string - expiresAt time.Time - mutex sync.Mutex - refreshFunc func() (string, time.Duration, error) - stopChan chan struct{} + token string + expiresAt time.Time + mutex sync.Mutex + refreshFunc func() (string, time.Duration, error) + stopChan chan struct{} + refreshTicker *time.Ticker + refreshInterval time.Duration + telemetryEnabled bool } // NewTokenManager initializes a new TokenManager. -func NewTokenManager(refreshFunc func() (string, time.Duration, error)) *TokenManager { +func NewTokenManager(refreshFunc func() (string, time.Duration, error), refreshInterval time.Duration, telemetryEnabled bool) *TokenManager { return &TokenManager{ - refreshFunc: refreshFunc, - stopChan: make(chan struct{}), + refreshFunc: refreshFunc, + stopChan: make(chan struct{}), + refreshInterval: refreshInterval, + telemetryEnabled: telemetryEnabled, } } @@ -28,7 +33,7 @@ func (tm *TokenManager) SetToken(token string, ttl time.Duration) { defer tm.mutex.Unlock() tm.token = token tm.expiresAt = time.Now().Add(ttl) - log.Printf("Token updated with TTL: %s", ttl) + log.Printf("[TokenManager] Token updated with TTL: %s", ttl) } // GetToken returns the current token if it's still valid. @@ -48,38 +53,42 @@ func (tm *TokenManager) RefreshToken() error { } token, ttl, err := tm.refreshFunc() if err != nil { + log.Printf("[TokenManager] Failed to refresh token: %v", err) return err } tm.SetToken(token, ttl) + log.Println("[TokenManager] Token refreshed successfully.") return nil } // StartAutoRefresh starts a goroutine to proactively refresh the token. func (tm *TokenManager) StartAutoRefresh() { + tm.refreshTicker = time.NewTicker(tm.refreshInterval) go func() { - ticker := time.NewTicker(1 * time.Minute) // Check periodically - defer ticker.Stop() - for { select { - case <-ticker.C: + case <-tm.refreshTicker.C: if tm.shouldRefresh() { - log.Println("Proactively refreshing token...") + log.Println("[TokenManager] Proactively refreshing token...") if err := tm.RefreshToken(); err != nil { - log.Printf("Failed to refresh token: %v", err) + log.Printf("[TokenManager] Error during token refresh: %v", err) } } case <-tm.stopChan: - log.Println("Stopping auto-refresh...") + log.Println("[TokenManager] Stopping auto-refresh...") return } } }() } -// StopAutoRefresh stops the auto-refresh goroutine. +// StopAutoRefresh stops the auto-refresh goroutine and cleans up resources. func (tm *TokenManager) StopAutoRefresh() { + if tm.refreshTicker != nil { + tm.refreshTicker.Stop() + } close(tm.stopChan) + log.Println("[TokenManager] Auto-refresh stopped and resources cleaned.") } // shouldRefresh checks if the token is nearing expiration. @@ -92,6 +101,10 @@ func (tm *TokenManager) shouldRefresh() bool { // MonitorTelemetry adds monitoring for token usage and expiration. func (tm *TokenManager) MonitorTelemetry() { + if !tm.telemetryEnabled { + return + } + go func() { ticker := time.NewTicker(30 * time.Second) // Adjust as needed defer ticker.Stop() @@ -101,11 +114,12 @@ func (tm *TokenManager) MonitorTelemetry() { case <-ticker.C: token, valid := tm.GetToken() if !valid { - log.Println("Token has expired.") + log.Println("[TokenManager] Token has expired.") } else { - log.Printf("Token is valid: %s, expires in: %s", token, time.Until(tm.expiresAt)) + log.Printf("[TokenManager] Token is valid: expires in %s", time.Until(tm.expiresAt)) } case <-tm.stopChan: + log.Println("[TokenManager] Telemetry monitoring stopped.") return } }