Skip to content

Commit 1c34507

Browse files
committed
Create async search index if necessary on updates and deletes (#64606)
This change ensures that we create the async search index with the right mappings and settings when updating or deleting a document. Users can delete the async search index at any time so we have to re-create it internally if necessary before applying any new operation.
1 parent bd159d8 commit 1c34507

File tree

2 files changed

+65
-13
lines changed

2 files changed

+65
-13
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,16 @@ public final class AsyncTaskIndexService<R extends AsyncResponse<R>> {
6565
public static final String EXPIRATION_TIME_FIELD = "expiration_time";
6666
public static final String RESULT_FIELD = "result";
6767

68-
private static Settings settings() {
68+
static Settings settings() {
6969
return Settings.builder()
70+
.put("index.codec", "best_compression")
7071
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
72+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
7173
.put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-1")
7274
.build();
7375
}
7476

75-
private static XContentBuilder mappings() throws IOException {
77+
static XContentBuilder mappings() throws IOException {
7678
XContentBuilder builder = jsonBuilder()
7779
.startObject()
7880
.startObject(SINGLE_MAPPING_NAME)
@@ -195,7 +197,9 @@ public void updateResponse(String docId,
195197
.id(docId)
196198
.doc(source, XContentType.JSON)
197199
.retryOnConflict(5);
198-
client.update(request, listener);
200+
// updates create the index automatically if it doesn't exist so we force the creation
201+
// preemptively.
202+
createIndexIfNecessary(ActionListener.wrap(v -> client.update(request, listener), listener::onFailure));
199203
} catch(Exception e) {
200204
listener.onFailure(e);
201205
}
@@ -213,7 +217,9 @@ public void updateExpirationTime(String docId,
213217
.id(docId)
214218
.doc(source, XContentType.JSON)
215219
.retryOnConflict(5);
216-
client.update(request, listener);
220+
// updates create the index automatically if it doesn't exist so we force the creation
221+
// preemptively.
222+
createIndexIfNecessary(ActionListener.wrap(v -> client.update(request, listener), listener::onFailure));
217223
}
218224

219225
/**
@@ -223,7 +229,9 @@ public void deleteResponse(AsyncExecutionId asyncExecutionId,
223229
ActionListener<DeleteResponse> listener) {
224230
try {
225231
DeleteRequest request = new DeleteRequest(index).id(asyncExecutionId.getDocId());
226-
client.delete(request, listener);
232+
// deletes create the index automatically if it doesn't exist so we force the creation
233+
// preemptively.
234+
createIndexIfNecessary(ActionListener.wrap(v -> client.delete(request, listener), listener::onFailure));
227235
} catch(Exception e) {
228236
listener.onFailure(e);
229237
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskServiceTests.java

Lines changed: 52 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,15 @@
77

88
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
99
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
10+
import org.elasticsearch.action.delete.DeleteResponse;
11+
import org.elasticsearch.action.index.IndexResponse;
1012
import org.elasticsearch.action.support.PlainActionFuture;
11-
import org.elasticsearch.cluster.metadata.IndexMetadata;
13+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
14+
import org.elasticsearch.action.update.UpdateResponse;
1215
import org.elasticsearch.cluster.service.ClusterService;
1316
import org.elasticsearch.common.settings.Settings;
1417
import org.elasticsearch.common.util.concurrent.ThreadContext;
18+
import org.elasticsearch.tasks.TaskId;
1519
import org.elasticsearch.test.ESSingleNodeTestCase;
1620
import org.elasticsearch.transport.TransportService;
1721
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
@@ -21,7 +25,6 @@
2125

2226
import java.io.IOException;
2327
import java.util.Collections;
24-
import java.util.concurrent.ExecutionException;
2528

2629
// TODO: test CRUD operations
2730
public class AsyncTaskServiceTests extends ESSingleNodeTestCase {
@@ -95,14 +98,55 @@ public void testEnsuredAuthenticatedUserIsSame() throws IOException {
9598
assertFalse(indexService.ensureAuthenticatedUserIsSame(threadContext.getHeaders(), runAsDiffType));
9699
}
97100

98-
public void testSettings() throws ExecutionException, InterruptedException {
99-
PlainActionFuture<Void> future = PlainActionFuture.newFuture();
100-
indexService.createIndexIfNecessary(future);
101-
future.get();
101+
public void testAutoCreateIndex() throws Exception {
102+
{
103+
PlainActionFuture<Void> future = PlainActionFuture.newFuture();
104+
indexService.createIndexIfNecessary(future);
105+
future.get();
106+
assertSettings();
107+
}
108+
AcknowledgedResponse ack = client().admin().indices().prepareDelete(index).get();
109+
assertTrue(ack.isAcknowledged());
110+
111+
AsyncExecutionId id = new AsyncExecutionId("0", new TaskId("N/A", 0));
112+
AsyncSearchResponse resp = new AsyncSearchResponse(id.getEncoded(), true, true, 0L, 0L);
113+
{
114+
PlainActionFuture<IndexResponse> future = PlainActionFuture.newFuture();
115+
indexService.createResponse(id.getDocId(), Collections.emptyMap(), resp, future);
116+
future.get();
117+
assertSettings();
118+
}
119+
ack = client().admin().indices().prepareDelete(index).get();
120+
assertTrue(ack.isAcknowledged());
121+
{
122+
PlainActionFuture<DeleteResponse> future = PlainActionFuture.newFuture();
123+
indexService.deleteResponse(id, future);
124+
future.get();
125+
assertSettings();
126+
}
127+
ack = client().admin().indices().prepareDelete(index).get();
128+
assertTrue(ack.isAcknowledged());
129+
{
130+
PlainActionFuture<UpdateResponse> future = PlainActionFuture.newFuture();
131+
indexService.updateResponse(id.getDocId(), Collections.emptyMap(), resp, future);
132+
expectThrows(Exception.class, () -> future.get());
133+
assertSettings();
134+
}
135+
ack = client().admin().indices().prepareDelete(index).get();
136+
assertTrue(ack.isAcknowledged());
137+
{
138+
PlainActionFuture<UpdateResponse> future = PlainActionFuture.newFuture();
139+
indexService.updateExpirationTime("0", 10L, future);
140+
expectThrows(Exception.class, () -> future.get());
141+
assertSettings();
142+
}
143+
}
144+
145+
private void assertSettings() throws IOException {
102146
GetIndexResponse getIndexResponse = client().admin().indices().getIndex(
103147
new GetIndexRequest().indices(index)).actionGet();
104148
Settings settings = getIndexResponse.getSettings().get(index);
105-
assertEquals("1", settings.get(IndexMetadata.SETTING_NUMBER_OF_SHARDS));
106-
assertEquals("0-1", settings.get(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS));
149+
Settings expected = AsyncTaskIndexService.settings();
150+
assertEquals(expected, settings.filter(key -> expected.hasValue(key)));
107151
}
108152
}

0 commit comments

Comments
 (0)