|
42 | 42 | import io.quarkus.deployment.logging.LogCleanupFilterBuildItem;
|
43 | 43 | import io.quarkus.hibernate.orm.deployment.spi.AdditionalJpaModelBuildItem;
|
44 | 44 | import io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames;
|
| 45 | +import io.quarkus.smallrye.reactivemessaging.deployment.items.ChannelDirection; |
45 | 46 | import io.quarkus.smallrye.reactivemessaging.deployment.items.ConnectorManagedChannelBuildItem;
|
46 | 47 | import io.quarkus.smallrye.reactivemessaging.kafka.DatabindProcessingStateCodec;
|
47 | 48 | import io.quarkus.smallrye.reactivemessaging.kafka.HibernateOrmStateStore;
|
@@ -201,27 +202,27 @@ public void defaultChannelConfiguration(
|
201 | 202 |
|
202 | 203 | if (launchMode.getLaunchMode().isDevOrTest()) {
|
203 | 204 | if (!buildTimeConfig.enableGracefulShutdownInDevAndTestMode()) {
|
204 |
| - List<AnnotationInstance> incomings = discoveryState.findRepeatableAnnotationsOnMethods(DotNames.INCOMING); |
205 |
| - List<AnnotationInstance> outgoings = discoveryState.findRepeatableAnnotationsOnMethods(DotNames.OUTGOING); |
206 |
| - List<AnnotationInstance> channels = discoveryState.findAnnotationsOnInjectionPoints(DotNames.CHANNEL); |
207 |
| - List<AnnotationInstance> annotations = new ArrayList<>(); |
208 |
| - annotations.addAll(incomings); |
209 |
| - annotations.addAll(outgoings); |
210 |
| - annotations.addAll(channels); |
211 |
| - for (AnnotationInstance annotation : annotations) { |
212 |
| - String channelName = annotation.value().asString(); |
213 |
| - if (!discoveryState.isKafkaConnector(channelsManagedByConnectors, true, channelName)) { |
214 |
| - continue; |
215 |
| - } |
216 |
| - String key = getChannelPropertyKey(channelName, "graceful-shutdown", true); |
217 |
| - discoveryState.ifNotYetConfigured(key, () -> { |
218 |
| - defaultConfigProducer.produce(new RunTimeConfigurationDefaultBuildItem(key, "false")); |
219 |
| - }); |
220 |
| - } |
| 205 | + disableGracefulShutdown(channelsManagedByConnectors, defaultConfigProducer, discoveryState); |
221 | 206 | }
|
222 | 207 | }
|
223 | 208 | }
|
224 | 209 |
|
| 210 | + void disableGracefulShutdown(List<ConnectorManagedChannelBuildItem> channelsManagedByConnectors, |
| 211 | + BuildProducer<RunTimeConfigurationDefaultBuildItem> defaultConfigProducer, |
| 212 | + DefaultSerdeDiscoveryState discoveryState) { |
| 213 | + for (ConnectorManagedChannelBuildItem managed : channelsManagedByConnectors) { |
| 214 | + String channelName = managed.getName(); |
| 215 | + boolean incoming = managed.getDirection() == ChannelDirection.INCOMING; |
| 216 | + if (!discoveryState.isKafkaConnector(channelsManagedByConnectors, incoming, channelName)) { |
| 217 | + continue; |
| 218 | + } |
| 219 | + String key = getChannelPropertyKey(channelName, incoming ? "graceful-shutdown" : "close-timeout", incoming); |
| 220 | + discoveryState.ifNotYetConfigured(key, () -> { |
| 221 | + defaultConfigProducer.produce(new RunTimeConfigurationDefaultBuildItem(key, incoming ? "false" : "0")); |
| 222 | + }); |
| 223 | + } |
| 224 | + } |
| 225 | + |
225 | 226 | // visible for testing
|
226 | 227 | void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery,
|
227 | 228 | List<ConnectorManagedChannelBuildItem> channelsManagedByConnectors,
|
|
0 commit comments