Skip to content

Commit 9a39a3c

Browse files
committed
[Transform] Recreate Notifications Index (elastic#121912)
If the notification index and alias gets deleted, recreate the index. Fix elastic#121909
1 parent 25c7faf commit 9a39a3c

File tree

6 files changed

+66
-12
lines changed

6 files changed

+66
-12
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditor.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1919
import org.elasticsearch.cluster.service.ClusterService;
2020
import org.elasticsearch.core.TimeValue;
21+
import org.elasticsearch.index.IndexNotFoundException;
2122
import org.elasticsearch.xcontent.ToXContent;
2223
import org.elasticsearch.xcontent.XContentBuilder;
2324
import org.elasticsearch.xpack.core.ml.utils.MlIndexAndAlias;
@@ -27,6 +28,7 @@
2728
import java.util.Objects;
2829
import java.util.Queue;
2930
import java.util.concurrent.ConcurrentLinkedQueue;
31+
import java.util.concurrent.ExecutorService;
3032
import java.util.concurrent.atomic.AtomicBoolean;
3133

3234
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
@@ -50,14 +52,16 @@ public abstract class AbstractAuditor<T extends AbstractAuditMessage> {
5052

5153
private Queue<ToXContent> backlog;
5254
private final AtomicBoolean indexAndAliasCreationInProgress;
55+
private final ExecutorService executorService;
5356

5457
protected AbstractAuditor(
5558
OriginSettingClient client,
5659
String auditIndexWriteAlias,
5760
String nodeName,
5861
AbstractAuditMessageFactory<T> messageFactory,
5962
ClusterService clusterService,
60-
IndexNameExpressionResolver indexNameExpressionResolver
63+
IndexNameExpressionResolver indexNameExpressionResolver,
64+
ExecutorService executorService
6165
) {
6266
this.client = Objects.requireNonNull(client);
6367
this.auditIndexWriteAlias = Objects.requireNonNull(auditIndexWriteAlias);
@@ -68,6 +72,7 @@ protected AbstractAuditor(
6872
this.backlog = new ConcurrentLinkedQueue<>();
6973
this.indexAndAliasCreated = new AtomicBoolean();
7074
this.indexAndAliasCreationInProgress = new AtomicBoolean();
75+
this.executorService = executorService;
7176
}
7277

7378
public void audit(Level level, String resourceId, String message) {
@@ -147,7 +152,16 @@ protected void indexDoc(ToXContent toXContent) {
147152
}
148153

149154
private void writeDoc(ToXContent toXContent) {
150-
client.index(indexRequest(toXContent), ActionListener.wrap(AbstractAuditor::onIndexResponse, AbstractAuditor::onIndexFailure));
155+
client.index(indexRequest(toXContent), ActionListener.wrap(AbstractAuditor::onIndexResponse, e -> {
156+
if (e instanceof IndexNotFoundException) {
157+
executorService.execute(() -> {
158+
reset();
159+
indexDoc(toXContent);
160+
});
161+
} else {
162+
onIndexFailure(e);
163+
}
164+
}));
151165
}
152166

153167
private IndexRequest indexRequest(ToXContent toXContent) {

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditorTests.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,10 @@
3434
import org.elasticsearch.cluster.service.ClusterService;
3535
import org.elasticsearch.common.bytes.BytesReference;
3636
import org.elasticsearch.common.settings.Settings;
37+
import org.elasticsearch.common.util.concurrent.EsExecutors;
3738
import org.elasticsearch.common.util.concurrent.ThreadContext;
3839
import org.elasticsearch.core.TimeValue;
40+
import org.elasticsearch.index.IndexNotFoundException;
3941
import org.elasticsearch.index.IndexVersion;
4042
import org.elasticsearch.indices.TestIndexNameExpressionResolver;
4143
import org.elasticsearch.test.ESTestCase;
@@ -233,6 +235,40 @@ public void testAuditingBeforeTemplateInstalled() throws Exception {
233235
verify(client, times(1)).execute(eq(TransportIndexAction.TYPE), any(), any());
234236
}
235237

238+
public void testRecreateTemplateWhenDeleted() throws Exception {
239+
CountDownLatch writeSomeDocsBeforeTemplateLatch = new CountDownLatch(1);
240+
AbstractAuditor<AbstractAuditMessageTests.TestAuditMessage> auditor = createTestAuditorWithoutTemplate(
241+
writeSomeDocsBeforeTemplateLatch
242+
);
243+
244+
auditor.info("foobar", "Here is my info to queue");
245+
246+
verify(client, never()).execute(eq(TransportIndexAction.TYPE), any(), any());
247+
// fire the put template response
248+
writeSomeDocsBeforeTemplateLatch.countDown();
249+
250+
assertBusy(() -> verify(client, times(1)).execute(eq(TransportPutComposableIndexTemplateAction.TYPE), any(), any()));
251+
assertBusy(() -> verify(client, times(1)).execute(eq(TransportCreateIndexAction.TYPE), any(), any()));
252+
253+
// the back log will be written some point later
254+
assertBusy(() -> verify(client, times(1)).execute(eq(TransportBulkAction.TYPE), any(), any()));
255+
256+
// "delete" the index
257+
doAnswer(ans -> {
258+
ActionListener<?> listener = ans.getArgument(2);
259+
listener.onFailure(new IndexNotFoundException("some index"));
260+
return null;
261+
}).when(client).execute(eq(TransportIndexAction.TYPE), any(), any());
262+
263+
// audit more data
264+
auditor.info("foobar", "Here is another message");
265+
266+
// verify the template is recreated and the audit message is processed
267+
assertBusy(() -> verify(client, times(2)).execute(eq(TransportPutComposableIndexTemplateAction.TYPE), any(), any()));
268+
assertBusy(() -> verify(client, times(2)).execute(eq(TransportCreateIndexAction.TYPE), any(), any()));
269+
assertBusy(() -> verify(client, times(2)).execute(eq(TransportBulkAction.TYPE), any(), any()));
270+
}
271+
236272
public void testMaxBufferSize() throws Exception {
237273
CountDownLatch writeSomeDocsBeforeTemplateLatch = new CountDownLatch(1);
238274
AbstractAuditor<AbstractAuditMessageTests.TestAuditMessage> auditor = createTestAuditorWithoutTemplate(
@@ -358,7 +394,8 @@ public static class TestAuditor extends AbstractAuditor<AbstractAuditMessageTest
358394
nodeName,
359395
AbstractAuditMessageTests.TestAuditMessage::new,
360396
clusterService,
361-
TestIndexNameExpressionResolver.newInstance()
397+
TestIndexNameExpressionResolver.newInstance(),
398+
EsExecutors.DIRECT_EXECUTOR_SERVICE
362399
);
363400
}
364401

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/AbstractMlAuditor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ protected AbstractMlAuditor(
4747
clusterService.getNodeName(),
4848
messageFactory,
4949
clusterService,
50-
indexNameExpressionResolver
50+
indexNameExpressionResolver,
51+
clusterService.threadPool().generic()
5152
);
5253
clusterService.addListener(event -> {
5354
if (event.metadataChanged()) {

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/notifications/TransformAuditor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ public TransformAuditor(
5252
nodeName,
5353
TransformAuditMessage::new,
5454
clusterService,
55-
indexNameExpressionResolver
55+
indexNameExpressionResolver,
56+
clusterService.threadPool().generic()
5657
);
5758
clusterService.addListener(event -> {
5859
if (event.metadataChanged()) {

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransformUpdaterTests.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.elasticsearch.client.internal.Client;
1919
import org.elasticsearch.cluster.ClusterState;
2020
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
21-
import org.elasticsearch.cluster.service.ClusterService;
2221
import org.elasticsearch.common.settings.Settings;
2322
import org.elasticsearch.common.util.concurrent.ThreadContext;
2423
import org.elasticsearch.core.Tuple;
@@ -66,7 +65,6 @@
6665
import static org.hamcrest.Matchers.not;
6766
import static org.hamcrest.Matchers.notNullValue;
6867
import static org.hamcrest.Matchers.nullValue;
69-
import static org.mockito.Mockito.mock;
7068

7169
public class TransformUpdaterTests extends ESTestCase {
7270

@@ -77,8 +75,7 @@ public class TransformUpdaterTests extends ESTestCase {
7775
private final IndexNameExpressionResolver indexNameExpressionResolver = TestIndexNameExpressionResolver.newInstance();
7876
private TestThreadPool threadPool;
7977
private Client client;
80-
private ClusterService clusterService = mock(ClusterService.class);
81-
private TransformAuditor auditor = new MockTransformAuditor(clusterService, mock(IndexNameExpressionResolver.class));
78+
private TransformAuditor auditor;
8279
private final Settings settings = Settings.builder().put(XPackSettings.SECURITY_ENABLED.getKey(), true).build();
8380
private final Settings destIndexSettings = new DefaultTransformExtension().getTransformDestinationIndexSettings();
8481

@@ -124,8 +121,7 @@ public void setupClient() {
124121
}
125122
threadPool = createThreadPool();
126123
client = new MyMockClient(threadPool);
127-
clusterService = mock(ClusterService.class);
128-
auditor = new MockTransformAuditor(clusterService, mock(IndexNameExpressionResolver.class));
124+
auditor = MockTransformAuditor.createMockAuditor();
129125
}
130126

131127
@After

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/notifications/MockTransformAuditor.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import org.elasticsearch.cluster.metadata.Metadata;
1717
import org.elasticsearch.cluster.service.ClusterService;
1818
import org.elasticsearch.common.regex.Regex;
19+
import org.elasticsearch.common.util.concurrent.EsExecutors;
20+
import org.elasticsearch.threadpool.ThreadPool;
1921
import org.elasticsearch.xpack.core.common.notifications.Level;
2022
import org.elasticsearch.xpack.core.transform.notifications.TransformAuditMessage;
2123

@@ -51,13 +53,16 @@ public static MockTransformAuditor createMockAuditor() {
5153
when(state.getMetadata()).thenReturn(metadata);
5254
ClusterService clusterService = mock(ClusterService.class);
5355
when(clusterService.state()).thenReturn(state);
56+
ThreadPool threadPool = mock();
57+
when(threadPool.generic()).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
58+
when(clusterService.threadPool()).thenReturn(threadPool);
5459

5560
return new MockTransformAuditor(clusterService, mock(IndexNameExpressionResolver.class));
5661
}
5762

5863
private final List<AuditExpectation> expectations;
5964

60-
public MockTransformAuditor(ClusterService clusterService, IndexNameExpressionResolver indexNameResolver) {
65+
private MockTransformAuditor(ClusterService clusterService, IndexNameExpressionResolver indexNameResolver) {
6166
super(mock(Client.class), MOCK_NODE_NAME, clusterService, indexNameResolver, true);
6267
expectations = new CopyOnWriteArrayList<>();
6368
}

0 commit comments

Comments
 (0)