Skip to content

Commit 46231cc

Browse files
committed
Migrate to Camel 4.x
1 parent 83c8f2d commit 46231cc

File tree

2 files changed

+15
-11
lines changed

2 files changed

+15
-11
lines changed

build.sbt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,11 @@ libraryDependencies ++= Seq(
101101
"org.apache.commons" % "commons-lang3" % "3.18.0",
102102
"com.sksamuel.avro4s" %% "avro4s-core" % "4.1.2", // 5.x for Scala 3
103103

104-
"org.apache.camel" % "camel-core" % "3.20.2",
105-
"org.apache.camel" % "camel-reactive-streams" % "3.20.2",
106-
"io.projectreactor" % "reactor-core" % "3.7.11",
107-
"io.reactivex.rxjava3" % "rxjava" % "3.1.11",
104+
"org.apache.camel" % "camel-core" % "4.16.0",
105+
"org.apache.camel" % "camel-seda" % "4.16.0",
106+
"org.apache.camel" % "camel-reactive-streams" % "4.16.0",
107+
"io.projectreactor" % "reactor-core" % "3.8.0",
108+
"io.reactivex.rxjava3" % "rxjava" % "3.1.12",
108109

109110
"com.github.blemale" %% "scaffeine" % "5.3.0",
110111
"ch.qos.logback" % "logback-classic" % "1.5.18",

src/main/scala/interop/ReactiveStreamsInterop.scala

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,13 @@ import scala.concurrent.duration.DurationInt
2323
* - RxJava
2424
* - pekko-streams
2525
*
26+
* Remarks:
27+
* - SEDA (Staged Event-Driven Architecture) provides asynchronous in-memory messaging within a single CamelContext
28+
*
2629
* Doc:
2730
* https://doc.akka.io/docs/akka/current/stream/reactive-streams-interop.html
28-
* https://camel.apache.org/components/3.18.x/reactive-streams-component.html
31+
* https://camel.apache.org/components/4.14.x/reactive-streams-component.html
32+
* https://camel.apache.org/components/4.14.x/seda-component.html
2933
* https://projectreactor.io/docs/core/release/reference/
3034
* https://github.com/ReactiveX/RxJava
3135
*/
@@ -40,8 +44,8 @@ object ReactiveStreamsInterop extends App {
4044
val rsCamel: CamelReactiveStreamsService = CamelReactiveStreams.get(camel)
4145
camel.start()
4246

43-
// Consumer endpoint with Camel
44-
val publisher: Publisher[String] = rsCamel.from("vm:words", classOf[String])
47+
// Producer/Publisher from Camel SEDA queue using Reactive Streams
48+
val publisher: Publisher[String] = rsCamel.from("seda:words", classOf[String])
4549

4650
// Slow consumer with Reactor 3
4751
Flux.from(publisher)
@@ -64,15 +68,14 @@ object ReactiveStreamsInterop extends App {
6468
.wireTap(each => logger.info(s"Consumed with pekko-streams: $each"))
6569
.runWith(Sink.ignore)
6670

67-
// Sender endpoint with Camel
68-
val template: FluentProducerTemplate = camel.createFluentProducerTemplate
71+
val producerTemplate: FluentProducerTemplate = camel.createFluentProducerTemplate
6972

7073
Source(1 to 10)
7174
.throttle(1, 1.seconds, 1, ThrottleMode.shaping)
7275
.mapAsync(1) { i =>
73-
template
76+
producerTemplate
7477
.withBody(s"Camel$i")
75-
.to("vm:words")
78+
.to("seda:words")
7679
.send
7780
Future(i)
7881
}.runWith(Sink.ignore)

0 commit comments

Comments
 (0)