diff --git a/test/s3-benchmarks/pom.xml b/test/s3-benchmarks/pom.xml index 4918b20eb69c..e304c85f1933 100644 --- a/test/s3-benchmarks/pom.xml +++ b/test/s3-benchmarks/pom.xml @@ -102,6 +102,17 @@ netty-nio-client ${awsjavasdk.version} + + apache-client + software.amazon.awssdk + ${awsjavasdk.version} + + + apache5-client + software.amazon.awssdk + ${awsjavasdk.version} + + software.amazon.awssdk aws-crt-client diff --git a/test/sdk-benchmarks/pom.xml b/test/sdk-benchmarks/pom.xml index 89774b720d89..209d6c148329 100644 --- a/test/sdk-benchmarks/pom.xml +++ b/test/sdk-benchmarks/pom.xml @@ -158,6 +158,11 @@ apache-client ${awsjavasdk.version} + + software.amazon.awssdk + apache5-client + ${awsjavasdk.version} + software.amazon.awssdk protocol-tests diff --git a/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/SdkHttpClientBenchmark.java b/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/SdkHttpClientBenchmark.java index c8448dfb37bc..8a130e2c83e9 100644 --- a/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/SdkHttpClientBenchmark.java +++ b/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/SdkHttpClientBenchmark.java @@ -40,4 +40,39 @@ public interface SdkHttpClientBenchmark { */ default void concurrentApiCall(Blackhole blackhole) { } + + + /** + * Benchmark for PUT operations with streaming + * + * @param blackhole the blackhole + */ + default void streamingPutOperation(Blackhole blackhole) { + } + + /** + * Benchmark for concurrent PUT operations + * + * @param blackhole the blackhole + */ + default void concurrentStreamingPutOperation(Blackhole blackhole) { + } + + /** + * Benchmark for GET operations with streaming response + * + * @param blackhole the blackhole + */ + default void streamingOutputOperation(Blackhole blackhole) { + } + + /** + * Benchmark for concurrent GET operations with streaming response + * + * @param blackhole the blackhole + */ + default void concurrentStreamingOutputOperation(Blackhole blackhole) { + } + + } diff --git a/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/sync/ApacheHttpClientBenchmark.java b/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/sync/ApacheHttpClientBenchmark.java index 232a892c23d5..2171c4467ba7 100644 --- a/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/sync/ApacheHttpClientBenchmark.java +++ b/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/sync/ApacheHttpClientBenchmark.java @@ -33,12 +33,14 @@ import org.openjdk.jmh.annotations.Measurement; import org.openjdk.jmh.annotations.Mode; import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.Param; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.profile.GCProfiler; import org.openjdk.jmh.profile.StackProfiler; import org.openjdk.jmh.results.RunResult; import org.openjdk.jmh.runner.Runner; @@ -46,13 +48,20 @@ import org.openjdk.jmh.runner.options.OptionsBuilder; import software.amazon.awssdk.benchmark.apicall.httpclient.SdkHttpClientBenchmark; import software.amazon.awssdk.benchmark.utils.MockServer; +import software.amazon.awssdk.core.ResponseBytes; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.core.sync.ResponseTransformer; import software.amazon.awssdk.http.SdkHttpClient; import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.http.apache5.Apache5HttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.protocolrestjson.ProtocolRestJsonClient; +import software.amazon.awssdk.services.protocolrestjson.model.StreamingInputOperationRequest; +import software.amazon.awssdk.services.protocolrestjson.model.StreamingOutputOperationRequest; +import software.amazon.awssdk.services.protocolrestjson.model.StreamingOutputOperationResponse; /** - * Benchmarking for running with different http clients. + * Benchmarking for running with different Apache HTTP clients. */ @State(Scope.Benchmark) @Warmup(iterations = 3, time = 15, timeUnit = TimeUnit.SECONDS) @@ -61,6 +70,19 @@ @BenchmarkMode(Mode.Throughput) public class ApacheHttpClientBenchmark implements SdkHttpClientBenchmark { + private static final int STREAM_SIZE = 1024 * 1024; // 1MB + private static final byte[] STREAM_DATA = new byte[STREAM_SIZE]; + + static { + // Initialize stream data + for (int i = 0; i < STREAM_SIZE; i++) { + STREAM_DATA[i] = (byte) (i % 256); + } + } + + @Param({"apache4", "apache5"}) + private String clientType; + private MockServer mockServer; private SdkHttpClient sdkHttpClient; private ProtocolRestJsonClient client; @@ -70,8 +92,21 @@ public class ApacheHttpClientBenchmark implements SdkHttpClientBenchmark { public void setup() throws Exception { mockServer = new MockServer(); mockServer.start(); - sdkHttpClient = ApacheHttpClient.builder() - .buildWithDefaults(trustAllTlsAttributeMapBuilder().build()); + + // Create HTTP client based on parameter + switch (clientType) { + case "apache4": + sdkHttpClient = ApacheHttpClient.builder() + .buildWithDefaults(trustAllTlsAttributeMapBuilder().build()); + break; + case "apache5": + sdkHttpClient = Apache5HttpClient.builder() + .buildWithDefaults(trustAllTlsAttributeMapBuilder().build()); + break; + default: + throw new IllegalArgumentException("Unknown client type: " + clientType); + } + client = ProtocolRestJsonClient.builder() .endpointOverride(mockServer.getHttpsUri()) .httpClient(sdkHttpClient) @@ -109,12 +144,78 @@ public void concurrentApiCall(Blackhole blackhole) { awaitCountdownLatchUninterruptibly(countDownLatch, 10, TimeUnit.SECONDS); } - public static void main(String... args) throws Exception { + @Benchmark + @Override + public void streamingPutOperation(Blackhole blackhole) { + StreamingInputOperationRequest request = StreamingInputOperationRequest.builder() + .build(); + RequestBody requestBody = RequestBody.fromBytes(STREAM_DATA); + + blackhole.consume(client.streamingInputOperation(request, requestBody)); + } + + @Benchmark + @Override + @OperationsPerInvocation(CONCURRENT_CALLS) + public void concurrentStreamingPutOperation(Blackhole blackhole) { + CountDownLatch countDownLatch = new CountDownLatch(CONCURRENT_CALLS); + for (int i = 0; i < CONCURRENT_CALLS; i++) { + CompletableFuture future = CompletableFuture.runAsync(() -> { + StreamingInputOperationRequest request = StreamingInputOperationRequest.builder() + .build(); + RequestBody requestBody = RequestBody.fromBytes(STREAM_DATA); + client.streamingInputOperation(request, requestBody); + }, executorService); + + countDownUponCompletion(blackhole, future, countDownLatch); + } + + awaitCountdownLatchUninterruptibly(countDownLatch, 10, TimeUnit.SECONDS); + } + + @Benchmark + @Override + public void streamingOutputOperation(Blackhole blackhole) { + StreamingOutputOperationRequest request = StreamingOutputOperationRequest.builder() + .build(); + + ResponseBytes responseBytes = + client.streamingOutputOperation(request, ResponseTransformer.toBytes()); + + blackhole.consume(responseBytes.asByteArray()); + } + + @Benchmark + @Override + @OperationsPerInvocation(CONCURRENT_CALLS) + public void concurrentStreamingOutputOperation(Blackhole blackhole) { + CountDownLatch countDownLatch = new CountDownLatch(CONCURRENT_CALLS); + + for (int i = 0; i < CONCURRENT_CALLS; i++) { + CompletableFuture future = CompletableFuture.runAsync(() -> { + StreamingOutputOperationRequest request = StreamingOutputOperationRequest.builder() + .build(); + + ResponseBytes responseBytes = + client.streamingOutputOperation(request, ResponseTransformer.toBytes()); + + blackhole.consume(responseBytes.asByteArray()); + }, executorService); + + countDownUponCompletion(blackhole, future, countDownLatch); + } + + awaitCountdownLatchUninterruptibly(countDownLatch, 10, TimeUnit.SECONDS); + } + + public static void main(String... args) throws Exception { Options opt = new OptionsBuilder() - .include(ApacheHttpClientBenchmark.class.getSimpleName() + ".concurrentApiCall") + .include(ApacheHttpClientBenchmark.class.getSimpleName()) .addProfiler(StackProfiler.class) + .addProfiler(GCProfiler.class) .build(); + Collection run = new Runner(opt).run(); } } diff --git a/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/utils/MockServer.java b/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/utils/MockServer.java index 15cca300789c..7a4e9a13df6a 100644 --- a/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/utils/MockServer.java +++ b/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/utils/MockServer.java @@ -59,7 +59,7 @@ public MockServer() throws IOException { server.setConnectors(new Connector[] {connector, sslConnector}); ServletContextHandler context = new ServletContextHandler(server, "/", ServletContextHandler.SESSIONS); - context.addServlet(new ServletHolder(new AlwaysSuccessServlet()), "/*"); + context.addServlet(new ServletHolder(new StreamingMockServlet()), "/*"); server.setHandler(context); } diff --git a/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/utils/StreamingMockServlet.java b/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/utils/StreamingMockServlet.java new file mode 100644 index 000000000000..f47573356918 --- /dev/null +++ b/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/utils/StreamingMockServlet.java @@ -0,0 +1,112 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.benchmark.utils; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintWriter; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +public class StreamingMockServlet extends HttpServlet { + private static final byte[] STREAMING_RESPONSE_DATA = new byte[1024 * 1024]; // 1MB response + + static { + // Initialize response data + for (int i = 0; i < STREAMING_RESPONSE_DATA.length; i++) { + STREAMING_RESPONSE_DATA[i] = (byte) (i % 256); + } + } + + @Override + protected void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException { + handleRequest(request, response); + } + + @Override + protected void doPost(HttpServletRequest request, HttpServletResponse response) throws IOException { + handleRequest(request, response); + } + + @Override + protected void doPut(HttpServletRequest request, HttpServletResponse response) throws IOException { + handleRequest(request, response); + } + + private void handleRequest(HttpServletRequest request, HttpServletResponse response) throws IOException { + // Check if this should be a streaming response + if (isStreamingOperation(request)) { + handleStreamingRequest(request, response); + } else { + handleJsonRequest(request, response); + } + } + + private boolean isStreamingOperation(HttpServletRequest request) { + String uri = request.getRequestURI(); + String contentType = request.getContentType(); + + return uri.contains("streaming") || + uri.contains("StreamingInput") || + uri.contains("StreamingOutput") || + "application/octet-stream".equals(contentType); + } + + private void handleStreamingRequest(HttpServletRequest request, HttpServletResponse response) throws IOException { + + // Consume input stream if present + try (InputStream inputStream = request.getInputStream()) { + byte[] buffer = new byte[8192]; + while (inputStream.read(buffer) != -1) { + // Just consume the data + } + } + + // Send streaming response + response.setStatus(HttpServletResponse.SC_OK); + response.setContentType("application/octet-stream"); + response.setContentLength(STREAMING_RESPONSE_DATA.length); + response.setHeader("x-amz-request-id", "streaming-" + System.currentTimeMillis()); + + try (OutputStream outputStream = response.getOutputStream()) { + outputStream.write(STREAMING_RESPONSE_DATA); + outputStream.flush(); + } + } + + private void handleJsonRequest(HttpServletRequest request, HttpServletResponse response) throws IOException { + + response.setStatus(HttpServletResponse.SC_OK); + response.setContentType("application/json"); + response.setCharacterEncoding("UTF-8"); + response.setHeader("x-amz-request-id", "json-" + System.currentTimeMillis()); + + String jsonResponse = "{" + + "\"status\":\"success\"," + + "\"message\":\"Mock operation completed\"," + + "\"ResponseMetadata\":{" + + "\"RequestId\":\"mock-request-id\"" + + "}" + + "}"; + + try (PrintWriter writer = response.getWriter()) { + writer.write(jsonResponse); + writer.flush(); + } + } +} \ No newline at end of file