diff --git a/driver-api/src/main/java/io/openmessaging/benchmark/driver/ResourceCreator.java b/driver-api/src/main/java/io/openmessaging/benchmark/driver/ResourceCreator.java index 3bdfe5372..64ef34561 100644 --- a/driver-api/src/main/java/io/openmessaging/benchmark/driver/ResourceCreator.java +++ b/driver-api/src/main/java/io/openmessaging/benchmark/driver/ResourceCreator.java @@ -83,6 +83,10 @@ private List createBlocking(List resources) { return created; } + public void close() { + executor.shutdown(); + } + @SneakyThrows private Map> executeBatch(List batch) { log.debug("Executing batch, size: {}", batch.size()); diff --git a/driver-rabbitmq/src/main/java/io/openmessaging/benchmark/driver/rabbitmq/RabbitMqBenchmarkDriver.java b/driver-rabbitmq/src/main/java/io/openmessaging/benchmark/driver/rabbitmq/RabbitMqBenchmarkDriver.java index 5d639b545..879a96dd2 100644 --- a/driver-rabbitmq/src/main/java/io/openmessaging/benchmark/driver/rabbitmq/RabbitMqBenchmarkDriver.java +++ b/driver-rabbitmq/src/main/java/io/openmessaging/benchmark/driver/rabbitmq/RabbitMqBenchmarkDriver.java @@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,10 +59,53 @@ public class RabbitMqBenchmarkDriver implements BenchmarkDriver { * back to secondary brokers. */ private final Map connections = new ConcurrentHashMap<>(); + private ResourceCreator producerResourceCreator; + private ResourceCreator consumerResourceCreator; @Override public void initialize(File configurationFile, StatsLogger statsLogger) throws IOException { config = mapper.readValue(configurationFile, RabbitMqConfig.class); + producerResourceCreator = new ResourceCreator<>( + "producer", + config.producerCreationBatchSize, + config.producerCreationDelay, + ps -> ps.stream().collect(toMap(p -> p, p -> createProducer(p.getTopic()))), + fc -> { + try { + return new CreationResult<>(fc.get(), true); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + log.debug(e.getMessage()); + return new CreationResult<>(null, false); + } + }); + consumerResourceCreator = new ResourceCreator<>( + "consumer", + config.consumerCreationBatchSize, + config.consumerCreationDelay, + cs -> + cs.stream() + .collect( + toMap( + c -> c, + c -> + createConsumer( + c.getTopic(), + c.getSubscriptionName(), + c.getConsumerCallback()))), + fc -> { + try { + return new CreationResult<>(fc.get(), true); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + log.debug(e.getMessage()); + return new CreationResult<>(null, false); + } + }); } @Override @@ -78,6 +122,12 @@ public void close() { } it.remove(); } + if (producerResourceCreator != null) { + producerResourceCreator.close(); + } + if (consumerResourceCreator != null) { + consumerResourceCreator.close(); + } } @Override @@ -126,53 +176,12 @@ public CompletableFuture createProducer(String topic) { @Override public CompletableFuture> createProducers(List producers) { - return new ResourceCreator( - "producer", - config.producerCreationBatchSize, - config.producerCreationDelay, - ps -> ps.stream().collect(toMap(p -> p, p -> createProducer(p.getTopic()))), - fc -> { - try { - return new CreationResult<>(fc.get(), true); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } catch (ExecutionException e) { - log.debug(e.getMessage()); - return new CreationResult<>(null, false); - } - }) - .create(producers); + return producerResourceCreator.create(producers); } @Override public CompletableFuture> createConsumers(List consumers) { - return new ResourceCreator( - "consumer", - config.consumerCreationBatchSize, - config.consumerCreationDelay, - cs -> - cs.stream() - .collect( - toMap( - c -> c, - c -> - createConsumer( - c.getTopic(), - c.getSubscriptionName(), - c.getConsumerCallback()))), - fc -> { - try { - return new CreationResult<>(fc.get(), true); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } catch (ExecutionException e) { - log.debug(e.getMessage()); - return new CreationResult<>(null, false); - } - }) - .create(consumers); + return consumerResourceCreator.create(consumers); } @Override @@ -228,12 +237,20 @@ private Connection getOrCreateConnection(String primaryBrokerUri) { try { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setAutomaticRecoveryEnabled(true); - String userInfo = newURI(primaryBrokerUri).getUserInfo(); + URI uri = newURI(primaryBrokerUri); + String userInfo = uri.getUserInfo(); if (userInfo != null) { String[] userInfoElems = userInfo.split(":"); connectionFactory.setUsername(userInfoElems[0]); connectionFactory.setPassword(userInfoElems[1]); } + String path = uri.getPath(); + if (!StringUtils.isBlank(path)) { + if (path.startsWith("/")) { + path = path.substring(1); + } + connectionFactory.setVirtualHost(path); + } return connectionFactory.newConnection(addresses); } catch (Exception e) { throw new RuntimeException("Couldn't establish connection to: " + primaryBrokerUri, e);