Skip to content

Commit 63a91d0

Browse files
committed
Start support for filter expressions
WIP
1 parent 84ccada commit 63a91d0

File tree

10 files changed

+748
-248
lines changed

10 files changed

+748
-248
lines changed

pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
<junit.jupiter.version>5.11.2</junit.jupiter.version>
5151
<assertj.version>3.26.3</assertj.version>
5252
<mockito.version>5.14.1</mockito.version>
53+
<jqwik.version>1.9.1</jqwik.version>
5354
<amqp-client.version>5.20.0</amqp-client.version>
5455
<micrometer-tracing-test.version>1.3.4</micrometer-tracing-test.version>
5556
<micrometer-docs-generator.version>1.0.4</micrometer-docs-generator.version>
@@ -184,6 +185,13 @@
184185
<scope>test</scope>
185186
</dependency>
186187

188+
<dependency>
189+
<groupId>net.jqwik</groupId>
190+
<artifactId>jqwik</artifactId>
191+
<version>${jqwik.version}</version>
192+
<scope>test</scope>
193+
</dependency>
194+
187195
<dependency>
188196
<groupId>eu.rekawek.toxiproxy</groupId>
189197
<artifactId>toxiproxy-java</artifactId>

src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717
1818
package com.rabbitmq.client.amqp;
1919

20+
import java.math.BigDecimal;
2021
import java.time.Instant;
22+
import java.util.UUID;
23+
import org.apache.qpid.protonj2.types.*;
2124

2225
/** API to configure and create a {@link Consumer}. */
2326
public interface ConsumerBuilder {
@@ -158,6 +161,8 @@ interface StreamOptions {
158161
*/
159162
StreamOptions filterMatchUnfiltered(boolean matchUnfiltered);
160163

164+
StreamFilterOptions filter();
165+
161166
/**
162167
* Return the consumer builder.
163168
*
@@ -176,6 +181,80 @@ enum StreamOffsetSpecification {
176181
NEXT
177182
}
178183

184+
interface FilterOptions<T> {
185+
186+
T messageId(Object id);
187+
188+
T messageId(String id);
189+
190+
T messageId(long id);
191+
192+
T messageId(byte[] id);
193+
194+
T messageId(UUID id);
195+
196+
T correlationId(Object correlationId);
197+
198+
T correlationId(String correlationId);
199+
200+
T correlationId(long correlationId);
201+
202+
T correlationId(byte[] correlationId);
203+
204+
T correlationId(UUID correlationId);
205+
206+
T userId(byte[] userId);
207+
208+
T to(String to);
209+
210+
T subject(String subject);
211+
212+
T property(String key, boolean value);
213+
214+
T property(String key, byte value);
215+
216+
T property(String key, short value);
217+
218+
T property(String key, int value);
219+
220+
T property(String key, long value);
221+
222+
T propertyUnsigned(String key, byte value);
223+
224+
T propertyUnsigned(String key, short value);
225+
226+
T propertyUnsigned(String key, int value);
227+
228+
T propertyUnsigned(String key, long value);
229+
230+
T property(String key, float value);
231+
232+
T property(String key, double value);
233+
234+
T propertyDecimal32(String key, BigDecimal value);
235+
236+
T propertyDecimal64(String key, BigDecimal value);
237+
238+
T propertyDecimal128(String key, BigDecimal value);
239+
240+
T property(String key, char value);
241+
242+
T propertyTimestamp(String key, long value);
243+
244+
T property(String key, UUID value);
245+
246+
T property(String key, byte[] value);
247+
248+
T property(String key, String value);
249+
250+
T propertySymbol(String key, String value);
251+
}
252+
253+
interface StreamFilterOptions extends FilterOptions<StreamFilterOptions> {
254+
255+
StreamOptions stream();
256+
}
257+
179258
/**
180259
* Callback to modify a consumer subscription before the link creation.
181260
*

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.qpid.protonj2.engine.impl.ProtonLinkCreditState;
4242
import org.apache.qpid.protonj2.engine.impl.ProtonReceiver;
4343
import org.apache.qpid.protonj2.engine.impl.ProtonSessionIncomingWindow;
44+
import org.apache.qpid.protonj2.types.DescribedType;
4445
import org.slf4j.Logger;
4546
import org.slf4j.LoggerFactory;
4647

@@ -58,7 +59,7 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
5859
private final Long id;
5960
private final String address;
6061
private final String queue;
61-
private final Map<String, Object> filters;
62+
private final Map<String, DescribedType> filters;
6263
private final Map<String, Object> linkProperties;
6364
private final ConsumerBuilder.SubscriptionListener subscriptionListener;
6465
private final AmqpConnection connection;
@@ -161,7 +162,7 @@ private ClientReceiver createNativeReceiver(
161162
Session nativeSession,
162163
String address,
163164
Map<String, Object> properties,
164-
Map<String, Object> filters,
165+
Map<String, DescribedType> filters,
165166
SubscriptionListener subscriptionListener) {
166167
try {
167168
filters = new LinkedHashMap<>(filters);
@@ -175,7 +176,7 @@ private ClientReceiver createNativeReceiver(
175176
.creditWindow(0)
176177
.properties(properties);
177178
if (!filters.isEmpty()) {
178-
receiverOptions.sourceOptions().filters(filters);
179+
receiverOptions.sourceOptions().filters(Map.copyOf(filters));
179180
}
180181
return (ClientReceiver)
181182
ExceptionUtils.wrapGet(nativeSession.openReceiver(address, receiverOptions).openFuture());

0 commit comments

Comments
 (0)