Skip to content

Commit bc59f75

Browse files
author
Andrey Drobynin
committed
[th2-2552] backpressure: added check for queue size limit
1 parent 6f8699b commit bc59f75

File tree

11 files changed

+220
-42
lines changed

11 files changed

+220
-42
lines changed

README.md

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# th2 common library (Java) (3.33.0)
1+
# th2 common library (Java) (3.34.0)
22

33
## Usage
44

@@ -80,6 +80,8 @@ The `CommonFactory` reads a RabbitMQ configuration from the rabbitMQ.json file.
8080
* maxConnectionRecoveryTimeout - this option defines a maximum interval in milliseconds between reconnect attempts, with its default value set to 60000. Common factory increases the reconnect interval values from minConnectionRecoveryTimeout to maxConnectionRecoveryTimeout.
8181
* prefetchCount - this option is the maximum number of messages that the server will deliver, with its value set to 0 if unlimited, the default value is set to 10.
8282
* messageRecursionLimit - an integer number denotes how deep nested protobuf message might be, default = 100
83+
* secondsToCheckVirtualQueueLimit - this option defines an interval in seconds between size check attempts, default = 10
84+
* batchesToCheckVirtualQueueLimit - this option defines the number of batches between size check attempts, default = 10000
8385
8486
```json
8587
{
@@ -95,7 +97,9 @@ The `CommonFactory` reads a RabbitMQ configuration from the rabbitMQ.json file.
9597
"minConnectionRecoveryTimeout": 10000,
9698
"maxConnectionRecoveryTimeout": 60000,
9799
"prefetchCount": 10,
98-
"messageRecursionLimit": 100
100+
"messageRecursionLimit": 100,
101+
"secondsToCheckVirtualQueueLimit": 10,
102+
"batchesToCheckVirtualQueueLimit": 10000
99103
}
100104
```
101105

@@ -117,6 +121,7 @@ The `CommonFactory` reads a message's router configuration from the `mq.json` fi
117121
* filters - pin's message's filters
118122
* metadata - a metadata filters
119123
* message - a message's fields filters
124+
* virtualQueueLimit - MQ router calculates destination queues and compares their current size to this value. The router blocks the current thread to repeat the comparison if the size of any destination queues exceeds the virtual limit
120125

121126
Filters format:
122127
* fieldName - a field's name
@@ -154,7 +159,8 @@ Filters format:
154159
"operation": "WILDCARD"
155160
}
156161
]
157-
}
162+
},
163+
"virtualQueueLimit": 10000
158164
}
159165
}
160166
}
@@ -288,6 +294,12 @@ dependencies {
288294

289295
## Release notes
290296

297+
### 3.34.0
298+
299+
+ Added backpressure support: lock sending if queue virtual size limit is exceeded
300+
+ Added parameter `virtualQueueLimit` to `mq.json`
301+
+ Added parameters `secondsToCheckVirtualQueueLimit` and `batchesToCheckVirtualQueueLimit` to `mq_router.json`
302+
291303
### 3.33.0
292304

293305
+ Added ability to read dictionaries by aliases and as group of all available aliases

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ dependencies {
171171
implementation "io.grpc:grpc-netty"
172172

173173
implementation "com.rabbitmq:amqp-client"
174+
implementation 'com.rabbitmq:http-client:4.0.0'
174175

175176
implementation "org.jetbrains:annotations"
176177

gradle.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#
2-
# Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
2+
# Copyright 2020-2022 Exactpro (Exactpro Systems Limited)
33
# Licensed under the Apache License, Version 2.0 (the "License");
44
# you may not use this file except in compliance with the License.
55
# You may obtain a copy of the License at
@@ -13,7 +13,7 @@
1313
# limitations under the License.
1414
#
1515

16-
release_version=3.33.0
16+
release_version=3.34.0
1717

1818
description = 'th2 common library (Java)'
1919

src/main/java/com/exactpro/th2/common/schema/factory/AbstractCommonFactory.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -645,7 +645,12 @@ protected PrometheusConfiguration loadPrometheusConfiguration() {
645645
}
646646

647647
protected ConnectionManager createRabbitMQConnectionManager() {
648-
return new ConnectionManager(getRabbitMqConfiguration(), getConnectionManagerConfiguration(), livenessMonitor::disable);
648+
return new ConnectionManager(
649+
getRabbitMqConfiguration(),
650+
getConnectionManagerConfiguration(),
651+
getMessageRouterConfiguration(),
652+
livenessMonitor::disable
653+
);
649654
}
650655

651656
protected ConnectionManager getRabbitMqConnectionManager() {

src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitSender.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
2+
* Copyright 2020-2022 Exactpro (Exactpro Systems Limited)
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -55,6 +55,7 @@ public abstract class AbstractRabbitSender<T> implements MessageSender<T> {
5555
private final AtomicReference<String> exchangeName = new AtomicReference<>();
5656
private final AtomicReference<ConnectionManager> connectionManager = new AtomicReference<>();
5757
private final String th2Type;
58+
private long sentBeforeQueueSizeCheck;
5859

5960
public AbstractRabbitSender(
6061
@NotNull ConnectionManager connectionManager,
@@ -81,15 +82,20 @@ public void send(T value) throws IOException {
8182
requireNonNull(value, "Value for send can not be null");
8283

8384
try {
84-
ConnectionManager connection = this.connectionManager.get();
85+
ConnectionManager connectionManager = this.connectionManager.get();
8586
byte[] bytes = valueToBytes(value);
8687
MESSAGE_SIZE_PUBLISH_BYTES
8788
.labels(th2Pin, th2Type, exchangeName.get(), routingKey.get())
8889
.inc(bytes.length);
8990
MESSAGE_PUBLISH_TOTAL
9091
.labels(th2Pin, th2Type, exchangeName.get(), routingKey.get())
9192
.inc();
92-
connection.basicPublish(exchangeName.get(), routingKey.get(), null, bytes);
93+
sentBeforeQueueSizeCheck++;
94+
if (sentBeforeQueueSizeCheck > connectionManager.getConnectionManagerConfiguration().getBatchesToCheckVirtualQueueLimit()) {
95+
connectionManager.lockSendingIfSizeLimitExceeded(routingKey.get());
96+
sentBeforeQueueSizeCheck = 0;
97+
}
98+
connectionManager.basicPublish(exchangeName.get(), routingKey.get(), null, bytes);
9399

94100
if (LOGGER.isTraceEnabled()) {
95101
LOGGER.trace("Message sent to exchangeName='{}', routing key='{}': '{}'",

0 commit comments

Comments
 (0)