From 10dc614354d296139e4e44d24260fcc9c51c44e4 Mon Sep 17 00:00:00 2001 From: Laura Trotta Date: Mon, 11 Aug 2025 16:23:29 +0200 Subject: [PATCH 1/2] safe response consumer for rest5client --- .../rest5_client/Rest5ClientOptions.java | 3 +- .../rest5_client/SafeResponseConsumer.java | 137 +++++++++++++++ .../low_level/SafeResponseConsumerTest.java | 158 ++++++++++++++++++ 3 files changed, 297 insertions(+), 1 deletion(-) create mode 100644 java-client/src/main/java/co/elastic/clients/transport/rest5_client/SafeResponseConsumer.java create mode 100644 java-client/src/test/java/co/elastic/clients/transport/rest5_client/low_level/SafeResponseConsumerTest.java diff --git a/java-client/src/main/java/co/elastic/clients/transport/rest5_client/Rest5ClientOptions.java b/java-client/src/main/java/co/elastic/clients/transport/rest5_client/Rest5ClientOptions.java index 0e553088a..2849d2d35 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/rest5_client/Rest5ClientOptions.java +++ b/java-client/src/main/java/co/elastic/clients/transport/rest5_client/Rest5ClientOptions.java @@ -24,6 +24,7 @@ import co.elastic.clients.transport.http.HeaderMap; import co.elastic.clients.transport.rest5_client.low_level.RequestOptions; import co.elastic.clients.transport.rest5_client.low_level.WarningsHandler; +import co.elastic.clients.transport.rest5_client.SafeResponseConsumer; import co.elastic.clients.util.LanguageRuntimeVersions; import co.elastic.clients.util.VisibleForTesting; import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder; @@ -209,7 +210,7 @@ public Rest5ClientOptions build() { } static Rest5ClientOptions initialOptions() { - return new Rest5ClientOptions(RequestOptions.DEFAULT, false); + return new Rest5ClientOptions(SafeResponseConsumer.DEFAULT_REQUEST_OPTIONS, false); } private static RequestOptions.Builder addBuiltinHeaders(RequestOptions.Builder builder) { diff --git a/java-client/src/main/java/co/elastic/clients/transport/rest5_client/SafeResponseConsumer.java b/java-client/src/main/java/co/elastic/clients/transport/rest5_client/SafeResponseConsumer.java new file mode 100644 index 000000000..69fccfdfe --- /dev/null +++ b/java-client/src/main/java/co/elastic/clients/transport/rest5_client/SafeResponseConsumer.java @@ -0,0 +1,137 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package co.elastic.clients.transport.rest5_client; + +import co.elastic.clients.transport.rest5_client.low_level.HttpAsyncResponseConsumerFactory; +import co.elastic.clients.transport.rest5_client.low_level.RequestOptions; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.nio.AsyncResponseConsumer; +import org.apache.hc.core5.http.nio.CapacityChannel; +import org.apache.hc.core5.http.protocol.HttpContext; + +import java.nio.ByteBuffer; +import java.util.List; + +/** + * A response consumer that will propagate Errors as RuntimeExceptions to avoid crashing the IOReactor. + */ +public class SafeResponseConsumer implements AsyncResponseConsumer { + + private final AsyncResponseConsumer delegate; + + public SafeResponseConsumer(AsyncResponseConsumer delegate) { + this.delegate = delegate; + } + + /** + * A consumer factory that safely wraps the one provided by {@code RequestOptions.DEFAULT}. + */ + public static final HttpAsyncResponseConsumerFactory DEFAULT_FACTORY = () -> new SafeResponseConsumer<>( + RequestOptions.DEFAULT.getHttpAsyncResponseConsumerFactory().createHttpAsyncResponseConsumer() + ); + + /** + * Same as {@code RequestOptions.DEFAULT} with a safe consumer factory + */ + public static final RequestOptions DEFAULT_REQUEST_OPTIONS = RequestOptions.DEFAULT + .toBuilder() + .setHttpAsyncResponseConsumerFactory(DEFAULT_FACTORY) + .build(); + + @SuppressWarnings("unchecked") + private static void throwUnchecked(Throwable thr) throws T { + throw (T) thr; + } + + @Override + public void consumeResponse(HttpResponse response, EntityDetails entityDetails, HttpContext context, + FutureCallback resultCallback) { + try { + delegate.consumeResponse(response, entityDetails, context, resultCallback); + } catch (Exception e) { + throwUnchecked(e); + } catch (Throwable e) { + throw new RuntimeException("Error consuming response", e); + } + } + + @Override + public void informationResponse(HttpResponse response, HttpContext context) { + try { + delegate.informationResponse(response, context); + } catch (Exception e) { + throwUnchecked(e); + } catch (Throwable e) { + throw new RuntimeException("Error information response", e); + } + } + + @Override + public void failed(Exception cause) { + try { + delegate.failed(cause); + } catch (Exception e) { + throwUnchecked(e); + } catch (Throwable e) { + throw new RuntimeException("Error handling failure", e); + } + } + + @Override + public void updateCapacity(CapacityChannel capacityChannel) { + try { + delegate.updateCapacity(capacityChannel); + } catch (Exception e) { + throwUnchecked(e); + } catch (Throwable e) { + throw new RuntimeException("Error updating capacity", e); + } + } + + @Override + public void consume(ByteBuffer src) { + try { + delegate.consume(src); + } catch (Exception e) { + throwUnchecked(e); + } catch (Throwable e) { + throw new RuntimeException("Error consuming data", e); + } + } + + @Override + public void streamEnd(List trailers) { + try { + delegate.streamEnd(trailers); + } catch (Exception e) { + throwUnchecked(e); + } catch (Throwable e) { + throw new RuntimeException("Error triggering stream end", e); + } + } + + @Override + public void releaseResources() { + delegate.releaseResources(); + } +} diff --git a/java-client/src/test/java/co/elastic/clients/transport/rest5_client/low_level/SafeResponseConsumerTest.java b/java-client/src/test/java/co/elastic/clients/transport/rest5_client/low_level/SafeResponseConsumerTest.java new file mode 100644 index 000000000..d22f1fbc9 --- /dev/null +++ b/java-client/src/test/java/co/elastic/clients/transport/rest5_client/low_level/SafeResponseConsumerTest.java @@ -0,0 +1,158 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package co.elastic.clients.transport.rest5_client.low_level; + +import co.elastic.clients.transport.rest5_client.SafeResponseConsumer; +import com.sun.net.httpserver.HttpServer; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.io.entity.ByteArrayEntity; +import org.apache.hc.core5.http.message.BasicClassicHttpResponse; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; + +public class SafeResponseConsumerTest { + + static HttpServer Server; + static HttpHost ESHost; + + // A consumer factory that throws an Error, to simulate the effect of an OOME + static HttpAsyncResponseConsumerFactory FailingConsumerFactory = + () -> new BasicAsyncResponseConsumer(new BufferedByteConsumer(100 * 1024 * 1024)) { + @Override + public void informationResponse(HttpResponse response, HttpContext context) { + super.informationResponse(response, context); + } + + @Override + protected BasicClassicHttpResponse buildResult(HttpResponse response, ByteArrayEntity entity, + ContentType contentType) { + super.buildResult(response, entity, contentType); + throw new Error("Error in buildResult"); + } + }; + + @BeforeAll + public static void setup() throws Exception { + Server = HttpServer.create(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0); + Server.start(); + + Server.createContext("/", exchange -> { + String path = exchange.getRequestURI().getPath(); + exchange.getResponseHeaders().set("Content-Type", "application/json"); + exchange.getResponseHeaders().set("X-Elastic-Product", "Elasticsearch"); + + if (path.equals("/")) { + byte[] bytes = Info.getBytes(StandardCharsets.UTF_8); + exchange.sendResponseHeaders(200, bytes.length); + exchange.getResponseBody().write(bytes); + exchange.close(); + return; + } + + exchange.sendResponseHeaders(404, -1); + exchange.close(); + }); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + Server.stop(1); + } catch (Exception e) { + // Ignore + } + })); + + ESHost = new HttpHost(Server.getAddress().getAddress(), Server.getAddress().getPort()); + } + + @AfterAll + public static void tearDown() { + Server.stop(0); + } + + // testReactorDeath cannot be tested, as the io reactor thread gets stuck and the test never completes + + @Test + public void testReactorSurvival() throws Exception { + + // Request options that will simulate an OOME and wrapped in the safe consumer that will + // avoid the reactor's death + RequestOptions.Builder protectedFailingOptionsBuilder = RequestOptions.DEFAULT.toBuilder(); + protectedFailingOptionsBuilder.setHttpAsyncResponseConsumerFactory(() -> + new SafeResponseConsumer<>(FailingConsumerFactory.createHttpAsyncResponseConsumer()) + ); + RequestOptions protectedFailingOptions = protectedFailingOptionsBuilder.build(); + + Rest5Client restClient = Rest5Client.builder(ESHost).build(); + + // First request, to warm things up. + // An "indice exists" request, that has no response body + Request existsReq = new Request("HEAD", "/index-name"); + restClient.performRequest(existsReq); + + try { + Request infoReq = new Request("GET", "/"); + infoReq.setOptions(protectedFailingOptions); + + restClient.performRequest(infoReq); + Assertions.fail("First request should not succeed"); + } catch (Exception t) { + System.err.println("Request 1 error"); + } + { + // 2nd request with no specific options + Request infoReq = new Request("GET", "/"); + + Response resp = restClient.performRequest(infoReq); + Assertions.assertEquals(200, resp.getStatusCode()); + } + { + // final request to make sure that the reactor isn't closed + restClient.performRequest(existsReq); + } + restClient.close(); + } + + private static final String Info = "{\n" + + " \"cluster_name\": \"foo\",\n" + + " \"cluster_uuid\": \"bar\",\n" + + " \"version\": {\n" + + " \"build_date\": \"2022-01-28T08:36:04.875279988Z\",\n" + + " \"minimum_wire_compatibility_version\": \"6.8.0\",\n" + + " \"build_hash\": \"bee86328705acaa9a6daede7140defd4d9ec56bd\",\n" + + " \"number\": \"7.17.0\",\n" + + " \"lucene_version\": \"8.11.1\",\n" + + " \"minimum_index_compatibility_version\": \"6.0.0-beta1\",\n" + + " \"build_flavor\": \"default\",\n" + + " \"build_snapshot\": false,\n" + + " \"build_type\": \"docker\"\n" + + " },\n" + + " \"name\": \"instance-0000000000\",\n" + + " \"tagline\": \"You Know, for Search\"\n" + + "}"; +} From 5c9efd63ffc92d76f90b9a5791ef1b457a66db03 Mon Sep 17 00:00:00 2001 From: Laura Trotta Date: Mon, 11 Aug 2025 16:33:11 +0200 Subject: [PATCH 2/2] unused import --- .../clients/transport/rest5_client/Rest5ClientOptions.java | 1 - 1 file changed, 1 deletion(-) diff --git a/java-client/src/main/java/co/elastic/clients/transport/rest5_client/Rest5ClientOptions.java b/java-client/src/main/java/co/elastic/clients/transport/rest5_client/Rest5ClientOptions.java index 2849d2d35..6135a6738 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/rest5_client/Rest5ClientOptions.java +++ b/java-client/src/main/java/co/elastic/clients/transport/rest5_client/Rest5ClientOptions.java @@ -24,7 +24,6 @@ import co.elastic.clients.transport.http.HeaderMap; import co.elastic.clients.transport.rest5_client.low_level.RequestOptions; import co.elastic.clients.transport.rest5_client.low_level.WarningsHandler; -import co.elastic.clients.transport.rest5_client.SafeResponseConsumer; import co.elastic.clients.util.LanguageRuntimeVersions; import co.elastic.clients.util.VisibleForTesting; import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder;