Skip to content

Commit c0a5d58

Browse files
authored
fix: remove race condition bug in refresh logic (#1390)
Update the logic in forceRefresh() to reduce the churn on the thread pool when the certificate refresh API calls are failing. New forceRefresh() logic ensures that: Only 1 refresh cycle may run at a time. If a refresh cycle is in progress, then it will not be canceled until it succeeds. Add new test cases to validate race conditions, deadlocks, and concurrency. Add additional logging to help diagnose production problems with certificate refresh. Related to #1314 Fixes #1209 Fixes #1159
1 parent 75fef46 commit c0a5d58

File tree

7 files changed

+410
-47
lines changed

7 files changed

+410
-47
lines changed

core/src/main/java/com/google/cloud/sql/core/CloudSqlInstance.java

Lines changed: 54 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import dev.failsafe.RateLimiter;
2929
import java.io.IOException;
3030
import java.security.KeyPair;
31-
import java.time.Duration;
3231
import java.time.Instant;
3332
import java.time.temporal.ChronoUnit;
3433
import java.util.List;
@@ -57,8 +56,7 @@ class CloudSqlInstance {
5756
private final ListenableFuture<KeyPair> keyPair;
5857
private final Object instanceDataGuard = new Object();
5958
// Limit forced refreshes to 1 every minute.
60-
private final RateLimiter<Object> forcedRenewRateLimiter =
61-
RateLimiter.burstyBuilder(2, Duration.ofSeconds(30)).build();
59+
private final RateLimiter<Object> forcedRenewRateLimiter;
6260

6361
private final RefreshCalculator refreshCalculator = new RefreshCalculator();
6462

@@ -68,6 +66,9 @@ class CloudSqlInstance {
6866
@GuardedBy("instanceDataGuard")
6967
private ListenableFuture<InstanceData> nextInstanceData;
7068

69+
@GuardedBy("instanceDataGuard")
70+
private boolean forceRefreshRunning;
71+
7172
/**
7273
* Initializes a new Cloud SQL instance based on the given connection name.
7374
*
@@ -82,12 +83,14 @@ class CloudSqlInstance {
8283
AuthType authType,
8384
CredentialFactory tokenSourceFactory,
8485
ListeningScheduledExecutorService executor,
85-
ListenableFuture<KeyPair> keyPair) {
86+
ListenableFuture<KeyPair> keyPair,
87+
RateLimiter<Object> forcedRenewRateLimiter) {
8688
this.instanceName = new CloudSqlInstanceName(connectionName);
8789
this.instanceDataSupplier = instanceDataSupplier;
8890
this.authType = authType;
8991
this.executor = executor;
9092
this.keyPair = keyPair;
93+
this.forcedRenewRateLimiter = forcedRenewRateLimiter;
9194

9295
if (authType == AuthType.IAM) {
9396
HttpRequestInitializer source = tokenSourceFactory.create();
@@ -159,20 +162,21 @@ String getPreferredIp(List<String> preferredTypes) {
159162
*/
160163
void forceRefresh() {
161164
synchronized (instanceDataGuard) {
162-
nextInstanceData.cancel(false);
163-
if (nextInstanceData.isCancelled()) {
164-
logger.fine(
165-
"Force Refresh: the next refresh operation was cancelled."
166-
+ " Scheduling new refresh operation immediately.");
167-
currentInstanceData = executor.submit(this::performRefresh);
168-
nextInstanceData = currentInstanceData;
169-
} else {
170-
logger.fine(
171-
"Force Refresh: the next refresh operation is already running."
172-
+ " Marking it as the current operation.");
173-
// Otherwise it's already running, so just move next to current.
174-
currentInstanceData = nextInstanceData;
165+
// Don't force a refresh until the current forceRefresh operation
166+
// has produced a successful refresh.
167+
if (forceRefreshRunning) {
168+
return;
175169
}
170+
171+
forceRefreshRunning = true;
172+
nextInstanceData.cancel(false);
173+
logger.fine(
174+
String.format(
175+
"[%s] Force Refresh: the next refresh operation was cancelled."
176+
+ " Scheduling new refresh operation immediately.",
177+
instanceName));
178+
currentInstanceData = executor.submit(this::performRefresh);
179+
nextInstanceData = currentInstanceData;
176180
}
177181
}
178182

@@ -182,10 +186,14 @@ void forceRefresh() {
182186
* would expire.
183187
*/
184188
private InstanceData performRefresh() throws InterruptedException, ExecutionException {
185-
logger.fine("Refresh Operation: Acquiring rate limiter permit.");
189+
logger.fine(
190+
String.format("[%s] Refresh Operation: Acquiring rate limiter permit.", instanceName));
186191
// To avoid unreasonable SQL Admin API usage, use a rate limit to throttle our usage.
187192
forcedRenewRateLimiter.acquirePermit();
188-
logger.fine("Refresh Operation: Acquired rate limiter permit. Starting refresh...");
193+
logger.fine(
194+
String.format(
195+
"[%s] Refresh Operation: Acquired rate limiter permit. Starting refresh...",
196+
instanceName));
189197

190198
try {
191199
InstanceData data =
@@ -194,15 +202,16 @@ private InstanceData performRefresh() throws InterruptedException, ExecutionExce
194202

195203
logger.fine(
196204
String.format(
197-
"Refresh Operation: Completed refresh with new certificate expiration at %s.",
198-
data.getExpiration().toInstant().toString()));
205+
"[%s] Refresh Operation: Completed refresh with new certificate expiration at %s.",
206+
instanceName, data.getExpiration().toInstant().toString()));
199207
long secondsToRefresh =
200208
refreshCalculator.calculateSecondsUntilNextRefresh(
201209
Instant.now(), data.getExpiration().toInstant());
202210

203211
logger.fine(
204212
String.format(
205-
"Refresh Operation: Next operation scheduled at %s.",
213+
"[%s] Refresh Operation: Next operation scheduled at %s.",
214+
instanceName,
206215
Instant.now()
207216
.plus(secondsToRefresh, ChronoUnit.SECONDS)
208217
.truncatedTo(ChronoUnit.SECONDS)
@@ -212,12 +221,17 @@ private InstanceData performRefresh() throws InterruptedException, ExecutionExce
212221
currentInstanceData = Futures.immediateFuture(data);
213222
nextInstanceData =
214223
executor.schedule(this::performRefresh, secondsToRefresh, TimeUnit.SECONDS);
224+
// Refresh completed successfully, reset forceRefreshRunning.
225+
forceRefreshRunning = false;
215226
}
216-
217227
return data;
218228
} catch (ExecutionException | InterruptedException e) {
219229
logger.log(
220-
Level.FINE, "Refresh Operation: Failed! Starting next refresh operation immediately.", e);
230+
Level.FINE,
231+
String.format(
232+
"[%s] Refresh Operation: Failed! Starting next refresh operation immediately.",
233+
instanceName),
234+
e);
221235
synchronized (instanceDataGuard) {
222236
nextInstanceData = executor.submit(this::performRefresh);
223237
}
@@ -228,4 +242,20 @@ private InstanceData performRefresh() throws InterruptedException, ExecutionExce
228242
SslData getSslData() {
229243
return getInstanceData().getSslData();
230244
}
245+
246+
ListenableFuture<InstanceData> getNext() {
247+
synchronized (instanceDataGuard) {
248+
return this.nextInstanceData;
249+
}
250+
}
251+
252+
ListenableFuture<InstanceData> getCurrent() {
253+
synchronized (instanceDataGuard) {
254+
return this.currentInstanceData;
255+
}
256+
}
257+
258+
public CloudSqlInstanceName getInstanceName() {
259+
return instanceName;
260+
}
231261
}

core/src/main/java/com/google/cloud/sql/core/CloudSqlInstanceName.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,4 +70,9 @@ String getRegionId() {
7070
String getInstanceId() {
7171
return instanceId;
7272
}
73+
74+
@Override
75+
public String toString() {
76+
return connectionName;
77+
}
7378
}

core/src/main/java/com/google/cloud/sql/core/CoreSocketFactory.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,15 @@
2424
import com.google.common.util.concurrent.ListenableFuture;
2525
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
2626
import com.google.common.util.concurrent.MoreExecutors;
27+
import dev.failsafe.RateLimiter;
2728
import java.io.File;
2829
import java.io.IOException;
2930
import java.net.InetSocketAddress;
3031
import java.net.Socket;
3132
import java.security.KeyPair;
3233
import java.security.KeyPairGenerator;
3334
import java.security.NoSuchAlgorithmException;
35+
import java.time.Duration;
3436
import java.util.ArrayList;
3537
import java.util.List;
3638
import java.util.Properties;
@@ -125,6 +127,7 @@ static ListeningScheduledExecutorService getDefaultExecutor() {
125127
// there should be enough free threads so that there will not be a deadlock. Most users
126128
// configure 3 or fewer instances, requiring 6 threads during refresh. By setting
127129
// this to 8, it's enough threads for most users, plus a safety factor of 2.
130+
128131
ScheduledThreadPoolExecutor executor =
129132
(ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(8);
130133

@@ -350,6 +353,12 @@ CloudSqlInstance getCloudSqlInstance(String instanceName, AuthType authType) {
350353
instanceName,
351354
k ->
352355
new CloudSqlInstance(
353-
k, adminApiService, authType, credentialFactory, executor, localKeyPair));
356+
k,
357+
adminApiService,
358+
authType,
359+
credentialFactory,
360+
executor,
361+
localKeyPair,
362+
RateLimiter.burstyBuilder(2, Duration.ofSeconds(30)).build()));
354363
}
355364
}

core/src/main/java/com/google/cloud/sql/core/SqlAdminApiFetcher.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,7 @@ public InstanceData getInstanceData(
103103
ListenableFuture<KeyPair> keyPair)
104104
throws ExecutionException, InterruptedException {
105105

106-
ListenableFuture<Optional<AccessToken>> token =
107-
executor.submit(() -> accessTokenSupplier.get());
106+
ListenableFuture<Optional<AccessToken>> token = executor.submit(accessTokenSupplier::get);
108107

109108
// Fetch the metadata
110109
ListenableFuture<Metadata> metadataFuture =
@@ -152,14 +151,18 @@ public InstanceData getInstanceData(
152151
.orElse(x509Certificate.getNotAfter());
153152
}
154153

154+
logger.fine(String.format("[%s] INSTANCE DATA DONE", instanceName));
155+
155156
return new InstanceData(
156157
Futures.getDone(metadataFuture),
157158
Futures.getDone(sslContextFuture),
158159
expiration);
159160
},
160161
executor);
161162

162-
return done.get();
163+
InstanceData instanceData = done.get();
164+
logger.fine(String.format("[%s] ALL FUTURES DONE", instanceName));
165+
return instanceData;
163166
}
164167

165168
String getApplicationName() {
@@ -219,6 +222,9 @@ private Metadata fetchMetadata(CloudSqlInstanceName instanceName, AuthType authT
219222
try {
220223
Certificate instanceCaCertificate =
221224
createCertificate(instanceMetadata.getServerCaCert().getCert());
225+
226+
logger.fine(String.format("[%s] METADATA DONE", instanceName));
227+
222228
return new Metadata(ipAddrs, instanceCaCertificate);
223229
} catch (CertificateException ex) {
224230
throw new RuntimeException(
@@ -288,6 +294,8 @@ private Certificate fetchEphemeralCertificate(
288294
ex);
289295
}
290296

297+
logger.fine(String.format("[%s %d] CERT DONE", instanceName, Thread.currentThread().getId()));
298+
291299
return ephemeralCertificate;
292300
}
293301

@@ -339,6 +347,9 @@ private SslData createSslData(
339347

340348
sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom());
341349

350+
logger.fine(
351+
String.format("[%s %d] SSL CONTEXT", instanceName, Thread.currentThread().getId()));
352+
342353
return new SslData(sslContext, kmf, tmf);
343354
} catch (GeneralSecurityException | IOException ex) {
344355
throw new RuntimeException(

0 commit comments

Comments
 (0)