|
57 | 57 | import io.netty.util.ReferenceCountUtil; |
58 | 58 | import io.netty.util.concurrent.Future; |
59 | 59 | import io.netty.util.concurrent.FutureListener; |
60 | | -import io.netty.util.concurrent.GenericFutureListener; |
61 | 60 | import io.netty.util.concurrent.Promise; |
| 61 | +import io.netty.util.concurrent.PromiseNotifier; |
62 | 62 | import org.slf4j.Logger; |
63 | 63 | import org.slf4j.LoggerFactory; |
64 | 64 |
|
@@ -187,74 +187,69 @@ public long queryTimeoutMillis() { |
187 | 187 |
|
188 | 188 | private static final class BackupRequestResolver implements DnsNameResolverDelegate { |
189 | 189 |
|
190 | | - private final DnsNameResolver primary; |
191 | | - private final DnsNameResolver backup; |
| 190 | + private final DnsNameResolver primaryResolver; |
| 191 | + private final DnsNameResolver backupResolver; |
192 | 192 | private final EventLoop eventLoop; |
193 | 193 |
|
194 | 194 | // TODO: we'll want to make sure we share the cache, make our backup request interval more flexible, and also |
195 | 195 | // give ourselves some sort of budget so we don't overload DNS, but these are all possible. |
196 | 196 | BackupRequestResolver(DnsNameResolverBuilder builder, EventLoop eventLoop) { |
197 | | - primary = builder.build(); |
198 | | - backup = builder.consolidateCacheSize(0).build(); |
| 197 | + primaryResolver = builder.build(); |
| 198 | + backupResolver = builder.consolidateCacheSize(0).build(); |
199 | 199 | this.eventLoop = eventLoop; |
200 | 200 | } |
201 | 201 |
|
202 | 202 | @Override |
203 | 203 | public void close() { |
204 | 204 | try { |
205 | | - primary.close(); |
| 205 | + primaryResolver.close(); |
206 | 206 | } finally { |
207 | | - backup.close(); |
| 207 | + backupResolver.close(); |
208 | 208 | } |
209 | 209 | } |
210 | 210 |
|
211 | 211 | @Override |
212 | 212 | public Future<List<InetAddress>> resolveAll(String name) { |
213 | | - return compose(resolver -> resolver.resolveAll(name)); |
| 213 | + return withBackup(resolver -> resolver.resolveAll(name)); |
214 | 214 | } |
215 | 215 |
|
216 | 216 | @Override |
217 | 217 | public Future<List<DnsRecord>> resolveAll(DnsQuestion name) { |
218 | | - return compose(resolver -> resolver.resolveAll(name)); |
| 218 | + return withBackup(resolver -> resolver.resolveAll(name)); |
219 | 219 | } |
220 | 220 |
|
221 | 221 | @Override |
222 | 222 | public long queryTimeoutMillis() { |
223 | | - return primary.queryTimeoutMillis(); |
| 223 | + return primaryResolver.queryTimeoutMillis(); |
224 | 224 | } |
225 | 225 |
|
226 | | - private <T> Future<T> compose(Function<DnsNameResolver, Future<T>> query) { |
| 226 | + private <T> Future<T> withBackup(Function<? super DnsNameResolver, ? extends Future<T>> query) { |
| 227 | + Future<T> primaryQuery = query.apply(primaryResolver); |
| 228 | + if (primaryQuery.isDone()) { |
| 229 | + return primaryQuery; |
| 230 | + } |
| 231 | + int backupDelay = backupDelayMs(); |
| 232 | + if (backupDelay <= 0) { |
| 233 | + // no backup for this request |
| 234 | + return primaryQuery; |
| 235 | + } |
227 | 236 | Promise<T> result = eventLoop.newPromise(); |
228 | | - Future<T> r1 = query.apply(primary); |
229 | | - Future<?> timer = eventLoop.schedule(() -> join(query.apply(backup), result, null), |
230 | | - 50, MILLISECONDS); |
231 | | - join(r1, result, timer); |
| 237 | + Future<?> timer = eventLoop.schedule(() -> { |
| 238 | + if (allowBackupRequest()) { |
| 239 | + PromiseNotifier.cascade(false, query.apply(backupResolver), result); |
| 240 | + } |
| 241 | + }, backupDelay, MILLISECONDS); |
| 242 | + primaryQuery.addListener(_unused -> timer.cancel(true)); |
| 243 | + PromiseNotifier.cascade(false, primaryQuery, result); |
232 | 244 | return result; |
233 | 245 | } |
234 | 246 |
|
235 | | - private static <T> void join(Future<? extends T> future, Promise<T> promise, @Nullable Future<?> timer) { |
236 | | - // transfer results from future to promise and cancellation from promise to future. |
237 | | - future.addListener(new GenericFutureListener<Future<T>>() { |
238 | | - @Override |
239 | | - public void operationComplete(Future<T> future) { |
240 | | - if (timer != null) { |
241 | | - timer.cancel(true); |
242 | | - } |
243 | | - if (future.isSuccess()) { |
244 | | - promise.trySuccess(future.getNow()); |
245 | | - } else if (!future.isCancelled()) { |
246 | | - promise.tryFailure(future.cause()); |
247 | | - } |
248 | | - } |
249 | | - }); |
250 | | - promise.addListener(new GenericFutureListener<Future<T>>() { |
251 | | - @Override |
252 | | - public void operationComplete(Future<T> promise) { |
253 | | - if (promise.isCancelled()) { |
254 | | - future.cancel(true); |
255 | | - } |
256 | | - } |
257 | | - }); |
| 247 | + private boolean allowBackupRequest() { |
| 248 | + return true; |
| 249 | + } |
| 250 | + |
| 251 | + private int backupDelayMs() { |
| 252 | + return 50; |
258 | 253 | } |
259 | 254 | } |
260 | 255 |
|
|
0 commit comments