Skip to content

Commit 13cde8c

Browse files
committed
Sync documentation of main branch
1 parent 9edc393 commit 13cde8c

File tree

5 files changed

+232
-79
lines changed

5 files changed

+232
-79
lines changed

_generated-doc/main/config/quarkus-all-config.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10359,7 +10359,7 @@ ifndef::add-copy-button-to-env-var[]
1035910359
Environment variable: `+++QUARKUS_TEST_ARG_LINE+++`
1036010360
endif::add-copy-button-to-env-var[]
1036110361
--
10362-
|list of string
10362+
|string
1036310363
|
1036410364

1036510365
a|icon:lock[title=Fixed at build time] [[quarkus-core_quarkus-test-env-environment-variable-name]] [.property-path]##link:#quarkus-core_quarkus-test-env-environment-variable-name[`quarkus.test.env."environment-variable-name"`]##

_generated-doc/main/config/quarkus-core_quarkus.test.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,7 @@ ifndef::add-copy-button-to-env-var[]
423423
Environment variable: `+++QUARKUS_TEST_ARG_LINE+++`
424424
endif::add-copy-button-to-env-var[]
425425
--
426-
|list of string
426+
|string
427427
|
428428

429429
a|icon:lock[title=Fixed at build time] [[quarkus-core_quarkus-test-env-environment-variable-name]] [.property-path]##link:#quarkus-core_quarkus-test-env-environment-variable-name[`quarkus.test.env."environment-variable-name"`]##

_versions/main/guides/kafka.adoc

Lines changed: 103 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2956,9 +2956,7 @@ NOTE: If you use Hibernate Reactive, look at <<writing-entities-managed-by-hiber
29562956

29572957
Because we write to a database, we must run this method in a transaction.
29582958
Yet, sending the entity to Kafka happens asynchronously.
2959-
The operation returns a `CompletionStage` (or a `Uni` if you use a `MutinyEmitter`) reporting when the operation completes.
2960-
We must be sure that the transaction is still running until the object is written.
2961-
Otherwise, you may access the object outside the transaction, which is not allowed.
2959+
We can achieve this by using `.sendAndAwait()` or `.sendAndForget()` on the `MutinyEmitter`, or `.send().toCompletableFuture().join()` on the `Emitter`.
29622960

29632961
To implement this process, you need the following approach:
29642962

@@ -2973,26 +2971,29 @@ import jakarta.ws.rs.POST;
29732971
import jakarta.ws.rs.Path;
29742972
29752973
import org.eclipse.microprofile.reactive.messaging.Channel;
2976-
import org.eclipse.microprofile.reactive.messaging.Emitter;
2974+
import io.smallrye.reactive.messaging.MutinyEmitter;
29772975
29782976
@Path("/")
29792977
public class ResourceSendingToKafka {
29802978
2981-
@Channel("kafka") Emitter<Fruit> emitter;
2979+
@Channel("kafka") MutinyEmitter<Fruit> emitter;
29822980
29832981
@POST
29842982
@Path("/fruits")
2985-
@Transactional // <1>
2986-
public CompletionStage<Void> storeAndSendToKafka(Fruit fruit) { // <2>
2983+
@Transactional // <1>
2984+
public void storeAndSendToKafka(Fruit fruit) { // <2>
29872985
fruit.persist();
2988-
return emitter.send(new FruitDto(fruit)); // <3>
2986+
emitter.sendAndAwait(new FruitDto(fruit)); // <3>
29892987
}
29902988
}
29912989
----
29922990
<1> As we are writing to the database, make sure we run inside a transaction
2993-
<2> The method receives the fruit instance to persist. It returns a `CompletionStage` which is used for the transaction demarcation. The transaction is committed when the return `CompletionStage` completes. In our case, it's when the message is written to Kafka.
2991+
<2> The method receives the fruit instance to persist.
29942992
<3> Wrap the managed entity inside a Data transfer object and send it to Kafka.
29952993
This makes sure that managed entity is not impacted by the Kafka serialization.
2994+
Then await the completion of the operation before returning.
2995+
2996+
NOTE: You should not return a `CompletionStage` or `Uni` when using `@Transactional`, as all transaction commits will happen on a single thread, which impacts performance.
29962997

29972998
[[writing-entities-managed-by-hibernate-reactive-to-kafka]]
29982999
=== Writing entities managed by Hibernate Reactive to Kafka
@@ -3191,23 +3192,104 @@ public class FruitProducer {
31913192
@Consumes(MediaType.APPLICATION_JSON)
31923193
@Bulkhead(1)
31933194
public Uni<Void> post(Fruit fruit) {
3194-
Context context = Vertx.currentContext(); // <2>
3195-
return sf.withTransaction(session -> // <3>
3196-
kafkaTx.withTransaction(emitter -> // <4>
3197-
session.persist(fruit).invoke(() -> emitter.send(fruit)) // <5>
3198-
).emitOn(context::runOnContext) // <6>
3199-
);
3195+
return sf.withTransaction(session -> // <2>
3196+
kafkaTx.withTransaction(emitter -> // <3>
3197+
session.persist(fruit).invoke(() -> emitter.send(fruit)) // <4>
3198+
));
32003199
}
32013200
}
32023201
----
32033202

32043203
<1> Inject the Hibernate Reactive `SessionFactory`.
3205-
<2> Capture the caller Vert.x context.
3206-
<3> Begin a Hibernate Reactive transaction.
3207-
<4> Begin a Kafka transaction.
3208-
<5> Persist the payload and send the entity to Kafka.
3209-
<6> The Kafka transaction terminates on the Kafka producer sender thread.
3210-
We need to switch to the Vert.x context previously captured in order to terminate the Hibernate Reactive transaction on the same context we started it.
3204+
<2> Begin a Hibernate Reactive transaction.
3205+
<3> Begin a Kafka transaction.
3206+
<4> Persist the payload and send the entity to Kafka.
3207+
3208+
Alternatively, you can use the `@WithTransaction` annotation to start a transaction and commit it when the method returns:
3209+
3210+
[source, java]
3211+
----
3212+
import jakarta.inject.Inject;
3213+
import jakarta.ws.rs.Consumes;
3214+
import jakarta.ws.rs.POST;
3215+
import jakarta.ws.rs.Path;
3216+
import jakarta.ws.rs.core.MediaType;
3217+
3218+
import org.eclipse.microprofile.faulttolerance.Bulkhead;
3219+
import org.eclipse.microprofile.reactive.messaging.Channel;
3220+
3221+
import io.quarkus.hibernate.reactive.panache.common.WithTransaction;
3222+
import io.smallrye.mutiny.Uni;
3223+
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;
3224+
3225+
@Path("/")
3226+
public class FruitProducer {
3227+
3228+
@Channel("kafka") KafkaTransactions<Fruit> kafkaTx;
3229+
3230+
@POST
3231+
@Path("/fruits")
3232+
@Consumes(MediaType.APPLICATION_JSON)
3233+
@Bulkhead(1)
3234+
@WithTransaction // <1>
3235+
public Uni<Void> post(Fruit fruit) {
3236+
return kafkaTx.withTransaction(emitter -> // <2>
3237+
fruit.persist().invoke(() -> emitter.send(fruit)) // <3>
3238+
);
3239+
}
3240+
}
3241+
----
3242+
3243+
<1> Start a Hibernate Reactive transaction and commit it when the method returns.
3244+
<2> Begin a Kafka transaction.
3245+
<3> Persist the payload and send the entity to Kafka.
3246+
3247+
[[chaining-kafka-transactions-with-hibernate-orm-transactions]]
3248+
=== Chaining Kafka Transactions with Hibernate ORM transactions
3249+
3250+
While `KafkaTransactions` provide a reactive API on top of Mutiny to manage Kafka transactions,
3251+
you can still chain Kafka transactions with blocking Hibernate ORM transactions.
3252+
3253+
[source, java]
3254+
----
3255+
import jakarta.transaction.Transactional;
3256+
import jakarta.ws.rs.POST;
3257+
import jakarta.ws.rs.Path;
3258+
3259+
import org.eclipse.microprofile.reactive.messaging.Channel;
3260+
3261+
import io.quarkus.logging.Log;
3262+
import io.smallrye.mutiny.Uni;
3263+
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;
3264+
3265+
@Path("/")
3266+
public class FruitProducer {
3267+
3268+
@Channel("kafka") KafkaTransactions<Pet> emitter;
3269+
3270+
@POST
3271+
@Path("/fruits")
3272+
@Consumes(MediaType.APPLICATION_JSON)
3273+
@Bulkhead(1)
3274+
@Transactional // <1>
3275+
public void post(Fruit fruit) {
3276+
emitter.withTransaction(e -> { // <2>
3277+
// if id is attributed by the database, will need to flush to get it
3278+
// fruit.persistAndFlush();
3279+
fruit.persist(); // <3>
3280+
Log.infov("Persisted fruit {0}", p);
3281+
e.send(p); // <4>
3282+
return Uni.createFrom().voidItem();
3283+
}).await().indefinitely(); // <5>
3284+
}
3285+
}
3286+
----
3287+
3288+
<1> Start a Hibernate ORM transaction. The transaction is committed when the method returns.
3289+
<2> Begin a Kafka transaction.
3290+
<3> Persist the payload.
3291+
<4> Send the entity to Kafka inside the Kafka transaction.
3292+
<5> Wait on the returned `Uni` for the Kafka transaction to complete.
32113293

32123294
== Logging
32133295

_versions/main/guides/messaging.adoc

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,80 @@ public class MyProfileBean {
356356
}
357357
----
358358

359+
==== Pausable Channels
360+
361+
Injected `@Channel` streams are not subscribed to by default, so the flow of messages is controlled by the application code using reactive streams and Mutiny APIs.
362+
But for `@Incoming` methods, the flow of messages is controlled by the runtime.
363+
364+
Pausable channels provide a mechanism to control message flow programmatically.
365+
This is useful in scenarios where producers or consumers need to stop temporarily due to managing the lifecycle or performing maintenance operations.
366+
367+
To use pausable channels, you need to activate it with the configuration property `pausable` set to `true`.
368+
369+
[source, properties]
370+
----
371+
mp.messaging.incoming.my-channel.pausable=true
372+
# optional, by default the channel is NOT paused initially
373+
mp.messaging.outgoing.my-channel.initially-paused=true
374+
----
375+
376+
If a channel is configured to be pausable, you can get the `PausableChannel` by channel name from the `ChannelRegistry` programmatically, and pause or resume the channel as needed:
377+
378+
[source, java]
379+
----
380+
import jakarta.annotation.PostConstruct;
381+
import jakarta.enterprise.context.ApplicationScoped;
382+
import jakarta.inject.Inject;
383+
384+
import org.eclipse.microprofile.reactive.messaging.Incoming;
385+
386+
import io.smallrye.reactive.messaging.ChannelRegistry;
387+
import io.smallrye.reactive.messaging.PausableChannel;
388+
389+
@ApplicationScoped
390+
public class PausableController {
391+
392+
@Inject
393+
ChannelRegistry registry;
394+
395+
@PostConstruct
396+
public void resume() {
397+
// Wait for the application to be ready
398+
// Retrieve the pausable channel
399+
PausableChannel pausable = registry.getPausable("my-channel");
400+
// Pause the processing of the messages
401+
pausable.resume();
402+
}
403+
404+
public void pause() {
405+
// Retrieve the pausable channel
406+
PausableChannel pausable = registry.getPausable("my-channel");
407+
// Pause the processing of the messages
408+
pausable.pause();
409+
}
410+
411+
@Incoming("my-channel")
412+
void process(String message) {
413+
// Process the message
414+
}
415+
416+
}
417+
----
418+
419+
This feature is independent of connectors and can be in theory used with channels backed by any connector.
420+
Note that pausing message consumption applies back-pressure on the underlying consumer which receives messages from the remote broker.
421+
422+
[NOTE]
423+
====
424+
Kafka consumers provide a similar feature to pause and resume the consumption of messages from topic-partitions.
425+
The Quarkus Kafka connector allows xref:kafka.adoc#kafka-bare-clients[access to the underlying client] to pause/resume the consumption.
426+
427+
However, by default, with the `pause-if-no-requests=true` configuration,
428+
the connector handles automatically the back-pressure,
429+
by the pausing and resuming the Kafka consumer based on downstream requests.
430+
It is therefore recommended to use pausable channels with the default `pause-if-no-requests=true` configuration.
431+
====
432+
359433
==== Multiple Outgoings and `@Broadcast`
360434

361435
By default, messages transmitted in a channel are only dispatched to a single consumer.

0 commit comments

Comments
 (0)