Skip to content

Commit 21377e2

Browse files
EQL: backport fix for async missing events and re-enable the feature (#98130)
1 parent af997d3 commit 21377e2

File tree

16 files changed

+662
-534
lines changed

16 files changed

+662
-534
lines changed

docs/changelog/98130.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 98130
2+
summary: Backport fix for async missing events and re-enable the feature
3+
area: EQL
4+
type: bug
5+
issues: []

x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/BaseEqlSpecTestCase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,10 +244,10 @@ private long[] extractIds(List<Map<String, Object>> events) {
244244
final long[] ids = new long[len];
245245
for (int i = 0; i < len; i++) {
246246
Map<String, Object> event = events.get(i);
247-
Map<String, Object> source = (Map<String, Object>) event.get("_source");
248-
if (source == null) {
247+
if (Boolean.TRUE.equals(event.get("missing"))) {
249248
ids[i] = -1;
250249
} else {
250+
Map<String, Object> source = (Map<String, Object>) event.get("_source");
251251
Object field = source.get(idField());
252252
ids[i] = ((Number) field).longValue();
253253
}

x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/DataLoader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ public static void loadDatasetIntoEs(
107107
//
108108
// missing_events index
109109
//
110-
// load(client, TEST_MISSING_EVENTS_INDEX, null, null, p);
110+
load(client, TEST_MISSING_EVENTS_INDEX, null, null, p);
111111
load(client, TEST_SAMPLE_MULTI, null, null, p);
112112
}
113113

x-pack/plugin/eql/qa/rest/src/javaRestTest/java/org/elasticsearch/xpack/eql/EqlMissingEventsIT.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,10 @@
77

88
package org.elasticsearch.xpack.eql;
99

10-
import org.apache.lucene.tests.util.LuceneTestCase;
1110
import org.elasticsearch.test.eql.EqlMissingEventsSpecTestCase;
1211

1312
import java.util.List;
1413

15-
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/97644")
1614
public class EqlMissingEventsIT extends EqlMissingEventsSpecTestCase {
1715

1816
public EqlMissingEventsIT(String query, String name, List<long[]> eventIds, String[] joinKeys, Integer size, Integer maxSamplesPerKey) {

x-pack/plugin/eql/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/eql/10_basic.yml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -436,9 +436,7 @@ setup:
436436

437437
---
438438
"Sequence with missing events.":
439-
- skip:
440-
version: "all"
441-
reason: "AwaitsFix https://github.com/elastic/elasticsearch/issues/97644"
439+
442440
- do:
443441
eql.search:
444442
index: eql_test
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
---
2+
setup:
3+
- do:
4+
indices.create:
5+
index: eql_test
6+
body:
7+
mappings:
8+
properties:
9+
"@timestamp":
10+
type: date
11+
event.category:
12+
type: keyword
13+
user:
14+
type: keyword
15+
16+
- do:
17+
bulk:
18+
refresh: true
19+
body:
20+
- index:
21+
_index: eql_test
22+
_id: "1"
23+
- event:
24+
- category: process
25+
"@timestamp": 2023-07-11T11:09:05.529Z
26+
user: foo
27+
- index:
28+
_index: eql_test
29+
_id: "2"
30+
- event:
31+
- category: process
32+
"@timestamp": 2023-07-11T11:09:06.529Z
33+
user: bar
34+
35+
---
36+
37+
"Execute async EQL with missing events":
38+
- do:
39+
eql.search:
40+
index: eql_test
41+
wait_for_completion_timeout: "0ms"
42+
keep_on_completion: true
43+
body:
44+
query: 'sequence with maxspan=24h [ process where true ] ![ process where true ]'
45+
46+
- is_true: id
47+
- set: {id: id}
48+
- gte: {took: 0}
49+
50+
- do:
51+
eql.get:
52+
id: $id
53+
wait_for_completion_timeout: "10s"
54+
55+
- match: {is_running: false}
56+
- match: {is_partial: false}
57+
- match: {timed_out: false}
58+
- match: {hits.total.value: 1}
59+
- match: {hits.total.relation: "eq"}
60+
- match: {hits.sequences.0.events.0._source.user: "bar"}
61+
- match: {hits.sequences.0.events.1.missing: true}

x-pack/plugin/eql/src/main/antlr/EqlBase.g4

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ sequenceTerm
6666
;
6767

6868
subquery
69-
: LB eventFilter RB
69+
: (LB | MISSING_EVENT_OPEN) eventFilter RB
7070
;
7171

7272
eventQuery
@@ -212,6 +212,7 @@ LP: '(';
212212
RP: ')';
213213
PIPE: '|';
214214
OPTIONAL: '?';
215+
MISSING_EVENT_OPEN: '![';
215216

216217
fragment STRING_ESCAPE
217218
: '\\' [btnfr"'\\]

x-pack/plugin/eql/src/main/antlr/EqlBase.tokens

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,16 @@ LP=40
4141
RP=41
4242
PIPE=42
4343
OPTIONAL=43
44-
STRING=44
45-
INTEGER_VALUE=45
46-
DECIMAL_VALUE=46
47-
IDENTIFIER=47
48-
QUOTED_IDENTIFIER=48
49-
TILDE_IDENTIFIER=49
50-
LINE_COMMENT=50
51-
BRACKETED_COMMENT=51
52-
WS=52
44+
MISSING_EVENT_OPEN=44
45+
STRING=45
46+
INTEGER_VALUE=46
47+
DECIMAL_VALUE=47
48+
IDENTIFIER=48
49+
QUOTED_IDENTIFIER=49
50+
TILDE_IDENTIFIER=50
51+
LINE_COMMENT=51
52+
BRACKETED_COMMENT=52
53+
WS=53
5354
'and'=1
5455
'any'=2
5556
'by'=3
@@ -93,3 +94,4 @@ WS=52
9394
')'=41
9495
'|'=42
9596
'?'=43
97+
'!['=44

x-pack/plugin/eql/src/main/antlr/EqlBaseLexer.tokens

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,16 @@ LP=40
4141
RP=41
4242
PIPE=42
4343
OPTIONAL=43
44-
STRING=44
45-
INTEGER_VALUE=45
46-
DECIMAL_VALUE=46
47-
IDENTIFIER=47
48-
QUOTED_IDENTIFIER=48
49-
TILDE_IDENTIFIER=49
50-
LINE_COMMENT=50
51-
BRACKETED_COMMENT=51
52-
WS=52
44+
MISSING_EVENT_OPEN=44
45+
STRING=45
46+
INTEGER_VALUE=46
47+
DECIMAL_VALUE=47
48+
IDENTIFIER=48
49+
QUOTED_IDENTIFIER=49
50+
TILDE_IDENTIFIER=50
51+
LINE_COMMENT=51
52+
BRACKETED_COMMENT=52
53+
WS=53
5354
'and'=1
5455
'any'=2
5556
'by'=3
@@ -93,3 +94,4 @@ WS=52
9394
')'=41
9495
'|'=42
9596
'?'=43
97+
'!['=44

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchResponse.java

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.TransportVersion;
1111
import org.elasticsearch.action.ActionResponse;
1212
import org.elasticsearch.common.Strings;
13+
import org.elasticsearch.common.bytes.BytesArray;
1314
import org.elasticsearch.common.bytes.BytesReference;
1415
import org.elasticsearch.common.document.DocumentField;
1516
import org.elasticsearch.common.io.stream.StreamInput;
@@ -33,6 +34,7 @@
3334
import org.elasticsearch.xpack.ql.async.QlStatusResponse;
3435

3536
import java.io.IOException;
37+
import java.nio.charset.StandardCharsets;
3638
import java.util.Collections;
3739
import java.util.HashMap;
3840
import java.util.List;
@@ -202,23 +204,33 @@ public String toString() {
202204
// Event
203205
public static class Event implements Writeable, ToXContentObject {
204206

207+
public static Event MISSING_EVENT = new Event("", "", new BytesArray("{}".getBytes(StandardCharsets.UTF_8)), null, true);
208+
205209
private static final class Fields {
206210
static final String INDEX = GetResult._INDEX;
207211
static final String ID = GetResult._ID;
208212
static final String SOURCE = SourceFieldMapper.NAME;
209213
static final String FIELDS = "fields";
214+
static final String MISSING = "missing";
210215
}
211216

212217
private static final ParseField INDEX = new ParseField(Fields.INDEX);
213218
private static final ParseField ID = new ParseField(Fields.ID);
214219
private static final ParseField SOURCE = new ParseField(Fields.SOURCE);
215220
private static final ParseField FIELDS = new ParseField(Fields.FIELDS);
221+
private static final ParseField MISSING = new ParseField(Fields.MISSING);
216222

217223
@SuppressWarnings("unchecked")
218224
private static final ConstructingObjectParser<Event, Void> PARSER = new ConstructingObjectParser<>(
219225
"eql/search_response_event",
220226
true,
221-
args -> new Event((String) args[0], (String) args[1], (BytesReference) args[2], (Map<String, DocumentField>) args[3])
227+
args -> new Event(
228+
(String) args[0],
229+
(String) args[1],
230+
(BytesReference) args[2],
231+
(Map<String, DocumentField>) args[3],
232+
(Boolean) args[4]
233+
)
222234
);
223235

224236
static {
@@ -238,21 +250,29 @@ private static final class Fields {
238250
}
239251
return fields;
240252
}, FIELDS);
253+
PARSER.declareBoolean(optionalConstructorArg(), MISSING);
241254
}
242255

243256
private String index;
244257
private final String id;
245258
private final BytesReference source;
246259
private final Map<String, DocumentField> fetchFields;
247260

261+
private final boolean missing;
262+
248263
public Event(String index, String id, BytesReference source, Map<String, DocumentField> fetchFields) {
264+
this(index, id, source, fetchFields, false);
265+
}
266+
267+
public Event(String index, String id, BytesReference source, Map<String, DocumentField> fetchFields, Boolean missing) {
249268
this.index = index;
250269
this.id = id;
251270
this.source = source;
252271
this.fetchFields = fetchFields;
272+
this.missing = missing != null && missing;
253273
}
254274

255-
public Event(StreamInput in) throws IOException {
275+
private Event(StreamInput in) throws IOException {
256276
index = in.readString();
257277
id = in.readString();
258278
source = in.readBytesReference();
@@ -261,6 +281,12 @@ public Event(StreamInput in) throws IOException {
261281
} else {
262282
fetchFields = null;
263283
}
284+
missing = index.isEmpty();
285+
}
286+
287+
public static Event readFrom(StreamInput in) throws IOException {
288+
Event result = new Event(in);
289+
return result.missing() ? MISSING_EVENT : result;
264290
}
265291

266292
@Override
@@ -295,6 +321,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
295321
}
296322
builder.endObject();
297323
}
324+
if (missing) {
325+
// preserve original event structure (before introduction of missing events): avoid "missing: false" for normal events
326+
builder.field(Fields.MISSING, missing);
327+
}
298328
builder.endObject();
299329
return builder;
300330
}
@@ -323,9 +353,13 @@ public Map<String, DocumentField> fetchFields() {
323353
return fetchFields;
324354
}
325355

356+
public boolean missing() {
357+
return missing;
358+
}
359+
326360
@Override
327361
public int hashCode() {
328-
return Objects.hash(index, id, source, fetchFields);
362+
return Objects.hash(index, id, source, fetchFields, missing);
329363
}
330364

331365
@Override
@@ -342,7 +376,8 @@ public boolean equals(Object obj) {
342376
return Objects.equals(index, other.index)
343377
&& Objects.equals(id, other.id)
344378
&& Objects.equals(source, other.source)
345-
&& Objects.equals(fetchFields, other.fetchFields);
379+
&& Objects.equals(fetchFields, other.fetchFields)
380+
&& Objects.equals(missing, other.missing);
346381
}
347382

348383
@Override
@@ -395,7 +430,7 @@ public Sequence(List<Object> joinKeys, List<Event> events) {
395430
@SuppressWarnings("unchecked")
396431
public Sequence(StreamInput in) throws IOException {
397432
this.joinKeys = (List<Object>) in.readGenericValue();
398-
this.events = in.readList(Event::new);
433+
this.events = in.readList(Event::readFrom);
399434
}
400435

401436
public static Sequence fromXContent(XContentParser parser) {
@@ -417,13 +452,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
417452
if (events.isEmpty() == false) {
418453
builder.startArray(Fields.EVENTS);
419454
for (Event event : events) {
420-
if (event == null) {
421-
builder.startObject();
422-
builder.field("missing", true);
423-
builder.endObject();
424-
} else {
425-
event.toXContent(builder, params);
426-
}
455+
event.toXContent(builder, params);
427456
}
428457
builder.endArray();
429458
}
@@ -483,7 +512,7 @@ public Hits(StreamInput in) throws IOException {
483512
} else {
484513
totalHits = null;
485514
}
486-
events = in.readBoolean() ? in.readList(Event::new) : null;
515+
events = in.readBoolean() ? in.readList(Event::readFrom) : null;
487516
sequences = in.readBoolean() ? in.readList(Sequence::new) : null;
488517
}
489518

0 commit comments

Comments
 (0)