Skip to content

Commit 85c55ad

Browse files
committed
Redesign of classic over async bridge
1 parent f24a94d commit 85c55ad

28 files changed

+2180
-141
lines changed
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
/*
2+
* ====================================================================
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
* ====================================================================
20+
*
21+
* This software consists of voluntary contributions made by many
22+
* individuals on behalf of the Apache Software Foundation. For more
23+
* information on the Apache Software Foundation, please see
24+
* <http://www.apache.org/>.
25+
*
26+
*/
27+
package org.apache.hc.core5.http2.examples;
28+
29+
import java.io.BufferedReader;
30+
import java.io.InputStreamReader;
31+
import java.nio.charset.Charset;
32+
import java.nio.charset.StandardCharsets;
33+
import java.util.List;
34+
import java.util.concurrent.Future;
35+
36+
import org.apache.hc.core5.annotation.Experimental;
37+
import org.apache.hc.core5.http.ClassicHttpRequest;
38+
import org.apache.hc.core5.http.ClassicHttpResponse;
39+
import org.apache.hc.core5.http.ContentType;
40+
import org.apache.hc.core5.http.Header;
41+
import org.apache.hc.core5.http.HttpConnection;
42+
import org.apache.hc.core5.http.HttpEntity;
43+
import org.apache.hc.core5.http.HttpHost;
44+
import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
45+
import org.apache.hc.core5.http.io.support.ClassicRequestBuilder;
46+
import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
47+
import org.apache.hc.core5.http.nio.support.classic.ClassicToAsyncRequestProducer;
48+
import org.apache.hc.core5.http.nio.support.classic.ClassicToAsyncResponseConsumer;
49+
import org.apache.hc.core5.http2.HttpVersionPolicy;
50+
import org.apache.hc.core5.http2.config.H2Config;
51+
import org.apache.hc.core5.http2.frame.RawFrame;
52+
import org.apache.hc.core5.http2.impl.nio.H2StreamListener;
53+
import org.apache.hc.core5.http2.impl.nio.bootstrap.H2RequesterBootstrap;
54+
import org.apache.hc.core5.io.CloseMode;
55+
import org.apache.hc.core5.util.Timeout;
56+
57+
/**
58+
* Example of HTTP/2 request execution with a classic I/O API compatibility bridge
59+
* that enables the use of standard {@link java.io.InputStream} / {@link java.io.OutputStream}
60+
* based data consumers / producers.
61+
* <p>>
62+
* Execution of individual message exchanges is performed at the current thread.
63+
*/
64+
@Experimental
65+
public class ClassicH2RequestExecutionExample {
66+
67+
public static void main(final String[] args) throws Exception {
68+
69+
// Create and start requester
70+
final H2Config h2Config = H2Config.custom()
71+
.setPushEnabled(false)
72+
.build();
73+
74+
final HttpAsyncRequester requester = H2RequesterBootstrap.bootstrap()
75+
.setH2Config(h2Config)
76+
.setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2)
77+
.setStreamListener(new H2StreamListener() {
78+
79+
@Override
80+
public void onHeaderInput(final HttpConnection connection, final int streamId, final List<? extends Header> headers) {
81+
for (int i = 0; i < headers.size(); i++) {
82+
System.out.println(connection.getRemoteAddress() + " (" + streamId + ") << " + headers.get(i));
83+
}
84+
}
85+
86+
@Override
87+
public void onHeaderOutput(final HttpConnection connection, final int streamId, final List<? extends Header> headers) {
88+
for (int i = 0; i < headers.size(); i++) {
89+
System.out.println(connection.getRemoteAddress() + " (" + streamId + ") >> " + headers.get(i));
90+
}
91+
}
92+
93+
@Override
94+
public void onFrameInput(final HttpConnection connection, final int streamId, final RawFrame frame) {
95+
}
96+
97+
@Override
98+
public void onFrameOutput(final HttpConnection connection, final int streamId, final RawFrame frame) {
99+
}
100+
101+
@Override
102+
public void onInputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) {
103+
}
104+
105+
@Override
106+
public void onOutputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) {
107+
}
108+
109+
})
110+
.create();
111+
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
112+
System.out.println("HTTP requester shutting down");
113+
requester.close(CloseMode.GRACEFUL);
114+
}));
115+
requester.start();
116+
117+
final HttpHost target = new HttpHost("nghttp2.org");
118+
final Future<AsyncClientEndpoint> future = requester.connect(target, Timeout.ofDays(5));
119+
final AsyncClientEndpoint clientEndpoint = future.get();
120+
121+
final String[] requestUris = new String[] {"/httpbin/ip", "/httpbin/user-agent", "/httpbin/headers"};
122+
123+
for (final String requestUri: requestUris) {
124+
final ClassicHttpRequest request = ClassicRequestBuilder.get()
125+
.setHttpHost(target)
126+
.setPath(requestUri)
127+
.build();
128+
129+
final ClassicToAsyncRequestProducer requestProducer = new ClassicToAsyncRequestProducer(request, Timeout.ofMinutes(5));
130+
final ClassicToAsyncResponseConsumer responseConsumer = new ClassicToAsyncResponseConsumer(Timeout.ofMinutes(5));
131+
132+
clientEndpoint.execute(requestProducer, responseConsumer, null);
133+
134+
requestProducer.blockWaiting().execute();
135+
try (ClassicHttpResponse response = responseConsumer.blockWaiting()) {
136+
System.out.println(requestUri + " -> " + response.getCode());
137+
final HttpEntity entity = response.getEntity();
138+
if (entity != null) {
139+
final ContentType contentType = ContentType.parse(entity.getContentType());
140+
final Charset charset = ContentType.getCharset(contentType, StandardCharsets.UTF_8);
141+
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(entity.getContent(), charset))) {
142+
String line;
143+
while ((line = reader.readLine()) != null) {
144+
System.out.println(line);
145+
}
146+
}
147+
}
148+
}
149+
}
150+
151+
System.out.println("Shutting down I/O reactor");
152+
requester.initiateShutdown();
153+
}
154+
155+
}
Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
/*
2+
* ====================================================================
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
* ====================================================================
20+
*
21+
* This software consists of voluntary contributions made by many
22+
* individuals on behalf of the Apache Software Foundation. For more
23+
* information on the Apache Software Foundation, please see
24+
* <http://www.apache.org/>.
25+
*
26+
*/
27+
package org.apache.hc.core5.http2.examples;
28+
29+
import java.io.BufferedWriter;
30+
import java.io.OutputStreamWriter;
31+
import java.net.InetSocketAddress;
32+
import java.net.URISyntaxException;
33+
import java.nio.charset.StandardCharsets;
34+
import java.util.List;
35+
import java.util.Map;
36+
import java.util.concurrent.ExecutorService;
37+
import java.util.concurrent.Executors;
38+
import java.util.concurrent.Future;
39+
import java.util.concurrent.TimeUnit;
40+
import java.util.stream.Collectors;
41+
42+
import org.apache.hc.core5.annotation.Experimental;
43+
import org.apache.hc.core5.concurrent.DefaultThreadFactory;
44+
import org.apache.hc.core5.http.ContentType;
45+
import org.apache.hc.core5.http.Header;
46+
import org.apache.hc.core5.http.HttpConnection;
47+
import org.apache.hc.core5.http.HttpEntity;
48+
import org.apache.hc.core5.http.NameValuePair;
49+
import org.apache.hc.core5.http.ProtocolException;
50+
import org.apache.hc.core5.http.URIScheme;
51+
import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
52+
import org.apache.hc.core5.http.impl.routing.RequestRouter;
53+
import org.apache.hc.core5.http.io.HttpRequestHandler;
54+
import org.apache.hc.core5.http.io.entity.EntityTemplate;
55+
import org.apache.hc.core5.http.io.entity.EntityUtils;
56+
import org.apache.hc.core5.http.nio.support.classic.ClassicToAsyncServerExchangeHandler;
57+
import org.apache.hc.core5.http2.HttpVersionPolicy;
58+
import org.apache.hc.core5.http2.frame.RawFrame;
59+
import org.apache.hc.core5.http2.impl.nio.H2StreamListener;
60+
import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap;
61+
import org.apache.hc.core5.io.CloseMode;
62+
import org.apache.hc.core5.net.URIBuilder;
63+
import org.apache.hc.core5.reactor.IOReactorConfig;
64+
import org.apache.hc.core5.reactor.ListenerEndpoint;
65+
import org.apache.hc.core5.util.TimeValue;
66+
67+
/**
68+
* Example of asynchronous embedded HTTP/2 server with a classic I/O API compatibility
69+
* bridge that enables the use of standard {@link java.io.InputStream} / {@link java.io.OutputStream}
70+
* based data consumers / producers.
71+
* <p>>
72+
* Execution of individual message exchanges is delegated to an {@link java.util.concurrent.Executor}
73+
* backed by a pool of threads.
74+
*/
75+
@Experimental
76+
public class ClassicH2ServerExample {
77+
78+
public static void main(final String[] args) throws Exception {
79+
int port = 8080;
80+
if (args.length >= 1) {
81+
port = Integer.parseInt(args[0]);
82+
}
83+
84+
final IOReactorConfig config = IOReactorConfig.custom()
85+
.setSoTimeout(15, TimeUnit.SECONDS)
86+
.setTcpNoDelay(true)
87+
.build();
88+
89+
final ExecutorService executorService = Executors.newFixedThreadPool(
90+
25,
91+
new DefaultThreadFactory("worker-pool", true));
92+
93+
final HttpRequestHandler requestHandler = (request, response, context) -> {
94+
try {
95+
final HttpEntity requestEntity = request.getEntity();
96+
if (requestEntity != null) {
97+
EntityUtils.consume(requestEntity);
98+
}
99+
final Map<String, String> queryParams = new URIBuilder(request.getUri()).getQueryParams().stream()
100+
.collect(Collectors.toMap(
101+
NameValuePair::getName,
102+
NameValuePair::getValue,
103+
(s, s2) -> s));
104+
final int n = Integer.parseInt(queryParams.getOrDefault("n", "10"));
105+
final String p = queryParams.getOrDefault("pattern", "huh?");
106+
final HttpEntity responseEntity = new EntityTemplate(
107+
ContentType.TEXT_PLAIN.withCharset(StandardCharsets.UTF_8),
108+
outputStream -> {
109+
try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream, StandardCharsets.UTF_8))) {
110+
for (int i = 0; i < n; i++) {
111+
writer.write(p);
112+
writer.write("\n");
113+
}
114+
}
115+
});
116+
response.setEntity(responseEntity);
117+
} catch (final URISyntaxException ex) {
118+
throw new ProtocolException("Invalid request URI", ex);
119+
} catch (final NumberFormatException ex) {
120+
throw new ProtocolException("Invalid query parameter", ex);
121+
}
122+
};
123+
124+
final RequestRouter<HttpRequestHandler> requestRouter = RequestRouter.<HttpRequestHandler>builder()
125+
.resolveAuthority(RequestRouter.LOCAL_AUTHORITY_RESOLVER)
126+
.addRoute(RequestRouter.LOCAL_AUTHORITY, "*", requestHandler)
127+
.build();
128+
129+
final HttpAsyncServer server = H2ServerBootstrap.bootstrap()
130+
.setIOReactorConfig(config)
131+
.setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2)
132+
.setStreamListener(new H2StreamListener() {
133+
134+
@Override
135+
public void onHeaderInput(final HttpConnection connection, final int streamId, final List<? extends Header> headers) {
136+
for (int i = 0; i < headers.size(); i++) {
137+
System.out.println(connection.getRemoteAddress() + " (" + streamId + ") << " + headers.get(i));
138+
}
139+
}
140+
141+
@Override
142+
public void onHeaderOutput(final HttpConnection connection, final int streamId, final List<? extends Header> headers) {
143+
for (int i = 0; i < headers.size(); i++) {
144+
System.out.println(connection.getRemoteAddress() + " (" + streamId + ") >> " + headers.get(i));
145+
}
146+
}
147+
148+
@Override
149+
public void onFrameInput(final HttpConnection connection, final int streamId, final RawFrame frame) {
150+
}
151+
152+
@Override
153+
public void onFrameOutput(final HttpConnection connection, final int streamId, final RawFrame frame) {
154+
}
155+
156+
@Override
157+
public void onInputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) {
158+
}
159+
160+
@Override
161+
public void onOutputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) {
162+
}
163+
164+
})
165+
.setRequestRouter((request, context) -> {
166+
final HttpRequestHandler handler = requestRouter.resolve(request, context);
167+
return () -> new ClassicToAsyncServerExchangeHandler(
168+
executorService,
169+
handler,
170+
e -> e.printStackTrace(System.out));
171+
})
172+
.setExceptionCallback(e -> e.printStackTrace(System.out))
173+
.create();
174+
175+
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
176+
System.out.println("HTTP server shutting down");
177+
server.close(CloseMode.GRACEFUL);
178+
executorService.shutdownNow();
179+
}));
180+
181+
server.start();
182+
final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(port), URIScheme.HTTP);
183+
final ListenerEndpoint listenerEndpoint = future.get();
184+
System.out.print("Listening on " + listenerEndpoint.getAddress());
185+
server.awaitShutdown(TimeValue.ofDays(Long.MAX_VALUE));
186+
}
187+
188+
}

httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2ConscriptRequestExecutionExample.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,12 +114,13 @@ public void onOutputFlowControl(final HttpConnection connection, final int strea
114114
requester.start();
115115

116116
final HttpHost target = new HttpHost("https", "nghttp2.org", 443);
117+
final Future<AsyncClientEndpoint> future = requester.connect(target, Timeout.ofDays(5));
118+
final AsyncClientEndpoint clientEndpoint = future.get();
119+
117120
final String[] requestUris = new String[] {"/httpbin/ip", "/httpbin/user-agent", "/httpbin/headers"};
118121

119122
final CountDownLatch latch = new CountDownLatch(requestUris.length);
120123
for (final String requestUri: requestUris) {
121-
final Future<AsyncClientEndpoint> future = requester.connect(target, Timeout.ofDays(5));
122-
final AsyncClientEndpoint clientEndpoint = future.get();
123124
clientEndpoint.execute(
124125
AsyncRequestBuilder.get()
125126
.setHttpHost(target)

httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2RequestExecutionExample.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,12 +105,13 @@ public void onOutputFlowControl(final HttpConnection connection, final int strea
105105
requester.start();
106106

107107
final HttpHost target = new HttpHost("nghttp2.org");
108+
final Future<AsyncClientEndpoint> future = requester.connect(target, Timeout.ofSeconds(5));
109+
final AsyncClientEndpoint clientEndpoint = future.get();
110+
108111
final String[] requestUris = new String[] {"/httpbin/ip", "/httpbin/user-agent", "/httpbin/headers"};
109112

110113
final CountDownLatch latch = new CountDownLatch(requestUris.length);
111114
for (final String requestUri: requestUris) {
112-
final Future<AsyncClientEndpoint> future = requester.connect(target, Timeout.ofSeconds(5));
113-
final AsyncClientEndpoint clientEndpoint = future.get();
114115
clientEndpoint.execute(
115116
AsyncRequestBuilder.get()
116117
.setHttpHost(target)

httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingHttp1StreamListener.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,10 @@
3434
import org.apache.hc.core5.http.HttpRequest;
3535
import org.apache.hc.core5.http.HttpResponse;
3636
import org.apache.hc.core5.http.impl.Http1StreamListener;
37-
import org.apache.hc.core5.http.message.RequestLine;
3837
import org.apache.hc.core5.http.message.StatusLine;
3938
import org.apache.hc.core5.testing.classic.LoggingSupport;
40-
import org.slf4j.LoggerFactory;
4139
import org.slf4j.Logger;
40+
import org.slf4j.LoggerFactory;
4241

4342
public class LoggingHttp1StreamListener implements Http1StreamListener {
4443

@@ -61,7 +60,7 @@ private LoggingHttp1StreamListener(final Type type) {
6160
public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
6261
if (headerLog.isDebugEnabled()) {
6362
final String idRequestDirection = LoggingSupport.getId(connection) + requestDirection;
64-
headerLog.debug("{}{}", idRequestDirection, new RequestLine(request));
63+
headerLog.debug("{}{} {}", idRequestDirection, request.getMethod(), request.getRequestUri());
6564
for (final Iterator<Header> it = request.headerIterator(); it.hasNext(); ) {
6665
headerLog.debug("{}{}", idRequestDirection, it.next());
6766
}

0 commit comments

Comments
 (0)