Skip to content

Commit ec08b08

Browse files
committed
More documentation editing (super streams and advanced topics sections)
1 parent 48d38ae commit ec08b08

File tree

2 files changed

+66
-63
lines changed

2 files changed

+66
-63
lines changed

src/docs/asciidoc/advanced-topics.adoc

Lines changed: 42 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,7 @@
66

77
WARNING: Filtering requires *RabbitMQ 3.13* or more.
88

9-
RabbitMQ Stream provides a server-side filtering feature that avoids reading all the messages of a stream and filtering only on the client side.
10-
This helps to save network bandwidth when a consuming application needs only a subset of messages, e.g. the messages from a given geographical region.
9+
RabbitMQ Stream's server-side filtering saves network bandwidth by filtering messages on the server, so clients receive only a subset of the messages in a stream.
1110

1211
The filtering feature works as follows:
1312

@@ -16,32 +15,31 @@ The filtering feature works as follows:
1615
** define one or several filter values
1716
** define some client-side filtering logic
1817

19-
Why does the consumer need to define some client-side filtering logic?
20-
Because the server-side filtering is probabilistic: messages that do not match the filter value(s) can still be sent to the consumer.
21-
The server uses a https://en.wikipedia.org/wiki/Bloom_filter[Bloom filter], _a space-efficient probabilistic data structure_, where false positives are possible.
22-
Despite this, the filtering saves some bandwidth, which is its primary goal.
18+
Why is client-side filtering logic still needed?
19+
Server-side filtering is probabilistic — it may still send messages that don't match your filter values.
20+
The server uses a https://en.wikipedia.org/wiki/Bloom_filter[Bloom filter] (a space-efficient probabilistic data structure) where false positives are possible.
21+
Despite this limitation, filtering significantly reduces network bandwidth.
2322

2423
==== Filtering on the Publishing Side
2524

26-
Filtering on the publishing side consists in defining some logic to extract the filter value from a message.
25+
Publishers must define logic to extract filter values from messages.
2726
The following snippet shows how to extract the filter value from an application property:
2827

2928
.Declaring a producer with logic to extract a filter value from each message
3029
[source,java,indent=0]
3130
--------
3231
include::{test-examples}/FilteringUsage.java[tag=producer-simple]
3332
--------
34-
<1> Get filter value from `state` application property
33+
<1> Extract filter value from `state` application property
3534

36-
Note the filter value can be null: the message is then published in a regular way.
37-
It is called in this context an _unfiltered_ message.
35+
Filter values can be null, resulting in _unfiltered_ messages that are published normally.
3836

3937
==== Filtering on the Consuming Side
4038

4139
A consumer needs to set up one or several filter values and some filtering logic to enable filtering.
4240
The filtering logic must be consistent with the filter values.
4341
In the next snippet, the consumer wants to process only messages from the state of California.
44-
It sets a filter value to `california` and a predicate that accepts a message only if the `state` application properties is `california`:
42+
It sets a filter value to `california` and a predicate that accepts a message only if the `state` application property is `california`:
4543

4644
.Declaring a consumer with a filter value and filtering logic
4745
[source,java,indent=0]
@@ -55,7 +53,7 @@ The filter logic is a `Predicate<Message>`.
5553
It must return `true` if a message is accepted, following the same semantics as `java.util.stream.Stream#filter(Predicate)`.
5654

5755
As stated above, not all messages must have an associated filter value.
58-
Many applications may not need some filtering, so they can publish messages the regular way.
56+
Many applications may not need filtering, so they can publish messages the regular way.
5957
So a stream can contain messages with and without an associated filter value.
6058

6159
By default, messages without a filter value (a.k.a _unfiltered_ messages) are not sent to a consumer that enabled filtering.
@@ -72,27 +70,33 @@ include::{test-examples}/FilteringUsage.java[tag=consumer-match-unfiltered]
7270
<2> Request messages without a filter value as well
7371
<3> Let both types of messages pass
7472

75-
In the example above, the filtering logic has been adapted to let pass `california` messages _and_ messages without a state set as well.
73+
In the example above, the filtering logic allows both `california` messages _and_ messages without a state set as well.
7674

7775
==== Considerations on Filtering
7876

79-
As stated previously, the server can send messages that do not match the filter value(s) set by consumers.
80-
This is why application developers must be very careful with the filtering logic they define to avoid processing unwanted messages.
77+
Since the server may send non-matching messages due to the probabilistic nature of Bloom filters, the client-side filtering logic must be robust to avoid processing unwanted messages.
8178

82-
What are good candidates for filter values?
83-
Unique identifiers are _not_: if you know a given message property will be unique in a stream, do not use it as a filter value.
84-
A defined set of values shared across the messages is a good candidate: geographical locations (e.g. countries, states), document types in a stream that stores document information (e.g. payslip, invoice, order), categories of products (e.g. book, luggage, toy).
79+
**Good filter value candidates:**
8580

86-
Cardinality of filter values can be from a few to a few thousands.
87-
Extreme cardinality (a couple or dozens of thousands) can make filtering less efficient.
81+
* Shared categorical values: geographical locations (countries, states), document types (payslip, invoice, order), product categories (book, luggage, toy)
82+
* Values with reasonable cardinality (few to few thousand distinct values)
83+
84+
**Poor filter value candidates:**
85+
86+
* Unique identifiers (message IDs, timestamps, UUIDs)
87+
* Values with extreme cardinality (tens of thousands of distinct values)
8888

8989
=== OAuth 2 Support
9090

91-
The client can authenticate against an OAuth 2 server like https://github.com/cloudfoundry/uaa[UAA].
92-
It uses the https://tools.ietf.org/html/rfc6749#section-4.4[OAuth 2 Client Credentials flow].
93-
The https://www.rabbitmq.com/docs/oauth2[OAuth 2 plugin] must be enabled on the server side and configured to use the same OAuth 2 server as the client.
91+
The client supports OAuth 2 authentication using the https://tools.ietf.org/html/rfc6749#section-4.4[OAuth 2 Client Credentials flow].
92+
Both the client and RabbitMQ server must be configured to use the same OAuth 2 server.
93+
94+
**Prerequisites:**
95+
96+
* https://www.rabbitmq.com/docs/oauth2[OAuth 2 plugin] enabled on RabbitMQ
97+
* OAuth 2 server (e.g. https://github.com/cloudfoundry/uaa[UAA]) configured and accessible
9498

95-
How to retrieve the OAuth 2 token is configured at the environment level:
99+
Token retrieval is configured at the environment level:
96100

97101
.Configuring OAuth 2 token retrieval
98102
[source,java,indent=0]
@@ -102,28 +106,30 @@ include::{test-examples}/EnvironmentUsage.java[tag=oauth2]
102106
<1> Access the OAuth 2 configuration
103107
<2> Set the token endpoint URI
104108
<3> Authenticate the client application
105-
<4> Set the grant type
109+
<4> Use Client Credentials grant type for service-to-service authentication
106110
<5> Set optional parameters (depends on the OAuth 2 server)
107111
<6> Set the SSL context (e.g. to verify and trust the identity of the OAuth 2 server)
108112

109-
The environment retrieves tokens and uses them to create stream connections.
110-
It also takes care of refreshing the tokens before they expire and of re-authenticating existing connections so the broker does not close them when their token expires.
113+
The environment handles token management automatically:
111114

112-
The environment uses the same token for all the connections it maintains.
115+
* Retrieves tokens for stream connections
116+
* Refreshes tokens before expiration
117+
* Re-authenticates existing connections to prevent broker disconnections
118+
* Uses the same token for all maintained connections
113119

114120
=== Using Native `epoll`
115121

116-
The stream Java client uses the https://netty.io/[Netty] network framework and its Java NIO transport implementation by default.
117-
This should be a reasonable default for most applications.
122+
The stream Java client uses https://netty.io/[Netty]'s Java NIO transport by default, which works well for most applications.
118123

119-
Netty also allows using https://netty.io/wiki/native-transports.html[JNI transports].
120-
They are less portable than Java NIO, but they can be more performant for some workloads (even though the RabbitMQ team has not seen any significant improvement in their own tests).
124+
For specialized performance requirements, Netty supports https://netty.io/wiki/native-transports.html[JNI-based transports].
125+
These are less portable but may offer better performance for specific workloads.
126+
Note: The RabbitMQ team has not observed significant improvements in their testing.
121127

122-
The https://en.wikipedia.org/wiki/Epoll[Linux `epoll` transport] is a popular choice, so we'll see how to configure with the stream Java client.
123-
Other JNI transports can be configured in the same way.
128+
This example shows how to configure the popular https://en.wikipedia.org/wiki/Epoll[Linux `epoll` transport].
129+
Other JNI transports follow the same configuration pattern.
124130

125-
The native transport dependency must be added to the dependency manager.
126-
We must pull the native binaries compiled for our OS and architecture, in our example Linux x86-64, so we are using the `linux-x86_64` classifier.
131+
Add the native transport dependency matching your OS and architecture.
132+
This example uses Linux x86-64 with the `linux-x86_64` classifier.
127133
Here is the declaration for Maven:
128134

129135
.Declaring the Linux x86-64 native `epoll` transport dependency with Maven

src/docs/asciidoc/super-streams.adoc

Lines changed: 24 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@
55

66
WARNING: Super Streams require *RabbitMQ 3.11* or more.
77

8-
A super stream is a logical stream made of several individual streams.
9-
In essence, a super stream is a partitioned stream that brings scalability compared to a single stream.
8+
A super stream is a logical stream composed of multiple individual streams.
9+
It provides scalability through partitioning, distributing data across several streams instead of using a single stream.
1010

11-
The stream Java client uses the same programming model for super streams as with individual streams, that is the `Producer`, `Consumer`, `Message`, etc API are still valid when super streams are in use.
12-
Application code should not be impacted whether it uses individual or super streams.
11+
The stream Java client maintains the same programming model for super streams as individual streams.
12+
The `Producer`, `Consumer`, `Message`, and other APIs remain unchanged when using super streams, so your application code requires minimal modifications.
1313

1414
Consuming applications can use super streams and <<api.adoc#single-active-consumer, single active consumer>> at the same time.
1515
The 2 features combined make sure only one consumer instance consumes from an individual stream at a time.
@@ -19,7 +19,7 @@ In this configuration, super streams provide scalability and single active consu
1919
.Super streams do not deprecate streams
2020
====
2121
Super streams are a https://en.wikipedia.org/wiki/Partition_(database)[partitioning] solution.
22-
They are not meant to replace individual streams, they sit on top of them to handle some use cases in a better way.
22+
They are not meant to replace individual streams; they sit on top of them to handle some use cases more effectively.
2323
If the stream data is likely to be large – hundreds of gigabytes or even terabytes, size remains relative – and even presents an obvious partition key (e.g. country), a super stream can be appropriate.
2424
It can help to cope with the data size and to take advantage of data locality for some processing use cases.
2525
Remember that partitioning always comes with complexity though, even if the implementation of super streams strives to make it as transparent as possible for the application developer.
@@ -28,9 +28,9 @@ Remember that partitioning always comes with complexity though, even if the impl
2828

2929
==== Topology
3030

31-
A super stream is made of several individual streams, so it can be considered a logical entity rather than an actual physical entity.
32-
The topology of a super stream is based on the https://www.rabbitmq.com/tutorials/amqp-concepts.html[AMQP 0.9.1 model], that is exchange, queues, and bindings between them.
33-
This does not mean AMQP resources are used to transport or store stream messages, it means that they are used to _describe_ the super stream topology, that is the streams it is made of.
31+
The topology of a super stream follows the https://www.rabbitmq.com/tutorials/amqp-concepts.html[AMQP 0.9.1 model]: exchanges, queues, and bindings.
32+
AMQP resources are not used to transport or store stream messages.
33+
Instead, they describe the super stream topology and define which streams compose the super stream.
3434

3535
Let's take the example of an `invoices` super stream made of 3 streams (i.e. partitions):
3636

@@ -79,9 +79,9 @@ Here is how to create an `invoices` super stream with 5 partitions:
7979
include::{test-examples}/SuperStreamUsage.java[tag=creation-partitions]
8080
--------
8181

82-
The super stream partitions will be `invoices-0`, `invoices-1`, ..., `invoices-5`.
83-
We use this kind of topology when routing keys of outbound messages are hashed to pick the partition to publish them to.
84-
This way, if the routing key is the customer ID of the invoice, all the invoices for a given customer end up in the same partition, and they can be processed in the publishing order.
82+
The super stream partitions will be `invoices-0`, `invoices-1`, ..., `invoices-4`.
83+
This topology works by hashing routing keys to determine the target partition for each message.
84+
For example, if the routing key is a customer ID, all invoices for the same customer will be routed to the same partition, ensuring they are processed in publishing order.
8585

8686
It is also possible to specify binding keys when creating a super stream:
8787

@@ -122,10 +122,10 @@ include::{test-examples}/SuperStreamUsage.java[tag=producer-simple]
122122
<3> Create the producer instance
123123
<4> Close the producer when it's no longer necessary
124124

125-
Note that even though the `invoices` super stream is not an actual stream, its name must be used to declare the producer.
126-
Internally the client will figure out the streams that compose the super stream.
127-
The application code must provide the logic to extract a routing key from a message as a `Function<Message, String>`.
128-
The client will hash the routing key to determine the stream to send the message to (using partition list and a modulo operation).
125+
Although the `invoices` super stream is not a physical stream, you must use its name when declaring the producer.
126+
The client automatically discovers the individual streams that compose the super stream.
127+
Your application code must provide logic to extract a routing key from each message using a `Function<Message, String>`.
128+
The client hashes this routing key to determine the target stream using the partition list and a modulo operation.
129129

130130
The client uses 32-bit https://en.wikipedia.org/wiki/MurmurHash[MurmurHash3] by default to hash the routing key.
131131
This hash function provides good uniformity, performance, and portability, making it a good default choice, but it is possible to specify a custom hash function:
@@ -214,16 +214,14 @@ include::{test-examples}/SuperStreamUsage.java[tag=producer-custom-routing-strat
214214
<1> No need to set the routing key extraction logic
215215
<2> Set the custom routing strategy
216216

217-
218-
219217
===== Deduplication
220218

221219
Deduplication for a super stream producer works the same way as with a <<api.adoc#outbound-message-deduplication, single stream producer>>.
222-
The publishing ID values are spread across the streams but this does affect the mechanism.
220+
The publishing ID values are spread across the streams, but this does not affect the mechanism.
223221

224222
==== Consuming From a Super Stream
225223

226-
A super stream consumer is a composite consumer: it will look up the super stream partitions and create a consumer for each or them.
224+
A super stream consumer is a composite consumer: it looks up the super stream partitions and creates a consumer for each of them.
227225
The programming model is the same as with regular consumers for the application developer: their main job is to provide the application code to process messages, that is a `MessageHandler` instance.
228226
The configuration is different though and this section covers its subtleties.
229227
But let's focus on the behavior of a super stream consumer first.
@@ -259,19 +257,18 @@ include::{test-examples}/SuperStreamUsage.java[tag=consumer-simple]
259257
<2> Close the consumer when it is no longer necessary
260258

261259
That's all.
262-
The super stream consumer will take of the details (partitions lookup, coordination of the single consumers, etc).
260+
The super stream consumer will take care of the details (partition lookup, coordination of individual consumers, etc.).
263261

264262
===== Offset Tracking
265263

266-
The semantic of offset tracking for a super stream consumer are roughly the same as for an individual stream consumer.
264+
The semantics of offset tracking for a super stream consumer are roughly the same as for an individual stream consumer.
267265
There are still some subtle differences, so a good understanding of <<api.adoc#consumer-offset-tracking, offset tracking>> in general and of the <<api.adoc#consumer-automatic-offset-tracking,automatic>> and <<api.adoc#consumer-manual-offset-tracking,manual>> offset tracking strategies is recommended.
268266

269267
Here are the main differences for the automatic/manual offset tracking strategies between single and super stream consuming:
270268

271269
* *automatic offset tracking*: internally, _the client divides the `messageCountBeforeStorage` setting by the number of partitions for each individual consumer_.
272-
Imagine a 3-partition super stream, `messageCountBeforeStorage` set to 10,000, and 10,000 messages coming in, perfectly balanced across the partitions (that is about 3,333 messages for each partition).
273-
In this case, the automatic offset tracking strategy will not kick in, because the expected count message has not been reached on any partition.
274-
Making the client divide `messageCountBeforeStorage` by the number of partitions can be considered "more accurate" if the message are well balanced across the partitions.
270+
Consider a 3-partition super stream with `messageCountBeforeStorage` set to 10,000. If 10,000 messages arrive evenly distributed (approximately 3,333 per partition), automatic offset tracking will not trigger because no individual partition reaches the threshold.
271+
Dividing `messageCountBeforeStorage` by the partition count provides more accurate tracking when messages are evenly distributed across partitions.
275272
A good rule of thumb is to then multiply the expected per-stream `messageCountBeforeStorage` by the number of partitions, to avoid storing offsets too often. So the default being 10,000, it can be set to 30,000 for a 3-partition super stream.
276273
* *manual offset tracking*: the `MessageHandler.Context#storeOffset()` method must be used, the `Consumer#store(long)` will fail, because an offset value has a meaning only in one stream, not in other streams.
277274
A call to `MessageHandler.Context#storeOffset()` will store the current message offset in _its_ stream, but also the offset of the last dispatched message for the other streams of the super stream.
@@ -286,9 +283,9 @@ As <<super-stream-consumer-in-practice, stated previously>>, super stream consum
286283
Let's take an example with a 3-partition super stream:
287284

288285
* You have an application that creates a super stream consumer instance with single active consumer enabled.
289-
* You start 3 instances of this application. An instance in this case is a JVM process, which can be in a Docker container, a virtual machine, or a bare-metal server.
290-
* As the super stream has 3 partitions, each application instance will create a super stream consumer that maintains internally 3 consumer instances.
291-
That is 9 Java instances of consumer overall.
286+
* You start 3 instances of this application. Each instance is a JVM process running in a Docker container, virtual machine, or on bare-metal hardware.
287+
* Since the super stream has 3 partitions, each application instance creates a super stream consumer that maintains 3 internal consumer instances.
288+
This results in 9 consumer instances total.
292289
Such a super stream consumer is a _composite consumer_.
293290
* The broker and the different application instances coordinate so that only 1 consumer instance for a given partition receives messages at a time.
294291
So among these 9 consumer instances, only 3 are actually _active_, the other ones are idle or _inactive_.

0 commit comments

Comments
 (0)