Skip to content

Commit f3458a6

Browse files
authored
fix: Improve handling of futures and threads during refresh. (#1573)
Rewrite the performRefresh() as a chain of task futures from the ListeningScheduledExecutorService. Now, tasks submitted to the ListeningScheduledExecutorService never block on another task submitted to the ListeningScheduledExecutorService. This should fix a category of bugs that show up in exceptions and logs as "connection timed out" or "refresh failed" or "bad client certificate". These exceptions can occur when the credentials fail to refresh. This is the underlying bug: The ListeningScheduledExecutorService gets into a state where all its threads are busy running tasks, all running tasks are blocked waiting for recently submitted task to complete, and the recently submitted tasks can't start because there are no available threads in the ListeningScheduledExecutorService. This changes the behavior of CloudSqlInstance.getInstanceData() and CloudSqlInstance.startRefreshAttempt() in ways that have a very small possibility of destabilizing customer applications. In version 1.14.1 and earlier: CloudSqlInstance.getInstanceData() behaved like this: When no refresh attempt is in progress, returns immediately. Otherwise, blocks application thread until the current refresh attempt finishes. If the refresh attempt succeeds, this returns the InstanceData. If not, this throws a RuntimeException, while a new refresh attempt is submitted to the executor in the background.
1 parent fb976dc commit f3458a6

File tree

6 files changed

+349
-106
lines changed

6 files changed

+349
-106
lines changed

core/src/main/java/com/google/cloud/sql/core/CloudSqlInstance.java

Lines changed: 105 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import com.google.common.util.concurrent.ListenableFuture;
2424
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
2525
import com.google.common.util.concurrent.RateLimiter;
26-
import com.google.common.util.concurrent.Uninterruptibles;
2726
import com.google.errorprone.annotations.concurrent.GuardedBy;
2827
import java.io.IOException;
2928
import java.security.KeyPair;
@@ -68,7 +67,10 @@ class CloudSqlInstance {
6867
private ListenableFuture<InstanceData> nextInstanceData;
6968

7069
@GuardedBy("instanceDataGuard")
71-
private boolean forceRefreshRunning;
70+
private boolean refreshRunning;
71+
72+
@GuardedBy("instanceDataGuard")
73+
private Throwable currentRefreshFailure;
7274

7375
/**
7476
* Initializes a new Cloud SQL instance based on the given connection name.
@@ -100,26 +102,48 @@ class CloudSqlInstance {
100102
}
101103

102104
synchronized (instanceDataGuard) {
103-
this.currentInstanceData = executor.submit(this::performRefresh);
105+
this.currentInstanceData = this.startRefreshAttempt();
104106
this.nextInstanceData = currentInstanceData;
105107
}
106108
}
107109

108110
/**
109-
* Returns the current data related to the instance from {@link #performRefresh()}. May block if
110-
* no valid data is currently available.
111+
* Returns the current data related to the instance from {@link #startRefreshAttempt()}. May block
112+
* if no valid data is currently available. This method is called by an application thread when it
113+
* is trying to create a new connection to the database. (It is not called by a
114+
* ListeningScheduledExecutorService task.) So it is OK to block waiting for a future to complete.
115+
*
116+
* <p>When no refresh attempt is in progress, this returns immediately. Otherwise, it waits up to
117+
* timeoutMs milliseconds. If a refresh attempt succeeds, returns immediately at the end of that
118+
* successful attempt. If no attempts succeed within the timeout, throws a RuntimeException with
119+
* the exception from the last failed refresh attempt as the cause.
111120
*/
112121
private InstanceData getInstanceData(long timeoutMs) {
113122
ListenableFuture<InstanceData> instanceDataFuture;
114123
synchronized (instanceDataGuard) {
115124
instanceDataFuture = currentInstanceData;
116125
}
126+
117127
try {
118-
return Uninterruptibles.getUninterruptibly(
119-
instanceDataFuture, timeoutMs, TimeUnit.MILLISECONDS);
120-
} catch (TimeoutException ex) {
121-
throw new RuntimeException(ex);
122-
} catch (ExecutionException ex) {
128+
return instanceDataFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
129+
} catch (TimeoutException e) {
130+
synchronized (instanceDataGuard) {
131+
if (currentRefreshFailure != null) {
132+
throw new RuntimeException(
133+
String.format(
134+
"Unable to get valid instance data within %d ms."
135+
+ " Last refresh attempt failed:",
136+
timeoutMs)
137+
+ currentRefreshFailure.getMessage(),
138+
currentRefreshFailure);
139+
}
140+
}
141+
throw new RuntimeException(
142+
String.format(
143+
"Unable to get valid instance data within %d ms. No refresh has completed.",
144+
timeoutMs),
145+
e);
146+
} catch (ExecutionException | InterruptedException ex) {
123147
Throwable cause = ex.getCause();
124148
Throwables.throwIfUnchecked(cause);
125149
throw new RuntimeException(cause);
@@ -165,43 +189,77 @@ String getPreferredIp(List<String> preferredTypes, long timeoutMs) {
165189
*/
166190
void forceRefresh() {
167191
synchronized (instanceDataGuard) {
168-
// Don't force a refresh until the current forceRefresh operation
192+
// Don't force a refresh until the current refresh operation
169193
// has produced a successful refresh.
170-
if (forceRefreshRunning) {
194+
if (refreshRunning) {
171195
return;
172196
}
173-
174-
forceRefreshRunning = true;
175197
nextInstanceData.cancel(false);
176198
logger.fine(
177199
String.format(
178200
"[%s] Force Refresh: the next refresh operation was cancelled."
179201
+ " Scheduling new refresh operation immediately.",
180202
instanceName));
181-
nextInstanceData = executor.submit(this::performRefresh);
203+
nextInstanceData = this.startRefreshAttempt();
182204
}
183205
}
184206

185207
/**
186-
* Triggers an update of internal information obtained from the Cloud SQL Admin API. Replaces the
187-
* value of currentInstanceData and schedules the next refresh shortly before the information
188-
* would expire.
208+
* Triggers an update of internal information obtained from the Cloud SQL Admin API, returning a
209+
* future that resolves once a valid InstanceData has been acquired. This sets up a chain of
210+
* futures that will 1. Acquire a rate limiter. 2. Attempt to fetch instance data. 3. Schedule the
211+
* next attempt to get instance data based on the success/failure of this attempt.
212+
*
213+
* @see com.google.cloud.sql.core.CloudSqlInstance#handleRefreshResult(
214+
* com.google.common.util.concurrent.ListenableFuture)
189215
*/
190-
private InstanceData performRefresh() throws InterruptedException, ExecutionException {
191-
logger.fine(
192-
String.format("[%s] Refresh Operation: Acquiring rate limiter permit.", instanceName));
216+
private ListenableFuture<InstanceData> startRefreshAttempt() {
217+
// As soon as we begin submitting refresh attempts to the executor, mark a refresh
218+
// as "in-progress" so that subsequent forceRefresh() calls balk until this one completes.
219+
synchronized (instanceDataGuard) {
220+
refreshRunning = true;
221+
}
222+
193223
// To avoid unreasonable SQL Admin API usage, use a rate limit to throttle our usage.
194-
//noinspection UnstableApiUsage
195-
forcedRenewRateLimiter.acquire();
196-
logger.fine(
197-
String.format(
198-
"[%s] Refresh Operation: Acquired rate limiter permit. Starting refresh...",
199-
instanceName));
224+
ListenableFuture<?> rateLimit =
225+
executor.submit(
226+
() -> {
227+
logger.fine(
228+
String.format(
229+
"[%s] Refresh Operation: Acquiring rate limiter permit.", instanceName));
230+
//noinspection UnstableApiUsage
231+
forcedRenewRateLimiter.acquire();
232+
logger.fine(
233+
String.format(
234+
"[%s] Refresh Operation: Acquired rate limiter permit. Starting refresh...",
235+
instanceName));
236+
},
237+
executor);
238+
239+
// Once rate limiter is done, attempt to getInstanceData.
240+
ListenableFuture<InstanceData> dataFuture =
241+
Futures.whenAllComplete(rateLimit)
242+
.callAsync(
243+
() ->
244+
instanceDataSupplier.getInstanceData(
245+
this.instanceName,
246+
this.accessTokenSupplier,
247+
this.authType,
248+
executor,
249+
keyPair),
250+
executor);
200251

252+
// Finally, reschedule refresh after getInstanceData is complete.
253+
return Futures.whenAllComplete(dataFuture)
254+
.callAsync(() -> handleRefreshResult(dataFuture), executor);
255+
}
256+
257+
private ListenableFuture<InstanceData> handleRefreshResult(
258+
ListenableFuture<InstanceData> dataFuture) {
201259
try {
202-
InstanceData data =
203-
instanceDataSupplier.getInstanceData(
204-
this.instanceName, this.accessTokenSupplier, this.authType, executor, keyPair);
260+
// This does not block, because it only gets called when dataFuture has completed.
261+
// This will throw an exception if the refresh attempt has failed.
262+
InstanceData data = dataFuture.get();
205263

206264
logger.fine(
207265
String.format(
@@ -220,13 +278,21 @@ private InstanceData performRefresh() throws InterruptedException, ExecutionExce
220278
.toString()));
221279

222280
synchronized (instanceDataGuard) {
281+
// Refresh completed successfully, reset forceRefreshRunning.
282+
refreshRunning = false;
283+
currentRefreshFailure = null;
223284
currentInstanceData = Futures.immediateFuture(data);
285+
286+
// Now update nextInstanceData to perform a refresh after the
287+
// scheduled delay
224288
nextInstanceData =
225-
executor.schedule(this::performRefresh, secondsToRefresh, TimeUnit.SECONDS);
226-
// Refresh completed successfully, reset forceRefreshRunning.
227-
forceRefreshRunning = false;
289+
Futures.scheduleAsync(
290+
this::startRefreshAttempt, secondsToRefresh, TimeUnit.SECONDS, executor);
291+
292+
// Resolves to an InstanceData immediately
293+
return currentInstanceData;
228294
}
229-
return data;
295+
230296
} catch (ExecutionException | InterruptedException e) {
231297
logger.log(
232298
Level.FINE,
@@ -235,9 +301,12 @@ private InstanceData performRefresh() throws InterruptedException, ExecutionExce
235301
instanceName),
236302
e);
237303
synchronized (instanceDataGuard) {
238-
nextInstanceData = executor.submit(this::performRefresh);
304+
currentRefreshFailure = e;
305+
nextInstanceData = this.startRefreshAttempt();
306+
307+
// Resolves after the next successful refresh attempt.
308+
return nextInstanceData;
239309
}
240-
throw e;
241310
}
242311
}
243312

core/src/main/java/com/google/cloud/sql/core/InstanceDataSupplier.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ interface InstanceDataSupplier {
3030
* @throws ExecutionException if an exception is thrown during execution.
3131
* @throws InterruptedException if the executor is interrupted.
3232
*/
33-
InstanceData getInstanceData(
33+
ListenableFuture<InstanceData> getInstanceData(
3434
CloudSqlInstanceName instanceName,
3535
AccessTokenSupplier accessTokenSupplier,
3636
AuthType authType,

core/src/main/java/com/google/cloud/sql/core/SqlAdminApiFetcher.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ private String generatePublicKeyCert(KeyPair keyPair) {
9595
* @throws InterruptedException if the executor is interrupted.
9696
*/
9797
@Override
98-
public InstanceData getInstanceData(
98+
public ListenableFuture<InstanceData> getInstanceData(
9999
CloudSqlInstanceName instanceName,
100100
AccessTokenSupplier accessTokenSupplier,
101101
AuthType authType,
@@ -163,9 +163,9 @@ public InstanceData getInstanceData(
163163
},
164164
executor);
165165

166-
InstanceData instanceData = done.get();
167-
logger.fine(String.format("[%s] ALL FUTURES DONE", instanceName));
168-
return instanceData;
166+
done.addListener(
167+
() -> logger.fine(String.format("[%s] ALL FUTURES DONE", instanceName)), executor);
168+
return done;
169169
}
170170

171171
String getApplicationName() {

0 commit comments

Comments
 (0)