Skip to content

Commit 16fd3ee

Browse files
committed
Check filter names in attach response
1 parent fbb3d4e commit 16fd3ee

File tree

7 files changed

+50
-7
lines changed

7 files changed

+50
-7
lines changed

.github/workflows/test-pr.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ jobs:
2525
- name: Start broker
2626
run: ci/start-broker.sh
2727
env:
28-
RABBITMQ_IMAGE: 'pivotalrabbitmq/rabbitmq:amqp-filtex'
28+
RABBITMQ_IMAGE: 'pivotalrabbitmq/rabbitmq:main'
2929
- name: Start toxiproxy
3030
run: ci/start-toxiproxy.sh
3131
- name: Display Java version

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ final class AmqpConnection extends ResourceBase implements Connection {
7272
private final ConnectionSettings.AffinityStrategy affinityStrategy;
7373
private final String name;
7474
private final Lock instanceLock = new ReentrantLock();
75+
private final boolean filterExpressionsSupported;
7576

7677
AmqpConnection(AmqpConnectionBuilder builder) {
7778
super(builder.listeners());
@@ -126,6 +127,8 @@ final class AmqpConnection extends ResourceBase implements Connection {
126127
ConnectionUtils.NO_RETRY_STRATEGY,
127128
this.name());
128129
this.sync(ncw);
130+
this.filterExpressionsSupported =
131+
Utils.supportFilterExpressions(brokerVersion(this.nativeConnection));
129132
LOGGER.debug("Opened connection '{}' on node '{}'.", this.name(), this.connectionNodename());
130133
this.state(OPEN);
131134
this.environment.metricsCollector().openConnection();
@@ -672,6 +675,10 @@ ConnectionUtils.AffinityContext affinity() {
672675
return this.affinity;
673676
}
674677

678+
boolean filterExpressionsSupported() {
679+
return this.filterExpressionsSupported;
680+
}
681+
675682
long id() {
676683
return this.id;
677684
}

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -175,11 +175,27 @@ private ClientReceiver createNativeReceiver(
175175
.autoSettle(false)
176176
.creditWindow(0)
177177
.properties(properties);
178+
Map<String, Object> localSourceFilters = Collections.emptyMap();
178179
if (!filters.isEmpty()) {
179-
receiverOptions.sourceOptions().filters(Map.copyOf(filters));
180+
localSourceFilters = Map.copyOf(filters);
181+
receiverOptions.sourceOptions().filters(localSourceFilters);
180182
}
181-
return (ClientReceiver)
182-
ExceptionUtils.wrapGet(nativeSession.openReceiver(address, receiverOptions).openFuture());
183+
ClientReceiver receiver =
184+
(ClientReceiver)
185+
ExceptionUtils.wrapGet(
186+
nativeSession.openReceiver(address, receiverOptions).openFuture());
187+
if (!filters.isEmpty()) {
188+
Map<String, String> remoteSourceFilters = receiver.source().filters();
189+
for (Map.Entry<String, Object> localEntry : localSourceFilters.entrySet()) {
190+
if (!remoteSourceFilters.containsKey(localEntry.getKey())) {
191+
LOGGER.warn(
192+
"Missing filter value in attach response: {} => {}",
193+
localEntry.getKey(),
194+
localEntry.getValue());
195+
}
196+
}
197+
}
198+
return receiver;
183199
} catch (ClientException e) {
184200
throw ExceptionUtils.convert(e, "Error while creating receiver from '%s'", address);
185201
}

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,10 +136,10 @@ public Consumer build() {
136136
private static class DefaultStreamOptions implements StreamOptions {
137137

138138
private final Map<String, DescribedType> filters;
139-
private final ConsumerBuilder builder;
139+
private final AmqpConsumerBuilder builder;
140140
private final StreamFilterOptions filterOptions;
141141

142-
private DefaultStreamOptions(ConsumerBuilder builder, Map<String, DescribedType> filters) {
142+
private DefaultStreamOptions(AmqpConsumerBuilder builder, Map<String, DescribedType> filters) {
143143
this.builder = builder;
144144
this.filters = filters;
145145
this.filterOptions = new DefaultStreamFilterOptions(this, filters);
@@ -190,6 +190,10 @@ public StreamOptions filterMatchUnfiltered(boolean matchUnfiltered) {
190190

191191
@Override
192192
public StreamFilterOptions filter() {
193+
if (!this.builder.connection.filterExpressionsSupported()) {
194+
throw new IllegalArgumentException(
195+
"AMQP filter expressions requires at least RabbitMQ 4.1.0");
196+
}
193197
return this.filterOptions;
194198
}
195199

src/main/java/com/rabbitmq/client/amqp/impl/Utils.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,14 @@ static boolean is4_0_OrMore(String brokerVersion) {
224224
return versionCompare(currentVersion(brokerVersion), "4.0.0") >= 0;
225225
}
226226

227+
static boolean is4_1_OrMore(String brokerVersion) {
228+
return versionCompare(currentVersion(brokerVersion), "4.1.0") >= 0;
229+
}
230+
231+
static boolean supportFilterExpressions(String brokerVersion) {
232+
return is4_1_OrMore(brokerVersion);
233+
}
234+
227235
static final class ObservationConnectionInfo implements ObservationCollector.ConnectionInfo {
228236

229237
private final String address;

src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static com.rabbitmq.client.amqp.ConsumerBuilder.StreamOffsetSpecification.*;
2121
import static com.rabbitmq.client.amqp.Management.QueueType.STREAM;
2222
import static com.rabbitmq.client.amqp.impl.Assertions.assertThat;
23+
import static com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersion.RABBITMQ_4_1_0;
2324
import static com.rabbitmq.client.amqp.impl.TestUtils.sync;
2425
import static com.rabbitmq.client.amqp.impl.TestUtils.waitUntilStable;
2526
import static java.nio.charset.StandardCharsets.*;
@@ -28,6 +29,7 @@
2829
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2930

3031
import com.rabbitmq.client.amqp.*;
32+
import com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersionAtLeast;
3133
import com.rabbitmq.client.amqp.impl.TestUtils.Sync;
3234
import java.time.Duration;
3335
import java.time.Instant;
@@ -274,6 +276,7 @@ void streamFiltering() {
274276
}
275277

276278
@Test
279+
@BrokerVersionAtLeast(RABBITMQ_4_1_0)
277280
void filterExpressionApplicationProperties() {
278281
int messageCount = 10;
279282
UUID uuid = UUID.randomUUID();
@@ -326,6 +329,7 @@ void filterExpressionApplicationProperties() {
326329
}
327330

328331
@Test
332+
@BrokerVersionAtLeast(RABBITMQ_4_1_0)
329333
void filterExpressionProperties() {
330334
int messageCount = 10;
331335
byte[] userId = "guest".getBytes(UTF_8);
@@ -407,6 +411,7 @@ void filterExpressionProperties() {
407411
}
408412

409413
@Test
414+
@BrokerVersionAtLeast(RABBITMQ_4_1_0)
410415
void filterExpressionsPropertiesAndApplicationProperties() {
411416
int messageCount = 10;
412417
String subject = stringArbitrary.sample();
@@ -438,6 +443,7 @@ void filterExpressionsPropertiesAndApplicationProperties() {
438443
}
439444

440445
@Test
446+
@BrokerVersionAtLeast(RABBITMQ_4_1_0)
441447
void filterExpressionFilterFewMessagesFromManyToTestFlowControl() {
442448
String groupId = stringArbitrary.sample();
443449
publish(1, m -> m.groupId(groupId));
@@ -449,6 +455,7 @@ void filterExpressionFilterFewMessagesFromManyToTestFlowControl() {
449455
}
450456

451457
@Test
458+
@BrokerVersionAtLeast(RABBITMQ_4_1_0)
452459
void filterExpressionStringModifier() {
453460
publish(1, m -> m.subject("abc 123"));
454461
publish(1, m -> m.subject("foo bar"));

src/test/java/com/rabbitmq/client/amqp/impl/TestConditions.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ public final class TestConditions {
3434
private TestConditions() {}
3535

3636
public enum BrokerVersion {
37-
RABBITMQ_4_0_3("4.0.3");
37+
RABBITMQ_4_0_3("4.0.3"),
38+
RABBITMQ_4_1_0("4.1.0");
3839

3940
final String value;
4041

0 commit comments

Comments
 (0)