Skip to content

Commit ab5d787

Browse files
committed
Rework HttpClient dns based lookup to use a resolver for origin endpoints.
1 parent abf4603 commit ab5d787

File tree

11 files changed

+295
-74
lines changed

11 files changed

+295
-74
lines changed

vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java

Lines changed: 142 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,8 @@
2323
import io.vertx.core.internal.pool.Lease;
2424
import io.vertx.core.internal.resource.ResourceManager;
2525
import io.vertx.core.net.*;
26-
import io.vertx.core.net.endpoint.Endpoint;
27-
import io.vertx.core.net.endpoint.EndpointResolver;
28-
import io.vertx.core.net.endpoint.ServerInteraction;
26+
import io.vertx.core.net.endpoint.*;
27+
import io.vertx.core.net.endpoint.impl.EndpointResolverImpl;
2928
import io.vertx.core.spi.metrics.ClientMetrics;
3029
import io.vertx.core.spi.metrics.HttpClientMetrics;
3130
import io.vertx.core.spi.metrics.MetricsProvider;
@@ -57,6 +56,7 @@ public class HttpClientImpl extends HttpClientBase implements HttpClientInternal
5756
private final long maxLifetime;
5857
private final Transport transport;
5958
private final EndpointResolverInternal resolver;
59+
private final EndpointResolverInternal originResolver;
6060

6161
HttpClientImpl(VertxInternal vertx,
6262
EndpointResolver resolver,
@@ -70,6 +70,7 @@ public class HttpClientImpl extends HttpClientBase implements HttpClientInternal
7070

7171
this.transport = transport;
7272
this.resolver = (EndpointResolverInternal) resolver;
73+
this.originResolver = new EndpointResolverImpl<>(vertx, new OriginEndpointResolver<>(vertx), LoadBalancer.ROUND_ROBIN, 10_000);
7374
this.poolOptions = poolOptions;
7475
this.resourceManager = new ResourceManager<>();
7576
this.maxLifetime = MILLISECONDS.convert(poolOptions.getMaxLifetime(), poolOptions.getMaxLifetimeUnit());
@@ -129,7 +130,7 @@ protected void checkExpired(Handler<Long> checker) {
129130
}
130131
}
131132

132-
private Function<EndpointKey, SharedHttpClientConnectionGroup> httpEndpointProvider(Transport transport) {
133+
private Function<EndpointKey, SharedHttpClientConnectionGroup> httpEndpointProvider(Endpoint endpoint, Transport transport) {
133134
return (key) -> {
134135
int maxPoolSize = Math.max(poolOptions.getHttp1MaxSize(), poolOptions.getHttp2MaxSize());
135136
ClientMetrics clientMetrics = HttpClientImpl.this.metrics != null ? HttpClientImpl.this.metrics.createEndpointMetrics(key.server, maxPoolSize) : null;
@@ -307,7 +308,41 @@ private Future<HttpClientRequest> doRequest(Transport transport, Address server,
307308
authority = null;
308309
}
309310
ClientSSLOptions sslOptions = sslOptions(transport.verifyHost, request, transport.sslOptions);
310-
return doRequest(transport, request.getRoutingKey(), method, authority, server, useSSL, requestURI, headers, request.getTraceOperation(), connectTimeout, idleTimeout, followRedirects, sslOptions, request.getProxyOptions());
311+
if (server instanceof SocketAddress) {
312+
SocketAddress serverSocketAddress = (SocketAddress) server;
313+
ProxyOptions proxyOptions = computeProxyOptions(request.getProxyOptions(), serverSocketAddress);
314+
if (proxyOptions != null || serverSocketAddress.isDomainSocket()) {
315+
return doRequestDirectly(method, requestURI, headers, request.getTraceOperation(), idleTimeout, followRedirects, proxyOptions, serverSocketAddress, useSSL,
316+
sslOptions, authority, connectTimeout);
317+
}
318+
}
319+
return doRequest(transport, request.getRoutingKey(), method, authority, server, useSSL, requestURI, headers, request.getTraceOperation(), connectTimeout, idleTimeout, followRedirects, sslOptions);
320+
}
321+
322+
private Future<HttpClientRequest> doRequestDirectly(
323+
HttpMethod httpMethod,
324+
String requestURI,
325+
MultiMap headers,
326+
String traceOperation,
327+
long idleTimeout,
328+
boolean followRedirects,
329+
ProxyOptions proxyOptions, SocketAddress server, boolean useSSL, ClientSSLOptions sslOptions,
330+
HostAndPort authority, long connectTimeout) {
331+
ContextInternal streamCtx = vertx.getOrCreateContext();
332+
EndpointKey key = new EndpointKey(useSSL, sslOptions, proxyOptions, server, authority);
333+
Future<ConnectionObtainedResult> fut2 = resourceManager.withResourceAsync(key, httpEndpointProvider(null, transport), (endpoint, created) -> {
334+
Future<Lease<HttpClientConnection>> fut = endpoint.requestConnection(streamCtx, transport.protocol, connectTimeout);
335+
return fut.compose(lease -> {
336+
HttpClientConnection conn = lease.get();
337+
return conn.createStream(streamCtx).map(stream -> {
338+
stream.closeHandler(v -> {
339+
lease.recycle();
340+
});
341+
return new ConnectionObtainedResult(stream, lease);
342+
});
343+
});
344+
});
345+
return wrap(httpMethod, requestURI, headers, traceOperation, idleTimeout, followRedirects, proxyOptions, fut2);
311346
}
312347

313348
private Future<HttpClientRequest> doRequest(
@@ -323,91 +358,126 @@ private Future<HttpClientRequest> doRequest(
323358
long connectTimeout,
324359
long idleTimeout,
325360
Boolean followRedirects,
326-
ClientSSLOptions sslOptions,
327-
ProxyOptions proxyConfig) {
361+
ClientSSLOptions sslOptions) {
362+
if (server instanceof SocketAddress && (resolver == null || !resolver.resolves(server))) {
363+
SocketAddress serverSocketAddress = (SocketAddress) server;
364+
return doRequest(
365+
originResolver,
366+
transport,
367+
routingKey,
368+
method,
369+
authority,
370+
new Origin(useSSL ? "https" : "http", serverSocketAddress.host(), serverSocketAddress.port()),
371+
useSSL,
372+
requestURI,
373+
headers,
374+
traceOperation,
375+
connectTimeout,
376+
idleTimeout,
377+
followRedirects,
378+
sslOptions
379+
);
380+
} else {
381+
return doRequest(
382+
resolver,
383+
transport,
384+
routingKey,
385+
method,
386+
authority,
387+
server,
388+
useSSL,
389+
requestURI,
390+
headers,
391+
traceOperation,
392+
connectTimeout,
393+
idleTimeout,
394+
followRedirects,
395+
sslOptions
396+
);
397+
}
398+
}
399+
400+
private Future<HttpClientRequest> doRequest(
401+
EndpointResolverInternal resolver,
402+
Transport transport,
403+
String routingKey,
404+
HttpMethod method,
405+
HostAndPort authority,
406+
Address server,
407+
boolean useSSL,
408+
String requestURI,
409+
MultiMap headers,
410+
String traceOperation,
411+
long connectTimeout,
412+
long idleTimeout,
413+
Boolean followRedirects,
414+
ClientSSLOptions sslOptions) {
328415
ContextInternal streamCtx = vertx.getOrCreateContext();
329416
Future<ConnectionObtainedResult> future;
330-
if (resolver != null) {
331-
PromiseInternal<Endpoint> promise = vertx.promise();
332-
resolver.lookupEndpoint(server, promise);
333-
future = promise.future()
334-
.map(endpoint -> endpoint.selectServer(routingKey))
335-
.compose(lookup -> {
417+
PromiseInternal<Endpoint> promise = vertx.promise();
418+
resolver.lookupEndpoint(server, promise);
419+
future = promise.future()
420+
.compose(endpoint -> {
421+
ServerEndpoint lookup = endpoint.selectServer(routingKey);
336422
SocketAddress address = lookup.address();
337-
ProxyOptions proxyOptions = computeProxyOptions(proxyConfig, address);
338-
EndpointKey key = new EndpointKey(useSSL, sslOptions, proxyOptions, address, authority != null ? authority : HostAndPort.create(address.host(), address.port()));
339-
return resourceManager.withResourceAsync(key, httpEndpointProvider(transport), (endpoint, created) -> {
423+
EndpointKey key = new EndpointKey(useSSL, sslOptions, null, address, authority != null ? authority : HostAndPort.create(address.host(), address.port()));
424+
return resourceManager.withResourceAsync(key, httpEndpointProvider(endpoint, transport), (endpoint, created) -> {
340425
Future<Lease<HttpClientConnection>> fut2 = endpoint.requestConnection(streamCtx, transport.protocol, connectTimeout);
341-
if (fut2 == null) {
342-
return null;
343-
} else {
344-
ServerInteraction endpointRequest = lookup.newInteraction();
345-
return fut2.andThen(ar -> {
346-
if (ar.failed()) {
347-
endpointRequest.reportFailure(ar.cause());
348-
}
349-
}).compose(lease -> {
350-
HttpClientConnection conn = lease.get();
351-
return conn.createStream(streamCtx).map(stream -> {
352-
HttpClientStream wrapped = new StatisticsGatheringHttpClientStream(stream, endpointRequest);
353-
wrapped.closeHandler(v -> lease.recycle());
354-
return new ConnectionObtainedResult(proxyOptions, wrapped, lease);
355-
});
356-
});
357-
}
358-
});
359-
});
360-
} else if (server instanceof SocketAddress) {
361-
ProxyOptions proxyOptions = computeProxyOptions(proxyConfig, (SocketAddress) server);
362-
EndpointKey key = new EndpointKey(useSSL, sslOptions, proxyOptions, (SocketAddress) server, authority);
363-
future = resourceManager.withResourceAsync(key, httpEndpointProvider(transport), (endpoint, created) -> {
364-
Future<Lease<HttpClientConnection>> fut = endpoint.requestConnection(streamCtx, transport.protocol, connectTimeout);
365-
if (fut == null) {
366-
return null;
367-
} else {
368-
return fut.compose(lease -> {
426+
ServerInteraction endpointRequest = lookup.newInteraction();
427+
return fut2.andThen(ar -> {
428+
if (ar.failed()) {
429+
endpointRequest.reportFailure(ar.cause());
430+
}
431+
}).compose(lease -> {
369432
HttpClientConnection conn = lease.get();
370433
return conn.createStream(streamCtx).map(stream -> {
371-
stream.closeHandler(v -> {
372-
lease.recycle();
373-
});
374-
return new ConnectionObtainedResult(proxyOptions, stream, lease);
434+
HttpClientStream wrapped = new StatisticsGatheringHttpClientStream(stream, endpointRequest);
435+
wrapped.closeHandler(v -> lease.recycle());
436+
return new ConnectionObtainedResult(wrapped, lease);
375437
});
376438
});
377-
}
439+
});
378440
});
379-
} else {
380-
future = streamCtx.failedFuture("Cannot resolve address " + server);
381-
}
382441
if (future == null) {
442+
// I think this is not possible - so remove it
383443
return streamCtx.failedFuture("Cannot resolve address " + server);
384444
} else {
385-
return future.map(res -> {
386-
RequestOptions options = new RequestOptions();
387-
options.setMethod(method);
388-
options.setHeaders(headers);
389-
options.setURI(requestURI);
390-
options.setProxyOptions(res.proxyOptions);
391-
options.setIdleTimeout(idleTimeout);
392-
options.setFollowRedirects(followRedirects);
393-
options.setTraceOperation(traceOperation);
394-
HttpClientStream stream = res.stream;
395-
HttpClientRequestImpl request = createRequest(stream.connection(), stream, options);
396-
stream.closeHandler(v -> {
397-
res.lease.recycle();
398-
request.handleClosed();
399-
});
400-
return request;
401-
});
445+
return wrap(method, requestURI, headers, traceOperation, idleTimeout, followRedirects, null, future);
402446
}
403447
}
404448

449+
private Future<HttpClientRequest> wrap(HttpMethod method,
450+
String requestURI,
451+
MultiMap headers,
452+
String traceOperation,
453+
long idleTimeout,
454+
Boolean followRedirects,
455+
ProxyOptions proxyOptions,
456+
Future<ConnectionObtainedResult> future) {
457+
return future.map(res -> {
458+
RequestOptions options = new RequestOptions();
459+
options.setMethod(method);
460+
options.setHeaders(headers);
461+
options.setURI(requestURI);
462+
options.setProxyOptions(proxyOptions);
463+
options.setIdleTimeout(idleTimeout);
464+
options.setFollowRedirects(followRedirects);
465+
options.setTraceOperation(traceOperation);
466+
HttpClientStream stream = res.stream;
467+
HttpClientRequestImpl request = createRequest(stream.connection(), stream, options);
468+
stream.closeHandler(v -> {
469+
res.lease.recycle();
470+
request.handleClosed();
471+
});
472+
return request;
473+
});
474+
}
475+
476+
405477
private static class ConnectionObtainedResult {
406-
private final ProxyOptions proxyOptions;
407478
private final HttpClientStream stream;
408479
private final Lease<HttpClientConnection> lease;
409-
public ConnectionObtainedResult(ProxyOptions proxyOptions, HttpClientStream stream, Lease<HttpClientConnection> lease) {
410-
this.proxyOptions = proxyOptions;
480+
public ConnectionObtainedResult(HttpClientStream stream, Lease<HttpClientConnection> lease) {
411481
this.stream = stream;
412482
this.lease = lease;
413483
}

vertx-core/src/main/java/io/vertx/core/http/impl/Origin.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@
1010
*/
1111
package io.vertx.core.http.impl;
1212

13-
import io.netty.buffer.ByteBuf;
1413
import io.vertx.core.buffer.Buffer;
14+
import io.vertx.core.net.Address;
1515
import io.vertx.core.net.HostAndPort;
1616

1717
import java.nio.charset.StandardCharsets;
1818

19-
public class Origin {
19+
public class Origin implements Address {
2020

2121
public static Origin fromASCII(String s) {
2222
int idx = s.indexOf("://");
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package io.vertx.core.http.impl;
2+
3+
import io.vertx.core.spi.endpoint.EndpointBuilder;
4+
5+
public class OriginEndpoint<L> {
6+
7+
final EndpointBuilder<L, OriginServer> builder;
8+
L list;
9+
10+
public OriginEndpoint(EndpointBuilder<L, OriginServer> builder, L list) {
11+
this.builder = builder;
12+
this.list = list;
13+
}
14+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package io.vertx.core.http.impl;
2+
3+
import io.vertx.core.Future;
4+
import io.vertx.core.internal.VertxInternal;
5+
import io.vertx.core.net.Address;
6+
import io.vertx.core.net.SocketAddress;
7+
import io.vertx.core.spi.endpoint.EndpointBuilder;
8+
import io.vertx.core.spi.endpoint.EndpointResolver;
9+
10+
import java.util.Map;
11+
12+
public class OriginEndpointResolver<L> implements EndpointResolver<Origin, OriginServer, OriginEndpoint<L>, L> {
13+
14+
private final VertxInternal vertx;
15+
16+
public OriginEndpointResolver(VertxInternal vertx) {
17+
this.vertx = vertx;
18+
}
19+
20+
@Override
21+
public Origin tryCast(Address address) {
22+
return address instanceof Origin ? (Origin)address : null;
23+
}
24+
25+
@Override
26+
public SocketAddress addressOf(OriginServer server) {
27+
return server.origin;
28+
}
29+
30+
@Override
31+
public Map<String, String> propertiesOf(OriginServer server) {
32+
return EndpointResolver.super.propertiesOf(server);
33+
}
34+
35+
@Override
36+
public Future<OriginEndpoint<L>> resolve(Origin address, EndpointBuilder<L, OriginServer> builder) {
37+
return vertx
38+
.nameResolver()
39+
.resolve(address.host)
40+
.map(addr -> {
41+
// Todo : ensure we only do a single resolver lookup with the subsequent actual HTTP request
42+
EndpointBuilder<L, OriginServer> builder2 = builder;
43+
builder2 = builder2.addServer(new OriginServer(SocketAddress.inetSocketAddress(address.port, address.host)));
44+
return new OriginEndpoint<>(builder, builder2.build());
45+
});
46+
}
47+
48+
@Override
49+
public L endpoint(OriginEndpoint<L> state) {
50+
return state.list;
51+
}
52+
53+
@Override
54+
public boolean isValid(OriginEndpoint<L> state) {
55+
// Need time eviction ???
56+
return true;
57+
}
58+
59+
@Override
60+
public void dispose(OriginEndpoint<L> data) {
61+
}
62+
63+
@Override
64+
public void close() {
65+
}
66+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package io.vertx.core.http.impl;
2+
3+
import io.vertx.core.Vertx;
4+
import io.vertx.core.net.Address;
5+
import io.vertx.core.net.AddressResolver;
6+
import io.vertx.core.spi.endpoint.EndpointResolver;
7+
8+
public class OriginResolver implements AddressResolver<Origin> {
9+
10+
@Override
11+
public EndpointResolver<Origin, ?, ?, ?> endpointResolver(Vertx vertx) {
12+
return null;
13+
}
14+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package io.vertx.core.http.impl;
2+
3+
import io.vertx.core.net.SocketAddress;
4+
5+
public class OriginServer {
6+
7+
public final SocketAddress origin;
8+
9+
public OriginServer(SocketAddress origin) {
10+
this.origin = origin;
11+
}
12+
}

0 commit comments

Comments
 (0)