Skip to content

Commit ce30cca

Browse files
kibanamachinenkhristininvitaliidmelasticmachine
authored
[8.19] ES|QL support for partial results (#223198) (#224641)
# Backport This will backport the following commits from `main` to `8.19`: - [ES|QL support for partial results (#223198)](#223198) <!--- Backport version: 9.6.6 --> ### Questions ? Please refer to the [Backport tool documentation](https://github.com/sorenlouv/backport) <!--BACKPORT [{"author":{"name":"Khristinin Nikita","email":"[email protected]"},"sourceCommit":{"committedDate":"2025-06-20T08:33:31Z","message":"ES|QL support for partial results (#223198)\n\n## ES|QL support for partial results\n\n[Issue](#211622) \n\n\nWe have 2 use cases:\n\n- For aggregation query, we set `allow_partial_results` to false\n- For normal query we are set warning status if there cluster failures\n\n\n\n\n\n### How to test\n\n1. Create a datastream\n\n```\nPUT _index_template/my_datastream_template\n{\n \"index_patterns\": [\"my_datastream*\"],\n \"data_stream\": {}, \n \"template\": {\n \"mappings\": {\n \"properties\": {\n \"@timestamp\": {\n \"type\": \"date\"\n },\n \"host\": {\n \"properties\": {\n \"name\": {\n \"type\": \"keyword\"\n }\n }\n }\n }\n }\n }\n}\n\n\nPUT /_data_stream/my_datastream\n```\n\n2. For a single specific index set broken mapping\n\n```\n\nGET my_datastream\n\nPUT .ds-my_datastream-2025.06.11-000001/_mapping\n{\n \"runtime\": {\n \"broken\": {\n \"type\": \"keyword\",\n \"script\": {\n \"source\": \"emit(doc['nonexistent_field'].value)\"\n }\n }\n }\n}\n```\n\n3. Ingest document\n\n```\nPOST my_datastream/_doc\n{\n \"@timestamp\": \"2025-06-05T09:04:11.493Z\"\n}\n```\n\n4. Check that query return partial result true:\n\n```\nPOST _query/async?drop_null_columns=true&allow_partial_results=true\n{\n \"query\": \"from my_datastream* METADATA _id | limit 101\",\n \"keep_alive\": \"60s\"\n}\n```\n\nresponse:\n```\n{\n \"is_running\": false,\n \"took\": 5,\n \"is_partial\": true,\n...\n```\n\n4. Create rule ES|QL with the same query and lookback which overlap\ndocuments you created on step 3.\n\nObserve warning\n\n<img width=\"1261\" alt=\"Screenshot 2025-06-11 at 08 52 07\"\nsrc=\"https://github.com/user-attachments/assets/c371f57b-51ff-4a13-96e3-19e2094d794c\"\n/>\n\n---------\n\nCo-authored-by: Vitalii Dmyterko <[email protected]>\nCo-authored-by: Elastic Machine <[email protected]>","sha":"8bd7f0e522ef861a8154fcf982c62ee759220422","branchLabelMapping":{"^v9.1.0$":"main","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:enhancement","backport:version","v9.1.0","v8.19.0"],"title":"ES|QL support for partial results","number":223198,"url":"https://github.com/elastic/kibana/pull/223198","mergeCommit":{"message":"ES|QL support for partial results (#223198)\n\n## ES|QL support for partial results\n\n[Issue](#211622) \n\n\nWe have 2 use cases:\n\n- For aggregation query, we set `allow_partial_results` to false\n- For normal query we are set warning status if there cluster failures\n\n\n\n\n\n### How to test\n\n1. Create a datastream\n\n```\nPUT _index_template/my_datastream_template\n{\n \"index_patterns\": [\"my_datastream*\"],\n \"data_stream\": {}, \n \"template\": {\n \"mappings\": {\n \"properties\": {\n \"@timestamp\": {\n \"type\": \"date\"\n },\n \"host\": {\n \"properties\": {\n \"name\": {\n \"type\": \"keyword\"\n }\n }\n }\n }\n }\n }\n}\n\n\nPUT /_data_stream/my_datastream\n```\n\n2. For a single specific index set broken mapping\n\n```\n\nGET my_datastream\n\nPUT .ds-my_datastream-2025.06.11-000001/_mapping\n{\n \"runtime\": {\n \"broken\": {\n \"type\": \"keyword\",\n \"script\": {\n \"source\": \"emit(doc['nonexistent_field'].value)\"\n }\n }\n }\n}\n```\n\n3. Ingest document\n\n```\nPOST my_datastream/_doc\n{\n \"@timestamp\": \"2025-06-05T09:04:11.493Z\"\n}\n```\n\n4. Check that query return partial result true:\n\n```\nPOST _query/async?drop_null_columns=true&allow_partial_results=true\n{\n \"query\": \"from my_datastream* METADATA _id | limit 101\",\n \"keep_alive\": \"60s\"\n}\n```\n\nresponse:\n```\n{\n \"is_running\": false,\n \"took\": 5,\n \"is_partial\": true,\n...\n```\n\n4. Create rule ES|QL with the same query and lookback which overlap\ndocuments you created on step 3.\n\nObserve warning\n\n<img width=\"1261\" alt=\"Screenshot 2025-06-11 at 08 52 07\"\nsrc=\"https://github.com/user-attachments/assets/c371f57b-51ff-4a13-96e3-19e2094d794c\"\n/>\n\n---------\n\nCo-authored-by: Vitalii Dmyterko <[email protected]>\nCo-authored-by: Elastic Machine <[email protected]>","sha":"8bd7f0e522ef861a8154fcf982c62ee759220422"}},"sourceBranch":"main","suggestedTargetBranches":["8.19"],"targetPullRequestStates":[{"branch":"main","label":"v9.1.0","branchLabelMappingKey":"^v9.1.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/223198","number":223198,"mergeCommit":{"message":"ES|QL support for partial results (#223198)\n\n## ES|QL support for partial results\n\n[Issue](#211622) \n\n\nWe have 2 use cases:\n\n- For aggregation query, we set `allow_partial_results` to false\n- For normal query we are set warning status if there cluster failures\n\n\n\n\n\n### How to test\n\n1. Create a datastream\n\n```\nPUT _index_template/my_datastream_template\n{\n \"index_patterns\": [\"my_datastream*\"],\n \"data_stream\": {}, \n \"template\": {\n \"mappings\": {\n \"properties\": {\n \"@timestamp\": {\n \"type\": \"date\"\n },\n \"host\": {\n \"properties\": {\n \"name\": {\n \"type\": \"keyword\"\n }\n }\n }\n }\n }\n }\n}\n\n\nPUT /_data_stream/my_datastream\n```\n\n2. For a single specific index set broken mapping\n\n```\n\nGET my_datastream\n\nPUT .ds-my_datastream-2025.06.11-000001/_mapping\n{\n \"runtime\": {\n \"broken\": {\n \"type\": \"keyword\",\n \"script\": {\n \"source\": \"emit(doc['nonexistent_field'].value)\"\n }\n }\n }\n}\n```\n\n3. Ingest document\n\n```\nPOST my_datastream/_doc\n{\n \"@timestamp\": \"2025-06-05T09:04:11.493Z\"\n}\n```\n\n4. Check that query return partial result true:\n\n```\nPOST _query/async?drop_null_columns=true&allow_partial_results=true\n{\n \"query\": \"from my_datastream* METADATA _id | limit 101\",\n \"keep_alive\": \"60s\"\n}\n```\n\nresponse:\n```\n{\n \"is_running\": false,\n \"took\": 5,\n \"is_partial\": true,\n...\n```\n\n4. Create rule ES|QL with the same query and lookback which overlap\ndocuments you created on step 3.\n\nObserve warning\n\n<img width=\"1261\" alt=\"Screenshot 2025-06-11 at 08 52 07\"\nsrc=\"https://github.com/user-attachments/assets/c371f57b-51ff-4a13-96e3-19e2094d794c\"\n/>\n\n---------\n\nCo-authored-by: Vitalii Dmyterko <[email protected]>\nCo-authored-by: Elastic Machine <[email protected]>","sha":"8bd7f0e522ef861a8154fcf982c62ee759220422"}},{"branch":"8.19","label":"v8.19.0","branchLabelMappingKey":"^v(\\d+).(\\d+).\\d+$","isSourceBranch":false,"state":"NOT_CREATED"}]}] BACKPORT--> --------- Co-authored-by: Khristinin Nikita <[email protected]> Co-authored-by: Vitalii Dmyterko <[email protected]> Co-authored-by: Elastic Machine <[email protected]> Co-authored-by: Nikita Khristinin <[email protected]>
1 parent 93a3478 commit ce30cca

File tree

6 files changed

+266
-1
lines changed

6 files changed

+266
-1
lines changed

x-pack/solutions/security/plugins/security_solution/server/lib/detection_engine/rule_types/esql/esql.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import type { RulePreviewLoggedRequest } from '../../../../../common/api/detecti
3333
import type { SecurityRuleServices, SecuritySharedParams, SignalSource } from '../types';
3434
import { getDataTierFilter } from '../utils/get_data_tier_filter';
3535
import { checkErrorDetails } from '../utils/check_error_details';
36+
import { logClusterShardFailuresEsql } from '../utils/log_cluster_shard_failures_esql';
3637
import type { ExcludedDocument, EsqlState } from './types';
3738

3839
import {
@@ -131,7 +132,12 @@ export const esqlExecutor = async ({
131132
excludedDocumentIds: excludedDocuments.map(({ id }) => id),
132133
ruleExecutionTimeout,
133134
});
134-
const esqlQueryString = { drop_null_columns: true };
135+
136+
const esqlQueryString = {
137+
drop_null_columns: true,
138+
// allow_partial_results is true by default, but we need to set it to false for aggregating queries
139+
allow_partial_results: !isRuleAggregating,
140+
};
135141
const hasLoggedRequestsReachedLimit = iteration >= 2;
136142

137143
ruleExecutionLogger.debug(`ES|QL query request: ${JSON.stringify(esqlRequest)}`);
@@ -151,6 +157,7 @@ export const esqlExecutor = async ({
151157
loggedRequests: isLoggedRequestsEnabled ? loggedRequests : undefined,
152158
});
153159

160+
logClusterShardFailuresEsql({ response, result });
154161
const esqlSearchDuration = performance.now() - esqlSignalSearchStart;
155162
result.searchAfterTimes.push(makeFloatString(esqlSearchDuration));
156163

x-pack/solutions/security/plugins/security_solution/server/lib/detection_engine/rule_types/esql/esql_request.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ export interface EsqlResultColumn {
2929
type: 'date' | 'keyword';
3030
}
3131

32+
export type EsqlEsqlShardFailure = Record<string, unknown>;
33+
3234
type AsyncEsqlResponse = {
3335
id: string;
3436
is_running: boolean;
@@ -39,6 +41,13 @@ export type EsqlResultRow = Array<string | null>;
3941
export interface EsqlTable {
4042
columns: EsqlResultColumn[];
4143
values: EsqlResultRow[];
44+
_clusters?: {
45+
details?: {
46+
[key: string]: {
47+
failures?: EsqlEsqlShardFailure[];
48+
};
49+
};
50+
};
4251
}
4352

4453
export const performEsqlRequest = async ({

x-pack/solutions/security/plugins/security_solution/server/lib/detection_engine/rule_types/translations.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,14 @@ export const EQL_SHARD_FAILURE_MESSAGE = (
6363
},
6464
});
6565

66+
export const ESQL_SHARD_FAILURE_MESSAGE = (shardFailuresMessage: string) =>
67+
i18n.translate('xpack.securitySolution.detectionEngine.esqlRuleType.esqlShardFailures', {
68+
defaultMessage: `The ES|QL event query was only executed on the available shards. The query failed to run successfully on the following shards: {shardFailures}`,
69+
values: {
70+
shardFailures: shardFailuresMessage,
71+
},
72+
});
73+
6674
export const FIND_THRESHOLD_BUCKETS_DESCRIPTION = (afterBucket?: string) =>
6775
afterBucket
6876
? i18n.translate(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
import type { EsqlTable, EsqlEsqlShardFailure } from '../esql/esql_request';
9+
import type { SearchAfterAndBulkCreateReturnType } from '../types';
10+
import { logClusterShardFailuresEsql } from './log_cluster_shard_failures_esql';
11+
12+
describe('logClusterShardFailuresEsql', () => {
13+
let mockResult: SearchAfterAndBulkCreateReturnType;
14+
15+
beforeEach(() => {
16+
mockResult = {
17+
warningMessages: [],
18+
bulkCreateTimes: [],
19+
createdSignalsCount: 0,
20+
createdSignals: [],
21+
errors: [],
22+
searchAfterTimes: [],
23+
success: true,
24+
warning: false,
25+
enrichmentTimes: [],
26+
};
27+
});
28+
29+
it('should not add warning message when no shard failures exist', () => {
30+
const response: EsqlTable = {
31+
columns: [],
32+
values: [],
33+
_clusters: {
34+
details: {},
35+
},
36+
};
37+
38+
logClusterShardFailuresEsql({ response, result: mockResult });
39+
expect(mockResult.warningMessages).toHaveLength(0);
40+
});
41+
42+
it('should add warning message when shard failures exist in a single cluster', () => {
43+
const shardFailure: EsqlEsqlShardFailure = {
44+
reason: { type: 'test_failure', reason: 'test failure' },
45+
shard: '0',
46+
index: 'test-index',
47+
};
48+
49+
const response: EsqlTable = {
50+
columns: [],
51+
values: [],
52+
_clusters: {
53+
details: {
54+
'cluster-1': {
55+
failures: [shardFailure],
56+
},
57+
},
58+
},
59+
};
60+
61+
logClusterShardFailuresEsql({ response, result: mockResult });
62+
expect(mockResult.warningMessages).toHaveLength(1);
63+
expect(mockResult.warningMessages[0]).toBe(
64+
`The ES|QL event query was only executed on the available shards. The query failed to run successfully on the following shards: ${JSON.stringify(
65+
[shardFailure]
66+
)}`
67+
);
68+
});
69+
70+
it('should add warning message when shard failures exist in multiple clusters', () => {
71+
const shardFailure1: EsqlEsqlShardFailure = {
72+
reason: { type: 'test_failure_1', reason: 'test failure 1' },
73+
shard: '0',
74+
index: 'test-index-1',
75+
};
76+
77+
const shardFailure2: EsqlEsqlShardFailure = {
78+
reason: { type: 'test_failure_2', reason: 'test failure 2' },
79+
shard: '1',
80+
index: 'test-index-2',
81+
};
82+
83+
const response: EsqlTable = {
84+
columns: [],
85+
values: [],
86+
_clusters: {
87+
details: {
88+
'cluster-1': {
89+
failures: [shardFailure1],
90+
},
91+
'cluster-2': {
92+
failures: [shardFailure2],
93+
},
94+
},
95+
},
96+
};
97+
98+
logClusterShardFailuresEsql({ response, result: mockResult });
99+
expect(mockResult.warningMessages).toHaveLength(1);
100+
expect(mockResult.warningMessages[0]).toBe(
101+
`The ES|QL event query was only executed on the available shards. The query failed to run successfully on the following shards: ${JSON.stringify(
102+
[shardFailure1, shardFailure2]
103+
)}`
104+
);
105+
});
106+
107+
it('should handle undefined _clusters property', () => {
108+
const response: EsqlTable = {
109+
columns: [],
110+
values: [],
111+
};
112+
113+
logClusterShardFailuresEsql({ response, result: mockResult });
114+
expect(mockResult.warningMessages).toHaveLength(0);
115+
});
116+
117+
it('should handle undefined details property', () => {
118+
const response: EsqlTable = {
119+
columns: [],
120+
values: [],
121+
_clusters: {},
122+
};
123+
124+
logClusterShardFailuresEsql({ response, result: mockResult });
125+
expect(mockResult.warningMessages).toHaveLength(0);
126+
});
127+
});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
import type { EsqlTable, EsqlEsqlShardFailure } from '../esql/esql_request';
8+
import * as i18n from '../translations';
9+
import type { SearchAfterAndBulkCreateReturnType } from '../types';
10+
11+
export const logClusterShardFailuresEsql = ({
12+
response,
13+
result,
14+
}: {
15+
response: EsqlTable;
16+
result: SearchAfterAndBulkCreateReturnType;
17+
}) => {
18+
const clusters = response?._clusters?.details ?? {};
19+
const shardFailures = Object.keys(clusters).reduce<EsqlEsqlShardFailure[]>((acc, cluster) => {
20+
const failures = clusters[cluster]?.failures ?? [];
21+
22+
if (failures.length > 0) {
23+
acc.push(...failures);
24+
}
25+
26+
return acc;
27+
}, []);
28+
29+
if (shardFailures.length > 0) {
30+
result.warningMessages.push(i18n.ESQL_SHARD_FAILURE_MESSAGE(JSON.stringify(shardFailures)));
31+
}
32+
};

x-pack/test/security_solution_api_integration/test_suites/detections_response/detection_engine/rule_execution_logic/esql/trial_license_complete_tier/esql.ts

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ import {
3434
waitForBackfillExecuted,
3535
setAdvancedSettings,
3636
getOpenAlerts,
37+
setBrokenRuntimeField,
38+
unsetBrokenRuntimeField,
3739
} from '../../../../utils';
3840
import {
3941
deleteAllRules,
@@ -42,6 +44,7 @@ import {
4244
} from '../../../../../../../common/utils/security_solution';
4345
import { deleteAllExceptions } from '../../../../../lists_and_exception_lists/utils';
4446
import { FtrProviderContext } from '../../../../../../ftr_provider_context';
47+
import { EsArchivePathBuilder } from '../../../../../../es_archive_path_builder';
4548

4649
export default ({ getService }: FtrProviderContext) => {
4750
const supertest = getService('supertest');
@@ -2227,6 +2230,85 @@ export default ({ getService }: FtrProviderContext) => {
22272230
});
22282231
});
22292232

2233+
describe('shard failures', () => {
2234+
const config = getService('config');
2235+
const isServerless = config.get('serverless');
2236+
const dataPathBuilder = new EsArchivePathBuilder(isServerless);
2237+
const packetBeatPath = dataPathBuilder.getPath('packetbeat/default');
2238+
2239+
before(async () => {
2240+
await esArchiver.load(packetBeatPath);
2241+
await setBrokenRuntimeField({ es, index: 'packetbeat-*' });
2242+
});
2243+
2244+
after(async () => {
2245+
await unsetBrokenRuntimeField({ es, index: 'packetbeat-*' });
2246+
await esArchiver.unload(packetBeatPath);
2247+
});
2248+
2249+
it('should handle shard failures and include warning in logs for query that is not aggregating', async () => {
2250+
const doc1 = { agent: { name: 'test-1' } };
2251+
await indexEnhancedDocuments({
2252+
documents: [doc1],
2253+
id: uuidv4(),
2254+
});
2255+
2256+
const rule: EsqlRuleCreateProps = {
2257+
...getCreateEsqlRulesSchemaMock('rule-1', true),
2258+
query: `from packetbeat-*, ecs_compliant METADATA _id | limit 101`,
2259+
from: 'now-100000h',
2260+
};
2261+
2262+
const { logs, previewId } = await previewRule({
2263+
supertest,
2264+
rule,
2265+
});
2266+
2267+
const previewAlerts = await getPreviewAlerts({ es, previewId });
2268+
2269+
expect(logs).toEqual(
2270+
expect.arrayContaining([
2271+
expect.objectContaining({
2272+
warnings: expect.arrayContaining([
2273+
expect.stringContaining(
2274+
'The ES|QL event query was only executed on the available shards. The query failed to run successfully on the following shards'
2275+
),
2276+
]),
2277+
}),
2278+
])
2279+
);
2280+
2281+
expect(previewAlerts?.length).toBeGreaterThan(0);
2282+
});
2283+
2284+
it('should handle shard failures and include errors in logs for query that is aggregating', async () => {
2285+
const rule: EsqlRuleCreateProps = {
2286+
...getCreateEsqlRulesSchemaMock(),
2287+
query: `from packetbeat-* | stats _count=count(broken) by @timestamp`,
2288+
from: 'now-100000h',
2289+
};
2290+
2291+
const { logs, previewId } = await previewRule({
2292+
supertest,
2293+
rule,
2294+
});
2295+
2296+
const previewAlerts = await getPreviewAlerts({ es, previewId });
2297+
2298+
expect(logs).toEqual(
2299+
expect.arrayContaining([
2300+
expect.objectContaining({
2301+
errors: expect.arrayContaining([
2302+
expect.stringContaining('No field found for [non_existing] in mapping'),
2303+
]),
2304+
}),
2305+
])
2306+
);
2307+
2308+
expect(previewAlerts).toHaveLength(0);
2309+
});
2310+
});
2311+
22302312
describe('alerts on alerts', () => {
22312313
let id: string;
22322314
let ruleId: string;

0 commit comments

Comments
 (0)