|
27 | 27 | import com.google.cloud.sql.AuthType; |
28 | 28 | import com.google.common.base.CharMatcher; |
29 | 29 | import com.google.common.io.BaseEncoding; |
30 | | -import com.google.common.util.concurrent.FutureCallback; |
31 | 30 | import com.google.common.util.concurrent.Futures; |
32 | 31 | import com.google.common.util.concurrent.ListenableFuture; |
33 | 32 | import com.google.common.util.concurrent.ListeningScheduledExecutorService; |
34 | | -import com.google.common.util.concurrent.SettableFuture; |
35 | 33 | import java.io.ByteArrayInputStream; |
36 | 34 | import java.io.IOException; |
37 | 35 | import java.nio.charset.StandardCharsets; |
|
55 | 53 | import java.util.Map; |
56 | 54 | import java.util.Optional; |
57 | 55 | import java.util.concurrent.Callable; |
58 | | -import java.util.concurrent.atomic.AtomicInteger; |
59 | | -import java.util.logging.Level; |
60 | 56 | import java.util.logging.Logger; |
61 | 57 | import javax.net.ssl.KeyManagerFactory; |
62 | 58 | import javax.net.ssl.SSLContext; |
63 | 59 | import javax.net.ssl.TrustManagerFactory; |
64 | | -import org.checkerframework.checker.nullness.compatqual.NullableDecl; |
65 | 60 |
|
66 | 61 | /** Class that encapsulates all logic for interacting with SQLAdmin API. */ |
67 | 62 | public class SqlAdminApiFetcher { |
@@ -98,92 +93,66 @@ private String generatePublicKeyCert(KeyPair keyPair) { |
98 | 93 | + "-----END RSA PUBLIC KEY-----\n"; |
99 | 94 | } |
100 | 95 |
|
101 | | - // Schedules task to be executed once the provided futures are complete. |
102 | | - private <T> ListenableFuture<T> whenAllSucceed( |
103 | | - Callable<T> task, |
104 | | - ListeningScheduledExecutorService executor, |
105 | | - ListenableFuture<?>... futures) { |
106 | | - SettableFuture<T> taskFuture = SettableFuture.create(); |
107 | | - |
108 | | - // Create a countDown for all Futures to complete. |
109 | | - AtomicInteger countDown = new AtomicInteger(futures.length); |
110 | | - |
111 | | - // Trigger the task when all futures are complete. |
112 | | - FutureCallback<Object> runWhenInputAreComplete = |
113 | | - new FutureCallback<Object>() { |
114 | | - @Override |
115 | | - public void onSuccess(@NullableDecl Object o) { |
116 | | - if (countDown.decrementAndGet() == 0) { |
117 | | - taskFuture.setFuture(executor.submit(task)); |
118 | | - } |
119 | | - } |
120 | | - |
121 | | - @Override |
122 | | - public void onFailure(Throwable throwable) { |
123 | | - if (!taskFuture.setException(throwable)) { |
124 | | - String msg = "Got more than one input failure. Logging failures after the first"; |
125 | | - logger.log(Level.SEVERE, msg, throwable); |
126 | | - } |
127 | | - } |
128 | | - }; |
129 | | - for (ListenableFuture<?> future : futures) { |
130 | | - Futures.addCallback(future, runWhenInputAreComplete, executor); |
131 | | - } |
132 | | - |
133 | | - return taskFuture; |
134 | | - } |
135 | | - |
136 | 96 | ListenableFuture<InstanceData> getInstanceData( |
137 | 97 | CloudSqlInstanceName instanceName, |
138 | 98 | OAuth2Credentials credentials, |
139 | 99 | AuthType authType, |
140 | 100 | ListeningScheduledExecutorService executor, |
141 | 101 | ListenableFuture<KeyPair> keyPair) { |
| 102 | + // Fetch the metadata |
142 | 103 | ListenableFuture<Metadata> metadataFuture = |
143 | 104 | executor.submit(() -> fetchMetadata(instanceName, authType)); |
| 105 | + |
| 106 | + // Fetch the ephemeral certificates |
144 | 107 | ListenableFuture<Certificate> ephemeralCertificateFuture = |
145 | | - whenAllSucceed( |
146 | | - () -> |
147 | | - fetchEphemeralCertificate( |
148 | | - Futures.getDone(keyPair), instanceName, credentials, authType), |
149 | | - executor, |
150 | | - keyPair); |
| 108 | + Futures.whenAllComplete(keyPair) |
| 109 | + .call( |
| 110 | + () -> |
| 111 | + fetchEphemeralCertificate( |
| 112 | + Futures.getDone(keyPair), instanceName, credentials, authType), |
| 113 | + executor); |
| 114 | + |
151 | 115 | // Once the API calls are complete, construct the SSLContext for the sockets |
152 | 116 | ListenableFuture<SslData> sslContextFuture = |
153 | | - whenAllSucceed( |
154 | | - () -> |
155 | | - createSslData( |
156 | | - Futures.getDone(keyPair), |
157 | | - Futures.getDone(metadataFuture), |
158 | | - Futures.getDone(ephemeralCertificateFuture), |
159 | | - instanceName, |
160 | | - authType), |
161 | | - executor, |
162 | | - keyPair, |
163 | | - metadataFuture, |
164 | | - ephemeralCertificateFuture); |
165 | | - // Once both the SSLContext and Metadata are complete, return the results |
166 | | - return whenAllSucceed( |
167 | | - () -> { |
| 117 | + Futures.whenAllComplete(metadataFuture, ephemeralCertificateFuture) |
| 118 | + .call( |
| 119 | + () -> |
| 120 | + createSslData( |
| 121 | + Futures.getDone(keyPair), |
| 122 | + Futures.getDone(metadataFuture), |
| 123 | + Futures.getDone(ephemeralCertificateFuture), |
| 124 | + instanceName, |
| 125 | + authType), |
| 126 | + executor); |
168 | 127 |
|
169 | | - // Get expiration value for new cert |
170 | | - Certificate ephemeralCertificate = Futures.getDone(ephemeralCertificateFuture); |
171 | | - X509Certificate x509Certificate = (X509Certificate) ephemeralCertificate; |
172 | | - Date expiration = x509Certificate.getNotAfter(); |
173 | | - |
174 | | - if (authType == AuthType.IAM) { |
175 | | - expiration = |
176 | | - getTokenExpirationTime(credentials) |
177 | | - .filter(tokenExpiration -> x509Certificate.getNotAfter().after(tokenExpiration)) |
178 | | - .orElse(x509Certificate.getNotAfter()); |
179 | | - } |
180 | | - |
181 | | - return new InstanceData( |
182 | | - Futures.getDone(metadataFuture), Futures.getDone(sslContextFuture), expiration); |
183 | | - }, |
184 | | - executor, |
185 | | - metadataFuture, |
186 | | - sslContextFuture); |
| 128 | + // Once both the SSLContext and Metadata are complete, return the results |
| 129 | + ListenableFuture<InstanceData> done = |
| 130 | + Futures.whenAllComplete(metadataFuture, ephemeralCertificateFuture, sslContextFuture) |
| 131 | + .call( |
| 132 | + () -> { |
| 133 | + |
| 134 | + // Get expiration value for new cert |
| 135 | + Certificate ephemeralCertificate = Futures.getDone(ephemeralCertificateFuture); |
| 136 | + X509Certificate x509Certificate = (X509Certificate) ephemeralCertificate; |
| 137 | + Date expiration = x509Certificate.getNotAfter(); |
| 138 | + |
| 139 | + if (authType == AuthType.IAM) { |
| 140 | + expiration = |
| 141 | + getTokenExpirationTime(credentials) |
| 142 | + .filter( |
| 143 | + tokenExpiration -> |
| 144 | + x509Certificate.getNotAfter().after(tokenExpiration)) |
| 145 | + .orElse(x509Certificate.getNotAfter()); |
| 146 | + } |
| 147 | + |
| 148 | + return new InstanceData( |
| 149 | + Futures.getDone(metadataFuture), |
| 150 | + Futures.getDone(sslContextFuture), |
| 151 | + expiration); |
| 152 | + }, |
| 153 | + executor); |
| 154 | + |
| 155 | + return done; |
187 | 156 | } |
188 | 157 |
|
189 | 158 | private Optional<Date> getTokenExpirationTime(OAuth2Credentials credentials) { |
|
0 commit comments