Skip to content

Commit f4682e4

Browse files
author
Roman Lovakov
committed
Correct setting kafka graceful-shutdown and close-timeout properties for dev and test profiles
1 parent b8314af commit f4682e4

File tree

1 file changed

+18
-17
lines changed

1 file changed

+18
-17
lines changed

extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import io.quarkus.deployment.logging.LogCleanupFilterBuildItem;
4343
import io.quarkus.hibernate.orm.deployment.spi.AdditionalJpaModelBuildItem;
4444
import io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames;
45+
import io.quarkus.smallrye.reactivemessaging.deployment.items.ChannelDirection;
4546
import io.quarkus.smallrye.reactivemessaging.deployment.items.ConnectorManagedChannelBuildItem;
4647
import io.quarkus.smallrye.reactivemessaging.kafka.DatabindProcessingStateCodec;
4748
import io.quarkus.smallrye.reactivemessaging.kafka.HibernateOrmStateStore;
@@ -201,27 +202,27 @@ public void defaultChannelConfiguration(
201202

202203
if (launchMode.getLaunchMode().isDevOrTest()) {
203204
if (!buildTimeConfig.enableGracefulShutdownInDevAndTestMode()) {
204-
List<AnnotationInstance> incomings = discoveryState.findRepeatableAnnotationsOnMethods(DotNames.INCOMING);
205-
List<AnnotationInstance> outgoings = discoveryState.findRepeatableAnnotationsOnMethods(DotNames.OUTGOING);
206-
List<AnnotationInstance> channels = discoveryState.findAnnotationsOnInjectionPoints(DotNames.CHANNEL);
207-
List<AnnotationInstance> annotations = new ArrayList<>();
208-
annotations.addAll(incomings);
209-
annotations.addAll(outgoings);
210-
annotations.addAll(channels);
211-
for (AnnotationInstance annotation : annotations) {
212-
String channelName = annotation.value().asString();
213-
if (!discoveryState.isKafkaConnector(channelsManagedByConnectors, true, channelName)) {
214-
continue;
215-
}
216-
String key = getChannelPropertyKey(channelName, "graceful-shutdown", true);
217-
discoveryState.ifNotYetConfigured(key, () -> {
218-
defaultConfigProducer.produce(new RunTimeConfigurationDefaultBuildItem(key, "false"));
219-
});
220-
}
205+
disableGracefulShutdown(channelsManagedByConnectors, defaultConfigProducer, discoveryState);
221206
}
222207
}
223208
}
224209

210+
void disableGracefulShutdown(List<ConnectorManagedChannelBuildItem> channelsManagedByConnectors,
211+
BuildProducer<RunTimeConfigurationDefaultBuildItem> defaultConfigProducer,
212+
DefaultSerdeDiscoveryState discoveryState) {
213+
for (ConnectorManagedChannelBuildItem managed : channelsManagedByConnectors) {
214+
String channelName = managed.getName();
215+
boolean incoming = managed.getDirection() == ChannelDirection.INCOMING;
216+
if (!discoveryState.isKafkaConnector(channelsManagedByConnectors, incoming, channelName)) {
217+
continue;
218+
}
219+
String key = getChannelPropertyKey(channelName, incoming ? "graceful-shutdown" : "close-timeout", incoming);
220+
discoveryState.ifNotYetConfigured(key, () -> {
221+
defaultConfigProducer.produce(new RunTimeConfigurationDefaultBuildItem(key, incoming ? "false" : "0"));
222+
});
223+
}
224+
}
225+
225226
// visible for testing
226227
void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery,
227228
List<ConnectorManagedChannelBuildItem> channelsManagedByConnectors,

0 commit comments

Comments
 (0)