diff --git a/.github/workflows/cd-integration.yml b/.github/workflows/cd-integration.yml
index 4cc2456..dd77edb 100644
--- a/.github/workflows/cd-integration.yml
+++ b/.github/workflows/cd-integration.yml
@@ -1,7 +1,7 @@
name: Build, Gates and Pull Request
on:
- pull_request_target:
+ pull_request:
branches:
- develop
types:
diff --git a/.github/workflows/cd-release.yml b/.github/workflows/cd-release.yml
index 0ff6f8c..83f36da 100644
--- a/.github/workflows/cd-release.yml
+++ b/.github/workflows/cd-release.yml
@@ -1,7 +1,7 @@
name: Build and Publish Release
on:
- pull_request_target:
+ pull_request:
branches:
- master
types:
diff --git a/.github/workflows/cd-snapshot.yml b/.github/workflows/cd-snapshot.yml
index 6a4e721..7099eb0 100644
--- a/.github/workflows/cd-snapshot.yml
+++ b/.github/workflows/cd-snapshot.yml
@@ -1,7 +1,7 @@
name: Build, Publish Snapshot and Pull Request
on:
- pull_request_target:
+ pull_request:
branches:
- release/**
types:
diff --git a/.github/workflows/ci-maven.yml b/.github/workflows/ci-maven.yml
index 68c14f6..849cb9d 100644
--- a/.github/workflows/ci-maven.yml
+++ b/.github/workflows/ci-maven.yml
@@ -8,7 +8,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
- java-version: [8, 11, 17]
+ java-version: [8, 11, 17, 21]
steps:
- name: Checkout repository
uses: actions/checkout@v4
@@ -34,7 +34,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
- java-version: [8, 11, 17]
+ java-version: [8, 11, 17, 21]
steps:
- name: Checkout repository
uses: actions/checkout@v4
diff --git a/README.md b/README.md
index 7be244c..c3fc607 100644
--- a/README.md
+++ b/README.md
@@ -16,7 +16,7 @@ Combine multiple requests to optimally utilise the network.
Article [Martin Fowler](https://martinfowler.com) [Request Batch](https://martinfowler.com/articles/patterns-of-distributed-systems/request-batch.html)
-_**Compatible JDK 8, 11 and 17**_
+_**Compatible JDK 8, 11, 17 and 21**_
_**Compatible AWS JDK v1 >= 1.12**_
@@ -38,7 +38,7 @@ You can pull it from the central Maven repositories:
com.github.mvallim
amazon-sns-java-messaging-lib-v1
- 1.0.8
+ 1.1.0
```
@@ -48,7 +48,7 @@ You can pull it from the central Maven repositories:
com.github.mvallim
amazon-sns-java-messaging-lib-v2
- 1.0.8
+ 1.1.0
```
@@ -70,13 +70,13 @@ If you want to try a snapshot version, add the following repository:
### For AWS SDK v1
```groovy
-implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v1:1.0.8'
+implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v1:1.1.0'
```
### For AWS SDK v2
```groovy
-implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v2:1.0.8'
+implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v2:1.1.0'
```
If you want to try a snapshot version, add the following repository:
diff --git a/amazon-sns-java-messaging-lib-template/pom.xml b/amazon-sns-java-messaging-lib-template/pom.xml
index 8f1797c..7561e76 100644
--- a/amazon-sns-java-messaging-lib-template/pom.xml
+++ b/amazon-sns-java-messaging-lib-template/pom.xml
@@ -7,7 +7,7 @@
com.github.mvallim
amazon-sns-java-messaging-lib
- 1.0.8-SNAPSHOT
+ 1.1.0-SNAPSHOT
../pom.xml
diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsThreadPoolExecutor.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/AmazonSnsThreadPoolExecutor.java
similarity index 76%
rename from amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsThreadPoolExecutor.java
rename to amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/AmazonSnsThreadPoolExecutor.java
index f3e728f..d503dfe 100644
--- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsThreadPoolExecutor.java
+++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/AmazonSnsThreadPoolExecutor.java
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package com.amazon.sns.messaging.lib.core;
+package com.amazon.sns.messaging.lib.concurrent;
import java.util.Objects;
import java.util.concurrent.SynchronousQueue;
@@ -23,49 +23,53 @@
import java.util.concurrent.atomic.AtomicInteger;
public class AmazonSnsThreadPoolExecutor extends ThreadPoolExecutor {
-
+
private final AtomicInteger activeTaskCount = new AtomicInteger();
-
+
private final AtomicInteger failedTaskCount = new AtomicInteger();
-
+
private final AtomicInteger succeededTaskCount = new AtomicInteger();
-
+
public AmazonSnsThreadPoolExecutor(final int maximumPoolSize) {
- super(0, maximumPoolSize, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), new BlockingSubmissionPolicy(30000));
+ super(0, maximumPoolSize, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), ThreadFactoryProvider.getThreadFactory(), new BlockingSubmissionPolicy(30000));
}
-
+
public int getActiveTaskCount() {
- return activeTaskCount.get();
+ return this.activeTaskCount.get();
}
-
+
public int getFailedTaskCount() {
- return failedTaskCount.get();
+ return this.failedTaskCount.get();
}
-
+
public int getSucceededTaskCount() {
- return succeededTaskCount.get();
+ return this.succeededTaskCount.get();
}
-
+
+ public int getQueueSize() {
+ return getQueue().size();
+ }
+
@Override
protected void beforeExecute(final Thread thread, final Runnable runnable) {
try {
super.beforeExecute(thread, runnable);
} finally {
- activeTaskCount.incrementAndGet();
+ this.activeTaskCount.incrementAndGet();
}
}
-
+
@Override
protected void afterExecute(final Runnable runnable, final Throwable throwable) {
try {
super.afterExecute(runnable, throwable);
} finally {
if (Objects.nonNull(throwable)) {
- failedTaskCount.incrementAndGet();
+ this.failedTaskCount.incrementAndGet();
} else {
- succeededTaskCount.incrementAndGet();
+ this.succeededTaskCount.incrementAndGet();
}
- activeTaskCount.decrementAndGet();
+ this.activeTaskCount.decrementAndGet();
}
}
diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/BlockingSubmissionPolicy.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/BlockingSubmissionPolicy.java
similarity index 87%
rename from amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/BlockingSubmissionPolicy.java
rename to amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/BlockingSubmissionPolicy.java
index 8eda3bf..47fdb03 100644
--- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/BlockingSubmissionPolicy.java
+++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/BlockingSubmissionPolicy.java
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package com.amazon.sns.messaging.lib.core;
+package com.amazon.sns.messaging.lib.concurrent;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
@@ -24,14 +24,14 @@
import lombok.SneakyThrows;
-class BlockingSubmissionPolicy implements RejectedExecutionHandler {
-
+public class BlockingSubmissionPolicy implements RejectedExecutionHandler {
+
private final long timeout;
-
+
public BlockingSubmissionPolicy(final long timeout) {
this.timeout = timeout;
}
-
+
@Override
@SneakyThrows
public void rejectedExecution(final Runnable runnable, final ThreadPoolExecutor executor) {
@@ -40,5 +40,5 @@ public void rejectedExecution(final Runnable runnable, final ThreadPoolExecutor
throw new RejectedExecutionException("Timeout");
}
}
-
+
}
diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/ExecutorsProvider.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/ExecutorsProvider.java
new file mode 100644
index 0000000..7723231
--- /dev/null
+++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/ExecutorsProvider.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2024 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.sns.messaging.lib.concurrent;
+
+import java.lang.reflect.Method;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Supplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.SneakyThrows;
+
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class ExecutorsProvider {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ExecutorsProvider.class);
+
+ private static Supplier supplierExecutorService;
+
+ static {
+ if (ExecutorsProvider.getJavaVersion() >= 21) {
+ ExecutorsProvider.supplierExecutorService = ExecutorsProvider::getVirtualExecutorService;
+ ExecutorsProvider.LOGGER.info("Java version is {}, using virtual thread executor", ExecutorsProvider.getJavaVersion());
+ } else {
+ ExecutorsProvider.supplierExecutorService = ExecutorsProvider::getDefaultExecutorService;
+ ExecutorsProvider.LOGGER.info("Java version is {}, using default thread executor", ExecutorsProvider.getJavaVersion());
+ }
+ }
+
+ public static ExecutorService getExecutorService() {
+ return ExecutorsProvider.supplierExecutorService.get();
+ }
+
+ @SneakyThrows
+ private static ExecutorService getDefaultExecutorService() {
+ return Executors.newSingleThreadExecutor();
+ }
+
+ @SneakyThrows
+ private static ExecutorService getVirtualExecutorService() {
+ final Class> clazzThread = Executors.class;
+ final Method ofVirtualMethod = clazzThread.getMethod("newVirtualThreadPerTaskExecutor");
+ return ExecutorService.class.cast(ofVirtualMethod.invoke(null));
+ }
+
+ private static int getJavaVersion() {
+ String version = System.getProperty("java.version");
+
+ if (version.startsWith("1.")) {
+ version = version.substring(2);
+ }
+
+ final int dotPos = version.indexOf('.');
+ final int dashPos = version.indexOf('-');
+ final int endIndex = dotPos > -1 ? dotPos : dashPos > -1 ? dashPos : 1;
+
+ return Integer.parseInt(version.substring(0, endIndex));
+ }
+
+}
diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueue.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueue.java
index ae0ed55..cd617a1 100644
--- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueue.java
+++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueue.java
@@ -32,7 +32,6 @@
import lombok.Setter;
import lombok.SneakyThrows;
-@SuppressWarnings({ "java:S2274" })
public class RingBufferBlockingQueue extends AbstractQueue implements BlockingQueue {
private static final int DEFAULT_CAPACITY = 2048;
@@ -55,15 +54,15 @@ public class RingBufferBlockingQueue extends AbstractQueue implements Bloc
public RingBufferBlockingQueue(final int capacity) {
this.capacity = capacity;
- buffer = new AtomicReferenceArray<>(capacity);
- reentrantLock = new ReentrantLock(true);
- waitingConsumer = reentrantLock.newCondition();
- waitingProducer = reentrantLock.newCondition();
- IntStream.range(0, capacity).forEach(idx -> buffer.set(idx, new Entry<>()));
+ this.buffer = new AtomicReferenceArray<>(capacity);
+ this.reentrantLock = new ReentrantLock(true);
+ this.waitingConsumer = this.reentrantLock.newCondition();
+ this.waitingProducer = this.reentrantLock.newCondition();
+ IntStream.range(0, capacity).forEach(idx -> this.buffer.set(idx, new Entry<>()));
}
public RingBufferBlockingQueue() {
- this(DEFAULT_CAPACITY);
+ this(RingBufferBlockingQueue.DEFAULT_CAPACITY);
}
private long avoidSequenceOverflow(final long sequence) {
@@ -71,34 +70,38 @@ private long avoidSequenceOverflow(final long sequence) {
}
private int wrap(final long sequence) {
- return Math.toIntExact(sequence % capacity);
+ return Math.toIntExact(sequence % this.capacity);
}
-
+
+ public int capacity() {
+ return this.capacity;
+ }
+
@Override
public int size() {
- return size.get();
+ return this.size.get();
}
@Override
public boolean isEmpty() {
- return size.get() == 0;
+ return this.size.get() == 0;
}
public boolean isFull() {
- return size.get() >= capacity;
+ return this.size.get() >= this.capacity;
}
public long writeSequence() {
- return writeSequence.get();
+ return this.writeSequence.get();
}
public long readSequence() {
- return readSequence.get();
+ return this.readSequence.get();
}
@Override
public E peek() {
- return isEmpty() ? null : buffer.get(wrap(readSequence.get())).getValue();
+ return isEmpty() ? null : this.buffer.get(wrap(this.readSequence.get())).getValue();
}
@Override
diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/ThreadFactoryProvider.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/ThreadFactoryProvider.java
new file mode 100644
index 0000000..8b21dd7
--- /dev/null
+++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/ThreadFactoryProvider.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2023 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.sns.messaging.lib.concurrent;
+
+import java.lang.reflect.Method;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.function.Supplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.SneakyThrows;
+
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class ThreadFactoryProvider {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ThreadFactoryProvider.class);
+
+ private static Supplier supplierThreadFactory;
+
+ static {
+ if (ThreadFactoryProvider.getJavaVersion() >= 21) {
+ ThreadFactoryProvider.supplierThreadFactory = ThreadFactoryProvider::getVirtualThreadFactory;
+ ThreadFactoryProvider.LOGGER.info("Java version is {}, using virtual thread factory", ThreadFactoryProvider.getJavaVersion());
+ } else {
+ ThreadFactoryProvider.supplierThreadFactory = ThreadFactoryProvider::getDefaultThreadFactory;
+ ThreadFactoryProvider.LOGGER.info("Java version is {}, using default thread factory", ThreadFactoryProvider.getJavaVersion());
+ }
+ }
+
+ public static ThreadFactory getThreadFactory() {
+ return ThreadFactoryProvider.supplierThreadFactory.get();
+ }
+
+ @SneakyThrows
+ private static ThreadFactory getDefaultThreadFactory() {
+ return Executors.defaultThreadFactory();
+ }
+
+ @SneakyThrows
+ private static ThreadFactory getVirtualThreadFactory() {
+ final Class> clazzThread = Thread.class;
+ final Class> clazzOfVirtual = Class.forName("java.lang.Thread$Builder$OfVirtual");
+ final Method ofVirtualMethod = clazzThread.getMethod("ofVirtual");
+ final Method factoryMethod = clazzOfVirtual.getMethod("factory");
+ final Object result = ofVirtualMethod.invoke(null);
+ return ThreadFactory.class.cast(factoryMethod.invoke(result));
+ }
+
+ private static int getJavaVersion() {
+ String version = System.getProperty("java.version");
+
+ if (version.startsWith("1.")) {
+ version = version.substring(2);
+ }
+
+ final int dotPos = version.indexOf('.');
+ final int dashPos = version.indexOf('-');
+ final int endIndex = dotPos > -1 ? dotPos : dashPos > -1 ? dashPos : 1;
+
+ return Integer.parseInt(version.substring(0, endIndex));
+ }
+
+}
\ No newline at end of file
diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumer.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumer.java
index 1ce914b..dac04c0 100644
--- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumer.java
+++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumer.java
@@ -37,6 +37,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.amazon.sns.messaging.lib.concurrent.ThreadFactoryProvider;
import com.amazon.sns.messaging.lib.core.RequestEntryInternalFactory.RequestEntryInternal;
import com.amazon.sns.messaging.lib.model.PublishRequestBuilder;
import com.amazon.sns.messaging.lib.model.RequestEntry;
@@ -50,11 +51,11 @@ abstract class AbstractAmazonSnsConsumer implements Runnable {
private static final Integer KB = 1024;
- private static final Integer BATCH_SIZE_BYTES_THRESHOLD = 256 * KB;
+ private static final Integer BATCH_SIZE_BYTES_THRESHOLD = 256 * AbstractAmazonSnsConsumer.KB;
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAmazonSnsConsumer.class);
- private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+ private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(ThreadFactoryProvider.getThreadFactory());
protected final C amazonSnsClient;
@@ -168,13 +169,13 @@ private void validateMessageSize(final Integer messageSize) {
}
private boolean canAddToBatch(final int batchSizeBytes, final int requestEntriesSize, final RequestEntry request) {
- return (batchSizeBytes < BATCH_SIZE_BYTES_THRESHOLD)
+ return (batchSizeBytes < AbstractAmazonSnsConsumer.BATCH_SIZE_BYTES_THRESHOLD)
&& (requestEntriesSize < topicProperty.getMaxBatchSize())
&& Objects.nonNull(request);
}
private boolean canAddPayload(final int batchSizeBytes) {
- return batchSizeBytes <= BATCH_SIZE_BYTES_THRESHOLD;
+ return batchSizeBytes <= AbstractAmazonSnsConsumer.BATCH_SIZE_BYTES_THRESHOLD;
}
@SneakyThrows
diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsTemplate.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsTemplate.java
index 1a14cb1..afff16a 100644
--- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsTemplate.java
+++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsTemplate.java
@@ -17,7 +17,11 @@
package com.amazon.sns.messaging.lib.core;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+import com.amazon.sns.messaging.lib.concurrent.AmazonSnsThreadPoolExecutor;
+import com.amazon.sns.messaging.lib.instrument.AmazonSnsThreadPoolExecutorJmx;
+import com.amazon.sns.messaging.lib.instrument.MBeanRegistrar;
import com.amazon.sns.messaging.lib.model.RequestEntry;
import com.amazon.sns.messaging.lib.model.ResponseFailEntry;
import com.amazon.sns.messaging.lib.model.ResponseSuccessEntry;
@@ -35,24 +39,33 @@ abstract class AbstractAmazonSnsTemplate {
private final AbstractAmazonSnsConsumer amazonSnsConsumer;
public ListenableFuture send(final RequestEntry requestEntry) {
- return amazonSnsProducer.send(requestEntry);
+ return this.amazonSnsProducer.send(requestEntry);
}
public void shutdown() {
- amazonSnsProducer.shutdown();
- amazonSnsConsumer.shutdown();
+ this.amazonSnsProducer.shutdown();
+ this.amazonSnsConsumer.shutdown();
}
public CompletableFuture await() {
- return amazonSnsConsumer.await();
+ return this.amazonSnsConsumer.await();
}
+ @SuppressWarnings("java:S1602")
protected static AmazonSnsThreadPoolExecutor getAmazonSnsThreadPoolExecutor(final TopicProperty topicProperty) {
- if (topicProperty.isFifo()) {
- return new AmazonSnsThreadPoolExecutor(1);
- } else {
- return new AmazonSnsThreadPoolExecutor(topicProperty.getMaximumPoolSize());
- }
+ final Supplier supplier = () -> {
+ return topicProperty.isFifo() ? new AmazonSnsThreadPoolExecutor(1) : new AmazonSnsThreadPoolExecutor(topicProperty.getMaximumPoolSize());
+ };
+
+ final AmazonSnsThreadPoolExecutor amazonSnsThreadPoolExecutor = supplier.get();
+
+ final String topicArn = topicProperty.getTopicArn();
+ final String topicName = topicArn.substring(topicArn.lastIndexOf(':') + 1);
+ final String name = String.format("com.amazon.sns.messaging.lib:type=AmazonSnsThreadPoolExecutor,name=%s", topicName);
+
+ MBeanRegistrar.registerMBean(new AmazonSnsThreadPoolExecutorJmx(amazonSnsThreadPoolExecutor), name);
+
+ return amazonSnsThreadPoolExecutor;
}
}
diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/instrument/AmazonSnsThreadPoolExecutorJmx.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/instrument/AmazonSnsThreadPoolExecutorJmx.java
new file mode 100644
index 0000000..ce3a2dd
--- /dev/null
+++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/instrument/AmazonSnsThreadPoolExecutorJmx.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.sns.messaging.lib.instrument;
+
+import com.amazon.sns.messaging.lib.concurrent.AmazonSnsThreadPoolExecutor;
+
+import lombok.RequiredArgsConstructor;
+
+@RequiredArgsConstructor
+public class AmazonSnsThreadPoolExecutorJmx implements AmazonSnsThreadPoolExecutorJmxMBean {
+
+ private final AmazonSnsThreadPoolExecutor amazonSnsThreadPoolExecutor;
+
+ @Override
+ public int getActiveTaskCount() {
+ return this.amazonSnsThreadPoolExecutor.getActiveTaskCount();
+ }
+
+ @Override
+ public int getFailedTaskCount() {
+ return this.amazonSnsThreadPoolExecutor.getFailedTaskCount();
+ }
+
+ @Override
+ public int getSucceededTaskCount() {
+ return this.amazonSnsThreadPoolExecutor.getSucceededTaskCount();
+ }
+
+ @Override
+ public int getQueueSize() {
+ return this.amazonSnsThreadPoolExecutor.getQueueSize();
+ }
+
+ @Override
+ public int getPoolSize() {
+ return this.amazonSnsThreadPoolExecutor.getPoolSize();
+ }
+
+}
diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/instrument/AmazonSnsThreadPoolExecutorJmxMBean.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/instrument/AmazonSnsThreadPoolExecutorJmxMBean.java
new file mode 100644
index 0000000..0b5c0e2
--- /dev/null
+++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/instrument/AmazonSnsThreadPoolExecutorJmxMBean.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.sns.messaging.lib.instrument;
+
+public interface AmazonSnsThreadPoolExecutorJmxMBean {
+
+ int getActiveTaskCount();
+
+ int getFailedTaskCount();
+
+ int getSucceededTaskCount();
+
+ int getQueueSize();
+
+ int getPoolSize();
+
+}
diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/instrument/MBeanRegistrar.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/instrument/MBeanRegistrar.java
new file mode 100644
index 0000000..5866aba
--- /dev/null
+++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/instrument/MBeanRegistrar.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2024 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.sns.messaging.lib.instrument;
+
+import java.lang.management.ManagementFactory;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.SneakyThrows;
+
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class MBeanRegistrar {
+
+ private static final MBeanServer BEAN_SERVER = ManagementFactory.getPlatformMBeanServer();
+
+ @SneakyThrows
+ public static void registerMBean(final Object mbean, final String name) {
+ final ObjectName objectName = new ObjectName(name);
+
+ if (!MBeanRegistrar.BEAN_SERVER.isRegistered(objectName)) {
+ MBeanRegistrar.BEAN_SERVER.registerMBean(mbean, objectName);
+ }
+
+ }
+
+ @SneakyThrows
+ public static void unregisterMBean(final String name) {
+ final ObjectName objectName = new ObjectName(name);
+
+ if (MBeanRegistrar.BEAN_SERVER.isRegistered(objectName)) {
+ MBeanRegistrar.BEAN_SERVER.unregisterMBean(objectName);
+ }
+
+ }
+
+}
diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/instrument/RingBufferBlockingQueueJmx.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/instrument/RingBufferBlockingQueueJmx.java
new file mode 100644
index 0000000..78255e2
--- /dev/null
+++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/instrument/RingBufferBlockingQueueJmx.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2024 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.sns.messaging.lib.instrument;
+
+import com.amazon.sns.messaging.lib.concurrent.RingBufferBlockingQueue;
+
+import lombok.RequiredArgsConstructor;
+
+@RequiredArgsConstructor
+public class RingBufferBlockingQueueJmx implements RingBufferBlockingQueueJmxMBean {
+
+ private final RingBufferBlockingQueue> ringBufferBlockingQueue;
+
+ @Override
+ public int capacity() {
+ return ringBufferBlockingQueue.capacity();
+ }
+
+ @Override
+ public int size() {
+ return ringBufferBlockingQueue.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return ringBufferBlockingQueue.isEmpty();
+ }
+
+ @Override
+ public boolean isFull() {
+ return ringBufferBlockingQueue.isFull();
+ }
+
+ @Override
+ public long writeSequence() {
+ return ringBufferBlockingQueue.writeSequence();
+ }
+
+ @Override
+ public long readSequence() {
+ return ringBufferBlockingQueue.readSequence();
+ }
+
+}
diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/instrument/RingBufferBlockingQueueJmxMBean.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/instrument/RingBufferBlockingQueueJmxMBean.java
new file mode 100644
index 0000000..98af569
--- /dev/null
+++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/instrument/RingBufferBlockingQueueJmxMBean.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2024 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.sns.messaging.lib.instrument;
+
+public interface RingBufferBlockingQueueJmxMBean {
+
+ int capacity();
+
+ int size();
+
+ boolean isEmpty();
+
+ boolean isFull();
+
+ long writeSequence();
+
+ long readSequence();
+
+}
diff --git a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueueTest.java b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueueTest.java
index 9275031..d94171c 100644
--- a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueueTest.java
+++ b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueueTest.java
@@ -41,9 +41,9 @@ class RingBufferBlockingQueueTest {
@Test
void testSuccess() {
- final ExecutorService producer = Executors.newSingleThreadScheduledExecutor();
+ final ExecutorService producer = ExecutorsProvider.getExecutorService();
- final ScheduledExecutorService consumer = Executors.newSingleThreadScheduledExecutor();
+ final ScheduledExecutorService consumer = Executors.newSingleThreadScheduledExecutor(ThreadFactoryProvider.getThreadFactory());
final List> requestEntriesOut = new LinkedList<>();
@@ -52,7 +52,6 @@ void testSuccess() {
producer.submit(() -> {
IntStream.range(0, 100_000).forEach(value -> {
ringBlockingQueue.put(RequestEntry.builder().withValue(value).build());
- System.err.println("write: " + ringBlockingQueue.writeSequence());
});
});
@@ -62,7 +61,6 @@ void testSuccess() {
while ((requestEntries.size() < 10) && Objects.nonNull(ringBlockingQueue.peek())) {
final RequestEntry take = ringBlockingQueue.take();
- System.err.println("read: " + ringBlockingQueue.readSequence());
requestEntries.add(take);
}
@@ -92,9 +90,9 @@ void testSuccess() {
void testSuccessWhenIsEmpty() {
final RingBufferBlockingQueue> ringBlockingQueue = new RingBufferBlockingQueue<>();
- final ExecutorService producer = Executors.newSingleThreadExecutor();
+ final ExecutorService producer = ExecutorsProvider.getExecutorService();
- final ExecutorService consumer = Executors.newSingleThreadExecutor();
+ final ExecutorService consumer = ExecutorsProvider.getExecutorService();
consumer.submit(() -> {
assertThat(ringBlockingQueue.take().getValue(), is(0));
@@ -121,9 +119,9 @@ void testSuccessWhenIsEmpty() {
void testSuccessWhenIsFull() {
final RingBufferBlockingQueue> ringBlockingQueue = new RingBufferBlockingQueue<>(1);
- final ExecutorService producer = Executors.newSingleThreadExecutor();
+ final ExecutorService producer = ExecutorsProvider.getExecutorService();
- final ExecutorService consumer = Executors.newSingleThreadExecutor();
+ final ExecutorService consumer = ExecutorsProvider.getExecutorService();
producer.submit(() -> {
ringBlockingQueue.put(RequestEntry.builder().withValue(0).build());
diff --git a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsThreadPoolExecutorTest.java b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsThreadPoolExecutorTest.java
index a0f7e7f..465f6d3 100644
--- a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsThreadPoolExecutorTest.java
+++ b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsThreadPoolExecutorTest.java
@@ -26,6 +26,8 @@
import org.junit.jupiter.api.Test;
+import com.amazon.sns.messaging.lib.concurrent.AmazonSnsThreadPoolExecutor;
+
// @formatter:off
class AmazonSnsThreadPoolExecutorTest {
diff --git a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/ListenableFutureRegistryTest.java b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/ListenableFutureRegistryTest.java
index 2816a8a..be0fcdc 100644
--- a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/ListenableFutureRegistryTest.java
+++ b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/ListenableFutureRegistryTest.java
@@ -1,3 +1,19 @@
+/*
+ * Copyright 2024 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package com.amazon.sns.messaging.lib.core;
import static org.hamcrest.CoreMatchers.notNullValue;
diff --git a/amazon-sns-java-messaging-lib-v1/pom.xml b/amazon-sns-java-messaging-lib-v1/pom.xml
index d0b3911..1c8bf5d 100644
--- a/amazon-sns-java-messaging-lib-v1/pom.xml
+++ b/amazon-sns-java-messaging-lib-v1/pom.xml
@@ -7,7 +7,7 @@
com.github.mvallim
amazon-sns-java-messaging-lib
- 1.0.8-SNAPSHOT
+ 1.1.0-SNAPSHOT
../pom.xml
diff --git a/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java b/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java
index 867a17e..5b10038 100644
--- a/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java
+++ b/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java
@@ -19,9 +19,9 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
import java.util.function.UnaryOperator;
+import com.amazon.sns.messaging.lib.concurrent.ExecutorsProvider;
import com.amazon.sns.messaging.lib.concurrent.RingBufferBlockingQueue;
import com.amazon.sns.messaging.lib.model.RequestEntry;
import com.amazon.sns.messaging.lib.model.TopicProperty;
@@ -41,8 +41,8 @@ private AmazonSnsTemplate(
final ObjectMapper objectMapper,
final UnaryOperator publishDecorator) {
super(
- new AmazonSnsProducer<>(pendingRequests, topicRequests, Executors.newSingleThreadExecutor()),
- new AmazonSnsConsumer<>(amazonSnsClient, topicProperty, objectMapper, pendingRequests, topicRequests, getAmazonSnsThreadPoolExecutor(topicProperty), publishDecorator)
+ new AmazonSnsProducer<>(pendingRequests, topicRequests, ExecutorsProvider.getExecutorService()),
+ new AmazonSnsConsumer<>(amazonSnsClient, topicProperty, objectMapper, pendingRequests, topicRequests, AbstractAmazonSnsTemplate.getAmazonSnsThreadPoolExecutor(topicProperty), publishDecorator)
);
}
diff --git a/amazon-sns-java-messaging-lib-v2/pom.xml b/amazon-sns-java-messaging-lib-v2/pom.xml
index 09bd84d..2fbc6f6 100644
--- a/amazon-sns-java-messaging-lib-v2/pom.xml
+++ b/amazon-sns-java-messaging-lib-v2/pom.xml
@@ -7,7 +7,7 @@
com.github.mvallim
amazon-sns-java-messaging-lib
- 1.0.8-SNAPSHOT
+ 1.1.0-SNAPSHOT
../pom.xml
diff --git a/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java b/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java
index d83304e..7466ab9 100644
--- a/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java
+++ b/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java
@@ -19,9 +19,9 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
import java.util.function.UnaryOperator;
+import com.amazon.sns.messaging.lib.concurrent.ExecutorsProvider;
import com.amazon.sns.messaging.lib.concurrent.RingBufferBlockingQueue;
import com.amazon.sns.messaging.lib.model.RequestEntry;
import com.amazon.sns.messaging.lib.model.TopicProperty;
@@ -42,8 +42,8 @@ private AmazonSnsTemplate(
final ObjectMapper objectMapper,
final UnaryOperator publishDecorator) {
super(
- new AmazonSnsProducer<>(pendingRequests, topicRequests, Executors.newSingleThreadExecutor()),
- new AmazonSnsConsumer<>(amazonSnsClient, topicProperty, objectMapper, pendingRequests, topicRequests, getAmazonSnsThreadPoolExecutor(topicProperty), publishDecorator)
+ new AmazonSnsProducer<>(pendingRequests, topicRequests, ExecutorsProvider.getExecutorService()),
+ new AmazonSnsConsumer<>(amazonSnsClient, topicProperty, objectMapper, pendingRequests, topicRequests, AbstractAmazonSnsTemplate.getAmazonSnsThreadPoolExecutor(topicProperty), publishDecorator)
);
}
diff --git a/pom.xml b/pom.xml
index a800b56..f1dd9ff 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
com.github.mvallim
amazon-sns-java-messaging-lib
- 1.0.8-SNAPSHOT
+ 1.1.0-SNAPSHOT
pom
${project.artifactId}
@@ -20,9 +20,10 @@
UTF-8
${java.version}
${java.version}
+ true
- 1.18.24
+ 1.18.38
1.3.14
4.11.0
5.10.2
@@ -344,8 +345,7 @@
org.apache.maven.plugins
maven-surefire-plugin
- ${jacoco.argLine} -Dfile.encoding=${project.build.sourceEncoding}
- -Xms1024m -Xmx1024m
+ ${jacoco.argLine} -Dfile.encoding=${project.build.sourceEncoding} -Xms1024m -Xmx1024m
@@ -382,7 +382,6 @@
report
- true
../target/site/${project.artifactId}/jacoco.exe
../target/site/${project.artifactId}
@@ -397,7 +396,6 @@
${java.version}
${project.build.sourceEncoding}
true
- true
true
true
@@ -406,11 +404,6 @@
lombok
${lombok.version}
-
- org.openjdk.jmh
- jmh-generator-annprocess
- ${jmh.version}
-
@@ -448,6 +441,16 @@
+
+ 21
+
+ 21
+
+
+ 21
+
+
+
sonar