From 51aab96e233267bd0e0bf4cfb01933596294bbb3 Mon Sep 17 00:00:00 2001 From: Pat Whelan Date: Thu, 27 Feb 2025 08:09:45 -0500 Subject: [PATCH 1/5] Warn when checkpoint duration is longer than frequency --- .../transforms/TransformContext.java | 33 +++++++++++++++++++ .../transforms/TransformHealthChecker.java | 28 +++++++++++++++- .../transforms/TransformIndexer.java | 19 ++++++++++- 3 files changed, 78 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java index 73872fac097c1..5c5394a738229 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java @@ -48,6 +48,9 @@ public interface Listener { private volatile boolean shouldRecreateDestinationIndex = false; private volatile AuthorizationState authState; private volatile int pageSize = 0; + private AtomicInteger skippedCheckpoints = new AtomicInteger(0); + private AtomicInteger totalTimeWasGreaterThanFrequency = new AtomicInteger(0); + private volatile Instant lastTotalTimeRecorded; /** * If the destination index is blocked (e.g. during a reindex), the Transform will fail to write to it. @@ -273,6 +276,36 @@ boolean doesNotHaveFailures() { return getFailureCount() == 0 && getStatePersistenceFailureCount() == 0 && getStartUpFailureCount() == 0; } + int skippedCheckpoints() { + return skippedCheckpoints.get(); + } + + void checkpointSkipped() { + skippedCheckpoints.incrementAndGet(); + } + + void resetSkippedCheckpoint() { + skippedCheckpoints.set(0); + } + + int numberOfTimesTotalTimeWasGreaterThanFrequency() { + return totalTimeWasGreaterThanFrequency.get(); + } + + void totalTimeWasGreaterThanFrequency() { + totalTimeWasGreaterThanFrequency.incrementAndGet(); + lastTotalTimeRecorded = Instant.now(); + } + + void totalTimeWasLessThanFrequency() { + totalTimeWasGreaterThanFrequency.decrementAndGet(); + lastTotalTimeRecorded = Instant.now(); + } + + Instant lastTotalTimeRecorded() { + return lastTotalTimeRecorded; + } + void shutdown() { taskListener.shutdown(); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformHealthChecker.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformHealthChecker.java index 24c5d45a38f75..815cc1250c7ee 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformHealthChecker.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformHealthChecker.java @@ -39,7 +39,9 @@ public enum IssueType { TRANSFORM_TASK_FAILED("Transform task state is [failed]"), TRANSFORM_INDEXER_FAILED("Transform indexer failed"), TRANSFORM_INTERNAL_STATE_UPDATE_FAILED("Task encountered failures updating internal state"), - TRANSFORM_STARTUP_FAILED("Transform task is automatically retrying its startup process"); + TRANSFORM_STARTUP_FAILED("Transform task is automatically retrying its startup process"), + TRANSFORM_IS_SLOW("Transform task is taking longer than its configured frequency. Data processing will be delayed."), + TRANSFORM_WAS_SLOW("Transform task was recently taking longer than its configured frequency, but it is recovering."); private final String issue; @@ -163,6 +165,30 @@ public static TransformHealth checkTransform(TransformTask transformTask, @Nulla ); } + if (transformContext.numberOfTimesTotalTimeWasGreaterThanFrequency() > 0) { + if (HealthStatus.RED.equals(maxStatus) == false) { + maxStatus = HealthStatus.YELLOW; + } + var checkpoints = transformContext.skippedCheckpoints(); + /*if (checkpoints > 0) { + issues.add( + IssueType.TRANSFORM_IS_SLOW.newIssue( + "Transform has skipped " + checkpoints + " checkpoints.", + transformContext.numberOfTimesTotalTimeWasGreaterThanFrequency(), + transformContext.lastTotalTimeRecorded() + ) + ); + } else { + issues.add( + IssueType.TRANSFORM_WAS_SLOW.newIssue( + null, + transformContext.numberOfTimesTotalTimeWasGreaterThanFrequency(), + transformContext.lastTotalTimeRecorded() + ) + ); + }*/ + } + return new TransformHealth(maxStatus, issues); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index bd0224f49814e..68a6bbd06085d 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -605,6 +605,14 @@ private void finalizeCheckpoint(ActionListener listener) { long durationMs = System.currentTimeMillis() - lastCheckpoint.getTimestamp(); getStats().incrementCheckpointExponentialAverages(durationMs < 0 ? 0 : durationMs, docsIndexed, docsProcessed); + + if (getConfig().getFrequency() != null) { + if (durationMs > getConfig().getFrequency().getMillis()) { + context.totalTimeWasGreaterThanFrequency(); + } else { + context.totalTimeWasLessThanFrequency(); + } + } } if (shouldAuditOnFinish(checkpoint)) { auditor.info(getJobId(), "Finished indexing for transform checkpoint [" + checkpoint + "]."); @@ -659,10 +667,19 @@ public boolean maybeTriggerAsyncJob(long now) { IndexerState indexerState = getState(); if (IndexerState.INDEXING.equals(indexerState) || IndexerState.STOPPING.equals(indexerState)) { logger.debug("[{}] indexer for transform has state [{}]. Ignoring trigger.", getJobId(), indexerState); + if (IndexerState.INDEXING.equals(indexerState)) { + context.checkpointSkipped(); + } return false; } - return super.maybeTriggerAsyncJob(now); + var triggered = super.maybeTriggerAsyncJob(now); + if (triggered) { + context.resetSkippedCheckpoint(); + } else { + context.checkpointSkipped(); + } + return triggered; } } From b1fedbae2933a416bae20d99150c58e150b98171 Mon Sep 17 00:00:00 2001 From: Pat Whelan Date: Thu, 27 Feb 2025 08:13:10 -0500 Subject: [PATCH 2/5] Uncomment issue --- .../xpack/transform/transforms/TransformHealthChecker.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformHealthChecker.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformHealthChecker.java index 815cc1250c7ee..1b747707a0ae1 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformHealthChecker.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformHealthChecker.java @@ -170,7 +170,7 @@ public static TransformHealth checkTransform(TransformTask transformTask, @Nulla maxStatus = HealthStatus.YELLOW; } var checkpoints = transformContext.skippedCheckpoints(); - /*if (checkpoints > 0) { + if (checkpoints > 0) { issues.add( IssueType.TRANSFORM_IS_SLOW.newIssue( "Transform has skipped " + checkpoints + " checkpoints.", @@ -186,7 +186,7 @@ public static TransformHealth checkTransform(TransformTask transformTask, @Nulla transformContext.lastTotalTimeRecorded() ) ); - }*/ + } } return new TransformHealth(maxStatus, issues); From f64d2275f9d86846f1d2b573c48455767d2e7513 Mon Sep 17 00:00:00 2001 From: Pat Whelan Date: Thu, 13 Jun 2024 13:26:47 -0400 Subject: [PATCH 3/5] [Transform] Create hooks for telemetry If a user runs with telemetry enabled, a Transform checkpoint shows up in the APM trace as a collection of discrete API calls. Eventually, Transform will open a trace span when it starts a checkpoint and close a trace span when it finishes a checkpoint. This will unify the discrete API calls into a single APM trace so the user can see all API calls made for a given Transform checkpoint. AsyncTwoPhaseIndexer now has event hooks in place to notify when the indexer starts a new job and finished a job. Eventually, Transforms will use this to open/close the trace span. --- .../core/indexing/AsyncTwoPhaseIndexer.java | 50 ++++++++++++++++++- .../indexing/AsyncTwoPhaseIndexerTests.java | 6 +-- .../xpack/rollup/job/RollupIndexer.java | 2 +- .../transforms/ClientTransformIndexer.java | 6 ++- .../ClientTransformIndexerBuilder.java | 10 +++- .../transforms/TransformIndexer.java | 5 +- .../ClientTransformIndexerTests.java | 7 ++- .../TransformIndexerFailureHandlingTests.java | 3 +- ...IndexerFailureOnStatePersistenceTests.java | 3 +- .../TransformIndexerStateTests.java | 6 ++- .../transforms/TransformIndexerTests.java | 3 +- 11 files changed, 83 insertions(+), 18 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index 16b55c63b81da..be20def169519 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -57,6 +57,7 @@ public abstract class AsyncTwoPhaseIndexer position; private final ThreadPool threadPool; private final Object lock; + private final EventHook eventHook; private final AtomicBoolean isJobFinishing; // throttling implementation @@ -99,9 +100,10 @@ protected AsyncTwoPhaseIndexer( ThreadPool threadPool, AtomicReference initialState, JobPosition initialPosition, - JobStats jobStats + JobStats jobStats, + EventHook eventHook ) { - this(threadPool, initialState, initialPosition, jobStats, new Object()); + this(threadPool, initialState, initialPosition, jobStats, eventHook, new Object()); } protected AsyncTwoPhaseIndexer( @@ -109,6 +111,7 @@ protected AsyncTwoPhaseIndexer( AtomicReference initialState, JobPosition initialPosition, JobStats jobStats, + EventHook eventHook, Object lock ) { this.threadPool = threadPool; @@ -116,6 +119,7 @@ protected AsyncTwoPhaseIndexer( this.position = new AtomicReference<>(initialPosition); this.stats = jobStats; this.lock = lock; + this.eventHook = eventHook; this.isJobFinishing = new AtomicBoolean(false); } @@ -230,6 +234,7 @@ public boolean maybeTriggerAsyncJob(long now) { logger.debug("Schedule was triggered for job [" + getJobId() + "], state: [" + currentState + "]"); stats.incrementNumInvocations(1); if (startJob()) { + eventHook.safeOnEvent(EventHook.Event.START); // fire off the search. Note this is async, the method will return from here threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { onStart(now, ActionListener.wrap(r -> { @@ -282,6 +287,7 @@ private void finishJob() { isJobFinishing.set(true); doSaveState(finishAndSetState(), position.get(), () -> { afterFinishOrFailure(); + eventHook.safeOnEvent(EventHook.Event.FINISH); isJobFinishing.set(false); }); } @@ -707,4 +713,44 @@ static TimeValue calculateThrottlingDelay(float docsPerSecond, long docCount, lo return executionDelay; } + /** + * Listener API for events happening within the {@link AsyncTwoPhaseIndexer}. Events are denoted by the {@link EventHook.Event} enum. + * onEvent is invoked synchronously when the event happens. + * onError is invoked via {@link #onFailure(Exception)} and is not associated with any other event. + * EventHook implementations are expected to never throw exceptions, RuntimeException or otherwise, as doing so can unintentionally + * cancel the AsyncTwoPhaseIndexer or break the Search's {@link ScheduledRunnable}. All exceptions will be ignored. + */ + public interface EventHook { + enum Event { + START, + FINISH + } + + private void safeOnEvent(Event event) { + try { + onEvent(event); + } catch (Exception e) { + logger.atDebug().withThrowable(e).log("Ignoring error from event handler for event {}", event); + } + } + + private void safeOnError(Throwable t) { + try { + onError(t); + } catch (Exception e) { + logger.atDebug().withThrowable(e).log("Ignoring error from event handler for throwable {}", t::getMessage); + } + } + + void onEvent(Event event); + + void onError(Throwable t); + + EventHook NOOP = new EventHook() { + public void onEvent(Event e) {} + + public void onError(Throwable t) {} + }; + } + } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java index 6f0c5fba2cf5a..91c9f7de37d60 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java @@ -65,7 +65,7 @@ protected MockIndexer( boolean stoppedBeforeFinished, boolean noIndices ) { - super(threadPool, initialState, initialPosition, new MockJobStats()); + super(threadPool, initialState, initialPosition, new MockJobStats(), EventHook.NOOP); this.latch = latch; this.stoppedBeforeFinished = stoppedBeforeFinished; this.noIndices = noIndices; @@ -178,7 +178,7 @@ protected MockIndexerFiveRuns( float maxDocsPerSecond, CountDownLatch latch ) { - super(threadPool, initialState, initialPosition, new MockJobStats()); + super(threadPool, initialState, initialPosition, new MockJobStats(), EventHook.NOOP); startTime = System.nanoTime(); this.latch = latch; this.maxDocsPerSecond = maxDocsPerSecond; @@ -304,7 +304,7 @@ protected MockIndexerThrowsFirstSearch( AtomicReference initialState, Integer initialPosition ) { - super(threadPool, initialState, initialPosition, new MockJobStats()); + super(threadPool, initialState, initialPosition, new MockJobStats(), EventHook.NOOP); } @Override diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java index 57bdebafd1b20..19445f87f1eac 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java @@ -90,7 +90,7 @@ public abstract class RollupIndexer extends AsyncTwoPhaseIndexer initialPosition, RollupIndexerJobStats jobStats ) { - super(threadPool, initialState, initialPosition, jobStats); + super(threadPool, initialState, initialPosition, jobStats, EventHook.NOOP); this.job = job; this.compositeBuilder = createCompositeBuilder(job.getConfig()); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java index c7fd6df4467dd..f232c831e32a1 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java @@ -112,7 +112,8 @@ class ClientTransformIndexer extends TransformIndexer { TransformCheckpoint nextCheckpoint, SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex, TransformContext context, - boolean shouldStopAtCheckpoint + boolean shouldStopAtCheckpoint, + EventHook eventHook ) { super( ExceptionsHelper.requireNonNull(threadPool, "threadPool"), @@ -125,7 +126,8 @@ class ClientTransformIndexer extends TransformIndexer { transformProgress, lastCheckpoint, nextCheckpoint, - context + context, + eventHook ); this.client = ExceptionsHelper.requireNonNull(client, "client"); this.clusterService = clusterService; diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java index 6dcf2657b12a8..38e2f2e7adb88 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java @@ -11,6 +11,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; @@ -39,6 +40,7 @@ class ClientTransformIndexerBuilder { private TransformCheckpoint nextCheckpoint; private SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex; private boolean shouldStopAtCheckpoint; + private AsyncTwoPhaseIndexer.EventHook eventHook = AsyncTwoPhaseIndexer.EventHook.NOOP; ClientTransformIndexerBuilder() { this.initialStats = new TransformIndexerStats(); @@ -68,7 +70,8 @@ ClientTransformIndexer build(ThreadPool threadPool, TransformContext context) { TransformCheckpoint.isNullOrEmpty(nextCheckpoint) ? TransformCheckpoint.EMPTY : nextCheckpoint, seqNoPrimaryTermAndIndex, context, - shouldStopAtCheckpoint + shouldStopAtCheckpoint, + eventHook ); } @@ -146,4 +149,9 @@ ClientTransformIndexerBuilder setSeqNoPrimaryTermAndIndex(SeqNoPrimaryTermAndInd return this; } + ClientTransformIndexerBuilder setEventHook(AsyncTwoPhaseIndexer.EventHook eventHook) { + this.eventHook = eventHook != null ? eventHook : this.eventHook; + return this; + } + } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 68a6bbd06085d..e91b12481d3ae 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -149,10 +149,11 @@ public TransformIndexer( TransformProgress transformProgress, TransformCheckpoint lastCheckpoint, TransformCheckpoint nextCheckpoint, - TransformContext context + TransformContext context, + EventHook eventHook ) { // important: note that we pass the context object as lock object - super(threadPool, initialState, initialPosition, jobStats, context); + super(threadPool, initialState, initialPosition, jobStats, eventHook, context); ExceptionsHelper.requireNonNull(transformServices, "transformServices"); this.transformsConfigManager = transformServices.configManager(); this.checkpointProvider = ExceptionsHelper.requireNonNull(checkpointProvider, "checkpointProvider"); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java index 1fbe5c53caccb..5ba20480edbdc 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java @@ -48,6 +48,7 @@ import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ActionNotFoundTransportException; +import org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.transform.TransformMetadata; import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig; @@ -563,7 +564,8 @@ private static class MockClientTransformIndexer extends ClientTransformIndexer { nextCheckpoint, seqNoPrimaryTermAndIndex, context, - shouldStopAtCheckpoint + shouldStopAtCheckpoint, + EventHook.NOOP ); } @@ -720,7 +722,8 @@ private ClientTransformIndexer createTestIndexer( new TransformCheckpoint("transform", Instant.now().toEpochMilli(), 2L, Collections.emptyMap(), Instant.now().toEpochMilli()), new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME), context, - false + false, + AsyncTwoPhaseIndexer.EventHook.NOOP ); } } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java index b9380ade1238b..0b4470fa38a02 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java @@ -159,7 +159,8 @@ static class MockedTransformIndexer extends ClientTransformIndexer { TransformCheckpoint.EMPTY, new SeqNoPrimaryTermAndIndex(1, 1, "foo"), context, - false + false, + EventHook.NOOP ); this.searchFunction = searchFunction; this.bulkFunction = bulkFunction; diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureOnStatePersistenceTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureOnStatePersistenceTests.java index 480fa8ceaf4b3..04169bc76dd74 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureOnStatePersistenceTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureOnStatePersistenceTests.java @@ -101,7 +101,8 @@ private static class MockClientTransformIndexer extends ClientTransformIndexer { nextCheckpoint, seqNoPrimaryTermAndIndex, context, - shouldStopAtCheckpoint + shouldStopAtCheckpoint, + EventHook.NOOP ); } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java index 4306e6dd8bf4f..43334511cf9a8 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java @@ -129,7 +129,8 @@ class MockedTransformIndexer extends TransformIndexer { /* TransformProgress */ null, TransformCheckpoint.EMPTY, TransformCheckpoint.EMPTY, - context + context, + EventHook.NOOP ); this.threadPool = threadPool; @@ -329,7 +330,8 @@ class MockedTransformIndexerForStatePersistenceTesting extends TransformIndexer /* TransformProgress */ null, TransformCheckpoint.EMPTY, TransformCheckpoint.EMPTY, - context + context, + EventHook.NOOP ); } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java index 9e352c0e89032..7cb778352c0b8 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java @@ -129,7 +129,8 @@ class MockedTransformIndexer extends TransformIndexer { /* TransformProgress */ null, TransformCheckpoint.EMPTY, TransformCheckpoint.EMPTY, - context + context, + EventHook.NOOP ); this.threadPool = threadPool; this.numberOfLoops = numberOfLoops; From d90f6b7b92264232b81dc63065e24829e4fb7925 Mon Sep 17 00:00:00 2001 From: Pat Whelan Date: Thu, 23 May 2024 15:14:24 -0400 Subject: [PATCH 4/5] Implementation for tracing --- .../xpack/transform/Transform.java | 11 ++- .../xpack/transform/TransformServices.java | 8 ++- .../telemetry/TransformTraceEvents.java | 72 +++++++++++++++++++ .../TransformPersistentTasksExecutor.java | 9 +++ .../ClientTransformIndexerTests.java | 16 +++-- .../TransformIndexerFailureHandlingTests.java | 4 +- ...IndexerFailureOnStatePersistenceTests.java | 10 ++- .../TransformIndexerStateTests.java | 10 ++- .../transforms/TransformIndexerTests.java | 4 +- ...TransformPersistentTasksExecutorTests.java | 10 ++- .../transforms/TransformTaskTests.java | 4 +- 11 files changed, 140 insertions(+), 18 deletions(-) create mode 100644 x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/telemetry/TransformTraceEvents.java diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java index 7f4b9543698b4..72c6eeaac2688 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java @@ -322,7 +322,16 @@ public Collection createComponents(PluginServices services) { var clusterStateListener = new TransformClusterStateListener(clusterService, client); var transformNode = new TransformNode(clusterStateListener); - transformServices.set(new TransformServices(configManager, checkpointService, auditor, scheduler, transformNode)); + transformServices.set( + new TransformServices( + configManager, + checkpointService, + auditor, + scheduler, + transformNode, + services.telemetryProvider().getTracer() + ) + ); var transformMeterRegistry = TransformMeterRegistry.create(services.telemetryProvider().getMeterRegistry()); transformConfigAutoMigration.set( diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformServices.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformServices.java index 9a7db0fde2d9c..17af191820d90 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformServices.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformServices.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.transform; +import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; @@ -25,19 +26,22 @@ public record TransformServices( TransformCheckpointService checkpointService, TransformAuditor auditor, TransformScheduler scheduler, - TransformNode transformNode + TransformNode transformNode, + Tracer tracer ) { public TransformServices( TransformConfigManager configManager, TransformCheckpointService checkpointService, TransformAuditor auditor, TransformScheduler scheduler, - TransformNode transformNode + TransformNode transformNode, + Tracer tracer ) { this.configManager = Objects.requireNonNull(configManager); this.checkpointService = Objects.requireNonNull(checkpointService); this.auditor = Objects.requireNonNull(auditor); this.scheduler = Objects.requireNonNull(scheduler); this.transformNode = Objects.requireNonNull(transformNode); + this.tracer = Objects.requireNonNull(tracer); } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/telemetry/TransformTraceEvents.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/telemetry/TransformTraceEvents.java new file mode 100644 index 0000000000000..d1577f6b0c29e --- /dev/null +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/telemetry/TransformTraceEvents.java @@ -0,0 +1,72 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform.telemetry; + +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.telemetry.tracing.Traceable; +import org.elasticsearch.telemetry.tracing.Tracer; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer; +import org.elasticsearch.xpack.transform.transforms.TransformTask; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; + +public record TransformTraceEvents( + Tracer tracer, + Map attributes, + Supplier threadContext, + String traceName, + AtomicLong checkpoint +) implements AsyncTwoPhaseIndexer.EventHook { + + public TransformTraceEvents( + Tracer tracer, + Supplier threadContext, + String traceName, + Map attributes, + long checkpoint + ) { + this(tracer, attributes, threadContext, traceName, new AtomicLong(checkpoint)); + } + + @Override + public void onEvent(Event event) { + switch (event) { + case START -> tracer.startTrace(threadContext.get(), spanId(checkpoint.get()), traceName, attributes); + case FINISH -> tracer.stopTrace(spanId(checkpoint.getAndIncrement())); + } + } + + private Traceable spanId(long suffix) { + return () -> traceName + "-" + suffix; + } + + @Override + public void onError(Throwable t) { + tracer.addError(spanId(checkpoint.get()), t); + } + + public static TransformTraceEvents create(Tracer tracer, ThreadPool threadPool, TransformTask task, long checkpoint) { + return new TransformTraceEvents( + tracer, + threadPool::getThreadContext, + "transform/" + task.getTransformId(), + Map.of( + Tracer.AttributeKeys.TASK_ID, + task.getId(), + Tracer.AttributeKeys.PARENT_TASK_ID, + task.getParentTaskId().toString(), + "Transform", + task.getTransformId() + ), + checkpoint + ); + } +} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java index b7bd434194b80..077dc6d1da948 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java @@ -55,6 +55,7 @@ import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex; import org.elasticsearch.xpack.transform.persistence.TransformInternalIndex; +import org.elasticsearch.xpack.transform.telemetry.TransformTraceEvents; import java.util.ArrayList; import java.util.Collection; @@ -488,6 +489,14 @@ private void startTask( Long previousCheckpoint, ActionListener listener ) { + indexerBuilder.setEventHook( + TransformTraceEvents.create( + transformServices.tracer(), + threadPool, + buildTask, + previousCheckpoint == null ? 0 : previousCheckpoint + ) + ); // switch the threadpool to generic, because the caller is on the system_read threadpool threadPool.generic().execute(() -> { buildTask.initializeIndexer(indexerBuilder); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java index 5ba20480edbdc..15daf6435da5c 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java @@ -44,6 +44,7 @@ import org.elasticsearch.search.profile.SearchProfileResults; import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.threadpool.ThreadPool; @@ -156,7 +157,8 @@ public void testPitInjection() throws InterruptedException { mock(TransformCheckpointService.class), mock(TransformAuditor.class), new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO), - mock(TransformNode.class) + mock(TransformNode.class), + Tracer.NOOP ), mock(CheckpointProvider.class), new AtomicReference<>(IndexerState.STOPPED), @@ -254,7 +256,8 @@ public void testPitInjectionIfPitNotSupported() throws InterruptedException { mock(TransformCheckpointService.class), mock(TransformAuditor.class), new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO), - mock(TransformNode.class) + mock(TransformNode.class), + Tracer.NOOP ), mock(CheckpointProvider.class), new AtomicReference<>(IndexerState.STOPPED), @@ -336,7 +339,8 @@ public void testDisablePit() throws InterruptedException { mock(TransformCheckpointService.class), mock(TransformAuditor.class), new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO), - mock(TransformNode.class) + mock(TransformNode.class), + Tracer.NOOP ), mock(CheckpointProvider.class), new AtomicReference<>(IndexerState.STOPPED), @@ -404,7 +408,8 @@ public void testDisablePitWhenThereIsRemoteIndexInSource() throws InterruptedExc mock(TransformCheckpointService.class), mock(TransformAuditor.class), new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO), - mock(TransformNode.class) + mock(TransformNode.class), + Tracer.NOOP ), mock(CheckpointProvider.class), new AtomicReference<>(IndexerState.STOPPED), @@ -709,7 +714,8 @@ private ClientTransformIndexer createTestIndexer( mock(TransformCheckpointService.class), mock(TransformAuditor.class), new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO), - mock(TransformNode.class) + mock(TransformNode.class), + Tracer.NOOP ), mock(CheckpointProvider.class), new AtomicReference<>(IndexerState.STOPPED), diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java index 0b4470fa38a02..a35a0321809b9 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java @@ -36,6 +36,7 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchResponseUtils; +import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.common.notifications.Level; @@ -146,7 +147,8 @@ static class MockedTransformIndexer extends ClientTransformIndexer { mock(TransformCheckpointService.class), auditor, new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY, TimeValue.ZERO), - mock(TransformNode.class) + mock(TransformNode.class), + Tracer.NOOP ), checkpointProvider, initialState, diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureOnStatePersistenceTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureOnStatePersistenceTests.java index 04169bc76dd74..abe2ad26affe8 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureOnStatePersistenceTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureOnStatePersistenceTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.threadpool.ThreadPool; @@ -243,7 +244,8 @@ public void fail(Throwable exception, String failureMessage, ActionListener(IndexerState.STOPPED), @@ -329,7 +331,8 @@ public void fail(Throwable exception, String failureMessage, ActionListener(IndexerState.STOPPED), @@ -464,7 +467,8 @@ public void fail(Throwable exception, String failureMessage, ActionListener(IndexerState.STOPPED), diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java index 43334511cf9a8..2ca58a02a1e9d 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchResponseUtils; +import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.test.junit.annotations.TestIssueLogging; @@ -839,7 +840,8 @@ public void testDeleteTransformBeforeConfigReload() throws Exception { mock(TransformCheckpointService.class), auditor, new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY, TimeValue.ZERO), - mock(TransformNode.class) + mock(TransformNode.class), + Tracer.NOOP ), new MockTimebasedCheckpointProvider(config), config, @@ -1057,7 +1059,8 @@ private MockedTransformIndexer createMockIndexer( mock(TransformCheckpointService.class), transformAuditor, new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY, TimeValue.ZERO), - mock(TransformNode.class) + mock(TransformNode.class), + Tracer.NOOP ); MockedTransformIndexer indexer = new MockedTransformIndexer( @@ -1092,7 +1095,8 @@ private MockedTransformIndexerForStatePersistenceTesting createMockIndexerForSta mock(TransformCheckpointService.class), transformAuditor, new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY, TimeValue.ZERO), - mock(TransformNode.class) + mock(TransformNode.class), + Tracer.NOOP ); MockedTransformIndexerForStatePersistenceTesting indexer = new MockedTransformIndexerForStatePersistenceTesting( diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java index 7cb778352c0b8..dbcb16215bddb 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchResponseUtils; +import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.threadpool.TestThreadPool; @@ -672,7 +673,8 @@ private MockedTransformIndexer createMockIndexer( mock(TransformCheckpointService.class), transformAuditor, new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY, TimeValue.ZERO), - mock(TransformNode.class) + mock(TransformNode.class), + Tracer.NOOP ); MockedTransformIndexer indexer = new MockedTransformIndexer( diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java index fa509143f9ba9..6aad31c86a38c 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.persistent.PersistentTasksCustomMetadata.Assignment; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -611,7 +612,14 @@ private TransformServices transformServices(TransformConfigManager configManager configManager, mockAuditor ); - return new TransformServices(configManager, transformCheckpointService, mockAuditor, scheduler, mock(TransformNode.class)); + return new TransformServices( + configManager, + transformCheckpointService, + mockAuditor, + scheduler, + mock(TransformNode.class), + Tracer.NOOP + ); } private TransformPersistentTasksExecutor buildTaskExecutor(TransformServices transformServices) { diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java index 535484ed3a196..8b7f54b861e10 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.threadpool.TestThreadPool; @@ -210,7 +211,8 @@ private TransformServices transformServices(Clock clock, TransformAuditor audito transformsCheckpointService, auditor, new TransformScheduler(clock, threadPool, Settings.EMPTY, TimeValue.ZERO), - mock(TransformNode.class) + mock(TransformNode.class), + Tracer.NOOP ); } From 6ae37a5e6bb03ba1c7cf41b49301623c2844860f Mon Sep 17 00:00:00 2001 From: Pat Whelan Date: Thu, 27 Feb 2025 12:32:53 -0500 Subject: [PATCH 5/5] Open new trace context on start, close on finish --- .../core/indexing/AsyncTwoPhaseIndexer.java | 32 +++++++++-------- .../telemetry/TransformTraceEvents.java | 35 ++++++++++--------- ...TransformPersistentTasksExecutorTests.java | 1 + 3 files changed, 36 insertions(+), 32 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index be20def169519..1b2fa664f676d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -89,7 +89,7 @@ public void reschedule(TimeValue delay) { if (delay.duration() > 0) { scheduled = threadPool.schedule(command, delay, threadPool.generic()); } else { - threadPool.generic().execute(command); + threadPool.generic().execute(threadPool.getThreadContext().preserveContext(command)); } } } @@ -234,20 +234,22 @@ public boolean maybeTriggerAsyncJob(long now) { logger.debug("Schedule was triggered for job [" + getJobId() + "], state: [" + currentState + "]"); stats.incrementNumInvocations(1); if (startJob()) { - eventHook.safeOnEvent(EventHook.Event.START); - // fire off the search. Note this is async, the method will return from here - threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { - onStart(now, ActionListener.wrap(r -> { - assert r != null; - if (r) { - nextSearch(); - } else { - onFinish(finishJobListener()); - } - }, this::finishWithFailure)); - }); - logger.debug("Beginning to index [" + getJobId() + "], state: [" + currentState + "]"); - return true; + try (var ignored = threadPool.getThreadContext().newTraceContext()) { + eventHook.safeOnEvent(EventHook.Event.START); + // fire off the search. Note this is async, the method will return from here + threadPool.generic().execute(threadPool.getThreadContext().preserveContext(() -> { + onStart(now, ActionListener.wrap(r -> { + assert r != null; + if (r) { + nextSearch(); + } else { + onFinish(finishJobListener()); + } + }, this::finishWithFailure)); + })); + logger.debug("Beginning to index [" + getJobId() + "], state: [" + currentState + "]"); + return true; + } } else { return false; } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/telemetry/TransformTraceEvents.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/telemetry/TransformTraceEvents.java index d1577f6b0c29e..3d671300f248a 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/telemetry/TransformTraceEvents.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/telemetry/TransformTraceEvents.java @@ -16,6 +16,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; public record TransformTraceEvents( @@ -23,29 +24,27 @@ public record TransformTraceEvents( Map attributes, Supplier threadContext, String traceName, - AtomicLong checkpoint + String spanPrefix, + AtomicLong checkpoint, + AtomicReference previousContext ) implements AsyncTwoPhaseIndexer.EventHook { - public TransformTraceEvents( - Tracer tracer, - Supplier threadContext, - String traceName, - Map attributes, - long checkpoint - ) { - this(tracer, attributes, threadContext, traceName, new AtomicLong(checkpoint)); - } - @Override public void onEvent(Event event) { switch (event) { - case START -> tracer.startTrace(threadContext.get(), spanId(checkpoint.get()), traceName, attributes); - case FINISH -> tracer.stopTrace(spanId(checkpoint.getAndIncrement())); + // TODO, probably just swap back to using Tracer directly in AsyncTwoPhase, but we need some way to declare the spanId, name, + // and attributes + case START -> { + tracer.startTrace(threadContext.get(), spanId(checkpoint.get()), traceName, attributes); + } + case FINISH -> { + tracer.stopTrace(spanId(checkpoint.getAndIncrement())); + } } } private Traceable spanId(long suffix) { - return () -> traceName + "-" + suffix; + return () -> spanPrefix + "-" + suffix; } @Override @@ -56,8 +55,6 @@ public void onError(Throwable t) { public static TransformTraceEvents create(Tracer tracer, ThreadPool threadPool, TransformTask task, long checkpoint) { return new TransformTraceEvents( tracer, - threadPool::getThreadContext, - "transform/" + task.getTransformId(), Map.of( Tracer.AttributeKeys.TASK_ID, task.getId(), @@ -66,7 +63,11 @@ public static TransformTraceEvents create(Tracer tracer, ThreadPool threadPool, "Transform", task.getTransformId() ), - checkpoint + threadPool::getThreadContext, + "transform/" + task.getTransformId(), + String.valueOf(task.getId()), + new AtomicLong(checkpoint), + new AtomicReference<>() ); } } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java index 6aad31c86a38c..7ed8dc35729ac 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java @@ -428,6 +428,7 @@ private TransformTask mockTransformTask() { when(task.setNumFailureRetries(anyInt())).thenReturn(task); when(task.getParentTaskId()).thenReturn(TaskId.EMPTY_TASK_ID); when(task.getContext()).thenReturn(mock()); + when(task.getTransformId()).thenReturn("1234"); doAnswer(a -> fail(a.getArgument(0, Throwable.class))).when(task).fail(any(Throwable.class), any(String.class), any()); when(task.getState()).thenReturn( new TransformState(TransformTaskState.STOPPED, IndexerState.STOPPED, null, 0, null, null, null, false, null)