Skip to content

Commit 390cc53

Browse files
committed
fix(queue): cleanup and implement fail-fast on consumption exception
1 parent cd7401e commit 390cc53

File tree

5 files changed

+156
-11
lines changed

5 files changed

+156
-11
lines changed

queue/src/main/java/io/kestra/queue/AbstractPollingSubscriber.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,7 @@ private QueueSubscriber<T> internalSubscribe(QueuePoller queuePoller) {
7979
// we ignore a transaction creation error as it is either Kestra shutting down or some other place that will fail
8080
log.debug("Can't poll on receive", e);
8181
} else {
82-
log.error("Unexpected error while polling messages. Stopping.", e);
83-
this.markEnd();
82+
this.markEnd(e);
8483
}
8584
}
8685
}

queue/src/main/java/io/kestra/queue/AbstractSubscriber.java

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.kestra.queue;
22

3+
import io.kestra.core.contexts.KestraContext;
34
import io.kestra.core.exceptions.DeserializationException;
45
import io.kestra.core.metrics.MetricRegistry;
56
import io.kestra.core.queues.QueueSubscriber;
@@ -59,32 +60,32 @@ protected void processMessage(byte[] message, Consumer<Either<T, Deserialization
5960
Either<T, DeserializationException> event = this.queueService.deserialize(this.cls, message);
6061
if (log.isDebugEnabled()) {
6162
if (event.isLeft()) {
62-
log.debug("[{}] receive a message with key {}", cls.getSimpleName(), event.getLeft().key());
63+
log.debug("{} received message with key '{}'", logPrefix, event.getLeft().key());
6364
} else {
64-
log.debug("[{}] receive a message with a deserialization error: {}", cls.getSimpleName(), event.getRight().getMessage());
65+
log.debug("{} received message with deserialization error: {}", logPrefix, event.getRight().getMessage());
6566
}
6667
}
6768

6869
try {
6970
consumer.accept(event);
70-
} catch (RuntimeException e) {
71+
} catch (Exception e) {
7172
if (event.isLeft()) {
7273
log.error(
73-
"[{}] message with id '{}' fail and was resubmitted to active queue",
74-
cls.getSimpleName(),
74+
"{} failed to process message with key '{}'. Message will be redelivered.",
75+
logPrefix,
7576
event.getLeft().key(),
7677
e
7778
);
7879
log.debug(new String(message));
7980
} else {
8081
log.error(
81-
"[{}] message fail and was resubmitted to active queue, it was a deserialization error message",
82-
cls.getSimpleName(),
82+
"{} failed to process message (deserialization error). Message will be redelivered.",
83+
logPrefix,
8384
e
8485
);
8586
log.debug(new String(message));
8687
}
87-
throw e; // TODO check if it would not be better to markEnd()
88+
throw e;
8889
}
8990
});
9091
}
@@ -101,7 +102,7 @@ protected void processBatchMessages(List<byte[]> messages, Consumer<List<Either<
101102
batchTimer.record(() -> {
102103
List<Either<T, DeserializationException>> events = messages.stream().map(message -> this.queueService.deserialize(this.cls, message)).toList();
103104
if (log.isDebugEnabled()) {
104-
log.debug("[{}] receive a batch of {} message", cls.getSimpleName(), events.size());
105+
log.debug("{} received batch of {} messages", logPrefix, events.size());
105106
}
106107

107108
try {
@@ -207,6 +208,25 @@ protected void markEnd() {
207208
this.stopped.countDown();
208209
}
209210

211+
/**
212+
* Marks the subscriber as stopped due to a fatal error, then initiates application shutdown.
213+
* <p>
214+
* This should be called when the subscriber encounters an unrecoverable processing error.
215+
* The message should NOT be acknowledged before calling this method, so it can be redelivered
216+
* to another instance after restart.
217+
*
218+
* @param cause the exception that caused the subscriber to stop
219+
*/
220+
protected void markEnd(Throwable cause) {
221+
log.error("{} fatal error while consuming messages. Initiating application shutdown.", logPrefix, cause);
222+
this.markEnd();
223+
try {
224+
KestraContext.getContext().shutdown();
225+
} catch (Exception e) {
226+
log.warn("{} failed to initiate shutdown.", logPrefix, e);
227+
}
228+
}
229+
210230
/** {@inheritDoc} */
211231
@Override
212232
public void close() {

queue/src/test/java/io/kestra/queue/AbstractDispatchQueueTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
package io.kestra.queue;
22

3+
import io.kestra.core.contexts.KestraContext;
34
import io.kestra.core.queues.*;
45
import io.kestra.core.queues.event.DispatchEvent;
56
import io.kestra.core.utils.IdUtils;
67
import jakarta.inject.Inject;
78
import org.apache.commons.lang3.tuple.Pair;
9+
import org.junit.jupiter.api.AfterEach;
10+
import org.junit.jupiter.api.BeforeEach;
811
import org.junit.jupiter.api.Test;
912

1013
import java.io.IOException;
@@ -27,6 +30,21 @@ public abstract class AbstractDispatchQueueTest extends AbstractQueueTest {
2730
@Inject
2831
private DispatchQueueInterface<TestDispatch> dispatchQueue;
2932

33+
private KestraContext realContext;
34+
protected NoOpShutdownContext noOpShutdownContext;
35+
36+
@BeforeEach
37+
void swapKestraContext() {
38+
realContext = KestraContext.getContext();
39+
noOpShutdownContext = new NoOpShutdownContext(realContext, new AtomicBoolean(false));
40+
KestraContext.setContext(noOpShutdownContext);
41+
}
42+
43+
@AfterEach
44+
void restoreKestraContext() {
45+
KestraContext.setContext(realContext);
46+
}
47+
3048
@Test
3149
void singleConsumer() throws QueueException, InterruptedException, IOException {
3250
CountDownLatch countDownLatch = new CountDownLatch(2);
@@ -129,6 +147,7 @@ void errorProcessing() throws QueueException, InterruptedException {
129147
assertThat(countDownLatch.getCount()).isEqualTo(0L);
130148
assertThat(list).hasSize(1);
131149
assertThat(list.getFirst()).isEqualTo(1);
150+
assertThat(noOpShutdownContext.isShutdownCalled()).as("shutdown() should have been called on processing error").isTrue();
132151

133152
// consume the remaining items from the queue
134153
CountDownLatch remaining = new CountDownLatch(3);

queue/src/test/java/io/kestra/queue/AbstractSubscriberTest.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.kestra.queue;
22

3+
import io.kestra.core.contexts.KestraContext;
34
import io.kestra.core.exceptions.DeserializationException;
45
import io.kestra.core.metrics.MetricRegistry;
56
import io.kestra.core.queues.QueueSubscriber;
@@ -434,6 +435,55 @@ void shouldNotThrowOnConcurrentPauseAndClose() throws InterruptedException {
434435
loop.join(1000);
435436
}
436437

438+
@Test
439+
void shouldCallShutdownOnMarkEndWithException() {
440+
// Given
441+
var subscriber = createSubscriber();
442+
subscriber.markReady();
443+
var noOpContext = new NoOpShutdownContext(new AtomicBoolean(false));
444+
KestraContext.setContext(noOpContext);
445+
446+
// When
447+
subscriber.markEnd(new RuntimeException("test error"));
448+
449+
// Then
450+
assertThat(subscriber.isActive()).isFalse();
451+
assertThat(noOpContext.isShutdownCalled()).isTrue();
452+
}
453+
454+
@Test
455+
void shouldSetActiveToFalseOnMarkEndWithException() {
456+
// Given
457+
var subscriber = createSubscriber();
458+
subscriber.markReady();
459+
KestraContext.setContext(new NoOpShutdownContext(new AtomicBoolean(false)));
460+
461+
// When
462+
subscriber.markEnd(new RuntimeException("test error"));
463+
464+
// Then
465+
assertThat(subscriber.isActive()).isFalse();
466+
}
467+
468+
@Test
469+
void shouldBeIdempotentOnMarkEndWithExceptionCalledTwice() {
470+
// Given
471+
var subscriber = createSubscriber();
472+
subscriber.markReady();
473+
var noOpContext = new NoOpShutdownContext(new AtomicBoolean(false));
474+
KestraContext.setContext(noOpContext);
475+
476+
// When
477+
subscriber.markEnd(new RuntimeException("first"));
478+
subscriber.markEnd(new RuntimeException("second"));
479+
480+
// Then
481+
assertThat(subscriber.isActive()).isFalse();
482+
// shutdown() is called twice because markEnd(Throwable) always calls it,
483+
// but KestraContext.Initializer.shutdown() itself is idempotent via AtomicBoolean
484+
assertThat(noOpContext.isShutdownCalled()).isTrue();
485+
}
486+
437487
/**
438488
* Minimal concrete implementation for testing AbstractSubscriber.
439489
*/
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package io.kestra.queue;
2+
3+
import io.kestra.core.contexts.KestraContext;
4+
import io.kestra.core.models.ServerType;
5+
import io.kestra.core.plugins.PluginRegistry;
6+
import io.kestra.core.storages.StorageInterface;
7+
8+
import java.util.Optional;
9+
import java.util.Set;
10+
import java.util.concurrent.atomic.AtomicBoolean;
11+
12+
/**
13+
* A {@link KestraContext} that intercepts {@link #shutdown()} to prevent actual ApplicationContext closure
14+
* during tests, while recording that shutdown was requested.
15+
* <p>
16+
* When a delegate is provided, all methods except {@code shutdown()} are forwarded to it.
17+
* When no delegate is provided (unit tests without a Micronaut context), methods return safe defaults.
18+
*/
19+
public class NoOpShutdownContext extends KestraContext {
20+
private final KestraContext delegate;
21+
private final AtomicBoolean shutdownCalled;
22+
23+
/**
24+
* Creates a context that delegates to an existing context.
25+
* Use this in integration tests where a real {@link KestraContext} is available.
26+
*/
27+
public NoOpShutdownContext(KestraContext delegate, AtomicBoolean shutdownCalled) {
28+
this.delegate = delegate;
29+
this.shutdownCalled = shutdownCalled;
30+
}
31+
32+
/**
33+
* Creates a standalone context with no delegate.
34+
* Use this in unit tests where no Micronaut context is available.
35+
*/
36+
public NoOpShutdownContext(AtomicBoolean shutdownCalled) {
37+
this(null, shutdownCalled);
38+
}
39+
40+
public boolean isShutdownCalled() {
41+
return shutdownCalled.get();
42+
}
43+
44+
@Override
45+
public void shutdown() {
46+
shutdownCalled.set(true);
47+
}
48+
49+
@Override public ServerType getServerType() { return delegate != null ? delegate.getServerType() : null; }
50+
@Override public Optional<Integer> getWorkerMaxNumThreads() { return delegate != null ? delegate.getWorkerMaxNumThreads() : Optional.empty(); }
51+
@Override public Optional<String> getWorkerGroupKey() { return delegate != null ? delegate.getWorkerGroupKey() : Optional.empty(); }
52+
@Override public void injectWorkerConfigs(Integer maxNumThreads, String workerGroupKey) { if (delegate != null) delegate.injectWorkerConfigs(maxNumThreads, workerGroupKey); }
53+
@Override public String getVersion() { return delegate != null ? delegate.getVersion() : "test"; }
54+
@Override public PluginRegistry getPluginRegistry() { return delegate != null ? delegate.getPluginRegistry() : null; }
55+
@Override public StorageInterface getStorageInterface() { return delegate != null ? delegate.getStorageInterface() : null; }
56+
@Override public Set<String> getEnvironments() { return delegate != null ? delegate.getEnvironments() : Set.of(); }
57+
}

0 commit comments

Comments
 (0)