2626import io .micrometer .prometheusmetrics .PrometheusConfig ;
2727import io .micrometer .prometheusmetrics .PrometheusMeterRegistry ;
2828
29- import java .nio .charset .StandardCharsets ;
30- import java .time .Duration ;
31-
32- import static com .rabbitmq .client .amqp .Management .ExchangeType .FANOUT ;
33- import static com .rabbitmq .client .amqp .Publisher .Status .ACCEPTED ;
34-
3529class Api {
3630
37- void environment () {
38- // tag::environment-creation[]
39- Environment environment = new AmqpEnvironmentBuilder ()
40- .build ();
41- // end::environment-creation[]
42- }
43-
44- void connection () {
45- Environment environment = null ;
46- // tag::connection-creation[]
47- Connection connection = environment .connectionBuilder ()
48- .build ();
49- // end::connection-creation[]
50- }
51-
5231 void connectionSettings () {
5332 // tag::connection-settings[]
5433 Environment environment = new AmqpEnvironmentBuilder ()
@@ -62,290 +41,31 @@ void connectionSettings() {
6241 // end::connection-settings[]
6342 }
6443
65-
66- void publishing () {
67- Connection connection = null ;
68- // tag::publisher-creation[]
69- Publisher publisher = connection .publisherBuilder ()
70- .exchange ("foo" ).key ("bar" )
71- .build ();
72- // end::publisher-creation[]
73-
74-
75- // tag::message-creation[]
76- Message message = publisher
77- .message ("hello" .getBytes (StandardCharsets .UTF_8 ))
78- .messageId (1L );
79- // end::message-creation[]
80-
81- // tag::message-publishing[]
82- publisher .publish (message , context -> {
83- if (context .status () == ACCEPTED ) {
84- // the broker accepted (confirmed) the message
85- } else {
86- // deal with possible failure
87- }
88- });
89- // end::message-publishing[]
90- }
91-
92- void targetAddressFormatExchangeKey () {
44+ void subscriptionListener () {
9345 Connection connection = null ;
94- // tag::target-address-exchange-key[]
95- Publisher publisher = connection .publisherBuilder ()
96- .exchange ("foo" ).key ("bar" ) // <1>
97- .build ();
98- // end::target-address-exchange-key[]
99- }
100-
101- void targetAddressFormatExchange () {
102- Connection connection = null ;
103- // tag::target-address-exchange[]
104- Publisher publisher = connection .publisherBuilder ()
105- .exchange ("foo" ) // <1>
106- .build ();
107- // end::target-address-exchange[]
108- }
109-
110- void targetAddressFormatQueue () {
111- Connection connection = null ;
112- // tag::target-address-queue[]
113- Publisher publisher = connection .publisherBuilder ()
114- .queue ("some-queue" ) // <1>
115- .build ();
116- // end::target-address-queue[]
117- }
118-
119- void targetAddressNull () {
120- Connection connection = null ;
121- // tag::target-address-null[]
122- Publisher publisher = connection .publisherBuilder ()
123- .build (); // <1>
124-
125- Message message1 = publisher .message ()
126- .toAddress ().exchange ("foo" ).key ("bar" ) // <2>
127- .message ();
128-
129- Message message2 = publisher .message ()
130- .toAddress ().exchange ("foo" ) // <3>
131- .message ();
132-
133- Message message3 = publisher .message ()
134- .toAddress ().queue ("my-queue" ) // <4>
135- .message ();
136- // end::target-address-null[]
137- }
138-
139- void consuming () {
140- Connection connection = null ;
141- // tag::consumer-consume[]
142- Consumer consumer = connection .consumerBuilder ()
143- .queue ("some-queue" )
144- .messageHandler ((context , message ) -> {
145- byte [] body = message .body (); // <1>
146- // ... <2>
147- context .accept (); // <3>
148- })
149- .build ();
150- // end::consumer-consume[]
151-
152- // tag::consumer-graceful-shutdown[]
153- consumer .pause (); // <1>
154- long unsettledMessageCount = consumer .unsettledMessageCount (); // <2>
155- consumer .close (); // <3>
156- // end::consumer-graceful-shutdown[]
157-
158- // tag::consumer-abrupt-shutdown[]
159- consumer .close (); // <1>
160- // end::consumer-abrupt-shutdown[]
161- }
162-
163- void consumingStream () {
164- Connection connection = null ;
165- // tag::consumer-consume-stream[]
166- Consumer consumer = connection .consumerBuilder ()
167- .queue ("some-stream" )
168- .stream () // <1>
169- .offset (ConsumerBuilder .StreamOffsetSpecification .FIRST ) // <2>
170- .builder () // <3>
171- .messageHandler ((context , message ) -> {
172- // message processing
173- })
174- .build ();
175- // end::consumer-consume-stream[]
176- }
177-
178- void consumingStreamFiltering () {
179- Connection connection = null ;
180- // tag::consumer-consume-stream-filtering[]
181- Consumer consumer = connection .consumerBuilder ()
46+ // tag::subscription-listener[]
47+ connection .consumerBuilder ()
18248 .queue ("some-stream" )
183- .stream () // <1>
184- .filterValues ("invoices" , "orders" ) // <2>
185- .filterMatchUnfiltered (true ) // <3>
186- .builder () // <4>
187- .messageHandler ((context , message ) -> {
188- // message processing
189- })
190- .build ();
191- // end::consumer-consume-stream-filtering[]
192- }
193-
194- void management () {
195- Connection connection = null ;
196- // tag::management[]
197- Management management = connection .management ();
198- // end::management[]
199- }
200-
201- void exchanges () {
202- Management management = null ;
203- // tag::fanout-exchange[]
204- management .exchange ()
205- .name ("my-exchange" )
206- .type (FANOUT )
207- .declare ();
208- // end::fanout-exchange[]
209-
210- // tag::delayed-message-exchange[]
211- management .exchange ()
212- .name ("my-exchange" )
213- .type ("x-delayed-message" )
214- .autoDelete (false )
215- .argument ("x-delayed-type" , "direct" )
216- .declare ();
217- // end::delayed-message-exchange[]
218-
219- // tag::delete-exchange[]
220- management .exchangeDeletion ().delete ("my-exchange" );
221- // end::delete-exchange[]
222- }
223-
224- void queues () {
225- Management management = null ;
226- // tag::queue-creation[]
227- management .queue ()
228- .name ("my-queue" )
229- .exclusive (true )
230- .autoDelete (false )
231- .declare ();
232- // end::queue-creation[]
233-
234- // tag::queue-creation-with-arguments[]
235- management
236- .queue ()
237- .name ("my-queue" )
238- .messageTtl (Duration .ofMinutes (10 )) // <1>
239- .maxLengthBytes (ByteCapacity .MB (100 )) // <1>
240- .declare ();
241- // end::queue-creation-with-arguments[]
242-
243- // tag::quorum-queue-creation[]
244- management
245- .queue ()
246- .name ("my-quorum-queue" )
247- .quorum () // <1>
248- .quorumInitialGroupSize (3 )
249- .deliveryLimit (3 )
250- .queue ()
251- .declare ();
252- // end::quorum-queue-creation[]
253-
254- // tag::queue-deletion[]
255- management .queueDeletion ().delete ("my-queue" );
256- // end::queue-deletion[]
257- }
258-
259- void bindings () {
260- Management management = null ;
261- // tag::binding[]
262- management .binding ()
263- .sourceExchange ("my-exchange" )
264- .destinationQueue ("my-queue" )
265- .key ("foo" )
266- .bind ();
267- // end::binding[]
268-
269- // tag::exchange-binding[]
270- management .binding ()
271- .sourceExchange ("my-exchange" )
272- .destinationExchange ("my-other-exchange" )
273- .key ("foo" )
274- .bind ();
275- // end::exchange-binding[]
276-
277- // tag::unbinding[]
278- management .unbind ()
279- .sourceExchange ("my-exchange" )
280- .destinationQueue ("my-queue" )
281- .key ("foo" )
282- .unbind ();
283- // end::unbinding[]
284- }
285-
286- void listeners () {
287- Environment environment = null ;
288- // tag::listener-connection[]
289- Connection connection = environment .connectionBuilder ()
290- .listeners (context -> { // <1>
291- context .previousState (); // <2>
292- context .currentState (); // <3>
293- context .failureCause (); // <4>
294- context .resource (); // <5>
295- }).build ();
296- // end::listener-connection[]
297-
298- // tag::listener-publisher[]
299- Publisher publisher = connection .publisherBuilder ()
300- .listeners (context -> { // <1>
301- // ...
49+ .subscriptionListener (ctx -> { // <1>
50+ long offset = getOffsetFromExternalStore (); // <2>
51+ ctx .streamOptions ().offset (offset + 1 ); // <3>
30252 })
303- .exchange ("foo" ).key ("bar" )
304- .build ();
305- // end::listener-publisher[]
53+ .messageHandler ((ctx , msg ) -> {
54+ // message handling code...
30655
307- // tag::listener-consumer[]
308- Consumer consumer = connection .consumerBuilder ()
309- .listeners (context -> { // <1>
310- // ...
56+ long offset = (long ) msg .annotation ("x-stream-offset" ); // <4>
57+ storeOffsetInExternalStore (offset ); // <5>
31158 })
312- .queue ("my-queue" )
31359 .build ();
314- // end::listener-consumer []
60+ // end::subscription-listener []
31561 }
31662
317- void connectionRecoveryBackOff () {
318- Environment environment = null ;
319- // tag::connection-recovery-back-off[]
320- Connection connection = environment .connectionBuilder ()
321- .recovery () // <1>
322- .backOffDelayPolicy (BackOffDelayPolicy .fixed (Duration .ofSeconds (2 ))) // <2>
323- .connectionBuilder ().build ();
324- // end::connection-recovery-back-off[]
63+ long getOffsetFromExternalStore () {
64+ return 0L ;
32565 }
32666
327- void connectionRecoveryNoTopologyRecovery () {
328- Environment environment = null ;
329- // tag::connection-recovery-no-topology-recovery[]
330- Connection connection = environment .connectionBuilder ()
331- .recovery ()
332- .topology (false ) // <1>
333- .connectionBuilder ()
334- .listeners (context -> {
335- // <2>
336- })
337- .build ();
338- // end::connection-recovery-no-topology-recovery[]
339- }
67+ void storeOffsetInExternalStore (long offset ) {
34068
341- void connectionRecoveryDeactivate () {
342- Environment environment = null ;
343- // tag::connection-recovery-deactivate[]
344- Connection connection = environment .connectionBuilder ()
345- .recovery ()
346- .activated (false ) // <1>
347- .connectionBuilder ().build ();
348- // end::connection-recovery-deactivate[]
34969 }
35070
35171 void metricsCollectorMicrometerPrometheus () {
0 commit comments