[th2-2552] backpressure: added check for queue size limit#184
[th2-2552] backpressure: added check for queue size limit#184andrew-drobynin wants to merge 12 commits intomasterfrom
Conversation
bc59f75 to
85efecf
Compare
85efecf to
79458f3
Compare
| QueueConfiguration::getQueue, | ||
| QueueConfiguration::getVirtualQueueLimit, | ||
| Math::min // TODO is it valid situation if there are several configurations for one queue? |
There was a problem hiding this comment.
Redundant, whole common or each pin (routing key) have a publish limit.
.../java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java
Outdated
Show resolved
Hide resolved
…d batchesToCheckVirtualQueueLimit, sum of queue sizes
| private Map<String, QueuesWithVirtualPublishLimit> groupQueuesByRoutingKey() { | ||
| List<BindingInfo> bindings = new ArrayList<>(); | ||
| knownExchanges.forEach(exchange -> bindings.addAll( | ||
| knownExchangesToRoutingKeys.forEach((exchange, routingKeys) -> bindings.addAll( | ||
| client.getBindingsBySource(rabbitMQConfiguration.getVHost(), exchange).stream() | ||
| .filter(it -> it.getDestinationType() == QUEUE && knownRoutingKeys.contains(it.getRoutingKey())) | ||
| .filter(it -> it.getDestinationType() == QUEUE && routingKeys.contains(it.getRoutingKey())) | ||
| .collect(Collectors.toList()) | ||
| )); | ||
| Map<String, QueueInfo> queueNameToInfo = client.getQueues().stream() | ||
| .collect(toMap(QueueInfo::getName, Function.identity())); | ||
| Map<String, List<QueueInfoWithVirtualLimit>> routingKeyToQueues = new HashMap<>(); | ||
| bindings.forEach(bindingInfo -> routingKeyToQueues | ||
| .computeIfAbsent(bindingInfo.getRoutingKey(), s -> new ArrayList<>()) | ||
| .add(new QueueInfoWithVirtualLimit( | ||
| queueNameToInfo.get(bindingInfo.getDestination()), | ||
| queueNameToVirtualQueueLimit.get(bindingInfo.getDestination()) | ||
| )) | ||
| ); | ||
| return routingKeyToQueues; | ||
| Map<String, QueuesWithVirtualPublishLimit> routingKeyToQueuesWithLimit = new HashMap<>(); | ||
| bindings.stream() | ||
| .collect(groupingBy(BindingInfo::getRoutingKey)) | ||
| .forEach((routingKey, bindingsForRoutingKey) -> | ||
| routingKeyToQueuesWithLimit.put( | ||
| routingKey, | ||
| new QueuesWithVirtualPublishLimit( | ||
| bindingsForRoutingKey.stream().map(bindingInfo -> queueNameToInfo.get(bindingInfo.getDestination())).collect(toList()), | ||
| connectionManagerConfiguration.getVirtualPublishLimit() | ||
| ) | ||
| ) | ||
| ); | ||
| return routingKeyToQueuesWithLimit; | ||
| } |
There was a problem hiding this comment.
private Map<String, QueuesWithVirtualPublishLimit> groupQueuesByRoutingKey() {
Map<String, QueueInfo> queueNameToInfo = client.getQueues().stream()
.collect(toMap(QueueInfo::getName, Function.identity()));
Map<String, List<BindingInfo>> bindings = knownExchangesToRoutingKeys.entrySet().stream()
.flatMap(entry -> {
String exchange = entry.getKey();
Set<String> routingKeys = entry.getValue();
return client.getBindingsBySource(rabbitMQConfiguration.getVHost(), exchange).stream()
.filter(it -> it.getDestinationType() == QUEUE && routingKeys.contains(it.getRoutingKey()));
}
).collect(groupingBy(BindingInfo::getRoutingKey));
Map<String, QueuesWithVirtualPublishLimit> routingKeyToQueuesWithLimit = new HashMap<>();
bindings.forEach((routingKey, bindingsForRoutingKey) ->
routingKeyToQueuesWithLimit.put(
routingKey,
new QueuesWithVirtualPublishLimit(
bindingsForRoutingKey.stream().map(bindingInfo -> queueNameToInfo.get(bindingInfo.getDestination())).collect(toList()),
connectionManagerConfiguration.getVirtualPublishLimit()
)
)
);
return routingKeyToQueuesWithLimit;
}
There was a problem hiding this comment.
I think we can combine lockSendingIfSizeLimitExceeded and groupQueuesByRoutingKey into continuous stream or Kotlin sequence
There was a problem hiding this comment.
I didn't want to do it. I wanted to keep all logic with com.rabbitmq.http.client.Client in one place and do only locking in lockSendingIfSizeLimitExceeded().
But I'll try if you want.
There was a problem hiding this comment.
I migrated ConnectionManager to Kotlin. Please see commit fbae68d.
But I don't think we can do it in one stream because we should take all queues for each routing key. I mean we can't do anything before check all queues.
ab78841 to
fbae68d
Compare
| .associateBy({ it.destination }, { queueNameToSize.getValue(it.destination) }) | ||
| val limit = connectionManagerConfiguration.virtualPublishLimit | ||
| val holder = getChannelFor(PinId.forRoutingKey(routingKey)) | ||
| LOGGER.trace { "Size limit lock for routing key '$routingKey': ${holder.sizeLimitLock}" } |
There was a problem hiding this comment.
I think it may be helpful, we'll have something like:
Size limit lock for routing key 'parsed-key': java.util.concurrent.locks.ReentrantLock@5744e6e4[Unlocked]
Size limit lock for routing key 'parsed-key': java.util.concurrent.locks.ReentrantLock@5744e6e4[Locked by thread pool-3-thread-1]
New version of #137