Skip to content

Commit 7433e41

Browse files
authored
Add more Kotlin DSL for sub-flows (#3233)
* Add more Kotlin DSL for sub-flows * Introduce `KotlinSplitterEndpointSpec`, `KotlinEnricherSpec` & `KotlinFilterEndpointSpec` to avoid an explicit use of `IntegrationFlow` usage from Kotlin DSL * Fix `AbstractKotlinRouterSpec` to extend `ConsumerEndpointSpec` for access to more endpoint options as it is with Java DSL for similar specs * Introduce a `KotlinIntegrationFlowDefinition.publishSubscribe()` for a `BroadcastCapableChannel` and `vararg` of `KotlinIntegrationFlowDefinition` builders. This allows us to not introduce a `BroadcastPublishSubscribeSpec` wrapper for Kotlin and let us to remove `publishSubscribeChannel()` DSL methods for `PublishSubscribeSpec` since it is covered with the `publishSubscribe(PublishSubscribeChannel())` and respective set of sub-flows in Kotlin DSL as subscribers * Use new Kotlin specs in the `KotlinIntegrationFlowDefinition` instead of their Java variants * * Mention `kotlin-dsl.adoc` in the ToC
1 parent 6d5690f commit 7433e41

File tree

8 files changed

+276
-48
lines changed

8 files changed

+276
-48
lines changed

spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/AbstractKotlinRouterSpec.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ import org.springframework.messaging.MessageChannel
2929
* @since 5.3
3030
*/
3131
abstract class AbstractKotlinRouterSpec<S : AbstractRouterSpec<S, R>, R : AbstractMessageRouter>(
32-
open val delegate: AbstractRouterSpec<S, R>) {
32+
open val delegate: AbstractRouterSpec<S, R>)
33+
: ConsumerEndpointSpec<S, R>(delegate.handler) {
3334

3435
fun ignoreSendFailures(ignoreSendFailures: Boolean) {
3536
this.delegate.ignoreSendFailures(ignoreSendFailures)
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.dsl
18+
19+
import org.springframework.integration.transformer.ContentEnricher
20+
import org.springframework.integration.transformer.support.HeaderValueMessageProcessor
21+
import org.springframework.messaging.Message
22+
import org.springframework.messaging.MessageChannel
23+
24+
/**
25+
* An [EnricherSpec] wrapped for Kotlin DSL.
26+
*
27+
* @property delegate the [EnricherSpec] this instance is delegating to.
28+
*
29+
* @author Artem Bilan
30+
*
31+
* @since 5.3
32+
*/
33+
class KotlinEnricherSpec(val delegate: EnricherSpec)
34+
: ConsumerEndpointSpec<EnricherSpec, ContentEnricher>(delegate.handler) {
35+
36+
fun requestChannel(requestChannel: MessageChannel) {
37+
this.delegate.requestChannel(requestChannel)
38+
}
39+
40+
fun requestChannel(requestChannel: String) {
41+
this.delegate.requestChannel(requestChannel)
42+
}
43+
44+
fun replyChannel(replyChannel: MessageChannel) {
45+
this.delegate.replyChannel(replyChannel)
46+
}
47+
48+
fun replyChannel(replyChannel: String) {
49+
this.delegate.replyChannel(replyChannel)
50+
}
51+
52+
fun errorChannel(errorChannel: MessageChannel) {
53+
this.delegate.errorChannel(errorChannel)
54+
}
55+
56+
fun errorChannel(errorChannel: String) {
57+
this.delegate.errorChannel(errorChannel)
58+
}
59+
60+
fun requestTimeout(requestTimeout: Long) {
61+
this.delegate.requestTimeout(requestTimeout)
62+
}
63+
64+
fun replyTimeout(replyTimeout: Long) {
65+
this.delegate.replyTimeout(replyTimeout)
66+
}
67+
68+
fun requestPayloadExpression(requestPayloadExpression: String) {
69+
this.delegate.requestPayloadExpression(requestPayloadExpression)
70+
}
71+
72+
fun <P> requestPayload(function: (Message<P>) -> Any) {
73+
this.delegate.requestPayload(function)
74+
}
75+
76+
fun requestSubFlow(subFlow: KotlinIntegrationFlowDefinition.() -> Unit) {
77+
this.delegate.requestSubFlow { subFlow(KotlinIntegrationFlowDefinition(it)) }
78+
}
79+
80+
fun shouldClonePayload(shouldClonePayload: Boolean) {
81+
this.delegate.shouldClonePayload(shouldClonePayload)
82+
}
83+
84+
fun <V> property(key: String, value: V) {
85+
this.delegate.property(key, value)
86+
}
87+
88+
fun propertyExpression(key: String, expression: String) {
89+
this.delegate.propertyExpression(key, expression)
90+
}
91+
92+
fun <P> propertyFunction(key: String, function: (Message<P>) -> Any) {
93+
this.delegate.propertyFunction(key, function)
94+
}
95+
96+
fun <V> header(name: String, value: V, overwrite: Boolean?) {
97+
this.delegate.header(name, value, overwrite)
98+
}
99+
100+
fun headerExpression(name: String, expression: String, overwrite: Boolean?) {
101+
this.delegate.header(name, expression, overwrite)
102+
}
103+
104+
fun <P> headerFunction(name: String, function: (Message<P>) -> Any, overwrite: Boolean?) {
105+
this.delegate.header(name, function, overwrite)
106+
}
107+
108+
fun <V> header(headerName: String, headerValueMessageProcessor: HeaderValueMessageProcessor<V>) {
109+
this.delegate.header(headerName, headerValueMessageProcessor)
110+
}
111+
112+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.dsl
18+
19+
import org.springframework.integration.filter.MessageFilter
20+
import org.springframework.messaging.MessageChannel
21+
22+
/**
23+
* An [FilterEndpointSpec] wrapped for Kotlin DSL.
24+
*
25+
* @property delegate the [FilterEndpointSpec] this instance is delegating to.
26+
*
27+
* @author Artem Bilan
28+
*
29+
* @since 5.3
30+
*/
31+
class KotlinFilterEndpointSpec(val delegate: FilterEndpointSpec)
32+
: ConsumerEndpointSpec<FilterEndpointSpec, MessageFilter>(delegate.handler) {
33+
34+
fun throwExceptionOnRejection(throwExceptionOnRejection: Boolean) {
35+
this.delegate.throwExceptionOnRejection(throwExceptionOnRejection)
36+
}
37+
38+
fun discardChannel(discardChannel: MessageChannel) {
39+
this.delegate.discardChannel(discardChannel)
40+
}
41+
42+
fun discardChannel(discardChannelName: String) {
43+
this.delegate.discardChannel(discardChannelName)
44+
}
45+
46+
fun discardWithinAdvice(discardWithinAdvice: Boolean) {
47+
this.delegate.discardWithinAdvice(discardWithinAdvice)
48+
}
49+
50+
fun discardFlow(subFlow: KotlinIntegrationFlowDefinition.() -> Unit) {
51+
this.delegate.discardFlow { subFlow(KotlinIntegrationFlowDefinition(it)) }
52+
}
53+
54+
}

spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinIntegrationFlowDefinition.kt

Lines changed: 41 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.springframework.integration.dsl
1919
import org.reactivestreams.Publisher
2020
import org.springframework.expression.Expression
2121
import org.springframework.integration.aggregator.AggregatingMessageHandler
22+
import org.springframework.integration.channel.BroadcastCapableChannel
2223
import org.springframework.integration.channel.FluxMessageChannel
2324
import org.springframework.integration.channel.interceptor.WireTap
2425
import org.springframework.integration.core.MessageSelector
@@ -54,7 +55,6 @@ import org.springframework.messaging.MessageChannel
5455
import org.springframework.messaging.MessageHandler
5556
import org.springframework.messaging.MessageHeaders
5657
import reactor.core.publisher.Flux
57-
import java.util.concurrent.Executor
5858
import java.util.function.Consumer
5959

6060
/**
@@ -112,9 +112,9 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ
112112
*/
113113
inline fun <reified P> split(
114114
crossinline function: (P) -> Any,
115-
crossinline configurer: SplitterEndpointSpec<MethodInvokingSplitter>.() -> Unit) {
115+
crossinline configurer: KotlinSplitterEndpointSpec<MethodInvokingSplitter>.() -> Unit) {
116116

117-
this.delegate.split(P::class.java, { function(it) }) { configurer(it) }
117+
this.delegate.split(P::class.java, { function(it) }) { configurer(KotlinSplitterEndpointSpec(it)) }
118118
}
119119

120120
/**
@@ -131,9 +131,9 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ
131131
*/
132132
inline fun <reified P> filter(
133133
crossinline function: (P) -> Boolean,
134-
crossinline configurer: FilterEndpointSpec.() -> Unit) {
134+
crossinline filterConfigurer: KotlinFilterEndpointSpec.() -> Unit) {
135135

136-
this.delegate.filter(P::class.java, { function(it) }) { configurer(it) }
136+
this.delegate.filter(P::class.java, { function(it) }) { filterConfigurer(KotlinFilterEndpointSpec(it)) }
137137
}
138138

139139

@@ -207,22 +207,19 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ
207207
}
208208

209209
/**
210-
* The [org.springframework.integration.channel.PublishSubscribeChannel] `channel()`
210+
* The [org.springframework.integration.channel.BroadcastCapableChannel] `channel()`
211211
* method specific implementation to allow the use of the 'subflow' subscriber capability.
212212
*/
213-
fun publishSubscribeChannel(publishSubscribeChannelConfigurer: PublishSubscribeSpec.() -> Unit) {
214-
this.delegate.publishSubscribeChannel(publishSubscribeChannelConfigurer)
215-
}
216-
217-
/**
218-
* The [org.springframework.integration.channel.PublishSubscribeChannel] `channel()`
219-
* method specific implementation to allow the use of the 'subflow' subscriber capability.
220-
* Use the provided [Executor] for the target subscribers.
221-
*/
222-
fun publishSubscribeChannel(executor: Executor,
223-
publishSubscribeChannelConfigurer: PublishSubscribeSpec.() -> Unit) {
213+
fun publishSubscribe(broadcastCapableChannel: BroadcastCapableChannel,
214+
vararg subscribeSubFlows: KotlinIntegrationFlowDefinition.() -> Unit) {
224215

225-
this.delegate.publishSubscribeChannel(executor, Consumer(publishSubscribeChannelConfigurer))
216+
val publishSubscribeChannelConfigurer =
217+
Consumer<BroadcastPublishSubscribeSpec> { spec ->
218+
subscribeSubFlows.forEach { subFlow ->
219+
spec.subscribe { subFlow(KotlinIntegrationFlowDefinition(it)) }
220+
}
221+
}
222+
this.delegate.publishSubscribeChannel(broadcastCapableChannel, publishSubscribeChannelConfigurer)
226223
}
227224

228225
/**
@@ -331,10 +328,10 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ
331328

332329
/**
333330
* Populate a [MessageFilter] with [MessageSelector] for the provided SpEL expression.
334-
* In addition accept options for the integration endpoint using [FilterEndpointSpec]:
331+
* In addition accept options for the integration endpoint using [KotlinFilterEndpointSpec]:
335332
*/
336-
fun filter(expression: String, endpointConfigurer: FilterEndpointSpec.() -> Unit = {}) {
337-
this.delegate.filter(expression, endpointConfigurer)
333+
fun filter(expression: String, filterConfigurer: KotlinFilterEndpointSpec.() -> Unit = {}) {
334+
this.delegate.filter(expression) { filterConfigurer(KotlinFilterEndpointSpec(it)) }
338335
}
339336

340337
/**
@@ -349,18 +346,19 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ
349346
* Populate a [MessageFilter] with [MethodInvokingSelector] for the
350347
* method of the provided service.
351348
*/
352-
fun filter(service: Any, methodName: String?, endpointConfigurer: FilterEndpointSpec.() -> Unit) {
353-
this.delegate.filter(service, methodName, endpointConfigurer)
349+
fun filter(service: Any, methodName: String?, filterConfigurer: KotlinFilterEndpointSpec.() -> Unit) {
350+
this.delegate.filter(service, methodName) { filterConfigurer(KotlinFilterEndpointSpec(it)) }
354351
}
355352

356353
/**
357354
* Populate a [MessageFilter] with [MethodInvokingSelector]
358355
* for the [MessageProcessor] from
359356
* the provided [MessageProcessorSpec].
360-
* In addition accept options for the integration endpoint using [FilterEndpointSpec].
357+
* In addition accept options for the integration endpoint using [KotlinFilterEndpointSpec].
361358
*/
362-
fun filter(messageProcessorSpec: MessageProcessorSpec<*>, endpointConfigurer: FilterEndpointSpec.() -> Unit = {}) {
363-
this.delegate.filter(messageProcessorSpec, endpointConfigurer)
359+
fun filter(messageProcessorSpec: MessageProcessorSpec<*>,
360+
filterConfigurer: KotlinFilterEndpointSpec.() -> Unit = {}) {
361+
this.delegate.filter(messageProcessorSpec) { filterConfigurer(KotlinFilterEndpointSpec(it)) }
364362
}
365363

366364
/**
@@ -512,8 +510,8 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ
512510
* to the current integration flow position
513511
* with provided options.
514512
*/
515-
fun enrich(enricherConfigurer: EnricherSpec.() -> Unit) {
516-
this.delegate.enrich(enricherConfigurer)
513+
fun enrich(enricherConfigurer: KotlinEnricherSpec.() -> Unit) {
514+
this.delegate.enrich { enricherConfigurer(KotlinEnricherSpec(it)) }
517515
}
518516

519517
/**
@@ -562,9 +560,9 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ
562560
* SpEL expression.
563561
*/
564562
fun split(expression: String,
565-
endpointConfigurer: SplitterEndpointSpec<ExpressionEvaluatingSplitter>.() -> Unit = {}) {
563+
endpointConfigurer: KotlinSplitterEndpointSpec<ExpressionEvaluatingSplitter>.() -> Unit = {}) {
566564

567-
this.delegate.split(expression, endpointConfigurer)
565+
this.delegate.split(expression) { endpointConfigurer(KotlinSplitterEndpointSpec(it)) }
568566
}
569567

570568
/**
@@ -578,12 +576,12 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ
578576
/**
579577
* Populate the [MethodInvokingSplitter] to evaluate the provided
580578
* `method` of the `bean` at runtime.
581-
* In addition accept options for the integration endpoint using [GenericEndpointSpec].
579+
* In addition accept options for the integration endpoint using [KotlinSplitterEndpointSpec].
582580
*/
583581
fun split(service: Any, methodName: String?,
584-
endpointConfigurer: SplitterEndpointSpec<MethodInvokingSplitter>.() -> Unit) {
582+
splitterConfigurer: KotlinSplitterEndpointSpec<MethodInvokingSplitter>.() -> Unit) {
585583

586-
this.delegate.split(service, methodName, endpointConfigurer)
584+
this.delegate.split(service, methodName) { splitterConfigurer(KotlinSplitterEndpointSpec(it)) }
587585
}
588586

589587
/**
@@ -597,43 +595,43 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ
597595
/**
598596
* Populate the [MethodInvokingSplitter] to evaluate the provided
599597
* `method` of the `bean` at runtime.
600-
* In addition accept options for the integration endpoint using [GenericEndpointSpec].
598+
* In addition accept options for the integration endpoint using [KotlinSplitterEndpointSpec].
601599
*/
602600
fun split(beanName: String, methodName: String?,
603-
endpointConfigurer: SplitterEndpointSpec<MethodInvokingSplitter>.() -> Unit) {
601+
splitterConfigurer: KotlinSplitterEndpointSpec<MethodInvokingSplitter>.() -> Unit) {
604602

605-
this.delegate.split(beanName, methodName, endpointConfigurer)
603+
this.delegate.split(beanName, methodName) { splitterConfigurer(KotlinSplitterEndpointSpec(it)) }
606604
}
607605

608606
/**
609607
* Populate the [MethodInvokingSplitter] to evaluate the
610608
* [MessageProcessor] at runtime
611609
* from provided [MessageProcessorSpec].
612-
* In addition accept options for the integration endpoint using [GenericEndpointSpec].
610+
* In addition accept options for the integration endpoint using [KotlinSplitterEndpointSpec].
613611
*/
614612
fun split(messageProcessorSpec: MessageProcessorSpec<*>,
615-
endpointConfigurer: SplitterEndpointSpec<MethodInvokingSplitter>.() -> Unit = {}) {
613+
splitterConfigurer: KotlinSplitterEndpointSpec<MethodInvokingSplitter>.() -> Unit = {}) {
616614

617-
this.delegate.split(messageProcessorSpec, endpointConfigurer)
615+
this.delegate.split(messageProcessorSpec) { splitterConfigurer(KotlinSplitterEndpointSpec(it)) }
618616
}
619617

620618
/**
621619
* Populate the provided [AbstractMessageSplitter] to the current integration flow position.
622620
*/
623621
fun <S : AbstractMessageSplitter> split(splitterMessageHandlerSpec: MessageHandlerSpec<*, S>,
624-
endpointConfigurer: SplitterEndpointSpec<S>.() -> Unit = {}) {
622+
splitterConfigurer: KotlinSplitterEndpointSpec<S>.() -> Unit = {}) {
625623

626-
this.delegate.split(splitterMessageHandlerSpec, endpointConfigurer)
624+
this.delegate.split(splitterMessageHandlerSpec) { splitterConfigurer(KotlinSplitterEndpointSpec(it)) }
627625
}
628626

629627
/**
630628
* Populate the provided [AbstractMessageSplitter] to the current integration
631629
* flow position.
632630
*/
633631
fun <S : AbstractMessageSplitter> split(splitter: S,
634-
endpointConfigurer: SplitterEndpointSpec<S>.() -> Unit = {}) {
632+
splitterConfigurer: KotlinSplitterEndpointSpec<S>.() -> Unit = {}) {
635633

636-
this.delegate.split(splitter, endpointConfigurer)
634+
this.delegate.split(splitter) { splitterConfigurer(KotlinSplitterEndpointSpec(it)) }
637635
}
638636

639637
/**

0 commit comments

Comments
 (0)