Skip to content

Commit 0e0f390

Browse files
authored
Complete Consumer verticle stop promise only after closing dependencies (#4181)
Completing the stop promise before the dependencies (dispatcher, client, etc) before will cause the WebClient to be closed. Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
1 parent 21f8e98 commit 0e0f390

File tree

1 file changed

+5
-5
lines changed
  • data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer

1 file changed

+5
-5
lines changed

data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/ConsumerVerticle.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,14 @@ public void start(Promise<Void> startPromise) {
6565

6666
@Override
6767
public void stop(Promise<Void> stopPromise) {
68-
logger.info("Stopping consumer {}", consumerVerticleContext.getLoggingKeyValue());
68+
logger.info("Stopping consumer verticle {}", consumerVerticleContext.getLoggingKeyValue());
6969

7070
AsyncCloseable.compose(this.recordDispatcher, this.closeable, this.consumer::close)
7171
.close()
72-
.onComplete(
73-
r -> logger.info("Consumer verticle closed {}", consumerVerticleContext.getLoggingKeyValue()));
74-
75-
stopPromise.tryComplete();
72+
.onComplete(r -> {
73+
stopPromise.tryComplete();
74+
logger.info("Consumer verticle closed {}", consumerVerticleContext.getLoggingKeyValue());
75+
});
7676
}
7777

7878
public void setConsumer(ReactiveKafkaConsumer<Object, CloudEvent> consumer) {

0 commit comments

Comments
 (0)