Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ml.utils.MlIndexAndAlias;
Expand All @@ -27,6 +28,7 @@
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
Expand All @@ -50,14 +52,16 @@ public abstract class AbstractAuditor<T extends AbstractAuditMessage> {

private Queue<ToXContent> backlog;
private final AtomicBoolean indexAndAliasCreationInProgress;
private final ExecutorService executorService;

protected AbstractAuditor(
OriginSettingClient client,
String auditIndexWriteAlias,
String nodeName,
AbstractAuditMessageFactory<T> messageFactory,
ClusterService clusterService,
IndexNameExpressionResolver indexNameExpressionResolver
IndexNameExpressionResolver indexNameExpressionResolver,
ExecutorService executorService
) {
this.client = Objects.requireNonNull(client);
this.auditIndexWriteAlias = Objects.requireNonNull(auditIndexWriteAlias);
Expand All @@ -68,6 +72,7 @@ protected AbstractAuditor(
this.backlog = new ConcurrentLinkedQueue<>();
this.indexAndAliasCreated = new AtomicBoolean();
this.indexAndAliasCreationInProgress = new AtomicBoolean();
this.executorService = executorService;
}

public void audit(Level level, String resourceId, String message) {
Expand Down Expand Up @@ -147,7 +152,16 @@ protected void indexDoc(ToXContent toXContent) {
}

private void writeDoc(ToXContent toXContent) {
client.index(indexRequest(toXContent), ActionListener.wrap(AbstractAuditor::onIndexResponse, AbstractAuditor::onIndexFailure));
client.index(indexRequest(toXContent), ActionListener.wrap(AbstractAuditor::onIndexResponse, e -> {
if (e instanceof IndexNotFoundException) {
executorService.execute(() -> {
reset();
indexDoc(toXContent);
});
} else {
onIndexFailure(e);
}
}));
}

private IndexRequest indexRequest(ToXContent toXContent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.indices.TestIndexNameExpressionResolver;
import org.elasticsearch.test.ESTestCase;
Expand Down Expand Up @@ -233,6 +235,40 @@ public void testAuditingBeforeTemplateInstalled() throws Exception {
verify(client, times(1)).execute(eq(TransportIndexAction.TYPE), any(), any());
}

public void testRecreateTemplateWhenDeleted() throws Exception {
CountDownLatch writeSomeDocsBeforeTemplateLatch = new CountDownLatch(1);
AbstractAuditor<AbstractAuditMessageTests.TestAuditMessage> auditor = createTestAuditorWithoutTemplate(
writeSomeDocsBeforeTemplateLatch
);

auditor.info("foobar", "Here is my info to queue");

verify(client, never()).execute(eq(TransportIndexAction.TYPE), any(), any());
// fire the put template response
writeSomeDocsBeforeTemplateLatch.countDown();

assertBusy(() -> verify(client, times(1)).execute(eq(TransportPutComposableIndexTemplateAction.TYPE), any(), any()));
assertBusy(() -> verify(client, times(1)).execute(eq(TransportCreateIndexAction.TYPE), any(), any()));

// the back log will be written some point later
assertBusy(() -> verify(client, times(1)).execute(eq(TransportBulkAction.TYPE), any(), any()));

// "delete" the index
doAnswer(ans -> {
ActionListener<?> listener = ans.getArgument(2);
listener.onFailure(new IndexNotFoundException("some index"));
return null;
}).when(client).execute(eq(TransportIndexAction.TYPE), any(), any());

// audit more data
auditor.info("foobar", "Here is another message");

// verify the template is recreated and the audit message is processed
assertBusy(() -> verify(client, times(2)).execute(eq(TransportPutComposableIndexTemplateAction.TYPE), any(), any()));
assertBusy(() -> verify(client, times(2)).execute(eq(TransportCreateIndexAction.TYPE), any(), any()));
assertBusy(() -> verify(client, times(2)).execute(eq(TransportBulkAction.TYPE), any(), any()));
}

public void testMaxBufferSize() throws Exception {
CountDownLatch writeSomeDocsBeforeTemplateLatch = new CountDownLatch(1);
AbstractAuditor<AbstractAuditMessageTests.TestAuditMessage> auditor = createTestAuditorWithoutTemplate(
Expand Down Expand Up @@ -358,7 +394,8 @@ public static class TestAuditor extends AbstractAuditor<AbstractAuditMessageTest
nodeName,
AbstractAuditMessageTests.TestAuditMessage::new,
clusterService,
TestIndexNameExpressionResolver.newInstance()
TestIndexNameExpressionResolver.newInstance(),
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ protected AbstractMlAuditor(
clusterService.getNodeName(),
messageFactory,
clusterService,
indexNameExpressionResolver
indexNameExpressionResolver,
clusterService.threadPool().generic()
);
clusterService.addListener(event -> {
if (event.metadataChanged()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public TransformAuditor(
nodeName,
TransformAuditMessage::new,
clusterService,
indexNameExpressionResolver
indexNameExpressionResolver,
clusterService.threadPool().generic()
);
clusterService.addListener(event -> {
if (event.metadataChanged()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Tuple;
Expand Down Expand Up @@ -66,7 +65,6 @@
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;

public class TransformUpdaterTests extends ESTestCase {

Expand All @@ -77,8 +75,7 @@ public class TransformUpdaterTests extends ESTestCase {
private final IndexNameExpressionResolver indexNameExpressionResolver = TestIndexNameExpressionResolver.newInstance();
private TestThreadPool threadPool;
private Client client;
private ClusterService clusterService = mock(ClusterService.class);
private TransformAuditor auditor = new MockTransformAuditor(clusterService, mock(IndexNameExpressionResolver.class));
private TransformAuditor auditor;
private final Settings settings = Settings.builder().put(XPackSettings.SECURITY_ENABLED.getKey(), true).build();
private final Settings destIndexSettings = new DefaultTransformExtension().getTransformDestinationIndexSettings();

Expand Down Expand Up @@ -124,8 +121,7 @@ public void setupClient() {
}
threadPool = createThreadPool();
client = new MyMockClient(threadPool);
clusterService = mock(ClusterService.class);
auditor = new MockTransformAuditor(clusterService, mock(IndexNameExpressionResolver.class));
auditor = MockTransformAuditor.createMockAuditor();
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.common.notifications.Level;
import org.elasticsearch.xpack.core.transform.notifications.TransformAuditMessage;

Expand Down Expand Up @@ -51,13 +53,16 @@ public static MockTransformAuditor createMockAuditor() {
when(state.getMetadata()).thenReturn(metadata);
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(state);
ThreadPool threadPool = mock();
when(threadPool.generic()).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
when(clusterService.threadPool()).thenReturn(threadPool);

return new MockTransformAuditor(clusterService, mock(IndexNameExpressionResolver.class));
}

private final List<AuditExpectation> expectations;

public MockTransformAuditor(ClusterService clusterService, IndexNameExpressionResolver indexNameResolver) {
private MockTransformAuditor(ClusterService clusterService, IndexNameExpressionResolver indexNameResolver) {
super(mock(Client.class), MOCK_NODE_NAME, clusterService, indexNameResolver, true);
expectations = new CopyOnWriteArrayList<>();
}
Expand Down