Skip to content

Commit d8cf6df

Browse files
authored
[ML] Migrate stream to core error parsing (#120722) (#120912)
Avoid trapping Throwable by rethrowing it on another thread, allowing us to reuse the `generateFailureXContent` for Exceptions and match the new 9.0 format.
1 parent 666c778 commit d8cf6df

File tree

3 files changed

+20
-46
lines changed

3 files changed

+20
-46
lines changed

docs/changelog/120722.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 120722
2+
summary: Migrate stream to core error parsing
3+
area: Machine Learning
4+
type: enhancement
5+
issues: []

x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/rest/ServerSentEventsRestActionListenerTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,7 @@ public class ServerSentEventsRestActionListenerTests extends ESIntegTestCase {
8383
private static final Exception expectedException = new IllegalStateException("hello there");
8484
private static final String expectedExceptionAsServerSentEvent = """
8585
{\
86-
"error":{"root_cause":[{"type":"illegal_state_exception","reason":"hello there",\
87-
"caused_by":{"type":"illegal_state_exception","reason":"hello there"}}],\
86+
"error":{"root_cause":[{"type":"illegal_state_exception","reason":"hello there"}],\
8887
"type":"illegal_state_exception","reason":"hello there"},"status":500\
8988
}""";
9089

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/rest/ServerSentEventsRestActionListener.java

Lines changed: 14 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,10 @@
4141
import java.io.OutputStream;
4242
import java.nio.charset.StandardCharsets;
4343
import java.util.Iterator;
44-
import java.util.Map;
4544
import java.util.Objects;
4645
import java.util.concurrent.Flow;
4746
import java.util.concurrent.atomic.AtomicBoolean;
4847

49-
import static org.elasticsearch.ElasticsearchException.REST_EXCEPTION_SKIP_CAUSE;
50-
import static org.elasticsearch.ElasticsearchException.REST_EXCEPTION_SKIP_STACK_TRACE;
51-
import static org.elasticsearch.rest.RestController.ERROR_TRACE_DEFAULT;
52-
5348
/**
5449
* A version of {@link org.elasticsearch.rest.action.RestChunkedToXContentListener} that reads from a {@link Flow.Publisher} and encodes
5550
* the response in Server-Sent Events.
@@ -155,48 +150,23 @@ public void onFailure(Exception e) {
155150
}
156151
}
157152

158-
// taken indirectly from "new Response(channel, e)"
159-
// except we need to emit the error as SSE
160153
private ChunkedToXContent errorChunk(Throwable t) {
161154
var status = ExceptionsHelper.status(t);
162-
return params -> Iterators.concat(ChunkedToXContentHelper.startObject(), ChunkedToXContentHelper.singleChunk((b, p) -> {
163-
// Render the exception with a simple message
164-
if (channel.detailedErrorsEnabled() == false) {
165-
String message = "No ElasticsearchException found";
166-
var inner = t;
167-
for (int counter = 0; counter < 10 && inner != null; counter++) {
168-
if (inner instanceof ElasticsearchException) {
169-
message = inner.getClass().getSimpleName() + "[" + inner.getMessage() + "]";
170-
break;
171-
}
172-
inner = inner.getCause();
173-
}
174-
return b.field("error", message);
175-
}
176-
177-
var errorParams = p;
178-
if (errorParams.paramAsBoolean("error_trace", ERROR_TRACE_DEFAULT) && status != RestStatus.UNAUTHORIZED) {
179-
errorParams = new ToXContent.DelegatingMapParams(
180-
Map.of(REST_EXCEPTION_SKIP_STACK_TRACE, "false", REST_EXCEPTION_SKIP_CAUSE, "true"),
181-
params
182-
);
183-
}
184155

185-
// Render the exception with all details
186-
final ElasticsearchException[] rootCauses = ElasticsearchException.guessRootCauses(t);
187-
b.startObject("error");
188-
{
189-
b.startArray("root_cause");
190-
for (ElasticsearchException rootCause : rootCauses) {
191-
b.startObject();
192-
rootCause.toXContent(b, errorParams);
193-
b.endObject();
194-
}
195-
b.endArray();
196-
}
197-
ElasticsearchException.generateThrowableXContent(b, errorParams, t);
198-
return b.endObject();
199-
}), ChunkedToXContentHelper.field("status", status.getStatus()), ChunkedToXContentHelper.endObject());
156+
Exception e;
157+
if (t instanceof Exception) {
158+
e = (Exception) t;
159+
} else {
160+
// if not exception, then error, and we should not let it escape. rethrow on another thread, and inform the user we're stopping.
161+
ExceptionsHelper.maybeDieOnAnotherThread(t);
162+
e = new RuntimeException("Fatal error while streaming response", t);
163+
}
164+
return params -> Iterators.concat(
165+
ChunkedToXContentHelper.startObject(),
166+
Iterators.single((b, p) -> ElasticsearchException.generateFailureXContent(b, p, e, channel.detailedErrorsEnabled())),
167+
Iterators.single((b, p) -> b.field("status", status.getStatus())),
168+
ChunkedToXContentHelper.endObject()
169+
);
200170
}
201171

202172
private void requestNextChunk(ActionListener<ChunkedRestResponseBodyPart> listener) {

0 commit comments

Comments
 (0)