Skip to content

Commit fbb3d4e

Browse files
committed
Support filter expressions
References rabbitmq/rabbitmq-server#12415
1 parent 63a91d0 commit fbb3d4e

File tree

6 files changed

+232
-37
lines changed

6 files changed

+232
-37
lines changed

.github/workflows/test-pr.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ jobs:
2424
cache: 'maven'
2525
- name: Start broker
2626
run: ci/start-broker.sh
27+
env:
28+
RABBITMQ_IMAGE: 'pivotalrabbitmq/rabbitmq:amqp-filtex'
2729
- name: Start toxiproxy
2830
run: ci/start-toxiproxy.sh
2931
- name: Display Java version

src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,22 @@ interface FilterOptions<T> {
209209

210210
T subject(String subject);
211211

212+
T replyTo(String replyTo);
213+
214+
T contentType(String contentType);
215+
216+
T contentEncoding(String contentEncoding);
217+
218+
T absoluteExpiryTime(long absoluteExpiryTime);
219+
220+
T creationTime(long creationTime);
221+
222+
T groupId(String groupId);
223+
224+
T groupSequence(int groupSequence);
225+
226+
T replyToGroupId(String groupId);
227+
212228
T property(String key, boolean value);
213229

214230
T property(String key, byte value);

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,46 @@ public StreamFilterOptions subject(String subject) {
296296
return propertyFilter("subject", subject);
297297
}
298298

299+
@Override
300+
public StreamFilterOptions replyTo(String replyTo) {
301+
return propertyFilter("reply-to", replyTo);
302+
}
303+
304+
@Override
305+
public StreamFilterOptions contentType(String contentType) {
306+
return propertyFilter("content-type", Symbol.valueOf(contentType));
307+
}
308+
309+
@Override
310+
public StreamFilterOptions contentEncoding(String contentEncoding) {
311+
return propertyFilter("content-encoding", Symbol.valueOf(contentEncoding));
312+
}
313+
314+
@Override
315+
public StreamFilterOptions absoluteExpiryTime(long absoluteExpiryTime) {
316+
return propertyFilter("absolute-expiry-time", new Date(absoluteExpiryTime));
317+
}
318+
319+
@Override
320+
public StreamFilterOptions creationTime(long creationTime) {
321+
return propertyFilter("creation-time", new Date(creationTime));
322+
}
323+
324+
@Override
325+
public StreamFilterOptions groupId(String groupId) {
326+
return propertyFilter("group-id", groupId);
327+
}
328+
329+
@Override
330+
public StreamFilterOptions groupSequence(int groupSequence) {
331+
return propertyFilter("group-sequence", UnsignedInteger.valueOf(groupSequence));
332+
}
333+
334+
@Override
335+
public StreamFilterOptions replyToGroupId(String groupId) {
336+
return propertyFilter("reply-to-group-id", groupId);
337+
}
338+
299339
@Override
300340
public StreamFilterOptions property(String key, boolean value) {
301341
return this.applicationPropertyFilter(key, value);

src/main/java/com/rabbitmq/client/amqp/impl/AmqpMessage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ public Message groupSequence(int groupSequence) {
241241

242242
@Override
243243
public Message replyToGroupId(String groupId) {
244-
callOnDelegate(m -> replyToGroupId(groupId));
244+
callOnDelegate(m -> m.replyToGroupId(groupId));
245245
return this;
246246
}
247247

src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java

Lines changed: 55 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.rabbitmq.client.amqp.Message;
2424
import java.time.Duration;
2525
import java.util.List;
26+
import java.util.Objects;
2627
import java.util.concurrent.CountDownLatch;
2728
import java.util.concurrent.TimeUnit;
2829
import java.util.concurrent.atomic.AtomicReference;
@@ -232,23 +233,15 @@ MessageAssert hasId(long id) {
232233
}
233234

234235
MessageAssert hasId(Object id) {
235-
isNotNull();
236-
if (!actual.messageId().equals(id)) {
237-
fail("Message ID should be '%s' but is '%s'", id, actual.messageId());
238-
}
239-
return this;
236+
return hasField("id", actual.messageId(), id);
240237
}
241238

242239
MessageAssert hasCorrelationId(long correlationId) {
243240
return this.hasCorrelationId(new UnsignedLong(correlationId));
244241
}
245242

246243
MessageAssert hasCorrelationId(Object id) {
247-
isNotNull();
248-
if (!actual.correlationId().equals(id)) {
249-
fail("Correlation ID should be '%s' but is '%s'", id, actual.correlationId());
250-
}
251-
return this;
244+
return hasField("correlation-id", actual.correlationId(), id);
252245
}
253246

254247
MessageAssert hasUserId(byte[] userId) {
@@ -258,18 +251,55 @@ MessageAssert hasUserId(byte[] userId) {
258251
}
259252

260253
MessageAssert hasTo(String to) {
254+
return hasField("to", actual.to(), to);
255+
}
256+
257+
MessageAssert hasSubject(String subject) {
258+
return hasField("subject", actual.subject(), subject);
259+
}
260+
261+
MessageAssert hasReplyTo(String replyTo) {
262+
return hasField("reply-to", actual.replyTo(), replyTo);
263+
}
264+
265+
MessageAssert hasContentType(String contentType) {
266+
return hasField("content-type", actual.contentType(), contentType);
267+
}
268+
269+
MessageAssert hasContentEncoding(String contentEncoding) {
270+
return hasField("content-encoding", actual.contentEncoding(), contentEncoding);
271+
}
272+
273+
MessageAssert hasAbsoluteExpiryTime(long absoluteExpiryTime) {
261274
isNotNull();
262-
if (!actual.to().equals(to)) {
263-
fail("To field should be '%s' but is '%s'", to, actual.to());
264-
}
275+
org.assertj.core.api.Assertions.assertThat(actual.absoluteExpiryTime())
276+
.isEqualTo(absoluteExpiryTime);
265277
return this;
266278
}
267279

268-
MessageAssert hasSubject(String subject) {
280+
MessageAssert hasCreationTime(long creationTime) {
269281
isNotNull();
270-
if (!actual.subject().equals(subject)) {
271-
fail("Subject should be '%s' but is '%s'", subject, actual.subject());
272-
}
282+
org.assertj.core.api.Assertions.assertThat(actual.creationTime()).isEqualTo(creationTime);
283+
return this;
284+
}
285+
286+
MessageAssert hasGroupId(String groupId) {
287+
return hasField("group-id", actual.groupId(), groupId);
288+
}
289+
290+
MessageAssert hasGroupSequence(long groupSequence) {
291+
isNotNull();
292+
org.assertj.core.api.Assertions.assertThat(actual.groupSequence()).isEqualTo(groupSequence);
293+
return this;
294+
}
295+
296+
MessageAssert hasReplyToGroupId(String groupId) {
297+
return hasField("reply-to-group-id", actual.replyToGroupId(), groupId);
298+
}
299+
300+
MessageAssert hasBody(byte[] body) {
301+
isNotNull();
302+
org.assertj.core.api.Assertions.assertThat(actual.body()).isEqualTo(body);
273303
return this;
274304
}
275305

@@ -324,6 +354,14 @@ MessageAssert doesNotHaveAnnotation(String key) {
324354
}
325355
return this;
326356
}
357+
358+
private MessageAssert hasField(String fieldLabel, Object value, Object expected) {
359+
isNotNull();
360+
if (!Objects.equals(value, expected)) {
361+
fail("Field '%s' should be '%s' but is '%s'", fieldLabel, expected, value);
362+
}
363+
return this;
364+
}
327365
}
328366

329367
static class ConnectionAssert extends AbstractObjectAssert<ConnectionAssert, AmqpConnection> {

0 commit comments

Comments
 (0)