Skip to content

Commit 38ee2da

Browse files
authored
Add configurable op_type for index watcher action (#64590) (#64647)
1 parent 15f2cbd commit 38ee2da

File tree

5 files changed

+103
-22
lines changed

5 files changed

+103
-22
lines changed

x-pack/docs/en/watcher/actions/index.asciidoc

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ The following snippet shows a simple `index` action definition:
2626
<1> The id of the action
2727
<2> An optional <<condition,condition>> to restrict action execution
2828
<3> An optional <<transform,transform>> to transform the payload and prepare the data that should be indexed
29-
<4> The elasticsearch index to store the data to
30-
<5> An optional `_id` for the document, if it should always be the same document.
29+
<4> The index, alias, or data stream to which the data will be written
30+
<5> An optional `_id` for the document
3131

3232

3333
[[index-action-attributes]]
@@ -37,11 +37,15 @@ The following snippet shows a simple `index` action definition:
3737
|======
3838
|Name |Required | Default | Description
3939

40-
| `index` | yes | - | The Elasticsearch index to index into.
40+
| `index` | yes | - | The index, alias, or data stream to index into.
4141

4242

4343
| `doc_id` | no | - | The optional `_id` of the document.
4444

45+
| `op_type` | no | `index` | The <<docs-index-api-op_type,op_type>> for the index operation.
46+
Must be one of either `index` or `create`. Must be `create` if
47+
`index` is a data stream.
48+
4549
| `execution_time_field` | no | - | The field that will store/index the watch execution
4650
time.
4751

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/actions/index/ExecutableIndexAction.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,9 @@ public Action.Result execute(String actionId, WatchExecutionContext ctx, Payload
8484
indexRequest.index(getField(actionId, ctx.id().watchId(), "index", data, INDEX_FIELD, action.index));
8585
indexRequest.type(getField(actionId, ctx.id().watchId(), "type",data, TYPE_FIELD, action.docType));
8686
indexRequest.id(getField(actionId, ctx.id().watchId(), "id",data, ID_FIELD, action.docId));
87+
if (action.opType != null) {
88+
indexRequest.opType(action.opType);
89+
}
8790

8891
data = addTimestampToDocument(data, ctx.executionTime());
8992
BytesReference bytesReference;
@@ -130,6 +133,9 @@ Action.Result indexBulk(Iterable list, String actionId, WatchExecutionContext ct
130133
indexRequest.index(getField(actionId, ctx.id().watchId(), "index", doc, INDEX_FIELD, action.index));
131134
indexRequest.type(getField(actionId, ctx.id().watchId(), "type",doc, TYPE_FIELD, action.docType));
132135
indexRequest.id(getField(actionId, ctx.id().watchId(), "id",doc, ID_FIELD, action.docId));
136+
if (action.opType != null) {
137+
indexRequest.opType(action.opType);
138+
}
133139

134140
doc = addTimestampToDocument(doc, ctx.executionTime());
135141
try (XContentBuilder builder = jsonBuilder()) {

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/actions/index/IndexAction.java

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@
66
package org.elasticsearch.xpack.watcher.actions.index;
77

88
import org.elasticsearch.ElasticsearchParseException;
9+
import org.elasticsearch.action.DocWriteRequest;
910
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
1011
import org.elasticsearch.common.Nullable;
1112
import org.elasticsearch.common.ParseField;
13+
import org.elasticsearch.common.collect.List;
1214
import org.elasticsearch.common.logging.DeprecationLogger;
1315
import org.elasticsearch.common.time.DateUtils;
1416
import org.elasticsearch.common.unit.TimeValue;
@@ -31,6 +33,7 @@ public class IndexAction implements Action {
3133
@Nullable @Deprecated final String docType;
3234
@Nullable final String index;
3335
@Nullable final String docId;
36+
@Nullable final DocWriteRequest.OpType opType;
3437
@Nullable final String executionTimeField;
3538
@Nullable final TimeValue timeout;
3639
@Nullable final ZoneId dynamicNameTimeZone;
@@ -42,18 +45,20 @@ public class IndexAction implements Action {
4245
public IndexAction(@Nullable String index, @Nullable String docId,
4346
@Nullable String executionTimeField,
4447
@Nullable TimeValue timeout, @Nullable ZoneId dynamicNameTimeZone, @Nullable RefreshPolicy refreshPolicy) {
45-
this(index, null, docId, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
48+
this(index, null, docId, null, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
4649
}
50+
4751
/**
4852
* Document types are deprecated, use constructor without docType
4953
*/
5054
@Deprecated
51-
public IndexAction(@Nullable String index, @Nullable String docType, @Nullable String docId,
52-
@Nullable String executionTimeField,
53-
@Nullable TimeValue timeout, @Nullable ZoneId dynamicNameTimeZone, @Nullable RefreshPolicy refreshPolicy) {
55+
public IndexAction(@Nullable String index, @Nullable String docType, @Nullable String docId, @Nullable DocWriteRequest.OpType opType,
56+
@Nullable String executionTimeField, @Nullable TimeValue timeout, @Nullable ZoneId dynamicNameTimeZone,
57+
@Nullable RefreshPolicy refreshPolicy) {
5458
this.index = index;
5559
this.docType = docType;
5660
this.docId = docId;
61+
this.opType = opType;
5762
this.executionTimeField = executionTimeField;
5863
this.timeout = timeout;
5964
this.dynamicNameTimeZone = dynamicNameTimeZone;
@@ -77,6 +82,10 @@ public String getDocId() {
7782
return docId;
7883
}
7984

85+
public DocWriteRequest.OpType getOpType() {
86+
return opType;
87+
}
88+
8089
public String getExecutionTimeField() {
8190
return executionTimeField;
8291
}
@@ -96,7 +105,10 @@ public boolean equals(Object o) {
96105

97106
IndexAction that = (IndexAction) o;
98107

99-
return Objects.equals(index, that.index) && Objects.equals(docType, that.docType) && Objects.equals(docId, that.docId)
108+
return Objects.equals(index, that.index)
109+
&& Objects.equals(docType, that.docType)
110+
&& Objects.equals(docId, that.docId)
111+
&& Objects.equals(opType, that.opType)
100112
&& Objects.equals(executionTimeField, that.executionTimeField)
101113
&& Objects.equals(timeout, that.timeout)
102114
&& Objects.equals(dynamicNameTimeZone, that.dynamicNameTimeZone)
@@ -105,7 +117,7 @@ public boolean equals(Object o) {
105117

106118
@Override
107119
public int hashCode() {
108-
return Objects.hash(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
120+
return Objects.hash(index, docType, docId, opType, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
109121
}
110122

111123
@Override
@@ -120,6 +132,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
120132
if (docId != null) {
121133
builder.field(Field.DOC_ID.getPreferredName(), docId);
122134
}
135+
if (opType != null) {
136+
builder.field(Field.OP_TYPE.getPreferredName(), opType);
137+
}
123138
if (executionTimeField != null) {
124139
builder.field(Field.EXECUTION_TIME_FIELD.getPreferredName(), executionTimeField);
125140
}
@@ -139,6 +154,7 @@ public static IndexAction parse(String watchId, String actionId, XContentParser
139154
String index = null;
140155
String docType = null;
141156
String docId = null;
157+
DocWriteRequest.OpType opType = null;
142158
String executionTimeField = null;
143159
TimeValue timeout = null;
144160
ZoneId dynamicNameTimeZone = null;
@@ -169,6 +185,17 @@ public static IndexAction parse(String watchId, String actionId, XContentParser
169185
docType = parser.text();
170186
} else if (Field.DOC_ID.match(currentFieldName, parser.getDeprecationHandler())) {
171187
docId = parser.text();
188+
} else if (Field.OP_TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
189+
try {
190+
opType = DocWriteRequest.OpType.fromString(parser.text());
191+
if (List.of(DocWriteRequest.OpType.CREATE, DocWriteRequest.OpType.INDEX).contains(opType) == false) {
192+
throw new ElasticsearchParseException("could not parse [{}] action [{}/{}]. op_type value for field [{}] " +
193+
"must be [index] or [create]", TYPE, watchId, actionId, currentFieldName);
194+
}
195+
} catch (IllegalArgumentException e) {
196+
throw new ElasticsearchParseException("could not parse [{}] action [{}/{}]. failed to parse op_type value for " +
197+
"field [{}]", TYPE, watchId, actionId, currentFieldName);
198+
}
172199
} else if (Field.EXECUTION_TIME_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
173200
executionTimeField = parser.text();
174201
} else if (Field.TIMEOUT_HUMAN.match(currentFieldName, parser.getDeprecationHandler())) {
@@ -193,7 +220,7 @@ public static IndexAction parse(String watchId, String actionId, XContentParser
193220
}
194221
}
195222

196-
return new IndexAction(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
223+
return new IndexAction(index, docType, docId, opType, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
197224
}
198225

199226
/**
@@ -289,6 +316,7 @@ public static class Builder implements Action.Builder<IndexAction> {
289316
final String index;
290317
final String docType;
291318
String docId;
319+
DocWriteRequest.OpType opType;
292320
String executionTimeField;
293321
TimeValue timeout;
294322
ZoneId dynamicNameTimeZone;
@@ -313,6 +341,11 @@ public Builder setDocId(String docId) {
313341
return this;
314342
}
315343

344+
public Builder setOpType(DocWriteRequest.OpType opType) {
345+
this.opType = opType;
346+
return this;
347+
}
348+
316349
public Builder setExecutionTimeField(String executionTimeField) {
317350
this.executionTimeField = executionTimeField;
318351
return this;
@@ -335,14 +368,15 @@ public Builder setRefreshPolicy(RefreshPolicy refreshPolicy) {
335368

336369
@Override
337370
public IndexAction build() {
338-
return new IndexAction(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
371+
return new IndexAction(index, docType, docId, opType, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
339372
}
340373
}
341374

342375
interface Field {
343376
ParseField INDEX = new ParseField("index");
344377
ParseField DOC_TYPE = new ParseField("doc_type");
345378
ParseField DOC_ID = new ParseField("doc_id");
379+
ParseField OP_TYPE = new ParseField("op_type");
346380
ParseField EXECUTION_TIME_FIELD = new ParseField("execution_time_field");
347381
ParseField SOURCE = new ParseField("source");
348382
ParseField RESPONSE = new ParseField("response");

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/index/IndexActionTests.java

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
5050
import static org.elasticsearch.common.util.set.Sets.newHashSet;
5151
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
52+
import static org.hamcrest.Matchers.containsString;
5253
import static org.hamcrest.Matchers.equalTo;
5354
import static org.hamcrest.Matchers.hasEntry;
5455
import static org.hamcrest.Matchers.hasSize;
@@ -87,6 +88,10 @@ public void testParser() throws Exception {
8788
if (writeTimeout != null) {
8889
builder.field(IndexAction.Field.TIMEOUT.getPreferredName(), writeTimeout.millis());
8990
}
91+
DocWriteRequest.OpType opType = randomBoolean() ? DocWriteRequest.OpType.fromId(randomFrom(new Byte[] { 0, 1 })) : null;
92+
if (opType != null) {
93+
builder.field(IndexAction.Field.OP_TYPE.getPreferredName(), opType.getLowercase());
94+
}
9095
builder.endObject();
9196
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, client);
9297
XContentParser parser = createParser(builder);
@@ -100,6 +105,9 @@ public void testParser() throws Exception {
100105
if (timestampField != null) {
101106
assertThat(executable.action().executionTimeField, equalTo(timestampField));
102107
}
108+
if (opType != null) {
109+
assertThat(executable.action().opType, equalTo(opType));
110+
}
103111
assertThat(executable.action().timeout, equalTo(writeTimeout));
104112
}
105113

@@ -146,20 +154,47 @@ public void testParserFailure() throws Exception {
146154
.endObject());
147155
}
148156

157+
public void testOpTypeThatCannotBeParsed() throws Exception {
158+
expectParseFailure(jsonBuilder()
159+
.startObject()
160+
.field(IndexAction.Field.OP_TYPE.getPreferredName(), randomAlphaOfLength(10))
161+
.endObject(),
162+
"failed to parse op_type value for field [op_type]");
163+
}
164+
165+
public void testUnsupportedOpType() throws Exception {
166+
expectParseFailure(jsonBuilder()
167+
.startObject()
168+
.field(IndexAction.Field.OP_TYPE.getPreferredName(),
169+
randomFrom(DocWriteRequest.OpType.UPDATE.name(), DocWriteRequest.OpType.DELETE.name()))
170+
.endObject(),
171+
"op_type value for field [op_type] must be [index] or [create]");
172+
}
173+
174+
private void expectParseFailure(XContentBuilder builder, String expectedMessage) throws Exception {
175+
expectFailure(ElasticsearchParseException.class, builder, expectedMessage);
176+
}
177+
149178
private void expectParseFailure(XContentBuilder builder) throws Exception {
150179
expectFailure(ElasticsearchParseException.class, builder);
151180
}
152181

153182
private void expectFailure(Class clazz, XContentBuilder builder) throws Exception {
183+
expectFailure(clazz, builder, null);
184+
}
185+
186+
private void expectFailure(Class clazz, XContentBuilder builder, String expectedMessage) throws Exception {
154187
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, client);
155188
XContentParser parser = createParser(builder);
156189
parser.nextToken();
157-
expectThrows(clazz, () ->
158-
actionParser.parseExecutable(randomAlphaOfLength(4), randomAlphaOfLength(5), parser));
190+
Throwable t = expectThrows(clazz, () -> actionParser.parseExecutable(randomAlphaOfLength(4), randomAlphaOfLength(5), parser));
191+
if (expectedMessage != null) {
192+
assertThat(t.getMessage(), containsString(expectedMessage));
193+
}
159194
}
160195

161196
public void testUsingParameterIdWithBulkOrIdFieldThrowsIllegalState() {
162-
final IndexAction action = new IndexAction("test-index", "test-type", "123", null, null, null, refreshPolicy);
197+
final IndexAction action = new IndexAction("test-index", "test-type", "123", null, null, null, null, refreshPolicy);
163198
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
164199
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
165200
final Map<String, Object> docWithId = MapBuilder.<String, Object>newMapBuilder().put("foo", "bar").put("_id", "0").immutableMap();
@@ -209,7 +244,7 @@ public void testThatIndexTypeIdDynamically() throws Exception {
209244
final IndexAction action = new IndexAction(configureIndexDynamically ? null : "my_index",
210245
configureTypeDynamically ? null : "my_type",
211246
configureIdDynamically ? null : "my_id",
212-
null, null, null, refreshPolicy);
247+
null, null, null, null, refreshPolicy);
213248
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
214249
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
215250

@@ -230,7 +265,7 @@ public void testThatIndexTypeIdDynamically() throws Exception {
230265
}
231266

232267
public void testThatIndexActionCanBeConfiguredWithDynamicIndexNameAndBulk() throws Exception {
233-
final IndexAction action = new IndexAction(null, "my-type", null, null, null, null, refreshPolicy);
268+
final IndexAction action = new IndexAction(null, "my-type", null, null, null, null, null, refreshPolicy);
234269
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
235270
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
236271

@@ -263,7 +298,7 @@ public void testConfigureIndexInMapAndAction() {
263298
String fieldName = randomFrom("_index", "_type");
264299
final IndexAction action = new IndexAction(fieldName.equals("_index") ? "my_index" : null,
265300
fieldName.equals("_type") ? "my_type" : null,
266-
null,null, null, null, refreshPolicy);
301+
null, null, null, null, null, refreshPolicy);
267302
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
268303
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
269304

@@ -283,7 +318,7 @@ public void testIndexActionExecuteSingleDoc() throws Exception {
283318
String docId = randomAlphaOfLength(5);
284319
String timestampField = randomFrom("@timestamp", null);
285320

286-
IndexAction action = new IndexAction("test-index", "test-type", docIdAsParam ? docId : null, timestampField, null, null,
321+
IndexAction action = new IndexAction("test-index", "test-type", docIdAsParam ? docId : null, null, timestampField, null, null,
287322
refreshPolicy);
288323
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30),
289324
TimeValue.timeValueSeconds(30));
@@ -334,7 +369,7 @@ public void testIndexActionExecuteSingleDoc() throws Exception {
334369
}
335370

336371
public void testFailureResult() throws Exception {
337-
IndexAction action = new IndexAction("test-index", "test-type", null, "@timestamp", null, null, refreshPolicy);
372+
IndexAction action = new IndexAction("test-index", "test-type", null, null, "@timestamp", null, null, refreshPolicy);
338373
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
339374
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
340375

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.apache.logging.log4j.LogManager;
99
import org.apache.logging.log4j.Logger;
1010
import org.elasticsearch.ElasticsearchParseException;
11+
import org.elasticsearch.action.DocWriteRequest;
1112
import org.elasticsearch.action.search.SearchRequest;
1213
import org.elasticsearch.action.support.WriteRequest;
1314
import org.elasticsearch.client.Client;
@@ -585,15 +586,16 @@ private List<ActionWrapper> randomActions() {
585586
randomFrom(DataAttachment.JSON, DataAttachment.YAML), EmailAttachments.EMPTY_ATTACHMENTS);
586587
list.add(new ActionWrapper("_email_" + randomAlphaOfLength(8), randomThrottler(),
587588
AlwaysConditionTests.randomCondition(scriptService), randomTransform(),
588-
new ExecutableEmailAction(action, logger, emailService, templateEngine, htmlSanitizer,
589+
new ExecutableEmailAction(action, logger, emailService, templateEngine, htmlSanitizer,
589590
Collections.emptyMap()), null, null));
590591
}
591592
if (randomBoolean()) {
592593
ZoneOffset timeZone = randomBoolean() ? ZoneOffset.UTC : null;
593594
TimeValue timeout = randomBoolean() ? timeValueSeconds(between(1, 10000)) : null;
594595
WriteRequest.RefreshPolicy refreshPolicy = randomBoolean() ? null : randomFrom(WriteRequest.RefreshPolicy.values());
595-
IndexAction action = new IndexAction("_index", null, randomBoolean() ? "123" : null, null, timeout, timeZone,
596-
refreshPolicy);
596+
IndexAction action = new IndexAction("_index", null, randomBoolean() ? "123" : null,
597+
randomBoolean() ? DocWriteRequest.OpType.fromId(randomFrom(new Byte[] { 0, 1 })) : null, null, timeout, timeZone,
598+
refreshPolicy);
597599
list.add(new ActionWrapper("_index_" + randomAlphaOfLength(8), randomThrottler(),
598600
AlwaysConditionTests.randomCondition(scriptService), randomTransform(),
599601
new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30),

0 commit comments

Comments
 (0)