Skip to content

Commit 36acb10

Browse files
committed
Stop all producer tasks and close connections
Stopping all the tasks and then close connections (instead of stop a task and then close the corresponding connection) at least stop publishing as soon as possible in case a connection takes some time to close. Publisher connections can take some time to close because of flow control. In this case, even though all publishing tasks are terminated, a significant amount of messages can still seem to be sent probably because of TCP buffers flushing. So this commit can help mitigate this phenomenon, but not completely prevent it. Using Connection#close(int timeout) should not help either TCP buffer flushing would also occur. [#157183370] Fixes #96
1 parent 0dfdf89 commit 36acb10

File tree

1 file changed

+9
-12
lines changed

1 file changed

+9
-12
lines changed

src/main/java/com/rabbitmq/perf/MulticastSet.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -182,20 +182,17 @@ public void run(boolean announceStartup)
182182

183183
this.completionHandler.waitForCompletion();
184184

185-
int count = 1; // counting the threads
186-
for (int i = 0; i < producerStates.length; i++) {
187-
producerStates[i].task.cancel(true);
188-
if(count % params.getProducerChannelCount() == 0) {
189-
// this is the end of a group of threads on the same connection,
190-
// closing the connection
191-
try {
192-
producerConnections[count / params.getProducerChannelCount() - 1].close();
193-
} catch (Exception e) {
194-
// don't do anything, we need to close the other connections
195-
}
185+
for (AgentState producerState : producerStates) {
186+
producerState.task.cancel(true);
187+
}
196188

189+
for (Connection producerConnection : producerConnections) {
190+
try {
191+
producerConnection.close();
192+
} catch (Exception e) {
193+
// don't do anything, we need to close the other connections
197194
}
198-
count++;
195+
199196
}
200197

201198
for (Connection consumerConnection : consumerConnections) {

0 commit comments

Comments
 (0)