Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ private List<C> createBlocking(List<R> resources) {
return created;
}

public void close() {
executor.shutdown();
}

@SneakyThrows
private Map<R, CreationResult<C>> executeBatch(List<R> batch) {
log.debug("Executing batch, size: {}", batch.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -58,10 +59,53 @@ public class RabbitMqBenchmarkDriver implements BenchmarkDriver {
* back to secondary brokers.
*/
private final Map<String, Connection> connections = new ConcurrentHashMap<>();
private ResourceCreator<ProducerInfo, BenchmarkProducer> producerResourceCreator;
private ResourceCreator<ConsumerInfo, BenchmarkConsumer> consumerResourceCreator;

@Override
public void initialize(File configurationFile, StatsLogger statsLogger) throws IOException {
config = mapper.readValue(configurationFile, RabbitMqConfig.class);
producerResourceCreator = new ResourceCreator<>(
"producer",
config.producerCreationBatchSize,
config.producerCreationDelay,
ps -> ps.stream().collect(toMap(p -> p, p -> createProducer(p.getTopic()))),
fc -> {
try {
return new CreationResult<>(fc.get(), true);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (ExecutionException e) {
log.debug(e.getMessage());
return new CreationResult<>(null, false);
}
});
consumerResourceCreator = new ResourceCreator<>(
"consumer",
config.consumerCreationBatchSize,
config.consumerCreationDelay,
cs ->
cs.stream()
.collect(
toMap(
c -> c,
c ->
createConsumer(
c.getTopic(),
c.getSubscriptionName(),
c.getConsumerCallback()))),
fc -> {
try {
return new CreationResult<>(fc.get(), true);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (ExecutionException e) {
log.debug(e.getMessage());
return new CreationResult<>(null, false);
}
});
}

@Override
Expand All @@ -78,6 +122,12 @@ public void close() {
}
it.remove();
}
if (producerResourceCreator != null) {
producerResourceCreator.close();
}
if (consumerResourceCreator != null) {
consumerResourceCreator.close();
}
}

@Override
Expand Down Expand Up @@ -126,53 +176,12 @@ public CompletableFuture<BenchmarkProducer> createProducer(String topic) {

@Override
public CompletableFuture<List<BenchmarkProducer>> createProducers(List<ProducerInfo> producers) {
return new ResourceCreator<ProducerInfo, BenchmarkProducer>(
"producer",
config.producerCreationBatchSize,
config.producerCreationDelay,
ps -> ps.stream().collect(toMap(p -> p, p -> createProducer(p.getTopic()))),
fc -> {
try {
return new CreationResult<>(fc.get(), true);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (ExecutionException e) {
log.debug(e.getMessage());
return new CreationResult<>(null, false);
}
})
.create(producers);
return producerResourceCreator.create(producers);
}

@Override
public CompletableFuture<List<BenchmarkConsumer>> createConsumers(List<ConsumerInfo> consumers) {
return new ResourceCreator<ConsumerInfo, BenchmarkConsumer>(
"consumer",
config.consumerCreationBatchSize,
config.consumerCreationDelay,
cs ->
cs.stream()
.collect(
toMap(
c -> c,
c ->
createConsumer(
c.getTopic(),
c.getSubscriptionName(),
c.getConsumerCallback()))),
fc -> {
try {
return new CreationResult<>(fc.get(), true);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (ExecutionException e) {
log.debug(e.getMessage());
return new CreationResult<>(null, false);
}
})
.create(consumers);
return consumerResourceCreator.create(consumers);
}

@Override
Expand Down Expand Up @@ -228,12 +237,20 @@ private Connection getOrCreateConnection(String primaryBrokerUri) {
try {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setAutomaticRecoveryEnabled(true);
String userInfo = newURI(primaryBrokerUri).getUserInfo();
URI uri = newURI(primaryBrokerUri);
String userInfo = uri.getUserInfo();
if (userInfo != null) {
String[] userInfoElems = userInfo.split(":");
connectionFactory.setUsername(userInfoElems[0]);
connectionFactory.setPassword(userInfoElems[1]);
}
String path = uri.getPath();
if (!StringUtils.isBlank(path)) {
if (path.startsWith("/")) {
path = path.substring(1);
}
connectionFactory.setVirtualHost(path);
}
return connectionFactory.newConnection(addresses);
} catch (Exception e) {
throw new RuntimeException("Couldn't establish connection to: " + primaryBrokerUri, e);
Expand Down