diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index 16b55c63b81da..1b2fa664f676d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -57,6 +57,7 @@ public abstract class AsyncTwoPhaseIndexer position; private final ThreadPool threadPool; private final Object lock; + private final EventHook eventHook; private final AtomicBoolean isJobFinishing; // throttling implementation @@ -88,7 +89,7 @@ public void reschedule(TimeValue delay) { if (delay.duration() > 0) { scheduled = threadPool.schedule(command, delay, threadPool.generic()); } else { - threadPool.generic().execute(command); + threadPool.generic().execute(threadPool.getThreadContext().preserveContext(command)); } } } @@ -99,9 +100,10 @@ protected AsyncTwoPhaseIndexer( ThreadPool threadPool, AtomicReference initialState, JobPosition initialPosition, - JobStats jobStats + JobStats jobStats, + EventHook eventHook ) { - this(threadPool, initialState, initialPosition, jobStats, new Object()); + this(threadPool, initialState, initialPosition, jobStats, eventHook, new Object()); } protected AsyncTwoPhaseIndexer( @@ -109,6 +111,7 @@ protected AsyncTwoPhaseIndexer( AtomicReference initialState, JobPosition initialPosition, JobStats jobStats, + EventHook eventHook, Object lock ) { this.threadPool = threadPool; @@ -116,6 +119,7 @@ protected AsyncTwoPhaseIndexer( this.position = new AtomicReference<>(initialPosition); this.stats = jobStats; this.lock = lock; + this.eventHook = eventHook; this.isJobFinishing = new AtomicBoolean(false); } @@ -230,19 +234,22 @@ public boolean maybeTriggerAsyncJob(long now) { logger.debug("Schedule was triggered for job [" + getJobId() + "], state: [" + currentState + "]"); stats.incrementNumInvocations(1); if (startJob()) { - // fire off the search. Note this is async, the method will return from here - threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { - onStart(now, ActionListener.wrap(r -> { - assert r != null; - if (r) { - nextSearch(); - } else { - onFinish(finishJobListener()); - } - }, this::finishWithFailure)); - }); - logger.debug("Beginning to index [" + getJobId() + "], state: [" + currentState + "]"); - return true; + try (var ignored = threadPool.getThreadContext().newTraceContext()) { + eventHook.safeOnEvent(EventHook.Event.START); + // fire off the search. Note this is async, the method will return from here + threadPool.generic().execute(threadPool.getThreadContext().preserveContext(() -> { + onStart(now, ActionListener.wrap(r -> { + assert r != null; + if (r) { + nextSearch(); + } else { + onFinish(finishJobListener()); + } + }, this::finishWithFailure)); + })); + logger.debug("Beginning to index [" + getJobId() + "], state: [" + currentState + "]"); + return true; + } } else { return false; } @@ -282,6 +289,7 @@ private void finishJob() { isJobFinishing.set(true); doSaveState(finishAndSetState(), position.get(), () -> { afterFinishOrFailure(); + eventHook.safeOnEvent(EventHook.Event.FINISH); isJobFinishing.set(false); }); } @@ -707,4 +715,44 @@ static TimeValue calculateThrottlingDelay(float docsPerSecond, long docCount, lo return executionDelay; } + /** + * Listener API for events happening within the {@link AsyncTwoPhaseIndexer}. Events are denoted by the {@link EventHook.Event} enum. + * onEvent is invoked synchronously when the event happens. + * onError is invoked via {@link #onFailure(Exception)} and is not associated with any other event. + * EventHook implementations are expected to never throw exceptions, RuntimeException or otherwise, as doing so can unintentionally + * cancel the AsyncTwoPhaseIndexer or break the Search's {@link ScheduledRunnable}. All exceptions will be ignored. + */ + public interface EventHook { + enum Event { + START, + FINISH + } + + private void safeOnEvent(Event event) { + try { + onEvent(event); + } catch (Exception e) { + logger.atDebug().withThrowable(e).log("Ignoring error from event handler for event {}", event); + } + } + + private void safeOnError(Throwable t) { + try { + onError(t); + } catch (Exception e) { + logger.atDebug().withThrowable(e).log("Ignoring error from event handler for throwable {}", t::getMessage); + } + } + + void onEvent(Event event); + + void onError(Throwable t); + + EventHook NOOP = new EventHook() { + public void onEvent(Event e) {} + + public void onError(Throwable t) {} + }; + } + } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java index 6f0c5fba2cf5a..91c9f7de37d60 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java @@ -65,7 +65,7 @@ protected MockIndexer( boolean stoppedBeforeFinished, boolean noIndices ) { - super(threadPool, initialState, initialPosition, new MockJobStats()); + super(threadPool, initialState, initialPosition, new MockJobStats(), EventHook.NOOP); this.latch = latch; this.stoppedBeforeFinished = stoppedBeforeFinished; this.noIndices = noIndices; @@ -178,7 +178,7 @@ protected MockIndexerFiveRuns( float maxDocsPerSecond, CountDownLatch latch ) { - super(threadPool, initialState, initialPosition, new MockJobStats()); + super(threadPool, initialState, initialPosition, new MockJobStats(), EventHook.NOOP); startTime = System.nanoTime(); this.latch = latch; this.maxDocsPerSecond = maxDocsPerSecond; @@ -304,7 +304,7 @@ protected MockIndexerThrowsFirstSearch( AtomicReference initialState, Integer initialPosition ) { - super(threadPool, initialState, initialPosition, new MockJobStats()); + super(threadPool, initialState, initialPosition, new MockJobStats(), EventHook.NOOP); } @Override diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java index 57bdebafd1b20..19445f87f1eac 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java @@ -90,7 +90,7 @@ public abstract class RollupIndexer extends AsyncTwoPhaseIndexer initialPosition, RollupIndexerJobStats jobStats ) { - super(threadPool, initialState, initialPosition, jobStats); + super(threadPool, initialState, initialPosition, jobStats, EventHook.NOOP); this.job = job; this.compositeBuilder = createCompositeBuilder(job.getConfig()); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java index 3108e9be4ea39..e738aa8c68d47 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java @@ -323,7 +323,16 @@ public Collection createComponents(PluginServices services) { var clusterStateListener = new TransformClusterStateListener(clusterService, client); var transformNode = new TransformNode(clusterStateListener); - transformServices.set(new TransformServices(configManager, checkpointService, auditor, scheduler, transformNode)); + transformServices.set( + new TransformServices( + configManager, + checkpointService, + auditor, + scheduler, + transformNode, + services.telemetryProvider().getTracer() + ) + ); var transformMeterRegistry = TransformMeterRegistry.create(services.telemetryProvider().getMeterRegistry()); transformConfigAutoMigration.set( diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformServices.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformServices.java index 9a7db0fde2d9c..17af191820d90 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformServices.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformServices.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.transform; +import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; @@ -25,19 +26,22 @@ public record TransformServices( TransformCheckpointService checkpointService, TransformAuditor auditor, TransformScheduler scheduler, - TransformNode transformNode + TransformNode transformNode, + Tracer tracer ) { public TransformServices( TransformConfigManager configManager, TransformCheckpointService checkpointService, TransformAuditor auditor, TransformScheduler scheduler, - TransformNode transformNode + TransformNode transformNode, + Tracer tracer ) { this.configManager = Objects.requireNonNull(configManager); this.checkpointService = Objects.requireNonNull(checkpointService); this.auditor = Objects.requireNonNull(auditor); this.scheduler = Objects.requireNonNull(scheduler); this.transformNode = Objects.requireNonNull(transformNode); + this.tracer = Objects.requireNonNull(tracer); } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/telemetry/TransformTraceEvents.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/telemetry/TransformTraceEvents.java new file mode 100644 index 0000000000000..3d671300f248a --- /dev/null +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/telemetry/TransformTraceEvents.java @@ -0,0 +1,73 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform.telemetry; + +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.telemetry.tracing.Traceable; +import org.elasticsearch.telemetry.tracing.Tracer; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer; +import org.elasticsearch.xpack.transform.transforms.TransformTask; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +public record TransformTraceEvents( + Tracer tracer, + Map attributes, + Supplier threadContext, + String traceName, + String spanPrefix, + AtomicLong checkpoint, + AtomicReference previousContext +) implements AsyncTwoPhaseIndexer.EventHook { + + @Override + public void onEvent(Event event) { + switch (event) { + // TODO, probably just swap back to using Tracer directly in AsyncTwoPhase, but we need some way to declare the spanId, name, + // and attributes + case START -> { + tracer.startTrace(threadContext.get(), spanId(checkpoint.get()), traceName, attributes); + } + case FINISH -> { + tracer.stopTrace(spanId(checkpoint.getAndIncrement())); + } + } + } + + private Traceable spanId(long suffix) { + return () -> spanPrefix + "-" + suffix; + } + + @Override + public void onError(Throwable t) { + tracer.addError(spanId(checkpoint.get()), t); + } + + public static TransformTraceEvents create(Tracer tracer, ThreadPool threadPool, TransformTask task, long checkpoint) { + return new TransformTraceEvents( + tracer, + Map.of( + Tracer.AttributeKeys.TASK_ID, + task.getId(), + Tracer.AttributeKeys.PARENT_TASK_ID, + task.getParentTaskId().toString(), + "Transform", + task.getTransformId() + ), + threadPool::getThreadContext, + "transform/" + task.getTransformId(), + String.valueOf(task.getId()), + new AtomicLong(checkpoint), + new AtomicReference<>() + ); + } +} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java index c7fd6df4467dd..f232c831e32a1 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java @@ -112,7 +112,8 @@ class ClientTransformIndexer extends TransformIndexer { TransformCheckpoint nextCheckpoint, SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex, TransformContext context, - boolean shouldStopAtCheckpoint + boolean shouldStopAtCheckpoint, + EventHook eventHook ) { super( ExceptionsHelper.requireNonNull(threadPool, "threadPool"), @@ -125,7 +126,8 @@ class ClientTransformIndexer extends TransformIndexer { transformProgress, lastCheckpoint, nextCheckpoint, - context + context, + eventHook ); this.client = ExceptionsHelper.requireNonNull(client, "client"); this.clusterService = clusterService; diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java index 6dcf2657b12a8..38e2f2e7adb88 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java @@ -11,6 +11,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; @@ -39,6 +40,7 @@ class ClientTransformIndexerBuilder { private TransformCheckpoint nextCheckpoint; private SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex; private boolean shouldStopAtCheckpoint; + private AsyncTwoPhaseIndexer.EventHook eventHook = AsyncTwoPhaseIndexer.EventHook.NOOP; ClientTransformIndexerBuilder() { this.initialStats = new TransformIndexerStats(); @@ -68,7 +70,8 @@ ClientTransformIndexer build(ThreadPool threadPool, TransformContext context) { TransformCheckpoint.isNullOrEmpty(nextCheckpoint) ? TransformCheckpoint.EMPTY : nextCheckpoint, seqNoPrimaryTermAndIndex, context, - shouldStopAtCheckpoint + shouldStopAtCheckpoint, + eventHook ); } @@ -146,4 +149,9 @@ ClientTransformIndexerBuilder setSeqNoPrimaryTermAndIndex(SeqNoPrimaryTermAndInd return this; } + ClientTransformIndexerBuilder setEventHook(AsyncTwoPhaseIndexer.EventHook eventHook) { + this.eventHook = eventHook != null ? eventHook : this.eventHook; + return this; + } + } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java index 73872fac097c1..5c5394a738229 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java @@ -48,6 +48,9 @@ public interface Listener { private volatile boolean shouldRecreateDestinationIndex = false; private volatile AuthorizationState authState; private volatile int pageSize = 0; + private AtomicInteger skippedCheckpoints = new AtomicInteger(0); + private AtomicInteger totalTimeWasGreaterThanFrequency = new AtomicInteger(0); + private volatile Instant lastTotalTimeRecorded; /** * If the destination index is blocked (e.g. during a reindex), the Transform will fail to write to it. @@ -273,6 +276,36 @@ boolean doesNotHaveFailures() { return getFailureCount() == 0 && getStatePersistenceFailureCount() == 0 && getStartUpFailureCount() == 0; } + int skippedCheckpoints() { + return skippedCheckpoints.get(); + } + + void checkpointSkipped() { + skippedCheckpoints.incrementAndGet(); + } + + void resetSkippedCheckpoint() { + skippedCheckpoints.set(0); + } + + int numberOfTimesTotalTimeWasGreaterThanFrequency() { + return totalTimeWasGreaterThanFrequency.get(); + } + + void totalTimeWasGreaterThanFrequency() { + totalTimeWasGreaterThanFrequency.incrementAndGet(); + lastTotalTimeRecorded = Instant.now(); + } + + void totalTimeWasLessThanFrequency() { + totalTimeWasGreaterThanFrequency.decrementAndGet(); + lastTotalTimeRecorded = Instant.now(); + } + + Instant lastTotalTimeRecorded() { + return lastTotalTimeRecorded; + } + void shutdown() { taskListener.shutdown(); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformHealthChecker.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformHealthChecker.java index 24c5d45a38f75..1b747707a0ae1 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformHealthChecker.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformHealthChecker.java @@ -39,7 +39,9 @@ public enum IssueType { TRANSFORM_TASK_FAILED("Transform task state is [failed]"), TRANSFORM_INDEXER_FAILED("Transform indexer failed"), TRANSFORM_INTERNAL_STATE_UPDATE_FAILED("Task encountered failures updating internal state"), - TRANSFORM_STARTUP_FAILED("Transform task is automatically retrying its startup process"); + TRANSFORM_STARTUP_FAILED("Transform task is automatically retrying its startup process"), + TRANSFORM_IS_SLOW("Transform task is taking longer than its configured frequency. Data processing will be delayed."), + TRANSFORM_WAS_SLOW("Transform task was recently taking longer than its configured frequency, but it is recovering."); private final String issue; @@ -163,6 +165,30 @@ public static TransformHealth checkTransform(TransformTask transformTask, @Nulla ); } + if (transformContext.numberOfTimesTotalTimeWasGreaterThanFrequency() > 0) { + if (HealthStatus.RED.equals(maxStatus) == false) { + maxStatus = HealthStatus.YELLOW; + } + var checkpoints = transformContext.skippedCheckpoints(); + if (checkpoints > 0) { + issues.add( + IssueType.TRANSFORM_IS_SLOW.newIssue( + "Transform has skipped " + checkpoints + " checkpoints.", + transformContext.numberOfTimesTotalTimeWasGreaterThanFrequency(), + transformContext.lastTotalTimeRecorded() + ) + ); + } else { + issues.add( + IssueType.TRANSFORM_WAS_SLOW.newIssue( + null, + transformContext.numberOfTimesTotalTimeWasGreaterThanFrequency(), + transformContext.lastTotalTimeRecorded() + ) + ); + } + } + return new TransformHealth(maxStatus, issues); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index bd0224f49814e..e91b12481d3ae 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -149,10 +149,11 @@ public TransformIndexer( TransformProgress transformProgress, TransformCheckpoint lastCheckpoint, TransformCheckpoint nextCheckpoint, - TransformContext context + TransformContext context, + EventHook eventHook ) { // important: note that we pass the context object as lock object - super(threadPool, initialState, initialPosition, jobStats, context); + super(threadPool, initialState, initialPosition, jobStats, eventHook, context); ExceptionsHelper.requireNonNull(transformServices, "transformServices"); this.transformsConfigManager = transformServices.configManager(); this.checkpointProvider = ExceptionsHelper.requireNonNull(checkpointProvider, "checkpointProvider"); @@ -605,6 +606,14 @@ private void finalizeCheckpoint(ActionListener listener) { long durationMs = System.currentTimeMillis() - lastCheckpoint.getTimestamp(); getStats().incrementCheckpointExponentialAverages(durationMs < 0 ? 0 : durationMs, docsIndexed, docsProcessed); + + if (getConfig().getFrequency() != null) { + if (durationMs > getConfig().getFrequency().getMillis()) { + context.totalTimeWasGreaterThanFrequency(); + } else { + context.totalTimeWasLessThanFrequency(); + } + } } if (shouldAuditOnFinish(checkpoint)) { auditor.info(getJobId(), "Finished indexing for transform checkpoint [" + checkpoint + "]."); @@ -659,10 +668,19 @@ public boolean maybeTriggerAsyncJob(long now) { IndexerState indexerState = getState(); if (IndexerState.INDEXING.equals(indexerState) || IndexerState.STOPPING.equals(indexerState)) { logger.debug("[{}] indexer for transform has state [{}]. Ignoring trigger.", getJobId(), indexerState); + if (IndexerState.INDEXING.equals(indexerState)) { + context.checkpointSkipped(); + } return false; } - return super.maybeTriggerAsyncJob(now); + var triggered = super.maybeTriggerAsyncJob(now); + if (triggered) { + context.resetSkippedCheckpoint(); + } else { + context.checkpointSkipped(); + } + return triggered; } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java index b7bd434194b80..077dc6d1da948 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java @@ -55,6 +55,7 @@ import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex; import org.elasticsearch.xpack.transform.persistence.TransformInternalIndex; +import org.elasticsearch.xpack.transform.telemetry.TransformTraceEvents; import java.util.ArrayList; import java.util.Collection; @@ -488,6 +489,14 @@ private void startTask( Long previousCheckpoint, ActionListener listener ) { + indexerBuilder.setEventHook( + TransformTraceEvents.create( + transformServices.tracer(), + threadPool, + buildTask, + previousCheckpoint == null ? 0 : previousCheckpoint + ) + ); // switch the threadpool to generic, because the caller is on the system_read threadpool threadPool.generic().execute(() -> { buildTask.initializeIndexer(indexerBuilder); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java index c1d36c329de4d..4a36d278aba7a 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java @@ -45,10 +45,12 @@ import org.elasticsearch.search.profile.SearchProfileResults; import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ActionNotFoundTransportException; +import org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.transform.TransformMetadata; import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig; @@ -157,7 +159,8 @@ public void testPitInjection() throws InterruptedException { mock(TransformCheckpointService.class), mock(TransformAuditor.class), new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO), - mock(TransformNode.class) + mock(TransformNode.class), + Tracer.NOOP ), mock(CheckpointProvider.class), new AtomicReference<>(IndexerState.STOPPED), @@ -255,7 +258,8 @@ public void testPitInjectionIfPitNotSupported() throws InterruptedException { mock(TransformCheckpointService.class), mock(TransformAuditor.class), new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO), - mock(TransformNode.class) + mock(TransformNode.class), + Tracer.NOOP ), mock(CheckpointProvider.class), new AtomicReference<>(IndexerState.STOPPED), @@ -337,7 +341,8 @@ public void testDisablePit() throws InterruptedException { mock(TransformCheckpointService.class), mock(TransformAuditor.class), new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO), - mock(TransformNode.class) + mock(TransformNode.class), + Tracer.NOOP ), mock(CheckpointProvider.class), new AtomicReference<>(IndexerState.STOPPED), @@ -405,7 +410,8 @@ public void testDisablePitWhenThereIsRemoteIndexInSource() throws InterruptedExc mock(TransformCheckpointService.class), mock(TransformAuditor.class), new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO), - mock(TransformNode.class) + mock(TransformNode.class), + Tracer.NOOP ), mock(CheckpointProvider.class), new AtomicReference<>(IndexerState.STOPPED), @@ -573,7 +579,8 @@ private static class MockClientTransformIndexer extends ClientTransformIndexer { nextCheckpoint, seqNoPrimaryTermAndIndex, context, - shouldStopAtCheckpoint + shouldStopAtCheckpoint, + EventHook.NOOP ); } @@ -717,7 +724,8 @@ private ClientTransformIndexer createTestIndexer( mock(TransformCheckpointService.class), mock(TransformAuditor.class), new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO), - mock(TransformNode.class) + mock(TransformNode.class), + Tracer.NOOP ), mock(CheckpointProvider.class), new AtomicReference<>(IndexerState.STOPPED), @@ -730,7 +738,8 @@ private ClientTransformIndexer createTestIndexer( new TransformCheckpoint("transform", Instant.now().toEpochMilli(), 2L, Collections.emptyMap(), Instant.now().toEpochMilli()), new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME), context, - false + false, + AsyncTwoPhaseIndexer.EventHook.NOOP ); } } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java index b9380ade1238b..a35a0321809b9 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java @@ -36,6 +36,7 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchResponseUtils; +import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.common.notifications.Level; @@ -146,7 +147,8 @@ static class MockedTransformIndexer extends ClientTransformIndexer { mock(TransformCheckpointService.class), auditor, new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY, TimeValue.ZERO), - mock(TransformNode.class) + mock(TransformNode.class), + Tracer.NOOP ), checkpointProvider, initialState, @@ -159,7 +161,8 @@ static class MockedTransformIndexer extends ClientTransformIndexer { TransformCheckpoint.EMPTY, new SeqNoPrimaryTermAndIndex(1, 1, "foo"), context, - false + false, + EventHook.NOOP ); this.searchFunction = searchFunction; this.bulkFunction = bulkFunction; diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureOnStatePersistenceTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureOnStatePersistenceTests.java index 480fa8ceaf4b3..abe2ad26affe8 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureOnStatePersistenceTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureOnStatePersistenceTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.threadpool.ThreadPool; @@ -101,7 +102,8 @@ private static class MockClientTransformIndexer extends ClientTransformIndexer { nextCheckpoint, seqNoPrimaryTermAndIndex, context, - shouldStopAtCheckpoint + shouldStopAtCheckpoint, + EventHook.NOOP ); } @@ -242,7 +244,8 @@ public void fail(Throwable exception, String failureMessage, ActionListener(IndexerState.STOPPED), @@ -328,7 +331,8 @@ public void fail(Throwable exception, String failureMessage, ActionListener(IndexerState.STOPPED), @@ -463,7 +467,8 @@ public void fail(Throwable exception, String failureMessage, ActionListener(IndexerState.STOPPED), diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java index 4306e6dd8bf4f..2ca58a02a1e9d 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchResponseUtils; +import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.test.junit.annotations.TestIssueLogging; @@ -129,7 +130,8 @@ class MockedTransformIndexer extends TransformIndexer { /* TransformProgress */ null, TransformCheckpoint.EMPTY, TransformCheckpoint.EMPTY, - context + context, + EventHook.NOOP ); this.threadPool = threadPool; @@ -329,7 +331,8 @@ class MockedTransformIndexerForStatePersistenceTesting extends TransformIndexer /* TransformProgress */ null, TransformCheckpoint.EMPTY, TransformCheckpoint.EMPTY, - context + context, + EventHook.NOOP ); } @@ -837,7 +840,8 @@ public void testDeleteTransformBeforeConfigReload() throws Exception { mock(TransformCheckpointService.class), auditor, new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY, TimeValue.ZERO), - mock(TransformNode.class) + mock(TransformNode.class), + Tracer.NOOP ), new MockTimebasedCheckpointProvider(config), config, @@ -1055,7 +1059,8 @@ private MockedTransformIndexer createMockIndexer( mock(TransformCheckpointService.class), transformAuditor, new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY, TimeValue.ZERO), - mock(TransformNode.class) + mock(TransformNode.class), + Tracer.NOOP ); MockedTransformIndexer indexer = new MockedTransformIndexer( @@ -1090,7 +1095,8 @@ private MockedTransformIndexerForStatePersistenceTesting createMockIndexerForSta mock(TransformCheckpointService.class), transformAuditor, new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY, TimeValue.ZERO), - mock(TransformNode.class) + mock(TransformNode.class), + Tracer.NOOP ); MockedTransformIndexerForStatePersistenceTesting indexer = new MockedTransformIndexerForStatePersistenceTesting( diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java index 9e352c0e89032..dbcb16215bddb 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchResponseUtils; +import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.threadpool.TestThreadPool; @@ -129,7 +130,8 @@ class MockedTransformIndexer extends TransformIndexer { /* TransformProgress */ null, TransformCheckpoint.EMPTY, TransformCheckpoint.EMPTY, - context + context, + EventHook.NOOP ); this.threadPool = threadPool; this.numberOfLoops = numberOfLoops; @@ -671,7 +673,8 @@ private MockedTransformIndexer createMockIndexer( mock(TransformCheckpointService.class), transformAuditor, new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY, TimeValue.ZERO), - mock(TransformNode.class) + mock(TransformNode.class), + Tracer.NOOP ); MockedTransformIndexer indexer = new MockedTransformIndexer( diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java index fa509143f9ba9..7ed8dc35729ac 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.persistent.PersistentTasksCustomMetadata.Assignment; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -427,6 +428,7 @@ private TransformTask mockTransformTask() { when(task.setNumFailureRetries(anyInt())).thenReturn(task); when(task.getParentTaskId()).thenReturn(TaskId.EMPTY_TASK_ID); when(task.getContext()).thenReturn(mock()); + when(task.getTransformId()).thenReturn("1234"); doAnswer(a -> fail(a.getArgument(0, Throwable.class))).when(task).fail(any(Throwable.class), any(String.class), any()); when(task.getState()).thenReturn( new TransformState(TransformTaskState.STOPPED, IndexerState.STOPPED, null, 0, null, null, null, false, null) @@ -611,7 +613,14 @@ private TransformServices transformServices(TransformConfigManager configManager configManager, mockAuditor ); - return new TransformServices(configManager, transformCheckpointService, mockAuditor, scheduler, mock(TransformNode.class)); + return new TransformServices( + configManager, + transformCheckpointService, + mockAuditor, + scheduler, + mock(TransformNode.class), + Tracer.NOOP + ); } private TransformPersistentTasksExecutor buildTaskExecutor(TransformServices transformServices) { diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java index 535484ed3a196..8b7f54b861e10 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.threadpool.TestThreadPool; @@ -210,7 +211,8 @@ private TransformServices transformServices(Clock clock, TransformAuditor audito transformsCheckpointService, auditor, new TransformScheduler(clock, threadPool, Settings.EMPTY, TimeValue.ZERO), - mock(TransformNode.class) + mock(TransformNode.class), + Tracer.NOOP ); }