1010import io .kestra .core .queues .TestQueueFactory ;
1111import io .kestra .core .repositories .ExecutionRepositoryInterface ;
1212import io .kestra .core .runners .TestRunner ;
13+ import io .kestra .core .utils .Await ;
1314import io .kestra .core .utils .ListUtils ;
1415import io .kestra .core .utils .TestsUtils ;
1516import io .micronaut .inject .qualifiers .Qualifiers ;
1920import org .junit .jupiter .api .extension .ExtensionContext ;
2021import org .junit .platform .commons .support .AnnotationSupport ;
2122
23+ import java .time .Duration ;
2224import java .util .Collections ;
2325import java .util .ConcurrentModificationException ;
2426import java .util .List ;
2527import java .util .Optional ;
28+ import java .util .concurrent .TimeoutException ;
2629
2730@ Slf4j
2831public class KestraTestExtension extends MicronautJunit5Extension {
@@ -81,9 +84,6 @@ public void afterTestExecution(ExtensionContext context) throws Exception {
8184
8285 TestsUtils .queueConsumersCleanup ();
8386
84- KestraTest kestraTest = context .getTestClass ()
85- .orElseThrow ()
86- .getAnnotation (KestraTest .class );
8787 Optional <TestQueueFactory > testQueueFactory = Optional .of (applicationContext .containsBean (TestQueueFactory .class )).flatMap (contains -> contains ? Optional .of (applicationContext .getBean (TestQueueFactory .class )) : Optional .empty ());
8888 List <Execution > testExecutions = testQueueFactory .map (TestQueueFactory ::getTestExecutions ).orElse (Collections .emptyList ());
8989 if (!testExecutions .isEmpty ()
@@ -100,12 +100,12 @@ public void afterTestExecution(ExtensionContext context) throws Exception {
100100
101101
102102 private void retryingExecutionKill (List <Execution > testExecutions , ExecutionRepositoryInterface executionRepository , QueueInterface <ExecutionKilled > killQueue , int retriesLeft ) throws InterruptedException {
103+ List <Execution > runningExecutions = ListUtils .distinctByKey (
104+ testExecutions .stream ().flatMap (launchedExecution -> executionRepository .findById (launchedExecution .getTenantId (), launchedExecution .getId ()).stream ()).toList (),
105+ Execution ::getId
106+ ).stream ().filter (inRepository -> !inRepository .getState ().isTerminated ()).toList ();
103107 try {
104- ListUtils .distinctByKey (
105- testExecutions .stream ().flatMap (launchedExecution -> executionRepository .findById (launchedExecution .getTenantId (), launchedExecution .getId ()).stream ()).toList (),
106- Execution ::getId
107- ).stream ().filter (inRepository -> inRepository .getState ().isRunning () || inRepository .getState ().isPaused () || inRepository .getState ().isQueued ())
108- .forEach (inRepository -> {
108+ runningExecutions .forEach (inRepository -> {
109109 log .warn ("Execution {} is still running after test execution, killing it" , inRepository .getId ());
110110 try {
111111 killQueue .emit (ExecutionKilledExecution .builder ()
@@ -128,5 +128,21 @@ private void retryingExecutionKill(List<Execution> testExecutions, ExecutionRepo
128128 Thread .sleep (100 );
129129 retryingExecutionKill (testExecutions , executionRepository , killQueue , retriesLeft - 1 );
130130 }
131+
132+ try {
133+ Await .until (() -> runningExecutions .stream ()
134+ .map (execution -> executionRepository .findById (execution .getTenantId (), execution .getId ()))
135+ .allMatch (maybeExecution -> maybeExecution .map (inRepository -> {
136+ boolean terminated = inRepository .getState ().isTerminated ();
137+ if (!terminated ) {
138+ log .warn ("Execution {} has a pending KILL request but is still in state {} " , inRepository .getId (), inRepository .getState ().getCurrent ());
139+ }
140+ return terminated ;
141+ })
142+ .orElse (true ))
143+ , Duration .ofMillis (50 ), Duration .ofSeconds (10 ));
144+ } catch (TimeoutException e ) {
145+ log .warn ("Some executions remained in KILLING " , e );
146+ }
131147 }
132148}
0 commit comments