1010import io .kestra .core .queues .TestQueueFactory ;
1111import io .kestra .core .repositories .ExecutionRepositoryInterface ;
1212import io .kestra .core .runners .TestRunner ;
13+ import io .kestra .core .utils .Await ;
1314import io .kestra .core .utils .ListUtils ;
1415import io .kestra .core .utils .TestsUtils ;
1516import io .micronaut .inject .qualifiers .Qualifiers ;
1920import org .junit .jupiter .api .extension .ExtensionContext ;
2021import org .junit .platform .commons .support .AnnotationSupport ;
2122
23+ import java .time .Duration ;
2224import java .util .Collections ;
2325import java .util .ConcurrentModificationException ;
2426import java .util .List ;
2527import java .util .Optional ;
28+ import java .util .concurrent .TimeoutException ;
2629
2730@ Slf4j
2831public class KestraTestExtension extends MicronautJunit5Extension {
@@ -100,12 +103,12 @@ public void afterTestExecution(ExtensionContext context) throws Exception {
100103
101104
102105 private void retryingExecutionKill (List <Execution > testExecutions , ExecutionRepositoryInterface executionRepository , QueueInterface <ExecutionKilled > killQueue , int retriesLeft ) throws InterruptedException {
106+ List <Execution > runningExecutions = ListUtils .distinctByKey (
107+ testExecutions .stream ().flatMap (launchedExecution -> executionRepository .findById (launchedExecution .getTenantId (), launchedExecution .getId ()).stream ()).toList (),
108+ Execution ::getId
109+ ).stream ().filter (inRepository -> !inRepository .getState ().isTerminated ()).toList ();
103110 try {
104- ListUtils .distinctByKey (
105- testExecutions .stream ().flatMap (launchedExecution -> executionRepository .findById (launchedExecution .getTenantId (), launchedExecution .getId ()).stream ()).toList (),
106- Execution ::getId
107- ).stream ().filter (inRepository -> inRepository .getState ().isRunning () || inRepository .getState ().isPaused () || inRepository .getState ().isQueued ())
108- .forEach (inRepository -> {
111+ runningExecutions .forEach (inRepository -> {
109112 log .warn ("Execution {} is still running after test execution, killing it" , inRepository .getId ());
110113 try {
111114 killQueue .emit (ExecutionKilledExecution .builder ()
@@ -128,5 +131,21 @@ private void retryingExecutionKill(List<Execution> testExecutions, ExecutionRepo
128131 Thread .sleep (100 );
129132 retryingExecutionKill (testExecutions , executionRepository , killQueue , retriesLeft - 1 );
130133 }
134+
135+ try {
136+ Await .until (() -> runningExecutions .stream ()
137+ .map (execution -> executionRepository .findById (execution .getTenantId (), execution .getId ()))
138+ .allMatch (maybeExecution -> maybeExecution .map (inRepository -> {
139+ boolean terminated = inRepository .getState ().isTerminated ();
140+ if (!terminated ) {
141+ log .warn ("Execution {} has a pending KILL request but is still in state {} " , inRepository .getId (), inRepository .getState ().getCurrent ());
142+ }
143+ return terminated ;
144+ })
145+ .orElse (true ))
146+ , Duration .ofMillis (50 ), Duration .ofSeconds (10 ));
147+ } catch (TimeoutException e ) {
148+ log .warn ("Some executions remained in KILLING " , e );
149+ }
131150 }
132151}
0 commit comments