Skip to content

Commit bee1a4b

Browse files
authored
[ML] Migrate stream to core error parsing (#120722)
Avoid trapping Throwable by rethrowing it on another thread, allowing us to reuse the `generateFailureXContent` for Exceptions and match the new 9.0 format.
1 parent 7e43605 commit bee1a4b

File tree

3 files changed

+20
-46
lines changed

3 files changed

+20
-46
lines changed

docs/changelog/120722.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 120722
2+
summary: Migrate stream to core error parsing
3+
area: Machine Learning
4+
type: enhancement
5+
issues: []

x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/rest/ServerSentEventsRestActionListenerTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,7 @@ public class ServerSentEventsRestActionListenerTests extends ESIntegTestCase {
8585
private static final Exception expectedException = new IllegalStateException("hello there");
8686
private static final String expectedExceptionAsServerSentEvent = """
8787
{\
88-
"error":{"root_cause":[{"type":"illegal_state_exception","reason":"hello there",\
89-
"caused_by":{"type":"illegal_state_exception","reason":"hello there"}}],\
88+
"error":{"root_cause":[{"type":"illegal_state_exception","reason":"hello there"}],\
9089
"type":"illegal_state_exception","reason":"hello there"},"status":500\
9190
}""";
9291

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/rest/ServerSentEventsRestActionListener.java

Lines changed: 14 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,10 @@
4040
import java.io.OutputStream;
4141
import java.nio.charset.StandardCharsets;
4242
import java.util.Iterator;
43-
import java.util.Map;
4443
import java.util.Objects;
4544
import java.util.concurrent.Flow;
4645
import java.util.concurrent.atomic.AtomicBoolean;
4746

48-
import static org.elasticsearch.ElasticsearchException.REST_EXCEPTION_SKIP_CAUSE;
49-
import static org.elasticsearch.ElasticsearchException.REST_EXCEPTION_SKIP_STACK_TRACE;
50-
import static org.elasticsearch.rest.RestController.ERROR_TRACE_DEFAULT;
51-
5247
/**
5348
* A version of {@link org.elasticsearch.rest.action.RestChunkedToXContentListener} that reads from a {@link Flow.Publisher} and encodes
5449
* the response in Server-Sent Events.
@@ -154,48 +149,23 @@ public void onFailure(Exception e) {
154149
}
155150
}
156151

157-
// taken indirectly from "new Response(channel, e)"
158-
// except we need to emit the error as SSE
159152
private ChunkedToXContent errorChunk(Throwable t) {
160153
var status = ExceptionsHelper.status(t);
161-
return params -> Iterators.concat(ChunkedToXContentHelper.startObject(), ChunkedToXContentHelper.chunk((b, p) -> {
162-
// Render the exception with a simple message
163-
if (channel.detailedErrorsEnabled() == false) {
164-
String message = "No ElasticsearchException found";
165-
var inner = t;
166-
for (int counter = 0; counter < 10 && inner != null; counter++) {
167-
if (inner instanceof ElasticsearchException) {
168-
message = inner.getClass().getSimpleName() + "[" + inner.getMessage() + "]";
169-
break;
170-
}
171-
inner = inner.getCause();
172-
}
173-
return b.field("error", message);
174-
}
175-
176-
var errorParams = p;
177-
if (errorParams.paramAsBoolean("error_trace", ERROR_TRACE_DEFAULT) && status != RestStatus.UNAUTHORIZED) {
178-
errorParams = new ToXContent.DelegatingMapParams(
179-
Map.of(REST_EXCEPTION_SKIP_STACK_TRACE, "false", REST_EXCEPTION_SKIP_CAUSE, "true"),
180-
params
181-
);
182-
}
183154

184-
// Render the exception with all details
185-
final ElasticsearchException[] rootCauses = ElasticsearchException.guessRootCauses(t);
186-
b.startObject("error");
187-
{
188-
b.startArray("root_cause");
189-
for (ElasticsearchException rootCause : rootCauses) {
190-
b.startObject();
191-
rootCause.toXContent(b, errorParams);
192-
b.endObject();
193-
}
194-
b.endArray();
195-
}
196-
ElasticsearchException.generateThrowableXContent(b, errorParams, t);
197-
return b.endObject();
198-
}), ChunkedToXContentHelper.field("status", status.getStatus()), ChunkedToXContentHelper.endObject());
155+
Exception e;
156+
if (t instanceof Exception) {
157+
e = (Exception) t;
158+
} else {
159+
// if not exception, then error, and we should not let it escape. rethrow on another thread, and inform the user we're stopping.
160+
ExceptionsHelper.maybeDieOnAnotherThread(t);
161+
e = new RuntimeException("Fatal error while streaming response", t);
162+
}
163+
return params -> Iterators.concat(
164+
ChunkedToXContentHelper.startObject(),
165+
Iterators.single((b, p) -> ElasticsearchException.generateFailureXContent(b, p, e, channel.detailedErrorsEnabled())),
166+
Iterators.single((b, p) -> b.field("status", status.getStatus())),
167+
ChunkedToXContentHelper.endObject()
168+
);
199169
}
200170

201171
private void requestNextChunk(ActionListener<ChunkedRestResponseBodyPart> listener) {

0 commit comments

Comments
 (0)