Skip to content

Commit 6d44744

Browse files
authored
Listener docs cleanup (#128)
* Listener docs cleanup Adding docs dor concurrency Cleaning up docs on using Acknowledgment API * Addressing PR review comments
1 parent 09deb95 commit 6d44744

File tree

1 file changed

+200
-26
lines changed

1 file changed

+200
-26
lines changed

spring-pulsar-docs/src/main/asciidoc/pulsar.adoc

Lines changed: 200 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -212,13 +212,13 @@ In addition, in order to enable batch consumption at the `PulsarListener` level,
212212
Based on the actual type that the `List` holds, the framework tries to infer the schema to use.
213213
If the `List` contains a complex type, then the `schemaType` still needs to be provided on `PulsarListener`.
214214

215-
The following also should work in which we use the `Messages` holder type provided by the Pulsar Java client.
215+
The following also should work in which we use the `Message` envelope provided by the Pulsar Java client.
216216

217217
====
218218
[source, java]
219219
----
220220
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
221-
public void listen4(Messages<Foo> messages) {
221+
public void listen4(List<Message<Foo>> messages) {
222222
System.out.println("records received :" + messages.size());
223223
messages.forEach((message) -> System.out.println("record : " + message));
224224
}
@@ -244,6 +244,10 @@ Note that the properties used are direct Pulsar consumer properties.
244244
[[pulsar-message-listener-container]]
245245
==== Pulsar Message Listener Container
246246

247+
Now that we saw the basic interactions on the consumer side through `PulsarListener`, let us now dive into the inner workings of how `PulsarListener` interacts with the underlying Pulsar consumer.
248+
Keep in mind that, for end-user applications, in most scenarios, we recommend using `PulsarListener` annotation directly for consuming from a Pulsar topic when using Spring for Apache Pulsar, as that model covers a broad set of application use cases.
249+
However, it is important to understand how `PulsarListener` works internally and this section will go through those details.
250+
247251
As briefly mentioned above, the message listener container is at the heart of message consumption when using Spring for Apache Pulsar.
248252
`PulsarListener` uses the message listener container infrastructure behind the scenes to create and manage the Pulsar consumer.
249253
Spring for Apache Pulsar provides the contract for this message listener container through `PulsarMessageListenerContainer`.
@@ -336,6 +340,47 @@ public ConcurrentPulsarMessageListenerContainer(PulsarConsumerFactory<? super T>
336340
Concurrency of more than `1` is only allowed on non-exclusive subscriptions (`failover`, `shared` and `key-shared`).
337341
You can only have the default `1` for concurrency when you have an exclusive subscription mode.
338342

343+
Here is an example of enabling `concurrency` through the `PulsarListener` annotation for a `failover` subscription.
344+
345+
====
346+
[source, java]
347+
----
348+
@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
349+
subscriptionType = SubscriptionType.Failover, concurrency = "3")
350+
void listen(String message, Consumer<String> consumer) {
351+
...
352+
System.out.println("Current Thread: " + Thread.currentThread().getName());
353+
System.out.println("Current Consumer: " + consumer.getConsumerName());
354+
}
355+
----
356+
====
357+
358+
In the above listener, it is assumed that the topic `my-topic` has 3 partitions.
359+
If it is a non-partitioned topic, then having concurrency set to `3`, will not do anything, you will simply get two idle consumers in addition to the main active one.
360+
If the topic has more than 3 partitions, then messages will be load-balanced across the consumers that the container creates.
361+
If you run this `PulsarListener`, you will see that messages from different partitions will be consumed through different consumers as implied by the thread name and consumer names printouts in the example code above.
362+
363+
**Note: When using the `Failover` subscription this way on partitioned topics, Pulsar guarantees message ordering.**
364+
365+
Here is another example of `PulsarListener`, but with `Shared` subscription and `concurrency` enabled.
366+
367+
====
368+
[source, java]
369+
----
370+
@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
371+
subscriptionType = SubscriptionType.Shared, concurrency = "5")
372+
void listen(String message) {
373+
...
374+
}
375+
----
376+
====
377+
378+
In the example above, the `PulsarListener` creates 5 different consumers (once again, we are assuming that the topic has 5 partitions).
379+
380+
**Keep in mind that, in this version, there is no message ordering as `Shared` subscriptions do not guarantee any message ordering in Pulsar**
381+
382+
If you need message ordering and still want a shared subscription types, then you need to use the `Key_Shared` subscription type.
383+
339384
==== Consuming the Records
340385

341386
In this section, we are going to see how the message listener container enables both single record and batch based message consumption.
@@ -396,31 +441,159 @@ MANUAL;
396441
```
397442

398443
`BATCH` acknowledgment mode is the default, but you can change it on the message listener container.
444+
In the following sections, we will see how acknowledgment works when using both single and batch versions of `PulsarListener` and how they translate to the backing message listener container (and of course ultimately to the Pulsar consumer).
445+
446+
==== Automatic Message Ack in Single Record Mode
399447

400-
==== Message Ack in Single Record Mode
448+
Let us revisit our basic single message based `PulsarListener`.
401449

402-
When consuming single records using `PulsarRecordMessageListener` and the default ack mode of `BATCH` is used, the framework waits for all the record received from the `batchReceive` call to process successfully and then call the `acknowledge` method on the Pulsar Consumer.
450+
====
451+
[source, java]
452+
----
453+
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
454+
public void listen(String message) {
455+
System.out.println("Message Received: " + message);
456+
}
457+
----
458+
====
459+
460+
It is natural to wonder, how acknowledgment works when using `PulsarListener`, espcially if you are familiar with Pulsar consumer directly.
461+
The answer comes down to the message listener container as that is the central place in Spring for Apache Pulsar which coordinates all the consumer related activities.
462+
463+
Assuming you are not overriding the default behavior, this is what happens behind the scenes when using the above `PulsarListener`.
464+
465+
. First, the Listener container receives messages as batch from the Pulsar consumer.
466+
. The received messages are handed down to `PulsarListener` one message at a time
467+
. When all the records are handed down to the listener method and successfully processed, the container will acknowledge all the messages from the original bach receive.
468+
469+
This is the normal flow. If any record from the original batch received, throws an exception, Spring for Apache Pulsar will track them separately.
470+
When all the records from the batch are processed, then Spring for Apache Pulsar will acknowledge all the succesful messages and negatively acknowledge (nack) all the failed messages.
471+
In other words, when consuming single records using `PulsarRecordMessageListener` and the default ack mode of `BATCH` is used, the framework waits for all the record received from the `batchReceive` call to process successfully and then call the `acknowledge` method on the Pulsar Consumer.
403472
If any particular record throws an exception when invoking the handler method, Spring for Apache Pulsar tracks those records and separately call `negativeAcknowledge` on those records after the entire batch is processed.
404473

405474
If the application wants the acknowledgment or negative acknowledgment to occur per record, then the `RECORD` ack mode can be enabled.
406-
In that case, after handling each record the message is acknowledged if no error or negatively acknowledged if there was an error.
475+
In that case, after handling each record, the message is acknowledged if no error and negatively acknowledged if there was an error.
476+
Here is an example of enabling `RECORD` ack mode on Pulsar Listener.
477+
478+
====
479+
[source, java]
480+
----
481+
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.RECORD)
482+
public void listen(String message) {
483+
System.out.println("Message Received: " + message);
484+
}
485+
----
486+
====
487+
488+
You can also set the listener property, `spring.pulsar.listner.ack-mode` to set the ack mode application wide.
489+
When doing this, you do not need to set this on the `PulsarListener` annotation.
490+
In that case, all the `PulsarListener` methods in the application acquires that property.
491+
492+
==== Manual Message Ack in Single Record Mode
493+
494+
There are situations in which you might not want the framework to do any acknowledgments, but rather do that directly from the application itself.
495+
Spring for Apache Pulsar provides a couple of ways to enable manual message acknowledgments.
496+
Let us look at a few examples.
497+
498+
====
499+
[source, java]
500+
----
501+
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
502+
public void listen(Message<String> message, Acknowledgment acknowledgment) {
503+
System.out.println("Message Received: " + message.getValue());
504+
acknowledgment.acknowledge();
505+
}
506+
----
507+
====
508+
509+
Few things merit explanation here - First we are enabling manual ack mode by setting `ackMode` on `PulsarListener`.
510+
When enabling manual ack mode, Spring for Apache Pulsar allows the application to inject an `Acknowledgment` object as you can see in the above `PulsarListener` method.
511+
The framework achieves this by selecting a compatible message listener container - `PulsarAcknowledgingMessageListener` for single record based consumption which gives you access to an `Acknowledgment` object.
512+
513+
The `Acknowledgment` object provides the following API methods.
514+
515+
====
516+
[source, java]
517+
----
518+
void acknowledge();
519+
520+
void acknowledge(MessageId messageId);
521+
522+
void acknowledge(List<MessageId> messageIds);
523+
524+
void nack();
525+
526+
void nack(MessageId messageId);
527+
----
528+
====
529+
530+
You can inject this `Acknowledgment` object to your `PulsarListener` while using `MANUAL` ack mode and then call one of the corresponding methods above.
531+
532+
In the above `PulsarListener` example, we are calling a parameter-less `acknowledge` method.
533+
This is because the framework knows which `Message` it is operating under currently.
534+
When calling `acknowledge()`, you do not need to receive the payload with the `Message` enveloper`, but rather simply using the target type - `String` in this example.
535+
You can also call a different variant of `acknowledge` by providing the message id - `acknowledge.acknowledge(message.getMessageId());`
536+
When using `acknowledge(messageId)`, you must receive the payload using the `Message<?>` envelope.
537+
538+
Similar to what is possible for acknowledging, the `Acknowledgment` API also provides options for negatively acknowledging - see the nack methods above.
539+
540+
You can also call `acknowledge` directly on the Pulsar consumer as below.
541+
542+
====
543+
[source, java]
544+
----
545+
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
546+
public void listen(Message<String> message, Consumer<String> consumer) {
547+
System.out.println("Message Received: " + message.getValue());
548+
try {
549+
consumer.acknowledge(message);
550+
}
551+
catch (Exception e) {
552+
....
553+
}
554+
}
555+
----
556+
====
557+
558+
As you can see, when calling `acknowledge` directly on the underlying consumer, then you need to do error handling by yourself.
559+
Using the `Acknowledgment` does not require that as the framework can do that for you.
560+
Therefore, it is recommended to use the `Acknowledgment` object approach when using manual acknowledgment.
561+
562+
When using manual acknowledgment, it is important to understand that the framework completely stay from any acknowledgment at all.
563+
Hence, it is extremely important for the end-users to think through the right acknowledgment strategies when designing applications.
407564

408565
==== Message Ack in Batch Consumption
409566

410567
When records are consumed in batches (See the section above), then if the default ack mode of `BATCH` is used, then when the entire batch is processed successfully, it will be acknowledged.
411568
If any records throw an exception, then the entire batch is negatively acknowledged.
569+
Note that this may not be the same batch that was batched on the producer side, rather this is the batch that returned from calling `batchReceive` on the consumer
570+
571+
Let us look at the following batch listener:
572+
573+
====
574+
[source, java]
575+
----
576+
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", batch = true)
577+
public void batchListen(List<Foo> messages) {
578+
for (Foo foo : messages) {
579+
...
580+
}
581+
}
582+
----
583+
====
584+
585+
When all the messages in the incoming collection (`messages` in this example) are processed, the framework will acknowledge all of them.
586+
412587
When consuming in batch mode, `RECORD` is not an allowed ack mode.
413-
This might cause an issue as application does not want the entire batch to be re-delivered again.
588+
This might cause an issue as application may not want the entire batch to be re-delivered again.
414589
For such situations, you need to use the `MANUAL` acknowledgement mode.
415590

416-
==== Manual Acknowledgment
591+
==== Manual Messge Acknowledgment in Batch Consumption
417592

418-
When `MANUAL` ack mode is set on the message listener container, then the framework will not do any acknowledgment - positive or negative.
593+
As seen in the previous section, when `MANUAL` ack mode is set on the message listener container, then the framework will not do any acknowledgment - positive or negative.
419594
It is entirely up to the application to take care of such concerns.
420-
When `MANUAL` ack mode is set, Spring for Apache Pulsar selects a compatible message listener container - `PulsarAcknowledgingMessageListener` when in record consumption and `PulsarBatchAcknowledgingMessageListener` for batch consumption.
421-
These interfaces provide you access to an `Acknowledgment` object.
422-
The `Acknowledgment` object provides the following API methods.
423-
595+
When `MANUAL` ack mode is set, Spring for Apache Pulsar selects a compatible message listener container - `PulsarBatchAcknowledgingMessageListener` for batch consumption which gives you access to an `Acknowledgment` object.
596+
Once again, the following are the methods availble in the `Acknowledgment` API.
424597
====
425598
[source, java]
426599
----
@@ -436,22 +609,27 @@ void nack(MessageId messageId);
436609
----
437610
====
438611

439-
You can inject this `Acknowledgment` object to your `PulsarListener` while using `MANUAL` ack mode and then call the corresponding method.
440-
Here is a basic example for a record based listener.
612+
You can inject this `Acknowledgment` object to your `PulsarListener` while using `MANUAL` ack mode.
613+
Here is a basic example for a batch based listener.
441614

442615
====
443616
[source, java]
444617
----
445618
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
446-
public void listen(String message, Acknowlegement acknowledgment) {
447-
System.out.println("Message Received: " + message);
448-
acknowledgment.acknowledge();
619+
public void listen(List<Message<String>> messgaes, Acknowlegement acknowledgment) {
620+
for (Message<String> message : messages) {
621+
try {
622+
...
623+
acknowledgment.acknowledge(message.getMessageId());
624+
}
625+
catch (Exception e) {
626+
acknowledgment.nack(message.getMessageId());
627+
}
628+
}
449629
}
450630
----
451631
====
452632

453-
You can also call `acknowledgment.nack()` to negatively acknowledge in which case the record will be re-delivered.
454-
455633
When using a batch listener, the message listener container cannot know which record it is currently operating upon.
456634
Therefore, in order to manually acknowledge, you need to use one of the overloaded `acknowledge` method that takes a `MessageId` or a `List<MessageId>`.
457635
You can also negatively acknowledge with the `MessageId` for the batch listener.
@@ -588,20 +766,16 @@ public void listen(org.apache.pulsar.client.api.Message<String> message) {
588766
System.out.println("Data Received: " + message.getValue());
589767
}
590768
----
591-
====
592769
593-
==== Accessing the Pulsar Messages Object
594-
595-
When consuming messages in batch mode using `PulsarListener`, instead of receiving them as a `List, you can receive them as Pulsar Messages type.
596-
Here is an example.
770+
====
771+
or in batch receiver:
597772

598773
====
599774
[source, java]
600775
----
601776
@PulsarListener(subscriptionName = "batch-subscription", topics = "hello-pulsar", batch = "true")
602-
public void listen(org.apache.pulsar.client.api.Messages<String> messages) {
777+
public void listen(List<org.apache.pulsar.client.api.Message<String>> messages) {
603778
// Iterate on the messages
604-
// Each iteration gives access to a org.apache.pulsar.client.api.Message object
605779
}
606780
----
607781
====

0 commit comments

Comments
 (0)