Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
private final AtomicReference<JobPosition> position;
private final ThreadPool threadPool;
private final Object lock;
private final EventHook eventHook;
private final AtomicBoolean isJobFinishing;

// throttling implementation
Expand Down Expand Up @@ -88,7 +89,7 @@ public void reschedule(TimeValue delay) {
if (delay.duration() > 0) {
scheduled = threadPool.schedule(command, delay, threadPool.generic());
} else {
threadPool.generic().execute(command);
threadPool.generic().execute(threadPool.getThreadContext().preserveContext(command));
}
}
}
Expand All @@ -99,23 +100,26 @@ protected AsyncTwoPhaseIndexer(
ThreadPool threadPool,
AtomicReference<IndexerState> initialState,
JobPosition initialPosition,
JobStats jobStats
JobStats jobStats,
EventHook eventHook
) {
this(threadPool, initialState, initialPosition, jobStats, new Object());
this(threadPool, initialState, initialPosition, jobStats, eventHook, new Object());
}

protected AsyncTwoPhaseIndexer(
ThreadPool threadPool,
AtomicReference<IndexerState> initialState,
JobPosition initialPosition,
JobStats jobStats,
EventHook eventHook,
Object lock
) {
this.threadPool = threadPool;
this.state = initialState;
this.position = new AtomicReference<>(initialPosition);
this.stats = jobStats;
this.lock = lock;
this.eventHook = eventHook;
this.isJobFinishing = new AtomicBoolean(false);
}

Expand Down Expand Up @@ -230,19 +234,22 @@ public boolean maybeTriggerAsyncJob(long now) {
logger.debug("Schedule was triggered for job [" + getJobId() + "], state: [" + currentState + "]");
stats.incrementNumInvocations(1);
if (startJob()) {
// fire off the search. Note this is async, the method will return from here
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
onStart(now, ActionListener.wrap(r -> {
assert r != null;
if (r) {
nextSearch();
} else {
onFinish(finishJobListener());
}
}, this::finishWithFailure));
});
logger.debug("Beginning to index [" + getJobId() + "], state: [" + currentState + "]");
return true;
try (var ignored = threadPool.getThreadContext().newTraceContext()) {
eventHook.safeOnEvent(EventHook.Event.START);
// fire off the search. Note this is async, the method will return from here
threadPool.generic().execute(threadPool.getThreadContext().preserveContext(() -> {
onStart(now, ActionListener.wrap(r -> {
assert r != null;
if (r) {
nextSearch();
} else {
onFinish(finishJobListener());
}
}, this::finishWithFailure));
}));
logger.debug("Beginning to index [" + getJobId() + "], state: [" + currentState + "]");
return true;
}
} else {
return false;
}
Expand Down Expand Up @@ -282,6 +289,7 @@ private void finishJob() {
isJobFinishing.set(true);
doSaveState(finishAndSetState(), position.get(), () -> {
afterFinishOrFailure();
eventHook.safeOnEvent(EventHook.Event.FINISH);
isJobFinishing.set(false);
});
}
Expand Down Expand Up @@ -707,4 +715,44 @@ static TimeValue calculateThrottlingDelay(float docsPerSecond, long docCount, lo
return executionDelay;
}

/**
* Listener API for events happening within the {@link AsyncTwoPhaseIndexer}. Events are denoted by the {@link EventHook.Event} enum.
* onEvent is invoked synchronously when the event happens.
* onError is invoked via {@link #onFailure(Exception)} and is not associated with any other event.
* EventHook implementations are expected to never throw exceptions, RuntimeException or otherwise, as doing so can unintentionally
* cancel the AsyncTwoPhaseIndexer or break the Search's {@link ScheduledRunnable}. All exceptions will be ignored.
*/
public interface EventHook {
enum Event {
START,
FINISH
}

private void safeOnEvent(Event event) {
try {
onEvent(event);
} catch (Exception e) {
logger.atDebug().withThrowable(e).log("Ignoring error from event handler for event {}", event);
}
}

private void safeOnError(Throwable t) {
try {
onError(t);
} catch (Exception e) {
logger.atDebug().withThrowable(e).log("Ignoring error from event handler for throwable {}", t::getMessage);
}
}

void onEvent(Event event);

void onError(Throwable t);

EventHook NOOP = new EventHook() {
public void onEvent(Event e) {}

public void onError(Throwable t) {}
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ protected MockIndexer(
boolean stoppedBeforeFinished,
boolean noIndices
) {
super(threadPool, initialState, initialPosition, new MockJobStats());
super(threadPool, initialState, initialPosition, new MockJobStats(), EventHook.NOOP);
this.latch = latch;
this.stoppedBeforeFinished = stoppedBeforeFinished;
this.noIndices = noIndices;
Expand Down Expand Up @@ -178,7 +178,7 @@ protected MockIndexerFiveRuns(
float maxDocsPerSecond,
CountDownLatch latch
) {
super(threadPool, initialState, initialPosition, new MockJobStats());
super(threadPool, initialState, initialPosition, new MockJobStats(), EventHook.NOOP);
startTime = System.nanoTime();
this.latch = latch;
this.maxDocsPerSecond = maxDocsPerSecond;
Expand Down Expand Up @@ -304,7 +304,7 @@ protected MockIndexerThrowsFirstSearch(
AtomicReference<IndexerState> initialState,
Integer initialPosition
) {
super(threadPool, initialState, initialPosition, new MockJobStats());
super(threadPool, initialState, initialPosition, new MockJobStats(), EventHook.NOOP);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public abstract class RollupIndexer extends AsyncTwoPhaseIndexer<Map<String, Obj
Map<String, Object> initialPosition,
RollupIndexerJobStats jobStats
) {
super(threadPool, initialState, initialPosition, jobStats);
super(threadPool, initialState, initialPosition, jobStats, EventHook.NOOP);
this.job = job;
this.compositeBuilder = createCompositeBuilder(job.getConfig());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,16 @@ public Collection<?> createComponents(PluginServices services) {
var clusterStateListener = new TransformClusterStateListener(clusterService, client);
var transformNode = new TransformNode(clusterStateListener);

transformServices.set(new TransformServices(configManager, checkpointService, auditor, scheduler, transformNode));
transformServices.set(
new TransformServices(
configManager,
checkpointService,
auditor,
scheduler,
transformNode,
services.telemetryProvider().getTracer()
)
);

var transformMeterRegistry = TransformMeterRegistry.create(services.telemetryProvider().getMeterRegistry());
transformConfigAutoMigration.set(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.transform;

import org.elasticsearch.telemetry.tracing.Tracer;
import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService;
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
Expand All @@ -25,19 +26,22 @@ public record TransformServices(
TransformCheckpointService checkpointService,
TransformAuditor auditor,
TransformScheduler scheduler,
TransformNode transformNode
TransformNode transformNode,
Tracer tracer
) {
public TransformServices(
TransformConfigManager configManager,
TransformCheckpointService checkpointService,
TransformAuditor auditor,
TransformScheduler scheduler,
TransformNode transformNode
TransformNode transformNode,
Tracer tracer
) {
this.configManager = Objects.requireNonNull(configManager);
this.checkpointService = Objects.requireNonNull(checkpointService);
this.auditor = Objects.requireNonNull(auditor);
this.scheduler = Objects.requireNonNull(scheduler);
this.transformNode = Objects.requireNonNull(transformNode);
this.tracer = Objects.requireNonNull(tracer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.transform.telemetry;

import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.telemetry.tracing.Traceable;
import org.elasticsearch.telemetry.tracing.Tracer;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer;
import org.elasticsearch.xpack.transform.transforms.TransformTask;

import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

public record TransformTraceEvents(
Tracer tracer,
Map<String, Object> attributes,
Supplier<ThreadContext> threadContext,
String traceName,
String spanPrefix,
AtomicLong checkpoint,
AtomicReference<ThreadContext.StoredContext> previousContext
) implements AsyncTwoPhaseIndexer.EventHook {

@Override
public void onEvent(Event event) {
switch (event) {
// TODO, probably just swap back to using Tracer directly in AsyncTwoPhase, but we need some way to declare the spanId, name,
// and attributes
case START -> {
tracer.startTrace(threadContext.get(), spanId(checkpoint.get()), traceName, attributes);
}
case FINISH -> {
tracer.stopTrace(spanId(checkpoint.getAndIncrement()));
}
}
}

private Traceable spanId(long suffix) {
return () -> spanPrefix + "-" + suffix;
}

@Override
public void onError(Throwable t) {
tracer.addError(spanId(checkpoint.get()), t);
}

public static TransformTraceEvents create(Tracer tracer, ThreadPool threadPool, TransformTask task, long checkpoint) {
return new TransformTraceEvents(
tracer,
Map.of(
Tracer.AttributeKeys.TASK_ID,
task.getId(),
Tracer.AttributeKeys.PARENT_TASK_ID,
task.getParentTaskId().toString(),
"Transform",
task.getTransformId()
),
threadPool::getThreadContext,
"transform/" + task.getTransformId(),
String.valueOf(task.getId()),
new AtomicLong(checkpoint),
new AtomicReference<>()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ class ClientTransformIndexer extends TransformIndexer {
TransformCheckpoint nextCheckpoint,
SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex,
TransformContext context,
boolean shouldStopAtCheckpoint
boolean shouldStopAtCheckpoint,
EventHook eventHook
) {
super(
ExceptionsHelper.requireNonNull(threadPool, "threadPool"),
Expand All @@ -125,7 +126,8 @@ class ClientTransformIndexer extends TransformIndexer {
transformProgress,
lastCheckpoint,
nextCheckpoint,
context
context,
eventHook
);
this.client = ExceptionsHelper.requireNonNull(client, "client");
this.clusterService = clusterService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
Expand Down Expand Up @@ -39,6 +40,7 @@ class ClientTransformIndexerBuilder {
private TransformCheckpoint nextCheckpoint;
private SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex;
private boolean shouldStopAtCheckpoint;
private AsyncTwoPhaseIndexer.EventHook eventHook = AsyncTwoPhaseIndexer.EventHook.NOOP;

ClientTransformIndexerBuilder() {
this.initialStats = new TransformIndexerStats();
Expand Down Expand Up @@ -68,7 +70,8 @@ ClientTransformIndexer build(ThreadPool threadPool, TransformContext context) {
TransformCheckpoint.isNullOrEmpty(nextCheckpoint) ? TransformCheckpoint.EMPTY : nextCheckpoint,
seqNoPrimaryTermAndIndex,
context,
shouldStopAtCheckpoint
shouldStopAtCheckpoint,
eventHook
);
}

Expand Down Expand Up @@ -146,4 +149,9 @@ ClientTransformIndexerBuilder setSeqNoPrimaryTermAndIndex(SeqNoPrimaryTermAndInd
return this;
}

ClientTransformIndexerBuilder setEventHook(AsyncTwoPhaseIndexer.EventHook eventHook) {
this.eventHook = eventHook != null ? eventHook : this.eventHook;
return this;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ public interface Listener {
private volatile boolean shouldRecreateDestinationIndex = false;
private volatile AuthorizationState authState;
private volatile int pageSize = 0;
private AtomicInteger skippedCheckpoints = new AtomicInteger(0);
private AtomicInteger totalTimeWasGreaterThanFrequency = new AtomicInteger(0);
private volatile Instant lastTotalTimeRecorded;

/**
* If the destination index is blocked (e.g. during a reindex), the Transform will fail to write to it.
Expand Down Expand Up @@ -273,6 +276,36 @@ boolean doesNotHaveFailures() {
return getFailureCount() == 0 && getStatePersistenceFailureCount() == 0 && getStartUpFailureCount() == 0;
}

int skippedCheckpoints() {
return skippedCheckpoints.get();
}

void checkpointSkipped() {
skippedCheckpoints.incrementAndGet();
}

void resetSkippedCheckpoint() {
skippedCheckpoints.set(0);
}

int numberOfTimesTotalTimeWasGreaterThanFrequency() {
return totalTimeWasGreaterThanFrequency.get();
}

void totalTimeWasGreaterThanFrequency() {
totalTimeWasGreaterThanFrequency.incrementAndGet();
lastTotalTimeRecorded = Instant.now();
}

void totalTimeWasLessThanFrequency() {
totalTimeWasGreaterThanFrequency.decrementAndGet();
lastTotalTimeRecorded = Instant.now();
}

Instant lastTotalTimeRecorded() {
return lastTotalTimeRecorded;
}

void shutdown() {
taskListener.shutdown();
}
Expand Down
Loading