diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditor.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditor.java index c9624a8cc99b5..bfe40347e8ca5 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditor.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditor.java @@ -18,6 +18,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xpack.core.ml.utils.MlIndexAndAlias; @@ -27,6 +28,7 @@ import java.util.Objects; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; @@ -50,6 +52,7 @@ public abstract class AbstractAuditor { private Queue backlog; private final AtomicBoolean indexAndAliasCreationInProgress; + private final ExecutorService executorService; protected AbstractAuditor( OriginSettingClient client, @@ -57,7 +60,8 @@ protected AbstractAuditor( String nodeName, AbstractAuditMessageFactory messageFactory, ClusterService clusterService, - IndexNameExpressionResolver indexNameExpressionResolver + IndexNameExpressionResolver indexNameExpressionResolver, + ExecutorService executorService ) { this.client = Objects.requireNonNull(client); this.auditIndexWriteAlias = Objects.requireNonNull(auditIndexWriteAlias); @@ -68,6 +72,7 @@ protected AbstractAuditor( this.backlog = new ConcurrentLinkedQueue<>(); this.indexAndAliasCreated = new AtomicBoolean(); this.indexAndAliasCreationInProgress = new AtomicBoolean(); + this.executorService = executorService; } public void audit(Level level, String resourceId, String message) { @@ -147,7 +152,16 @@ protected void indexDoc(ToXContent toXContent) { } private void writeDoc(ToXContent toXContent) { - client.index(indexRequest(toXContent), ActionListener.wrap(AbstractAuditor::onIndexResponse, AbstractAuditor::onIndexFailure)); + client.index(indexRequest(toXContent), ActionListener.wrap(AbstractAuditor::onIndexResponse, e -> { + if (e instanceof IndexNotFoundException) { + executorService.execute(() -> { + reset(); + indexDoc(toXContent); + }); + } else { + onIndexFailure(e); + } + })); } private IndexRequest indexRequest(ToXContent toXContent) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditorTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditorTests.java index bcf777906bb7c..13d15a4ab0b07 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditorTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditorTests.java @@ -34,8 +34,10 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.indices.TestIndexNameExpressionResolver; import org.elasticsearch.test.ESTestCase; @@ -233,6 +235,40 @@ public void testAuditingBeforeTemplateInstalled() throws Exception { verify(client, times(1)).execute(eq(TransportIndexAction.TYPE), any(), any()); } + public void testRecreateTemplateWhenDeleted() throws Exception { + CountDownLatch writeSomeDocsBeforeTemplateLatch = new CountDownLatch(1); + AbstractAuditor auditor = createTestAuditorWithoutTemplate( + writeSomeDocsBeforeTemplateLatch + ); + + auditor.info("foobar", "Here is my info to queue"); + + verify(client, never()).execute(eq(TransportIndexAction.TYPE), any(), any()); + // fire the put template response + writeSomeDocsBeforeTemplateLatch.countDown(); + + assertBusy(() -> verify(client, times(1)).execute(eq(TransportPutComposableIndexTemplateAction.TYPE), any(), any())); + assertBusy(() -> verify(client, times(1)).execute(eq(TransportCreateIndexAction.TYPE), any(), any())); + + // the back log will be written some point later + assertBusy(() -> verify(client, times(1)).execute(eq(TransportBulkAction.TYPE), any(), any())); + + // "delete" the index + doAnswer(ans -> { + ActionListener listener = ans.getArgument(2); + listener.onFailure(new IndexNotFoundException("some index")); + return null; + }).when(client).execute(eq(TransportIndexAction.TYPE), any(), any()); + + // audit more data + auditor.info("foobar", "Here is another message"); + + // verify the template is recreated and the audit message is processed + assertBusy(() -> verify(client, times(2)).execute(eq(TransportPutComposableIndexTemplateAction.TYPE), any(), any())); + assertBusy(() -> verify(client, times(2)).execute(eq(TransportCreateIndexAction.TYPE), any(), any())); + assertBusy(() -> verify(client, times(2)).execute(eq(TransportBulkAction.TYPE), any(), any())); + } + public void testMaxBufferSize() throws Exception { CountDownLatch writeSomeDocsBeforeTemplateLatch = new CountDownLatch(1); AbstractAuditor auditor = createTestAuditorWithoutTemplate( @@ -358,7 +394,8 @@ public static class TestAuditor extends AbstractAuditor { if (event.metadataChanged()) { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/notifications/TransformAuditor.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/notifications/TransformAuditor.java index 51e679ff9fe6c..402a8cbe12bd5 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/notifications/TransformAuditor.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/notifications/TransformAuditor.java @@ -52,7 +52,8 @@ public TransformAuditor( nodeName, TransformAuditMessage::new, clusterService, - indexNameExpressionResolver + indexNameExpressionResolver, + clusterService.threadPool().generic() ); clusterService.addListener(event -> { if (event.metadataChanged()) { diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransformUpdaterTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransformUpdaterTests.java index b9d91287ce45f..3231d705f389c 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransformUpdaterTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransformUpdaterTests.java @@ -18,7 +18,6 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.Tuple; @@ -66,7 +65,6 @@ import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; -import static org.mockito.Mockito.mock; public class TransformUpdaterTests extends ESTestCase { @@ -77,8 +75,7 @@ public class TransformUpdaterTests extends ESTestCase { private final IndexNameExpressionResolver indexNameExpressionResolver = TestIndexNameExpressionResolver.newInstance(); private TestThreadPool threadPool; private Client client; - private ClusterService clusterService = mock(ClusterService.class); - private TransformAuditor auditor = new MockTransformAuditor(clusterService, mock(IndexNameExpressionResolver.class)); + private TransformAuditor auditor; private final Settings settings = Settings.builder().put(XPackSettings.SECURITY_ENABLED.getKey(), true).build(); private final Settings destIndexSettings = new DefaultTransformExtension().getTransformDestinationIndexSettings(); @@ -124,8 +121,7 @@ public void setupClient() { } threadPool = createThreadPool(); client = new MyMockClient(threadPool); - clusterService = mock(ClusterService.class); - auditor = new MockTransformAuditor(clusterService, mock(IndexNameExpressionResolver.class)); + auditor = MockTransformAuditor.createMockAuditor(); } @After diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/notifications/MockTransformAuditor.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/notifications/MockTransformAuditor.java index 1dffd8c20abbf..4eb255b69cfd3 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/notifications/MockTransformAuditor.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/notifications/MockTransformAuditor.java @@ -16,6 +16,8 @@ import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.common.notifications.Level; import org.elasticsearch.xpack.core.transform.notifications.TransformAuditMessage; @@ -51,13 +53,16 @@ public static MockTransformAuditor createMockAuditor() { when(state.getMetadata()).thenReturn(metadata); ClusterService clusterService = mock(ClusterService.class); when(clusterService.state()).thenReturn(state); + ThreadPool threadPool = mock(); + when(threadPool.generic()).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE); + when(clusterService.threadPool()).thenReturn(threadPool); return new MockTransformAuditor(clusterService, mock(IndexNameExpressionResolver.class)); } private final List expectations; - public MockTransformAuditor(ClusterService clusterService, IndexNameExpressionResolver indexNameResolver) { + private MockTransformAuditor(ClusterService clusterService, IndexNameExpressionResolver indexNameResolver) { super(mock(Client.class), MOCK_NODE_NAME, clusterService, indexNameResolver, true); expectations = new CopyOnWriteArrayList<>(); }