Skip to content

Commit 02517c5

Browse files
[Streams] Mark ES|QL rule execution errors as user-triggered (#255011)
## πŸ““ Summary Closes #255003 The ES|QL request executor in Streams rules currently throws a plain `Error` when query execution fails. Task Manager treats these as framework errors, which can trigger unnecessary retries and negatively impact task health metrics. This change wraps the thrown error with `createTaskRunError` using `TaskErrorSource.USER`, correctly attributing failures (e.g., invalid ES|QL queries written by the user) to user input rather than the platform. ## πŸ§ͺ Testing - Create a Stream rule with an invalid ES|QL query and verify the error is reported without triggering framework-level retries. - Confirm valid ES|QL rules continue to execute and return results as expected. --------- Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
1 parent 8500782 commit 02517c5

File tree

4 files changed

+83
-1
lines changed

4 files changed

+83
-1
lines changed

β€Žx-pack/platform/plugins/shared/streams/moon.ymlβ€Ž

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ dependsOn:
7979
- '@kbn/core-http-common'
8080
- '@kbn/scout'
8181
- '@kbn/esql-language'
82+
- '@kbn/core-elasticsearch-server-mocks'
83+
- '@kbn/core-logging-server-mocks'
8284
tags:
8385
- plugin
8486
- prod
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
import { elasticsearchServiceMock } from '@kbn/core-elasticsearch-server-mocks';
9+
import { loggingSystemMock } from '@kbn/core-logging-server-mocks';
10+
import { getErrorSource, TaskErrorSource } from '@kbn/task-manager-plugin/server/task_running';
11+
import { executeEsqlRequest } from './execute_esql_request';
12+
13+
describe('executeEsqlRequest', () => {
14+
const logger = loggingSystemMock.createLogger();
15+
const esClient = elasticsearchServiceMock.createElasticsearchClient();
16+
17+
const esqlRequest = {
18+
query: 'FROM logs-* | WHERE host.name == "test"',
19+
filter: { match_all: {} },
20+
};
21+
22+
beforeEach(() => {
23+
jest.clearAllMocks();
24+
});
25+
26+
it('returns parsed results when the query succeeds', async () => {
27+
esClient.esql.query.mockResolvedValueOnce({
28+
columns: [
29+
{ name: '_source', type: 'object' },
30+
{ name: '_id', type: 'keyword' },
31+
],
32+
values: [[{ host: { name: 'test' } }, 'doc-1']],
33+
} as never);
34+
35+
const results = await executeEsqlRequest({ esClient, esqlRequest, logger });
36+
37+
expect(results).toEqual([{ _id: 'doc-1', _source: { host: { name: 'test' } } }]);
38+
});
39+
40+
it('returns an empty array when _source or _id columns are missing', async () => {
41+
esClient.esql.query.mockResolvedValueOnce({
42+
columns: [{ name: 'host.name', type: 'keyword' }],
43+
values: [['test']],
44+
} as never);
45+
46+
const results = await executeEsqlRequest({ esClient, esqlRequest, logger });
47+
48+
expect(results).toEqual([]);
49+
});
50+
51+
it('throws a user error when the ES|QL query fails', async () => {
52+
esClient.esql.query.mockRejectedValueOnce(
53+
new Error('verification_exception: Unknown index [logs.kafka]')
54+
);
55+
56+
try {
57+
await executeEsqlRequest({ esClient, esqlRequest, logger });
58+
fail('Expected executeEsqlRequest to throw');
59+
} catch (e) {
60+
const error = e as Error;
61+
expect(error).toBeInstanceOf(Error);
62+
expect(error.message).toContain('Error executing ES|QL request');
63+
expect(error.message).toContain('verification_exception');
64+
expect(getErrorSource(error)).toBe(TaskErrorSource.USER);
65+
}
66+
});
67+
68+
it('logs the error message at debug level', async () => {
69+
esClient.esql.query.mockRejectedValueOnce(new Error('some query error'));
70+
71+
await expect(executeEsqlRequest({ esClient, esqlRequest, logger })).rejects.toThrow();
72+
73+
expect(logger.debug).toHaveBeenCalledWith(
74+
expect.stringContaining('Error executing ES|QL request: some query error')
75+
);
76+
});
77+
});

β€Žx-pack/platform/plugins/shared/streams/server/lib/rules/esql/lib/execute_esql_request.tsβ€Ž

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import type { estypes } from '@elastic/elasticsearch';
99
import type { ElasticsearchClient, Logger } from '@kbn/core/server';
1010
import type { ESQLSearchResponse } from '@kbn/es-types';
11+
import { createTaskRunError, TaskErrorSource } from '@kbn/task-manager-plugin/server';
1112

1213
type Response = Array<{
1314
_id: string;
@@ -52,6 +53,6 @@ export const executeEsqlRequest = async ({
5253
error instanceof Error ? error.message : String(error)
5354
}`;
5455
logger.debug(message);
55-
throw new Error(message);
56+
throw createTaskRunError(new Error(message), TaskErrorSource.USER);
5657
}
5758
};

β€Žx-pack/platform/plugins/shared/streams/tsconfig.jsonβ€Ž

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,5 +78,7 @@
7878
"@kbn/core-http-common",
7979
"@kbn/scout",
8080
"@kbn/esql-language",
81+
"@kbn/core-elasticsearch-server-mocks",
82+
"@kbn/core-logging-server-mocks",
8183
]
8284
}

0 commit comments

Comments
Β (0)