Skip to content

Commit 2a26c41

Browse files
committed
feat(jdbc): avoid using the WorkerTaskResult queue when possible
Instead, directly process the result (add the taskrun) to avoid one rountrip inside the queue.
1 parent 1e06b9f commit 2a26c41

File tree

1 file changed

+10
-5
lines changed

1 file changed

+10
-5
lines changed

jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@
33
import com.fasterxml.jackson.databind.ObjectMapper;
44
import io.kestra.core.contexts.KestraContext;
55
import io.kestra.core.exceptions.DeserializationException;
6-
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
76
import io.kestra.core.exceptions.InternalException;
8-
import io.kestra.core.exceptions.KestraRuntimeException;
97
import io.kestra.core.metrics.MetricRegistry;
108
import io.kestra.core.models.executions.*;
119
import io.kestra.core.models.executions.statistics.ExecutionCount;
@@ -543,14 +541,15 @@ private void executionQueue(Either<Execution, DeserializationException> either)
543541

544542
// worker task
545543
if (!executor.getWorkerTasks().isEmpty()) {
544+
List<WorkerTaskResult> workerTaskResults = new ArrayList<>();
546545
executor
547546
.getWorkerTasks()
548547
.stream()
549548
.filter(workerTask -> this.deduplicateWorkerTask(execution, executorState, workerTask.getTaskRun()))
550549
.forEach(throwConsumer(workerTask -> {
551550
try {
552551
if (!TruthUtils.isTruthy(workerTask.getRunContext().render(workerTask.getTask().getRunIf()))) {
553-
workerTaskResultQueue.emit(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.SKIPPED)));
552+
workerTaskResults.add(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.SKIPPED)));
554553
} else {
555554
if (workerTask.getTask().isSendToWorkerTask()) {
556555
Optional<WorkerGroup> maybeWorkerGroup = workerGroupService.resolveGroupFromJob(workerTask);
@@ -559,14 +558,20 @@ private void executionQueue(Either<Execution, DeserializationException> either)
559558
workerJobQueue.emit(workerGroupKey, workerTask);
560559
}
561560
if (workerTask.getTask().isFlowable()) {
562-
workerTaskResultQueue.emit(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.RUNNING)));
561+
workerTaskResults.add(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.RUNNING)));
563562
}
564563
}
565564
} catch (Exception e) {
566-
workerTaskResultQueue.emit(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.FAILED)));
565+
workerTaskResults.add(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.FAILED)));
567566
workerTask.getRunContext().logger().error("Failed to evaluate the runIf condition for task {}. Cause: {}", workerTask.getTask().getId(), e.getMessage(), e);
568567
}
569568
}));
569+
570+
try {
571+
executorService.addWorkerTaskResults(executor, flow, workerTaskResults);
572+
} catch (InternalException e) {
573+
log.error("Unable to add a worker task result to the execution", e);
574+
}
570575
}
571576

572577
// subflow execution results

0 commit comments

Comments
 (0)