Skip to content

Commit 51aab96

Browse files
committed
Warn when checkpoint duration is longer than frequency
1 parent cee43fd commit 51aab96

File tree

3 files changed

+78
-2
lines changed

3 files changed

+78
-2
lines changed

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ public interface Listener {
4848
private volatile boolean shouldRecreateDestinationIndex = false;
4949
private volatile AuthorizationState authState;
5050
private volatile int pageSize = 0;
51+
private AtomicInteger skippedCheckpoints = new AtomicInteger(0);
52+
private AtomicInteger totalTimeWasGreaterThanFrequency = new AtomicInteger(0);
53+
private volatile Instant lastTotalTimeRecorded;
5154

5255
/**
5356
* If the destination index is blocked (e.g. during a reindex), the Transform will fail to write to it.
@@ -273,6 +276,36 @@ boolean doesNotHaveFailures() {
273276
return getFailureCount() == 0 && getStatePersistenceFailureCount() == 0 && getStartUpFailureCount() == 0;
274277
}
275278

279+
int skippedCheckpoints() {
280+
return skippedCheckpoints.get();
281+
}
282+
283+
void checkpointSkipped() {
284+
skippedCheckpoints.incrementAndGet();
285+
}
286+
287+
void resetSkippedCheckpoint() {
288+
skippedCheckpoints.set(0);
289+
}
290+
291+
int numberOfTimesTotalTimeWasGreaterThanFrequency() {
292+
return totalTimeWasGreaterThanFrequency.get();
293+
}
294+
295+
void totalTimeWasGreaterThanFrequency() {
296+
totalTimeWasGreaterThanFrequency.incrementAndGet();
297+
lastTotalTimeRecorded = Instant.now();
298+
}
299+
300+
void totalTimeWasLessThanFrequency() {
301+
totalTimeWasGreaterThanFrequency.decrementAndGet();
302+
lastTotalTimeRecorded = Instant.now();
303+
}
304+
305+
Instant lastTotalTimeRecorded() {
306+
return lastTotalTimeRecorded;
307+
}
308+
276309
void shutdown() {
277310
taskListener.shutdown();
278311
}

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformHealthChecker.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ public enum IssueType {
3939
TRANSFORM_TASK_FAILED("Transform task state is [failed]"),
4040
TRANSFORM_INDEXER_FAILED("Transform indexer failed"),
4141
TRANSFORM_INTERNAL_STATE_UPDATE_FAILED("Task encountered failures updating internal state"),
42-
TRANSFORM_STARTUP_FAILED("Transform task is automatically retrying its startup process");
42+
TRANSFORM_STARTUP_FAILED("Transform task is automatically retrying its startup process"),
43+
TRANSFORM_IS_SLOW("Transform task is taking longer than its configured frequency. Data processing will be delayed."),
44+
TRANSFORM_WAS_SLOW("Transform task was recently taking longer than its configured frequency, but it is recovering.");
4345

4446
private final String issue;
4547

@@ -163,6 +165,30 @@ public static TransformHealth checkTransform(TransformTask transformTask, @Nulla
163165
);
164166
}
165167

168+
if (transformContext.numberOfTimesTotalTimeWasGreaterThanFrequency() > 0) {
169+
if (HealthStatus.RED.equals(maxStatus) == false) {
170+
maxStatus = HealthStatus.YELLOW;
171+
}
172+
var checkpoints = transformContext.skippedCheckpoints();
173+
/*if (checkpoints > 0) {
174+
issues.add(
175+
IssueType.TRANSFORM_IS_SLOW.newIssue(
176+
"Transform has skipped " + checkpoints + " checkpoints.",
177+
transformContext.numberOfTimesTotalTimeWasGreaterThanFrequency(),
178+
transformContext.lastTotalTimeRecorded()
179+
)
180+
);
181+
} else {
182+
issues.add(
183+
IssueType.TRANSFORM_WAS_SLOW.newIssue(
184+
null,
185+
transformContext.numberOfTimesTotalTimeWasGreaterThanFrequency(),
186+
transformContext.lastTotalTimeRecorded()
187+
)
188+
);
189+
}*/
190+
}
191+
166192
return new TransformHealth(maxStatus, issues);
167193
}
168194

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -605,6 +605,14 @@ private void finalizeCheckpoint(ActionListener<Void> listener) {
605605

606606
long durationMs = System.currentTimeMillis() - lastCheckpoint.getTimestamp();
607607
getStats().incrementCheckpointExponentialAverages(durationMs < 0 ? 0 : durationMs, docsIndexed, docsProcessed);
608+
609+
if (getConfig().getFrequency() != null) {
610+
if (durationMs > getConfig().getFrequency().getMillis()) {
611+
context.totalTimeWasGreaterThanFrequency();
612+
} else {
613+
context.totalTimeWasLessThanFrequency();
614+
}
615+
}
608616
}
609617
if (shouldAuditOnFinish(checkpoint)) {
610618
auditor.info(getJobId(), "Finished indexing for transform checkpoint [" + checkpoint + "].");
@@ -659,10 +667,19 @@ public boolean maybeTriggerAsyncJob(long now) {
659667
IndexerState indexerState = getState();
660668
if (IndexerState.INDEXING.equals(indexerState) || IndexerState.STOPPING.equals(indexerState)) {
661669
logger.debug("[{}] indexer for transform has state [{}]. Ignoring trigger.", getJobId(), indexerState);
670+
if (IndexerState.INDEXING.equals(indexerState)) {
671+
context.checkpointSkipped();
672+
}
662673
return false;
663674
}
664675

665-
return super.maybeTriggerAsyncJob(now);
676+
var triggered = super.maybeTriggerAsyncJob(now);
677+
if (triggered) {
678+
context.resetSkippedCheckpoint();
679+
} else {
680+
context.checkpointSkipped();
681+
}
682+
return triggered;
666683
}
667684
}
668685

0 commit comments

Comments
 (0)