Skip to content

Commit b28200a

Browse files
garyrussellartembilan
authored andcommitted
GH-1397: Docs for S-I-K Channels
Resolves #1397
1 parent f536406 commit b28200a

File tree

1 file changed

+118
-0
lines changed

1 file changed

+118
-0
lines changed

src/reference/asciidoc/si-kafka.adoc

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,13 @@ It provides the following components:
1616
* <<si-inbound-pollable>>
1717
* <<si-outbound-gateway>>
1818
* <<si-inbound-gateway>>
19+
* <<si-channels>>
1920

2021
[[new-in-sik]]
22+
==== What's new in Spring Integration for Apache Kafka (version 3.3 - pre-release)
23+
24+
* <<si-channels>>
25+
2126
==== What's new in Spring Integration for Apache Kafka (version 3.2)
2227

2328
* The pollable `KafkaMessageSource` now implements `Pausable` so the consumer can be `paused` and `resumed`.
@@ -614,6 +619,119 @@ See <<container-factory>> and <<si-inbound>> for examples.
614619

615620
See the XML schema for a description of each property.
616621

622+
[[si-channels]]
623+
==== Channels Backed by Kafka Topics
624+
625+
Spring Integration for Apache Kafka version 3.3 (still under development) introduces channels backed by a Kafka topic for persistence.
626+
627+
Each channel requires a `KafkaTemplate` for the sending side and either a listener container factory (for subscribable channels) or a `KafkaMessageSource` for a pollable channel.
628+
629+
===== Java DSL Configuration
630+
631+
====
632+
[source, java]
633+
----
634+
@Bean
635+
public IntegrationFlow flowWithSubscribable(KafkaTemplate<Integer, String> template,
636+
ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {
637+
638+
return IntegrationFlows.from(...)
639+
...
640+
.channel(Kafka.channel(template, containerFactory, "someTopic1").groupId("group1"))
641+
...
642+
.get();
643+
}
644+
645+
@Bean
646+
public IntegrationFlow flowWithPubSub(KafkaTemplate<Integer, String> template,
647+
ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {
648+
649+
return IntegrationFlows.from(...)
650+
...
651+
.publishSubscribeChannel(Kafka.publishSubscribeChannel(
652+
template, containerFactory, "someTopic2").groupId("group2"),
653+
pubsub -> pubsub
654+
.subscribe(subflow -> ...)
655+
.subscribe(subflow -> ...))
656+
.get();
657+
}
658+
659+
@Bean
660+
public IntegrationFlow flowWithPollable(KafkaTemplate<Integer, String> template,
661+
KafkaMessageSource<Integer, String> source) {
662+
663+
return IntegrationFlows.from(...)
664+
...
665+
.channel(Kafka.pollableChannel(template, source, "someTopic3").greoupId("group3"))
666+
.handle(..., e -> e.poller(...))
667+
...
668+
.get();
669+
}
670+
----
671+
====
672+
673+
===== Java Configuration
674+
675+
====
676+
[source, java]
677+
----
678+
/**
679+
* Channel for a single subscriber.
680+
**/
681+
@Bean
682+
SubscribableKafkaChannel pointToPoint(KafkaTemplate<String, String> template,
683+
KafkaListenerContainerFactory<String, String> factory)
684+
685+
SubscribableKafkaChannel channel =
686+
new SubscribableKafkaChannel(template, factory, "topicA");
687+
channel.setGroupId("group1");
688+
return channel;
689+
}
690+
691+
/**
692+
* Channel for multiple subscribers.
693+
**/
694+
@Bean
695+
SubscribableKafkaChannel pubsub(KafkaTemplate<String, String> template,
696+
KafkaListenerContainerFactory<String, String> factory)
697+
698+
SubscribableKafkaChannel channel =
699+
new SubscribableKafkaChannel(template, factory, "topicB", true);
700+
channel.setGroupId("group2");
701+
return channel;
702+
}
703+
704+
/**
705+
* Pollable channel (topic is configured on the source)
706+
**/
707+
@Bean
708+
PollableKafkaChannel pollable(KafkaTemplate<String, String> template,
709+
KafkaMessageSource<String, String> source)
710+
711+
PollableKafkaChannel channel =
712+
new PollableKafkaChannel(template, source);
713+
channel.setGroupId("group3");
714+
return channel;
715+
}
716+
----
717+
====
718+
719+
===== XML Configuration
720+
721+
====
722+
[source, xml]
723+
----
724+
<int-kafka:channel kafka-template="template" id="ptp" topic="ptpTopic" group-id="ptpGroup"
725+
container-factory="containerFactory" />
726+
727+
<int-kafka:pollable-channel kafka-template="template" id="pollable" message-source="source"
728+
group-id = "pollableGroup"/>
729+
730+
<int-kafka:publish-subscribe-channel kafka-template="template" id="pubSub" topic="pubSubTopic"
731+
group-id="pubSubGroup" container-factory="containerFactory" />
732+
----
733+
====
734+
617735
[[message-conversion]]
618736
==== Message Conversion
619737

0 commit comments

Comments
 (0)