Skip to content

Commit 3b8f8ba

Browse files
authored
Improve error message for non append-only writes that target data stream (#60887)
Backport of #60874 to 7.9 branch. Closes #60581
1 parent 29d37c9 commit 3b8f8ba

File tree

5 files changed

+97
-13
lines changed

5 files changed

+97
-13
lines changed

server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@
8989
import java.util.stream.Collectors;
9090

9191
import static java.util.Collections.emptyMap;
92+
import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.EXCLUDED_DATA_STREAMS_KEY;
9293
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
9394
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
9495

@@ -577,7 +578,7 @@ private boolean addFailureIfIndexIsUnavailable(DocWriteRequest<?> request, int i
577578
if (concreteIndex == null) {
578579
try {
579580
concreteIndex = concreteIndices.resolveIfAbsent(request);
580-
} catch (IndexClosedException | IndexNotFoundException ex) {
581+
} catch (IndexClosedException | IndexNotFoundException | IllegalArgumentException ex) {
581582
addFailure(request, idx, ex);
582583
return true;
583584
}
@@ -623,8 +624,16 @@ Index resolveIfAbsent(DocWriteRequest<?> request) {
623624
Index concreteIndex = indices.get(request.index());
624625
if (concreteIndex == null) {
625626
boolean includeDataStreams = request.opType() == DocWriteRequest.OpType.CREATE;
626-
concreteIndex = indexNameExpressionResolver.concreteWriteIndex(state, request.indicesOptions(), request.indices()[0],
627-
false, includeDataStreams);
627+
try {
628+
concreteIndex = indexNameExpressionResolver.concreteWriteIndex(state, request.indicesOptions(),
629+
request.indices()[0], false, includeDataStreams);
630+
} catch (IndexNotFoundException e) {
631+
if (includeDataStreams == false && e.getMetadataKeys().contains(EXCLUDED_DATA_STREAMS_KEY)) {
632+
throw new IllegalArgumentException("only write ops with an op_type of create are allowed in data streams");
633+
} else {
634+
throw e;
635+
}
636+
}
628637
indices.put(request.index(), concreteIndex);
629638
}
630639
return concreteIndex;

server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.elasticsearch.common.io.stream.StreamInput;
3838
import org.elasticsearch.common.io.stream.Writeable;
3939
import org.elasticsearch.common.unit.TimeValue;
40+
import org.elasticsearch.index.IndexNotFoundException;
4041
import org.elasticsearch.node.NodeClosedException;
4142
import org.elasticsearch.tasks.Task;
4243
import org.elasticsearch.threadpool.ThreadPool;
@@ -50,6 +51,8 @@
5051

5152
import java.io.IOException;
5253

54+
import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.EXCLUDED_DATA_STREAMS_KEY;
55+
5356
public abstract class TransportInstanceSingleOperationAction<
5457
Request extends InstanceShardOperationRequest<Request>,
5558
Response extends ActionResponse
@@ -143,7 +146,15 @@ protected void doStart(ClusterState clusterState) {
143146
throw blockException;
144147
}
145148
}
146-
request.concreteIndex(indexNameExpressionResolver.concreteWriteIndex(clusterState, request).getName());
149+
try {
150+
request.concreteIndex(indexNameExpressionResolver.concreteWriteIndex(clusterState, request).getName());
151+
} catch (IndexNotFoundException e) {
152+
if (request.includeDataStreams() == false && e.getMetadataKeys().contains(EXCLUDED_DATA_STREAMS_KEY)) {
153+
throw new IllegalArgumentException("only write ops with an op_type of create are allowed in data streams");
154+
} else {
155+
throw e;
156+
}
157+
}
147158
resolveRequest(clusterState, request);
148159
blockException = checkRequestBlock(clusterState, request);
149160
if (blockException != null) {

server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@
6060

6161
public class IndexNameExpressionResolver {
6262

63+
public static final String EXCLUDED_DATA_STREAMS_KEY = "es.excluded_ds";
64+
6365
private final DateMathExpressionResolver dateMathExpressionResolver = new DateMathExpressionResolver();
6466
private final WildcardExpressionResolver wildcardExpressionResolver = new WildcardExpressionResolver();
6567
private final List<ExpressionResolver> expressionResolvers =
@@ -208,6 +210,7 @@ Index[] concreteIndices(Context context, String... indexExpressions) {
208210
}
209211
}
210212

213+
boolean excludedDataStreams = false;
211214
final Set<Index> concreteIndices = new HashSet<>(expressions.size());
212215
for (String expression : expressions) {
213216
IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(expression);
@@ -232,6 +235,7 @@ Index[] concreteIndices(Context context, String... indexExpressions) {
232235
}
233236
} else if (indexAbstraction.getType() == IndexAbstraction.Type.DATA_STREAM &&
234237
context.includeDataStreams() == false) {
238+
excludedDataStreams = true;
235239
continue;
236240
}
237241

@@ -273,6 +277,10 @@ Index[] concreteIndices(Context context, String... indexExpressions) {
273277
if (options.allowNoIndices() == false && concreteIndices.isEmpty()) {
274278
IndexNotFoundException infe = new IndexNotFoundException((String)null);
275279
infe.setResources("index_expression", indexExpressions);
280+
if (excludedDataStreams) {
281+
// Allows callers to handle IndexNotFoundException differently based on whether data streams were excluded.
282+
infe.addMetadata(EXCLUDED_DATA_STREAMS_KEY, "true");
283+
}
276284
throw infe;
277285
}
278286
return concreteIndices.toArray(new Index[concreteIndices.size()]);

x-pack/plugin/data-streams/qa/rest/src/test/resources/rest-api-spec/test/data-streams/20_unsupported_apis.yml

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,3 +196,50 @@
196196
indices.delete_data_stream:
197197
name: simple-data-stream1
198198
- is_true: acknowledged
199+
200+
---
201+
"Non append-only writes into a data stream":
202+
- skip:
203+
version: " - 7.8.99"
204+
reason: "data streams only supported in 7.9+"
205+
features: allowed_warnings
206+
207+
- do:
208+
allowed_warnings:
209+
- "index template [my-template1] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template1] will take precedence during new index creation"
210+
indices.put_index_template:
211+
name: my-template1
212+
body:
213+
index_patterns: [logs-*]
214+
data_stream: {}
215+
216+
- do:
217+
catch: bad_request
218+
index:
219+
index: logs-foobar
220+
id: "1"
221+
body:
222+
'@timestamp': '2020-12-12'
223+
224+
- do:
225+
bulk:
226+
body:
227+
- index:
228+
_index: logs-foobar
229+
_id: "1"
230+
- '@timestamp': '2020-12-12'
231+
- create:
232+
_index: logs-foobar
233+
_id: "1"
234+
- '@timestamp': '2020-12-12'
235+
- match: { errors: true }
236+
- match: { items.0.index.status: 400 }
237+
- match: { items.0.index.error.type: illegal_argument_exception }
238+
- match: { items.0.index.error.reason: "only write ops with an op_type of create are allowed in data streams" }
239+
- match: { items.1.create.result: created }
240+
- match: { items.1.create._index: .ds-logs-foobar-000001 }
241+
242+
- do:
243+
indices.delete_data_stream:
244+
name: logs-foobar
245+
- is_true: acknowledged

x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -230,36 +230,45 @@ public void testOtherWriteOps() throws Exception {
230230
);
231231
BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
232232
assertThat(bulkResponse.getItems(), arrayWithSize(1));
233-
assertThat(bulkResponse.getItems()[0].getFailure().getMessage(), containsString("no such index [null]"));
233+
assertThat(
234+
bulkResponse.getItems()[0].getFailure().getMessage(),
235+
containsString("only write ops with an op_type of create are allowed in data streams")
236+
);
234237
}
235238
{
236239
BulkRequest bulkRequest = new BulkRequest().add(new DeleteRequest(dataStreamName, "_id"));
237240
BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
238241
assertThat(bulkResponse.getItems(), arrayWithSize(1));
239-
assertThat(bulkResponse.getItems()[0].getFailure().getMessage(), containsString("no such index [null]"));
242+
assertThat(
243+
bulkResponse.getItems()[0].getFailure().getMessage(),
244+
containsString("only write ops with an op_type of create are allowed in data streams")
245+
);
240246
}
241247
{
242248
BulkRequest bulkRequest = new BulkRequest().add(
243249
new UpdateRequest(dataStreamName, "_id").doc("{\"@timestamp1\": \"2020-12-12\"}", XContentType.JSON)
244250
);
245251
BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
246252
assertThat(bulkResponse.getItems(), arrayWithSize(1));
247-
assertThat(bulkResponse.getItems()[0].getFailure().getMessage(), containsString("no such index [null]"));
253+
assertThat(
254+
bulkResponse.getItems()[0].getFailure().getMessage(),
255+
containsString("only write ops with an op_type of create are allowed in data streams")
256+
);
248257
}
249258
{
250259
IndexRequest indexRequest = new IndexRequest(dataStreamName).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON);
251-
Exception e = expectThrows(IndexNotFoundException.class, () -> client().index(indexRequest).actionGet());
252-
assertThat(e.getMessage(), equalTo("no such index [null]"));
260+
Exception e = expectThrows(IllegalArgumentException.class, () -> client().index(indexRequest).actionGet());
261+
assertThat(e.getMessage(), equalTo("only write ops with an op_type of create are allowed in data streams"));
253262
}
254263
{
255264
UpdateRequest updateRequest = new UpdateRequest(dataStreamName, "_id").doc("{}", XContentType.JSON);
256-
Exception e = expectThrows(IndexNotFoundException.class, () -> client().update(updateRequest).actionGet());
257-
assertThat(e.getMessage(), equalTo("no such index [null]"));
265+
Exception e = expectThrows(IllegalArgumentException.class, () -> client().update(updateRequest).actionGet());
266+
assertThat(e.getMessage(), equalTo("only write ops with an op_type of create are allowed in data streams"));
258267
}
259268
{
260269
DeleteRequest deleteRequest = new DeleteRequest(dataStreamName, "_id");
261-
Exception e = expectThrows(IndexNotFoundException.class, () -> client().delete(deleteRequest).actionGet());
262-
assertThat(e.getMessage(), equalTo("no such index [null]"));
270+
Exception e = expectThrows(IllegalArgumentException.class, () -> client().delete(deleteRequest).actionGet());
271+
assertThat(e.getMessage(), equalTo("only write ops with an op_type of create are allowed in data streams"));
263272
}
264273
{
265274
IndexRequest indexRequest = new IndexRequest(dataStreamName).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON)

0 commit comments

Comments
 (0)