Skip to content

Commit 190a420

Browse files
authored
Merge branch 'main' into esql-skip-null-metrics
2 parents 9eb5544 + 7d678a3 commit 190a420

File tree

20 files changed

+1006
-109
lines changed

20 files changed

+1006
-109
lines changed

docs/changelog/133245.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 133245
2+
summary: Add query heads priority to `SliceQueue`
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

docs/changelog/133347.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 133347
2+
summary: Force rollover on write to true when data stream indices list is empty
3+
area: Data streams
4+
type: bug
5+
issues:
6+
- 133176

docs/reference/query-languages/query-dsl/full-text-filter-tutorial.md

Lines changed: 574 additions & 0 deletions
Large diffs are not rendered by default.

docs/reference/query-languages/querydsl.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ mapped_pages:
66
# QueryDSL [query-dsl]
77

88
:::{note}
9-
This section provides detailed **reference information**.
10-
119
Refer to the [Query DSL overview](docs-content://explore-analyze/query-filter/languages/querydsl.md) in the **Explore and analyze** section for overview and conceptual information about Query DSL.
1210
:::
1311

docs/reference/query-languages/toc.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ toc:
22
- file: index.md
33
- file: querydsl.md
44
children:
5+
- file: query-dsl/full-text-filter-tutorial.md
56
- file: query-dsl/query-filter-context.md
67
- file: query-dsl/compound-queries.md
78
children:
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.upgrades;
11+
12+
import com.carrotsearch.randomizedtesting.annotations.Name;
13+
14+
import org.elasticsearch.client.Request;
15+
import org.elasticsearch.client.Response;
16+
import org.elasticsearch.client.ResponseException;
17+
import org.elasticsearch.cluster.metadata.DataStream;
18+
import org.elasticsearch.common.time.DateFormatter;
19+
import org.elasticsearch.common.time.FormatNames;
20+
import org.elasticsearch.test.rest.ObjectPath;
21+
22+
import java.io.IOException;
23+
import java.time.Instant;
24+
import java.util.Map;
25+
26+
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo;
27+
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.dataStreamIndexEqualTo;
28+
import static org.hamcrest.Matchers.anyOf;
29+
import static org.hamcrest.Matchers.empty;
30+
import static org.hamcrest.Matchers.equalTo;
31+
import static org.hamcrest.Matchers.hasSize;
32+
import static org.hamcrest.Matchers.is;
33+
import static org.hamcrest.Matchers.not;
34+
import static org.hamcrest.Matchers.notNullValue;
35+
36+
public class FailureStoreUpgradeIT extends AbstractRollingUpgradeWithSecurityTestCase {
37+
38+
public FailureStoreUpgradeIT(@Name("upgradedNodes") int upgradedNodes) {
39+
super(upgradedNodes);
40+
}
41+
42+
final String INDEX_TEMPLATE = """
43+
{
44+
"index_patterns": ["$PATTERN"],
45+
"data_stream": {},
46+
"template": {
47+
"mappings":{
48+
"properties": {
49+
"@timestamp" : {
50+
"type": "date"
51+
},
52+
"numeral": {
53+
"type": "long"
54+
}
55+
}
56+
}
57+
}
58+
}""";
59+
60+
private static final String VALID_DOC = """
61+
{"@timestamp": "$now", "numeral": 0}
62+
""";
63+
64+
private static final String INVALID_DOC = """
65+
{"@timestamp": "$now", "numeral": "foobar"}
66+
""";
67+
68+
private static final String BULK = """
69+
{"create": {}}
70+
{"@timestamp": "$now", "numeral": 0}
71+
{"create": {}}
72+
{"@timestamp": "$now", "numeral": 1}
73+
{"create": {}}
74+
{"@timestamp": "$now", "numeral": 2}
75+
{"create": {}}
76+
{"@timestamp": "$now", "numeral": 3}
77+
{"create": {}}
78+
{"@timestamp": "$now", "numeral": 4}
79+
""";
80+
81+
private static final String ENABLE_FAILURE_STORE_OPTIONS = """
82+
{
83+
"failure_store": {
84+
"enabled": true
85+
}
86+
}
87+
""";
88+
89+
public void testFailureStoreOnPreviouslyExistingDataStream() throws Exception {
90+
assumeFalse(
91+
"testing migration from data streams created before failure store feature existed",
92+
oldClusterHasFeature(DataStream.DATA_STREAM_FAILURE_STORE_FEATURE)
93+
);
94+
String dataStreamName = "fs-ds-upgrade-test";
95+
String failureStoreName = dataStreamName + "::failures";
96+
String templateName = "fs-ds-template";
97+
if (isOldCluster()) {
98+
// Create template
99+
var putIndexTemplateRequest = new Request("POST", "/_index_template/" + templateName);
100+
putIndexTemplateRequest.setJsonEntity(INDEX_TEMPLATE.replace("$PATTERN", dataStreamName));
101+
assertOK(client().performRequest(putIndexTemplateRequest));
102+
103+
// Initialize data stream
104+
executeBulk(dataStreamName);
105+
106+
// Ensure document failure
107+
indexDoc(dataStreamName, INVALID_DOC, false);
108+
109+
// Check data stream state
110+
var dataStreams = getDataStream(dataStreamName);
111+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1));
112+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo(dataStreamName));
113+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.generation"), equalTo(1));
114+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.template"), equalTo(templateName));
115+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.indices"), hasSize(1));
116+
String firstBackingIndex = ObjectPath.evaluate(dataStreams, "data_streams.0.indices.0.index_name");
117+
assertThat(firstBackingIndex, backingIndexEqualTo(dataStreamName, 1));
118+
119+
assertDocCount(client(), dataStreamName, 5);
120+
} else if (isMixedCluster()) {
121+
ensureHealth(dataStreamName, request -> request.addParameter("wait_for_status", "yellow"));
122+
if (isFirstMixedCluster()) {
123+
indexDoc(dataStreamName, VALID_DOC, true);
124+
indexDoc(dataStreamName, INVALID_DOC, false);
125+
}
126+
assertDocCount(client(), dataStreamName, 6);
127+
} else if (isUpgradedCluster()) {
128+
ensureGreen(dataStreamName);
129+
130+
// Ensure correct default failure store state for upgraded data stream
131+
var dataStreams = getDataStream(dataStreamName);
132+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1));
133+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo(dataStreamName));
134+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store"), notNullValue());
135+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.enabled"), equalTo(false));
136+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.indices"), is(empty()));
137+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.rollover_on_write"), equalTo(true));
138+
139+
// Ensure invalid document is not indexed
140+
indexDoc(dataStreamName, INVALID_DOC, false);
141+
142+
// Enable failure store on upgraded data stream
143+
var putOptionsRequest = new Request("PUT", "/_data_stream/" + dataStreamName + "/_options");
144+
putOptionsRequest.setJsonEntity(ENABLE_FAILURE_STORE_OPTIONS);
145+
assertOK(client().performRequest(putOptionsRequest));
146+
147+
// Ensure correct enabled failure store state
148+
dataStreams = getDataStream(dataStreamName);
149+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1));
150+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo(dataStreamName));
151+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store"), notNullValue());
152+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.enabled"), equalTo(true));
153+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.indices"), is(empty()));
154+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.rollover_on_write"), equalTo(true));
155+
156+
// Initialize failure store
157+
int expectedFailureDocuments = 0;
158+
if (randomBoolean()) {
159+
// Index a failure via a mapping exception
160+
indexDoc(dataStreamName, INVALID_DOC, true);
161+
expectedFailureDocuments = 1;
162+
} else {
163+
// Manually rollover failure store to force initialization
164+
var failureStoreRolloverRequest = new Request("POST", "/" + failureStoreName + "/_rollover");
165+
assertOK(client().performRequest(failureStoreRolloverRequest));
166+
}
167+
168+
// Ensure correct initialized failure store state
169+
dataStreams = getDataStream(dataStreamName);
170+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1));
171+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo(dataStreamName));
172+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store"), notNullValue());
173+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.enabled"), equalTo(true));
174+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.indices"), is(not(empty())));
175+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.rollover_on_write"), equalTo(false));
176+
177+
String failureIndexName = ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.indices.0.index_name");
178+
assertThat(failureIndexName, dataStreamIndexEqualTo(dataStreamName, 2, true));
179+
180+
assertDocCount(client(), dataStreamName, 6);
181+
assertDocCount(client(), failureStoreName, expectedFailureDocuments);
182+
}
183+
}
184+
185+
private static void indexDoc(String dataStreamName, String docBody, boolean expectSuccess) throws IOException {
186+
var indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
187+
indexRequest.addParameter("refresh", "true");
188+
indexRequest.setJsonEntity(docBody.replace("$now", formatInstant(Instant.now())));
189+
Response response = null;
190+
try {
191+
response = client().performRequest(indexRequest);
192+
} catch (ResponseException re) {
193+
response = re.getResponse();
194+
}
195+
assertNotNull(response);
196+
if (expectSuccess) {
197+
assertOK(response);
198+
} else {
199+
assertThat(response.getStatusLine().getStatusCode(), not(anyOf(equalTo(200), equalTo(201))));
200+
}
201+
}
202+
203+
private static void executeBulk(String dataStreamName) throws IOException {
204+
var bulkRequest = new Request("POST", "/" + dataStreamName + "/_bulk");
205+
bulkRequest.setJsonEntity(BULK.replace("$now", formatInstant(Instant.now())));
206+
bulkRequest.addParameter("refresh", "true");
207+
Response response = null;
208+
try {
209+
response = client().performRequest(bulkRequest);
210+
} catch (ResponseException re) {
211+
response = re.getResponse();
212+
}
213+
assertNotNull(response);
214+
var responseBody = entityAsMap(response);
215+
assertOK(response);
216+
assertThat("errors in response:\n " + responseBody, responseBody.get("errors"), equalTo(false));
217+
}
218+
219+
static String formatInstant(Instant instant) {
220+
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
221+
}
222+
223+
private static Map<String, Object> getDataStream(String dataStreamName) throws IOException {
224+
var getDataStreamsRequest = new Request("GET", "/_data_stream/" + dataStreamName);
225+
var response = client().performRequest(getDataStreamsRequest);
226+
assertOK(response);
227+
return entityAsMap(response);
228+
}
229+
}

server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,9 @@ public static <T> SubscribableListener<T> newFailed(Exception exception) {
129129
/**
130130
* Create a {@link SubscribableListener}, fork a computation to complete it, and return the listener. If the forking itself throws an
131131
* exception then the exception is caught and fed to the returned listener.
132+
* <p>
133+
* The listener passed to {@code fork} is the returned {@link SubscribableListener}. In particular, it is valid to complete this
134+
* listener more than once, but all results after the first completion will be silently ignored.
132135
*/
133136
public static <T> SubscribableListener<T> newForked(CheckedConsumer<ActionListener<T>, ? extends Exception> fork) {
134137
final var listener = new SubscribableListener<T>();
@@ -448,6 +451,9 @@ public void complete(ActionListener<?> listener) {
448451
* <li>Ensure that this {@link SubscribableListener} is always completed using that executor, and</li>
449452
* <li>Invoke {@link #andThen} using that executor.</li>
450453
* </ul>
454+
* <p>
455+
* The listener passed to {@code nextStep} is the returned {@link SubscribableListener}. In particular, it is valid to complete this
456+
* listener more than once, but all results after the first completion will be silently ignored.
451457
*/
452458
public <U> SubscribableListener<U> andThen(CheckedConsumer<ActionListener<U>, ? extends Exception> nextStep) {
453459
return newForked(l -> addListener(l.delegateFailureIgnoreResponseAndWrap(nextStep)));
@@ -475,6 +481,9 @@ public <U> SubscribableListener<U> andThen(CheckedConsumer<ActionListener<U>, ?
475481
* <li>Ensure that this {@link SubscribableListener} is always completed using that executor, and</li>
476482
* <li>Invoke {@link #andThen} using that executor.</li>
477483
* </ul>
484+
* <p>
485+
* The listener passed to {@code nextStep} is the returned {@link SubscribableListener}. In particular, it is valid to complete this
486+
* listener more than once, but all results after the first completion will be silently ignored.
478487
*/
479488
public <U> SubscribableListener<U> andThen(CheckedBiConsumer<ActionListener<U>, T, ? extends Exception> nextStep) {
480489
return andThen(EsExecutors.DIRECT_EXECUTOR_SERVICE, null, nextStep);
@@ -513,6 +522,9 @@ public <U> SubscribableListener<U> andThen(CheckedBiConsumer<ActionListener<U>,
513522
* with a rejection exception on the thread which completes this listener. Likewise if this listener is completed exceptionally but
514523
* {@code executor} rejects the execution of the completion of the returned listener then the returned listener is completed with a
515524
* rejection exception on the thread which completes this listener.
525+
* <p>
526+
* The listener passed to {@code nextStep} is the returned {@link SubscribableListener}. In particular, it is valid to complete this
527+
* listener more than once, but all results after the first completion will be silently ignored.
516528
*/
517529
public <U> SubscribableListener<U> andThen(
518530
Executor executor,

server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,12 @@ public DataStream(
214214
lifecycle,
215215
dataStreamOptions,
216216
new DataStreamIndices(BACKING_INDEX_PREFIX, List.copyOf(indices), rolloverOnWrite, autoShardingEvent),
217-
new DataStreamIndices(FAILURE_STORE_PREFIX, List.copyOf(failureIndices), false, null)
217+
new DataStreamIndices(
218+
FAILURE_STORE_PREFIX,
219+
List.copyOf(failureIndices),
220+
(replicated == false && failureIndices.isEmpty()),
221+
null
222+
)
218223
);
219224
}
220225

@@ -283,8 +288,15 @@ public static DataStream read(StreamInput in) throws IOException {
283288
backingIndicesBuilder.setAutoShardingEvent(in.readOptionalWriteable(DataStreamAutoShardingEvent::new));
284289
}
285290
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)) {
286-
failureIndicesBuilder.setRolloverOnWrite(in.readBoolean())
291+
// Read the rollover on write flag from the stream, but force it on if the failure indices are empty and we're not replicating
292+
boolean failureStoreRolloverOnWrite = in.readBoolean() || (replicated == false && failureIndices.isEmpty());
293+
failureIndicesBuilder.setRolloverOnWrite(failureStoreRolloverOnWrite)
287294
.setAutoShardingEvent(in.readOptionalWriteable(DataStreamAutoShardingEvent::new));
295+
} else {
296+
// If we are reading from an older version that does not have these fields, just default
297+
// to a reasonable value for rollover on write for the failure store
298+
boolean failureStoreRolloverOnWrite = replicated == false && failureIndices.isEmpty();
299+
failureIndicesBuilder.setRolloverOnWrite(failureStoreRolloverOnWrite);
288300
}
289301
DataStreamOptions dataStreamOptions;
290302
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
@@ -1490,7 +1502,11 @@ public void writeTo(StreamOutput out) throws IOException {
14901502
new DataStreamIndices(
14911503
FAILURE_STORE_PREFIX,
14921504
args[13] != null ? (List<Index>) args[13] : List.of(),
1493-
args[14] != null && (boolean) args[14],
1505+
// If replicated (args[5]) is null or exists and is false, and the failure index list (args[13]) is null or
1506+
// exists and is empty, then force the rollover on write field to true. If none of those conditions are met,
1507+
// then use the rollover on write value (args[14]) present in the parser.
1508+
((args[5] == null || ((boolean) args[5] == false)) && (args[13] == null || ((List<Index>) args[13]).isEmpty()))
1509+
|| (args[14] != null && (boolean) args[14]),
14941510
(DataStreamAutoShardingEvent) args[15]
14951511
)
14961512
)

server/src/test/java/org/elasticsearch/cluster/metadata/IndexAbstractionTests.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,13 +201,18 @@ private IndexMetadata newIndexMetadata(String indexName, AliasMetadata aliasMeta
201201

202202
private static DataStream newDataStreamInstance(List<Index> backingIndices, List<Index> failureStoreIndices) {
203203
boolean isSystem = randomBoolean();
204+
boolean isReplicated = randomBoolean();
204205
return DataStream.builder(randomAlphaOfLength(50), backingIndices)
205-
.setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(failureStoreIndices).build())
206+
.setFailureIndices(
207+
DataStream.DataStreamIndices.failureIndicesBuilder(failureStoreIndices)
208+
.setRolloverOnWrite(isReplicated == false && failureStoreIndices.isEmpty())
209+
.build()
210+
)
206211
.setGeneration(randomLongBetween(1, 1000))
207212
.setMetadata(Map.of())
208213
.setSystem(isSystem)
209214
.setHidden(isSystem || randomBoolean())
210-
.setReplicated(randomBoolean())
215+
.setReplicated(isReplicated)
211216
.build();
212217
}
213218
}

server/src/test/java/org/elasticsearch/cluster/metadata/ProjectMetadataTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2473,7 +2473,7 @@ public void testToXContent() throws IOException {
24732473
"system": false,
24742474
"allow_custom_routing": false,
24752475
"settings" : { },
2476-
"failure_rollover_on_write": false,
2476+
"failure_rollover_on_write": true,
24772477
"rollover_on_write": false
24782478
}
24792479
},
@@ -2740,7 +2740,7 @@ public void testToXContentMultiProject() throws IOException {
27402740
"system": false,
27412741
"allow_custom_routing": false,
27422742
"settings" : { },
2743-
"failure_rollover_on_write": false,
2743+
"failure_rollover_on_write": true,
27442744
"rollover_on_write": false
27452745
}
27462746
},

0 commit comments

Comments
 (0)