Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/121912.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 121912
summary: Recreate Notifications Index
area: Transform
type: bug
issues:
- 121909
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ml.utils.MlIndexAndAlias;
Expand All @@ -28,6 +29,7 @@
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
Expand All @@ -51,14 +53,16 @@ public abstract class AbstractAuditor<T extends AbstractAuditMessage> {

private Queue<ToXContent> backlog;
private final AtomicBoolean indexAndAliasCreationInProgress;
private final ExecutorService executorService;

protected AbstractAuditor(
OriginSettingClient client,
String auditIndexWriteAlias,
String nodeName,
AbstractAuditMessageFactory<T> messageFactory,
ClusterService clusterService,
IndexNameExpressionResolver indexNameExpressionResolver
IndexNameExpressionResolver indexNameExpressionResolver,
ExecutorService executorService
) {
this.client = Objects.requireNonNull(client);
this.auditIndexWriteAlias = Objects.requireNonNull(auditIndexWriteAlias);
Expand All @@ -69,6 +73,7 @@ protected AbstractAuditor(
this.backlog = new ConcurrentLinkedQueue<>();
this.indexAndAliasCreated = new AtomicBoolean();
this.indexAndAliasCreationInProgress = new AtomicBoolean();
this.executorService = executorService;
}

public void audit(Level level, String resourceId, String message) {
Expand Down Expand Up @@ -148,7 +153,16 @@ protected void indexDoc(ToXContent toXContent) {
}

private void writeDoc(ToXContent toXContent) {
client.index(indexRequest(toXContent), ActionListener.wrap(AbstractAuditor::onIndexResponse, AbstractAuditor::onIndexFailure));
client.index(indexRequest(toXContent), ActionListener.wrap(AbstractAuditor::onIndexResponse, e -> {
if (e instanceof IndexNotFoundException) {
executorService.execute(() -> {
reset();
indexDoc(toXContent);
});
} else {
onIndexFailure(e);
}
}));
}

private IndexRequest indexRequest(ToXContent toXContent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.indices.TestIndexNameExpressionResolver;
import org.elasticsearch.test.ESTestCase;
Expand Down Expand Up @@ -233,6 +235,40 @@ public void testAuditingBeforeTemplateInstalled() throws Exception {
verify(client, times(1)).execute(eq(TransportIndexAction.TYPE), any(), any());
}

public void testRecreateTemplateWhenDeleted() throws Exception {
CountDownLatch writeSomeDocsBeforeTemplateLatch = new CountDownLatch(1);
AbstractAuditor<AbstractAuditMessageTests.TestAuditMessage> auditor = createTestAuditorWithoutTemplate(
writeSomeDocsBeforeTemplateLatch
);

auditor.info("foobar", "Here is my info to queue");

verify(client, never()).execute(eq(TransportIndexAction.TYPE), any(), any());
// fire the put template response
writeSomeDocsBeforeTemplateLatch.countDown();

assertBusy(() -> verify(client, times(1)).execute(eq(TransportPutComposableIndexTemplateAction.TYPE), any(), any()));
assertBusy(() -> verify(client, times(1)).execute(eq(TransportCreateIndexAction.TYPE), any(), any()));

// the back log will be written some point later
assertBusy(() -> verify(client, times(1)).execute(eq(TransportBulkAction.TYPE), any(), any()));

// "delete" the index
doAnswer(ans -> {
ActionListener<?> listener = ans.getArgument(2);
listener.onFailure(new IndexNotFoundException("some index"));
return null;
}).when(client).execute(eq(TransportIndexAction.TYPE), any(), any());

// audit more data
auditor.info("foobar", "Here is another message");

// verify the template is recreated and the audit message is processed
assertBusy(() -> verify(client, times(2)).execute(eq(TransportPutComposableIndexTemplateAction.TYPE), any(), any()));
assertBusy(() -> verify(client, times(2)).execute(eq(TransportCreateIndexAction.TYPE), any(), any()));
assertBusy(() -> verify(client, times(2)).execute(eq(TransportBulkAction.TYPE), any(), any()));
}

public void testMaxBufferSize() throws Exception {
CountDownLatch writeSomeDocsBeforeTemplateLatch = new CountDownLatch(1);
AbstractAuditor<AbstractAuditMessageTests.TestAuditMessage> auditor = createTestAuditorWithoutTemplate(
Expand Down Expand Up @@ -358,7 +394,8 @@ public static class TestAuditor extends AbstractAuditor<AbstractAuditMessageTest
nodeName,
AbstractAuditMessageTests.TestAuditMessage::new,
clusterService,
TestIndexNameExpressionResolver.newInstance()
TestIndexNameExpressionResolver.newInstance(),
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ protected AbstractMlAuditor(
clusterService.getNodeName(),
messageFactory,
clusterService,
indexNameExpressionResolver
indexNameExpressionResolver,
clusterService.threadPool().generic()
);
clusterService.addListener(event -> {
if (event.metadataChanged()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public TransformAuditor(
nodeName,
TransformAuditMessage::new,
clusterService,
indexNameExpressionResolver
indexNameExpressionResolver,
clusterService.threadPool().generic()
);
clusterService.addListener(event -> {
if (event.metadataChanged()) {
Expand Down