Skip to content

Commit 580c015

Browse files
Add Asynchronous Token Refresh to RefreshableTokenSource (#455)
## Motivation Synchronous token refresh introduces unnecessary blocking and latency, which becomes especially problematic in high QPS (Queries Per Second) workloads. In such scenarios, blocking token refresh operations can create bottlenecks, impact throughput, and cause request queuing. ## Solution This PR introduces asynchronous token refresh to improve responsiveness and reliability, with particular benefits for high-throughput applications. ## What changes are proposed in this pull request? **Async Refresh Option**: Added `withAsyncRefresh(boolean enabled)` method to enable/disable asynchronous token refresh. When enabled, tokens are refreshed in the background when they become "stale" (close to expiry). ### Token State Management - **Three-State Token System**: - `FRESH`: Token is valid and not close to expiry - `STALE`: Token is valid but will expire soon (triggers async refresh if enabled) - `EXPIRED`: Token has expired (requires blocking refresh) - `Token` class is now a pure data class, holding only token information (access token, refresh token, expiry time). - `TokenSource` implementations are responsible for token state management and refresh logic. ### Performance Optimizations - **Non-blocking for Stale Tokens**: When async is enabled and the token is stale but not expired, API calls continue using the current token while a background refresh is triggered, reducing latency and avoiding thread blocking. - **Blocking Only on Expiry**: If the token is expired, calls will still block until a new token is fetched, ensuring correctness. - **Default Stale Duration**: Tokens are considered stale 3 minutes before expiry, allowing proactive refresh while maintaining validity. ### Thread Safety & Reliability - **Synchronized Refresh Logic**: All async refresh operations are synchronized to prevent race conditions and redundant refreshes. - **Refresh State Tracking**: The implementation tracks: - Whether a refresh is already in progress - Whether the last refresh succeeded - This prevents unnecessary or repeated background refreshes - **Failure Handling**: If an async refresh fails, subsequent refreshes will be forced to be synchronous until a successful refresh occurs. ### Configuration Options **Experimental Features (may change in future releases)**: - withClockSupplier() - Custom clock supplier for testing - withAsyncRefresh() - Enable/disable asynchronous token refresh - withExpiryBuffer() - Configure the expiry buffer duration ## Testing - **Unit tests** cover async refresh triggering, correct token usage during background refresh, and thread safety. - **Time Control for Testing:** The `RefreshableTokenSource` class can be configured with a custom clock supplier via `withClockSupplier(ClockSupplier clockSupplier)`, allowing tests to precisely control and simulate token expiry and refresh timing. This enables deterministic testing of token state transitions and refresh behavior without relying on real system time. NO_CHANGELOG=true --------- Co-authored-by: Parth Bansal <[email protected]>
1 parent e51c2c6 commit 580c015

File tree

11 files changed

+436
-135
lines changed

11 files changed

+436
-135
lines changed

databricks-sdk-java/src/main/java/com/databricks/sdk/core/oauth/RefreshableTokenSource.java

Lines changed: 221 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,36 +5,254 @@
55
import com.databricks.sdk.core.http.FormRequest;
66
import com.databricks.sdk.core.http.HttpClient;
77
import com.databricks.sdk.core.http.Request;
8+
import com.databricks.sdk.core.utils.ClockSupplier;
9+
import com.databricks.sdk.core.utils.UtcClockSupplier;
10+
import java.time.Duration;
811
import java.time.Instant;
912
import java.util.Base64;
1013
import java.util.Map;
14+
import java.util.concurrent.CompletableFuture;
1115
import org.apache.http.HttpHeaders;
16+
import org.slf4j.Logger;
17+
import org.slf4j.LoggerFactory;
1218

1319
/**
1420
* An OAuth TokenSource which can be refreshed.
1521
*
16-
* <p>Calls to getToken() will first check if the token is still valid (currently defined by having
17-
* at least 10 seconds until expiry). If not, refresh() is called first to refresh the token.
22+
* <p>This class supports both synchronous and asynchronous token refresh. When async is enabled,
23+
* stale tokens will trigger a background refresh, while expired tokens will block until a new token
24+
* is fetched.
1825
*/
1926
public abstract class RefreshableTokenSource implements TokenSource {
20-
protected Token token;
2127

28+
/**
29+
* Enum representing the state of the token. FRESH: Token is valid and not close to expiry. STALE:
30+
* Token is valid but will expire soon - an async refresh will be triggered if enabled. EXPIRED:
31+
* Token has expired and must be refreshed using a blocking call.
32+
*/
33+
protected enum TokenState {
34+
FRESH,
35+
STALE,
36+
EXPIRED
37+
}
38+
39+
private static final Logger logger = LoggerFactory.getLogger(RefreshableTokenSource.class);
40+
// Default duration before expiry to consider a token as 'stale'.
41+
private static final Duration DEFAULT_STALE_DURATION = Duration.ofMinutes(3);
42+
// Default additional buffer before expiry to consider a token as expired.
43+
private static final Duration DEFAULT_EXPIRY_BUFFER = Duration.ofSeconds(40);
44+
45+
// The current OAuth token. May be null if not yet fetched.
46+
protected volatile Token token;
47+
// Whether asynchronous refresh is enabled.
48+
private boolean asyncEnabled = false;
49+
// Duration before expiry to consider a token as 'stale'.
50+
private Duration staleDuration = DEFAULT_STALE_DURATION;
51+
// Additional buffer before expiry to consider a token as expired.
52+
private Duration expiryBuffer = DEFAULT_EXPIRY_BUFFER;
53+
// Whether a refresh is currently in progress (for async refresh).
54+
private boolean refreshInProgress = false;
55+
// Whether the last refresh attempt succeeded.
56+
private boolean lastRefreshSucceeded = true;
57+
// Clock supplier for current time.
58+
private ClockSupplier clockSupplier = new UtcClockSupplier();
59+
60+
/** Constructs a new {@code RefreshableTokenSource} with no initial token. */
2261
public RefreshableTokenSource() {}
2362

63+
/**
64+
* Constructor with initial token.
65+
*
66+
* @param token The initial token to use.
67+
*/
2468
public RefreshableTokenSource(Token token) {
2569
this.token = token;
2670
}
2771

72+
/**
73+
* Set the clock supplier for current time.
74+
*
75+
* <p><b>Experimental:</b> This method may change or be removed in future releases.
76+
*
77+
* @param clockSupplier The clock supplier to use.
78+
* @return this instance for chaining
79+
*/
80+
public RefreshableTokenSource withClockSupplier(ClockSupplier clockSupplier) {
81+
this.clockSupplier = clockSupplier;
82+
return this;
83+
}
84+
85+
/**
86+
* Enable or disable asynchronous token refresh.
87+
*
88+
* <p><b>Experimental:</b> This method may change or be removed in future releases.
89+
*
90+
* @param enabled true to enable async refresh, false to disable
91+
* @return this instance for chaining
92+
*/
93+
public RefreshableTokenSource withAsyncRefresh(boolean enabled) {
94+
this.asyncEnabled = enabled;
95+
return this;
96+
}
97+
98+
/**
99+
* Set the expiry buffer. If the token's lifetime is less than this buffer, it is considered
100+
* expired.
101+
*
102+
* <p><b>Experimental:</b> This method may change or be removed in future releases.
103+
*
104+
* @param buffer the expiry buffer duration
105+
* @return this instance for chaining
106+
*/
107+
public RefreshableTokenSource withExpiryBuffer(Duration buffer) {
108+
this.expiryBuffer = buffer;
109+
return this;
110+
}
111+
112+
/**
113+
* Refresh the OAuth token. Subclasses must implement this to define how the token is refreshed.
114+
*
115+
* <p>This method may throw an exception if the token cannot be refreshed. The specific exception
116+
* type depends on the implementation.
117+
*
118+
* @return The newly refreshed Token.
119+
*/
120+
protected abstract Token refresh();
121+
122+
/**
123+
* Gets the current token, refreshing if necessary. If async refresh is enabled, may return a
124+
* stale token while a refresh is in progress.
125+
*
126+
* <p>This method may throw an exception if the token cannot be refreshed, depending on the
127+
* implementation of {@link #refresh()}.
128+
*
129+
* @return The current valid token
130+
*/
131+
public Token getToken() {
132+
if (asyncEnabled) {
133+
return getTokenAsync();
134+
}
135+
return getTokenBlocking();
136+
}
137+
138+
/**
139+
* Determine the state of the current token (fresh, stale, or expired).
140+
*
141+
* @return The token state
142+
*/
143+
protected TokenState getTokenState(Token t) {
144+
if (t == null) {
145+
return TokenState.EXPIRED;
146+
}
147+
Duration lifeTime = Duration.between(Instant.now(clockSupplier.getClock()), t.getExpiry());
148+
if (lifeTime.compareTo(expiryBuffer) <= 0) {
149+
return TokenState.EXPIRED;
150+
}
151+
if (lifeTime.compareTo(staleDuration) <= 0) {
152+
return TokenState.STALE;
153+
}
154+
return TokenState.FRESH;
155+
}
156+
157+
/**
158+
* Get the current token, blocking to refresh if expired.
159+
*
160+
* <p>This method may throw an exception if the token cannot be refreshed, depending on the
161+
* implementation of {@link #refresh()}.
162+
*
163+
* @return The current valid token
164+
*/
165+
protected Token getTokenBlocking() {
166+
// Use double-checked locking to minimize synchronization overhead on reads:
167+
// 1. Check if the token is expired without locking.
168+
// 2. If expired, synchronize and check again (another thread may have refreshed it).
169+
// 3. If still expired, perform the refresh.
170+
if (getTokenState(token) != TokenState.EXPIRED) {
171+
return token;
172+
}
173+
synchronized (this) {
174+
if (getTokenState(token) != TokenState.EXPIRED) {
175+
return token;
176+
}
177+
lastRefreshSucceeded = false;
178+
try {
179+
token = refresh();
180+
} catch (Exception e) {
181+
logger.error("Failed to refresh token synchronously", e);
182+
throw e;
183+
}
184+
lastRefreshSucceeded = true;
185+
return token;
186+
}
187+
}
188+
189+
/**
190+
* Get the current token, possibly triggering an async refresh if stale. If the token is expired,
191+
* blocks to refresh.
192+
*
193+
* <p>This method may throw an exception if the token cannot be refreshed, depending on the
194+
* implementation of {@link #refresh()}.
195+
*
196+
* @return The current valid or stale token
197+
*/
198+
protected Token getTokenAsync() {
199+
Token currentToken = token;
200+
201+
switch (getTokenState(currentToken)) {
202+
case FRESH:
203+
return currentToken;
204+
case STALE:
205+
triggerAsyncRefresh();
206+
return currentToken;
207+
case EXPIRED:
208+
return getTokenBlocking();
209+
default:
210+
throw new IllegalStateException("Invalid token state.");
211+
}
212+
}
213+
214+
/**
215+
* Trigger an asynchronous refresh of the token if not already in progress and last refresh
216+
* succeeded.
217+
*/
218+
private synchronized void triggerAsyncRefresh() {
219+
// Check token state again inside the synchronized block to avoid triggering a refresh if
220+
// another thread updated the token in the meantime.
221+
if (!refreshInProgress && lastRefreshSucceeded && getTokenState(token) != TokenState.FRESH) {
222+
refreshInProgress = true;
223+
CompletableFuture.runAsync(
224+
() -> {
225+
try {
226+
// Attempt to refresh the token in the background
227+
Token newToken = refresh();
228+
synchronized (this) {
229+
token = newToken;
230+
refreshInProgress = false;
231+
}
232+
} catch (Exception e) {
233+
synchronized (this) {
234+
lastRefreshSucceeded = false;
235+
refreshInProgress = false;
236+
logger.error("Asynchronous token refresh failed", e);
237+
}
238+
}
239+
});
240+
}
241+
}
242+
28243
/**
29244
* Helper method implementing OAuth token refresh.
30245
*
246+
* @param hc The HTTP client to use for the request.
31247
* @param clientId The client ID to authenticate with.
32248
* @param clientSecret The client secret to authenticate with.
33249
* @param tokenUrl The authorization URL for fetching tokens.
34250
* @param params Additional request parameters.
35251
* @param headers Additional headers.
36252
* @param position The position of the authentication parameters in the request.
37253
* @return The newly fetched Token.
254+
* @throws DatabricksException if the refresh fails
255+
* @throws IllegalArgumentException if the OAuth response contains an error
38256
*/
39257
protected static Token retrieveToken(
40258
HttpClient hc,
@@ -75,13 +293,4 @@ protected static Token retrieveToken(
75293
throw new DatabricksException("Failed to refresh credentials: " + e.getMessage(), e);
76294
}
77295
}
78-
79-
protected abstract Token refresh();
80-
81-
public synchronized Token getToken() {
82-
if (token == null || !token.isValid()) {
83-
token = refresh();
84-
}
85-
return token;
86-
}
87296
}
Lines changed: 25 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package com.databricks.sdk.core.oauth;
22

3-
import com.databricks.sdk.core.utils.ClockSupplier;
4-
import com.databricks.sdk.core.utils.SystemClockSupplier;
53
import com.fasterxml.jackson.annotation.JsonCreator;
64
import com.fasterxml.jackson.annotation.JsonProperty;
75
import java.time.Instant;
@@ -23,16 +21,9 @@ public class Token {
2321
*/
2422
@JsonProperty private Instant expiry;
2523

26-
private final ClockSupplier clockSupplier;
27-
2824
/** Constructor for non-refreshable tokens (e.g. M2M). */
2925
public Token(String accessToken, String tokenType, Instant expiry) {
30-
this(accessToken, tokenType, null, expiry, new SystemClockSupplier());
31-
}
32-
33-
/** Constructor for non-refreshable tokens (e.g. M2M) with ClockSupplier */
34-
public Token(String accessToken, String tokenType, Instant expiry, ClockSupplier clockSupplier) {
35-
this(accessToken, tokenType, null, expiry, clockSupplier);
26+
this(accessToken, tokenType, null, expiry);
3627
}
3728

3829
/** Constructor for refreshable tokens. */
@@ -42,51 +33,48 @@ public Token(
4233
@JsonProperty("tokenType") String tokenType,
4334
@JsonProperty("refreshToken") String refreshToken,
4435
@JsonProperty("expiry") Instant expiry) {
45-
this(accessToken, tokenType, refreshToken, expiry, new SystemClockSupplier());
46-
}
47-
48-
/** Constructor for refreshable tokens with ClockSupplier. */
49-
public Token(
50-
String accessToken,
51-
String tokenType,
52-
String refreshToken,
53-
Instant expiry,
54-
ClockSupplier clockSupplier) {
5536
Objects.requireNonNull(accessToken, "accessToken must be defined");
5637
Objects.requireNonNull(tokenType, "tokenType must be defined");
5738
Objects.requireNonNull(expiry, "expiry must be defined");
58-
Objects.requireNonNull(clockSupplier, "clockSupplier must be defined");
5939
this.accessToken = accessToken;
6040
this.tokenType = tokenType;
6141
this.refreshToken = refreshToken;
6242
this.expiry = expiry;
63-
this.clockSupplier = clockSupplier;
64-
}
65-
66-
public boolean isExpired() {
67-
if (expiry == null) {
68-
return false;
69-
}
70-
// Azure Databricks rejects tokens that expire in 30 seconds or less,
71-
// so we refresh the token 40 seconds before it expires.
72-
Instant potentiallyExpired = expiry.minusSeconds(40);
73-
Instant now = Instant.now(clockSupplier.getClock());
74-
return potentiallyExpired.isBefore(now);
75-
}
76-
77-
public boolean isValid() {
78-
return accessToken != null && !isExpired();
7943
}
8044

45+
/**
46+
* Returns the type of the token (e.g., "Bearer").
47+
*
48+
* @return the token type
49+
*/
8150
public String getTokenType() {
8251
return tokenType;
8352
}
8453

54+
/**
55+
* Returns the refresh token, if available. May be null for non-refreshable tokens.
56+
*
57+
* @return the refresh token or null
58+
*/
8559
public String getRefreshToken() {
8660
return refreshToken;
8761
}
8862

63+
/**
64+
* Returns the access token string.
65+
*
66+
* @return the access token
67+
*/
8968
public String getAccessToken() {
9069
return accessToken;
9170
}
71+
72+
/**
73+
* Returns the expiry time of the token as a Instant.
74+
*
75+
* @return the expiry time
76+
*/
77+
public Instant getExpiry() {
78+
return this.expiry;
79+
}
9280
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import java.time.Clock;
44

5-
public class SystemClockSupplier implements ClockSupplier {
5+
public class UtcClockSupplier implements ClockSupplier {
66
@Override
77
public Clock getClock() {
88
return Clock.systemUTC();

databricks-sdk-java/src/test/java/com/databricks/sdk/core/CliTokenSourceTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ public void testRefreshWithExpiry(
154154
Token token = tokenSource.refresh();
155155
assertEquals("Bearer", token.getTokenType());
156156
assertEquals("test-token", token.getAccessToken());
157-
assertEquals(shouldBeExpired, token.isExpired());
157+
assertEquals(shouldBeExpired, token.getExpiry().isBefore(Instant.now()));
158158
}
159159
}
160160
}

0 commit comments

Comments
 (0)