diff --git a/.github/workflows/ci-maven.yml b/.github/workflows/ci-maven.yml index 68c14f6..849cb9d 100644 --- a/.github/workflows/ci-maven.yml +++ b/.github/workflows/ci-maven.yml @@ -8,7 +8,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - java-version: [8, 11, 17] + java-version: [8, 11, 17, 21] steps: - name: Checkout repository uses: actions/checkout@v4 @@ -34,7 +34,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - java-version: [8, 11, 17] + java-version: [8, 11, 17, 21] steps: - name: Checkout repository uses: actions/checkout@v4 diff --git a/README.md b/README.md index 7be244c..c3fc607 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ Combine multiple requests to optimally utilise the network. Article [Martin Fowler](https://martinfowler.com) [Request Batch](https://martinfowler.com/articles/patterns-of-distributed-systems/request-batch.html) -_**Compatible JDK 8, 11 and 17**_ +_**Compatible JDK 8, 11, 17 and 21**_ _**Compatible AWS JDK v1 >= 1.12**_ @@ -38,7 +38,7 @@ You can pull it from the central Maven repositories: com.github.mvallim amazon-sns-java-messaging-lib-v1 - 1.0.8 + 1.1.0 ``` @@ -48,7 +48,7 @@ You can pull it from the central Maven repositories: com.github.mvallim amazon-sns-java-messaging-lib-v2 - 1.0.8 + 1.1.0 ``` @@ -70,13 +70,13 @@ If you want to try a snapshot version, add the following repository: ### For AWS SDK v1 ```groovy -implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v1:1.0.8' +implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v1:1.1.0' ``` ### For AWS SDK v2 ```groovy -implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v2:1.0.8' +implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v2:1.1.0' ``` If you want to try a snapshot version, add the following repository: diff --git a/amazon-sns-java-messaging-lib-template/pom.xml b/amazon-sns-java-messaging-lib-template/pom.xml index 8f1797c..7561e76 100644 --- a/amazon-sns-java-messaging-lib-template/pom.xml +++ b/amazon-sns-java-messaging-lib-template/pom.xml @@ -7,7 +7,7 @@ com.github.mvallim amazon-sns-java-messaging-lib - 1.0.8-SNAPSHOT + 1.1.0-SNAPSHOT ../pom.xml diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsThreadPoolExecutor.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/AmazonSnsThreadPoolExecutor.java similarity index 76% rename from amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsThreadPoolExecutor.java rename to amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/AmazonSnsThreadPoolExecutor.java index f3e728f..d503dfe 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsThreadPoolExecutor.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/AmazonSnsThreadPoolExecutor.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.amazon.sns.messaging.lib.core; +package com.amazon.sns.messaging.lib.concurrent; import java.util.Objects; import java.util.concurrent.SynchronousQueue; @@ -23,49 +23,53 @@ import java.util.concurrent.atomic.AtomicInteger; public class AmazonSnsThreadPoolExecutor extends ThreadPoolExecutor { - + private final AtomicInteger activeTaskCount = new AtomicInteger(); - + private final AtomicInteger failedTaskCount = new AtomicInteger(); - + private final AtomicInteger succeededTaskCount = new AtomicInteger(); - + public AmazonSnsThreadPoolExecutor(final int maximumPoolSize) { - super(0, maximumPoolSize, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), new BlockingSubmissionPolicy(30000)); + super(0, maximumPoolSize, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), ThreadFactoryProvider.getThreadFactory(), new BlockingSubmissionPolicy(30000)); } - + public int getActiveTaskCount() { - return activeTaskCount.get(); + return this.activeTaskCount.get(); } - + public int getFailedTaskCount() { - return failedTaskCount.get(); + return this.failedTaskCount.get(); } - + public int getSucceededTaskCount() { - return succeededTaskCount.get(); + return this.succeededTaskCount.get(); } - + + public int getQueueSize() { + return getQueue().size(); + } + @Override protected void beforeExecute(final Thread thread, final Runnable runnable) { try { super.beforeExecute(thread, runnable); } finally { - activeTaskCount.incrementAndGet(); + this.activeTaskCount.incrementAndGet(); } } - + @Override protected void afterExecute(final Runnable runnable, final Throwable throwable) { try { super.afterExecute(runnable, throwable); } finally { if (Objects.nonNull(throwable)) { - failedTaskCount.incrementAndGet(); + this.failedTaskCount.incrementAndGet(); } else { - succeededTaskCount.incrementAndGet(); + this.succeededTaskCount.incrementAndGet(); } - activeTaskCount.decrementAndGet(); + this.activeTaskCount.decrementAndGet(); } } diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/BlockingSubmissionPolicy.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/BlockingSubmissionPolicy.java similarity index 87% rename from amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/BlockingSubmissionPolicy.java rename to amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/BlockingSubmissionPolicy.java index 8eda3bf..47fdb03 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/BlockingSubmissionPolicy.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/BlockingSubmissionPolicy.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.amazon.sns.messaging.lib.core; +package com.amazon.sns.messaging.lib.concurrent; import java.util.concurrent.BlockingQueue; import java.util.concurrent.RejectedExecutionException; @@ -24,14 +24,14 @@ import lombok.SneakyThrows; -class BlockingSubmissionPolicy implements RejectedExecutionHandler { - +public class BlockingSubmissionPolicy implements RejectedExecutionHandler { + private final long timeout; - + public BlockingSubmissionPolicy(final long timeout) { this.timeout = timeout; } - + @Override @SneakyThrows public void rejectedExecution(final Runnable runnable, final ThreadPoolExecutor executor) { @@ -40,5 +40,5 @@ public void rejectedExecution(final Runnable runnable, final ThreadPoolExecutor throw new RejectedExecutionException("Timeout"); } } - + } diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/ExecutorsProvider.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/ExecutorsProvider.java new file mode 100644 index 0000000..7723231 --- /dev/null +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/ExecutorsProvider.java @@ -0,0 +1,78 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.sns.messaging.lib.concurrent; + +import java.lang.reflect.Method; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Supplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.SneakyThrows; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public final class ExecutorsProvider { + + private static final Logger LOGGER = LoggerFactory.getLogger(ExecutorsProvider.class); + + private static Supplier supplierExecutorService; + + static { + if (ExecutorsProvider.getJavaVersion() >= 21) { + ExecutorsProvider.supplierExecutorService = ExecutorsProvider::getVirtualExecutorService; + ExecutorsProvider.LOGGER.info("Java version is {}, using virtual thread executor", ExecutorsProvider.getJavaVersion()); + } else { + ExecutorsProvider.supplierExecutorService = ExecutorsProvider::getDefaultExecutorService; + ExecutorsProvider.LOGGER.info("Java version is {}, using default thread executor", ExecutorsProvider.getJavaVersion()); + } + } + + public static ExecutorService getExecutorService() { + return ExecutorsProvider.supplierExecutorService.get(); + } + + @SneakyThrows + private static ExecutorService getDefaultExecutorService() { + return Executors.newSingleThreadExecutor(); + } + + @SneakyThrows + private static ExecutorService getVirtualExecutorService() { + final Class clazzThread = Executors.class; + final Method ofVirtualMethod = clazzThread.getMethod("newVirtualThreadPerTaskExecutor"); + return ExecutorService.class.cast(ofVirtualMethod.invoke(null)); + } + + private static int getJavaVersion() { + String version = System.getProperty("java.version"); + + if (version.startsWith("1.")) { + version = version.substring(2); + } + + final int dotPos = version.indexOf('.'); + final int dashPos = version.indexOf('-'); + final int endIndex = dotPos > -1 ? dotPos : dashPos > -1 ? dashPos : 1; + + return Integer.parseInt(version.substring(0, endIndex)); + } + +} diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueue.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueue.java index ae0ed55..cd617a1 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueue.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueue.java @@ -32,7 +32,6 @@ import lombok.Setter; import lombok.SneakyThrows; -@SuppressWarnings({ "java:S2274" }) public class RingBufferBlockingQueue extends AbstractQueue implements BlockingQueue { private static final int DEFAULT_CAPACITY = 2048; @@ -55,15 +54,15 @@ public class RingBufferBlockingQueue extends AbstractQueue implements Bloc public RingBufferBlockingQueue(final int capacity) { this.capacity = capacity; - buffer = new AtomicReferenceArray<>(capacity); - reentrantLock = new ReentrantLock(true); - waitingConsumer = reentrantLock.newCondition(); - waitingProducer = reentrantLock.newCondition(); - IntStream.range(0, capacity).forEach(idx -> buffer.set(idx, new Entry<>())); + this.buffer = new AtomicReferenceArray<>(capacity); + this.reentrantLock = new ReentrantLock(true); + this.waitingConsumer = this.reentrantLock.newCondition(); + this.waitingProducer = this.reentrantLock.newCondition(); + IntStream.range(0, capacity).forEach(idx -> this.buffer.set(idx, new Entry<>())); } public RingBufferBlockingQueue() { - this(DEFAULT_CAPACITY); + this(RingBufferBlockingQueue.DEFAULT_CAPACITY); } private long avoidSequenceOverflow(final long sequence) { @@ -71,34 +70,38 @@ private long avoidSequenceOverflow(final long sequence) { } private int wrap(final long sequence) { - return Math.toIntExact(sequence % capacity); + return Math.toIntExact(sequence % this.capacity); } - + + public int capacity() { + return this.capacity; + } + @Override public int size() { - return size.get(); + return this.size.get(); } @Override public boolean isEmpty() { - return size.get() == 0; + return this.size.get() == 0; } public boolean isFull() { - return size.get() >= capacity; + return this.size.get() >= this.capacity; } public long writeSequence() { - return writeSequence.get(); + return this.writeSequence.get(); } public long readSequence() { - return readSequence.get(); + return this.readSequence.get(); } @Override public E peek() { - return isEmpty() ? null : buffer.get(wrap(readSequence.get())).getValue(); + return isEmpty() ? null : this.buffer.get(wrap(this.readSequence.get())).getValue(); } @Override diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/ThreadFactoryProvider.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/ThreadFactoryProvider.java new file mode 100644 index 0000000..8b21dd7 --- /dev/null +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/ThreadFactoryProvider.java @@ -0,0 +1,81 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.sns.messaging.lib.concurrent; + +import java.lang.reflect.Method; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.function.Supplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.SneakyThrows; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public final class ThreadFactoryProvider { + + private static final Logger LOGGER = LoggerFactory.getLogger(ThreadFactoryProvider.class); + + private static Supplier supplierThreadFactory; + + static { + if (ThreadFactoryProvider.getJavaVersion() >= 21) { + ThreadFactoryProvider.supplierThreadFactory = ThreadFactoryProvider::getVirtualThreadFactory; + ThreadFactoryProvider.LOGGER.info("Java version is {}, using virtual thread factory", ThreadFactoryProvider.getJavaVersion()); + } else { + ThreadFactoryProvider.supplierThreadFactory = ThreadFactoryProvider::getDefaultThreadFactory; + ThreadFactoryProvider.LOGGER.info("Java version is {}, using default thread factory", ThreadFactoryProvider.getJavaVersion()); + } + } + + public static ThreadFactory getThreadFactory() { + return ThreadFactoryProvider.supplierThreadFactory.get(); + } + + @SneakyThrows + private static ThreadFactory getDefaultThreadFactory() { + return Executors.defaultThreadFactory(); + } + + @SneakyThrows + private static ThreadFactory getVirtualThreadFactory() { + final Class clazzThread = Thread.class; + final Class clazzOfVirtual = Class.forName("java.lang.Thread$Builder$OfVirtual"); + final Method ofVirtualMethod = clazzThread.getMethod("ofVirtual"); + final Method factoryMethod = clazzOfVirtual.getMethod("factory"); + final Object result = ofVirtualMethod.invoke(null); + return ThreadFactory.class.cast(factoryMethod.invoke(result)); + } + + private static int getJavaVersion() { + String version = System.getProperty("java.version"); + + if (version.startsWith("1.")) { + version = version.substring(2); + } + + final int dotPos = version.indexOf('.'); + final int dashPos = version.indexOf('-'); + final int endIndex = dotPos > -1 ? dotPos : dashPos > -1 ? dashPos : 1; + + return Integer.parseInt(version.substring(0, endIndex)); + } + +} \ No newline at end of file diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumer.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumer.java index 1ce914b..dac04c0 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumer.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumer.java @@ -37,6 +37,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.amazon.sns.messaging.lib.concurrent.ThreadFactoryProvider; import com.amazon.sns.messaging.lib.core.RequestEntryInternalFactory.RequestEntryInternal; import com.amazon.sns.messaging.lib.model.PublishRequestBuilder; import com.amazon.sns.messaging.lib.model.RequestEntry; @@ -50,11 +51,11 @@ abstract class AbstractAmazonSnsConsumer implements Runnable { private static final Integer KB = 1024; - private static final Integer BATCH_SIZE_BYTES_THRESHOLD = 256 * KB; + private static final Integer BATCH_SIZE_BYTES_THRESHOLD = 256 * AbstractAmazonSnsConsumer.KB; private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAmazonSnsConsumer.class); - private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(ThreadFactoryProvider.getThreadFactory()); protected final C amazonSnsClient; @@ -168,13 +169,13 @@ private void validateMessageSize(final Integer messageSize) { } private boolean canAddToBatch(final int batchSizeBytes, final int requestEntriesSize, final RequestEntry request) { - return (batchSizeBytes < BATCH_SIZE_BYTES_THRESHOLD) + return (batchSizeBytes < AbstractAmazonSnsConsumer.BATCH_SIZE_BYTES_THRESHOLD) && (requestEntriesSize < topicProperty.getMaxBatchSize()) && Objects.nonNull(request); } private boolean canAddPayload(final int batchSizeBytes) { - return batchSizeBytes <= BATCH_SIZE_BYTES_THRESHOLD; + return batchSizeBytes <= AbstractAmazonSnsConsumer.BATCH_SIZE_BYTES_THRESHOLD; } @SneakyThrows diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsTemplate.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsTemplate.java index 1a14cb1..afff16a 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsTemplate.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsTemplate.java @@ -17,7 +17,11 @@ package com.amazon.sns.messaging.lib.core; import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; +import com.amazon.sns.messaging.lib.concurrent.AmazonSnsThreadPoolExecutor; +import com.amazon.sns.messaging.lib.instrument.AmazonSnsThreadPoolExecutorJmx; +import com.amazon.sns.messaging.lib.instrument.MBeanRegistrar; import com.amazon.sns.messaging.lib.model.RequestEntry; import com.amazon.sns.messaging.lib.model.ResponseFailEntry; import com.amazon.sns.messaging.lib.model.ResponseSuccessEntry; @@ -35,24 +39,33 @@ abstract class AbstractAmazonSnsTemplate { private final AbstractAmazonSnsConsumer amazonSnsConsumer; public ListenableFuture send(final RequestEntry requestEntry) { - return amazonSnsProducer.send(requestEntry); + return this.amazonSnsProducer.send(requestEntry); } public void shutdown() { - amazonSnsProducer.shutdown(); - amazonSnsConsumer.shutdown(); + this.amazonSnsProducer.shutdown(); + this.amazonSnsConsumer.shutdown(); } public CompletableFuture await() { - return amazonSnsConsumer.await(); + return this.amazonSnsConsumer.await(); } + @SuppressWarnings("java:S1602") protected static AmazonSnsThreadPoolExecutor getAmazonSnsThreadPoolExecutor(final TopicProperty topicProperty) { - if (topicProperty.isFifo()) { - return new AmazonSnsThreadPoolExecutor(1); - } else { - return new AmazonSnsThreadPoolExecutor(topicProperty.getMaximumPoolSize()); - } + final Supplier supplier = () -> { + return topicProperty.isFifo() ? new AmazonSnsThreadPoolExecutor(1) : new AmazonSnsThreadPoolExecutor(topicProperty.getMaximumPoolSize()); + }; + + final AmazonSnsThreadPoolExecutor amazonSnsThreadPoolExecutor = supplier.get(); + + final String topicArn = topicProperty.getTopicArn(); + final String topicName = topicArn.substring(topicArn.lastIndexOf(':') + 1); + final String name = String.format("com.amazon.sns.messaging.lib:type=AmazonSnsThreadPoolExecutor,name=%s", topicName); + + MBeanRegistrar.registerMBean(new AmazonSnsThreadPoolExecutorJmx(amazonSnsThreadPoolExecutor), name); + + return amazonSnsThreadPoolExecutor; } } diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/instrument/AmazonSnsThreadPoolExecutorJmx.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/instrument/AmazonSnsThreadPoolExecutorJmx.java new file mode 100644 index 0000000..ce3a2dd --- /dev/null +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/instrument/AmazonSnsThreadPoolExecutorJmx.java @@ -0,0 +1,53 @@ +/* + * Copyright 2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.sns.messaging.lib.instrument; + +import com.amazon.sns.messaging.lib.concurrent.AmazonSnsThreadPoolExecutor; + +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +public class AmazonSnsThreadPoolExecutorJmx implements AmazonSnsThreadPoolExecutorJmxMBean { + + private final AmazonSnsThreadPoolExecutor amazonSnsThreadPoolExecutor; + + @Override + public int getActiveTaskCount() { + return this.amazonSnsThreadPoolExecutor.getActiveTaskCount(); + } + + @Override + public int getFailedTaskCount() { + return this.amazonSnsThreadPoolExecutor.getFailedTaskCount(); + } + + @Override + public int getSucceededTaskCount() { + return this.amazonSnsThreadPoolExecutor.getSucceededTaskCount(); + } + + @Override + public int getQueueSize() { + return this.amazonSnsThreadPoolExecutor.getQueueSize(); + } + + @Override + public int getPoolSize() { + return this.amazonSnsThreadPoolExecutor.getPoolSize(); + } + +} diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/instrument/AmazonSnsThreadPoolExecutorJmxMBean.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/instrument/AmazonSnsThreadPoolExecutorJmxMBean.java new file mode 100644 index 0000000..0b5c0e2 --- /dev/null +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/instrument/AmazonSnsThreadPoolExecutorJmxMBean.java @@ -0,0 +1,31 @@ +/* + * Copyright 2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.sns.messaging.lib.instrument; + +public interface AmazonSnsThreadPoolExecutorJmxMBean { + + int getActiveTaskCount(); + + int getFailedTaskCount(); + + int getSucceededTaskCount(); + + int getQueueSize(); + + int getPoolSize(); + +} diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/instrument/MBeanRegistrar.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/instrument/MBeanRegistrar.java new file mode 100644 index 0000000..5866aba --- /dev/null +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/instrument/MBeanRegistrar.java @@ -0,0 +1,53 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.sns.messaging.lib.instrument; + +import java.lang.management.ManagementFactory; + +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.SneakyThrows; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public final class MBeanRegistrar { + + private static final MBeanServer BEAN_SERVER = ManagementFactory.getPlatformMBeanServer(); + + @SneakyThrows + public static void registerMBean(final Object mbean, final String name) { + final ObjectName objectName = new ObjectName(name); + + if (!MBeanRegistrar.BEAN_SERVER.isRegistered(objectName)) { + MBeanRegistrar.BEAN_SERVER.registerMBean(mbean, objectName); + } + + } + + @SneakyThrows + public static void unregisterMBean(final String name) { + final ObjectName objectName = new ObjectName(name); + + if (MBeanRegistrar.BEAN_SERVER.isRegistered(objectName)) { + MBeanRegistrar.BEAN_SERVER.unregisterMBean(objectName); + } + + } + +} diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/instrument/RingBufferBlockingQueueJmx.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/instrument/RingBufferBlockingQueueJmx.java new file mode 100644 index 0000000..78255e2 --- /dev/null +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/instrument/RingBufferBlockingQueueJmx.java @@ -0,0 +1,58 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.sns.messaging.lib.instrument; + +import com.amazon.sns.messaging.lib.concurrent.RingBufferBlockingQueue; + +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +public class RingBufferBlockingQueueJmx implements RingBufferBlockingQueueJmxMBean { + + private final RingBufferBlockingQueue ringBufferBlockingQueue; + + @Override + public int capacity() { + return ringBufferBlockingQueue.capacity(); + } + + @Override + public int size() { + return ringBufferBlockingQueue.size(); + } + + @Override + public boolean isEmpty() { + return ringBufferBlockingQueue.isEmpty(); + } + + @Override + public boolean isFull() { + return ringBufferBlockingQueue.isFull(); + } + + @Override + public long writeSequence() { + return ringBufferBlockingQueue.writeSequence(); + } + + @Override + public long readSequence() { + return ringBufferBlockingQueue.readSequence(); + } + +} diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/instrument/RingBufferBlockingQueueJmxMBean.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/instrument/RingBufferBlockingQueueJmxMBean.java new file mode 100644 index 0000000..98af569 --- /dev/null +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/instrument/RingBufferBlockingQueueJmxMBean.java @@ -0,0 +1,33 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.sns.messaging.lib.instrument; + +public interface RingBufferBlockingQueueJmxMBean { + + int capacity(); + + int size(); + + boolean isEmpty(); + + boolean isFull(); + + long writeSequence(); + + long readSequence(); + +} diff --git a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueueTest.java b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueueTest.java index 9275031..d94171c 100644 --- a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueueTest.java +++ b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueueTest.java @@ -41,9 +41,9 @@ class RingBufferBlockingQueueTest { @Test void testSuccess() { - final ExecutorService producer = Executors.newSingleThreadScheduledExecutor(); + final ExecutorService producer = ExecutorsProvider.getExecutorService(); - final ScheduledExecutorService consumer = Executors.newSingleThreadScheduledExecutor(); + final ScheduledExecutorService consumer = Executors.newSingleThreadScheduledExecutor(ThreadFactoryProvider.getThreadFactory()); final List> requestEntriesOut = new LinkedList<>(); @@ -52,7 +52,6 @@ void testSuccess() { producer.submit(() -> { IntStream.range(0, 100_000).forEach(value -> { ringBlockingQueue.put(RequestEntry.builder().withValue(value).build()); - System.err.println("write: " + ringBlockingQueue.writeSequence()); }); }); @@ -62,7 +61,6 @@ void testSuccess() { while ((requestEntries.size() < 10) && Objects.nonNull(ringBlockingQueue.peek())) { final RequestEntry take = ringBlockingQueue.take(); - System.err.println("read: " + ringBlockingQueue.readSequence()); requestEntries.add(take); } @@ -92,9 +90,9 @@ void testSuccess() { void testSuccessWhenIsEmpty() { final RingBufferBlockingQueue> ringBlockingQueue = new RingBufferBlockingQueue<>(); - final ExecutorService producer = Executors.newSingleThreadExecutor(); + final ExecutorService producer = ExecutorsProvider.getExecutorService(); - final ExecutorService consumer = Executors.newSingleThreadExecutor(); + final ExecutorService consumer = ExecutorsProvider.getExecutorService(); consumer.submit(() -> { assertThat(ringBlockingQueue.take().getValue(), is(0)); @@ -121,9 +119,9 @@ void testSuccessWhenIsEmpty() { void testSuccessWhenIsFull() { final RingBufferBlockingQueue> ringBlockingQueue = new RingBufferBlockingQueue<>(1); - final ExecutorService producer = Executors.newSingleThreadExecutor(); + final ExecutorService producer = ExecutorsProvider.getExecutorService(); - final ExecutorService consumer = Executors.newSingleThreadExecutor(); + final ExecutorService consumer = ExecutorsProvider.getExecutorService(); producer.submit(() -> { ringBlockingQueue.put(RequestEntry.builder().withValue(0).build()); diff --git a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsThreadPoolExecutorTest.java b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsThreadPoolExecutorTest.java index a0f7e7f..465f6d3 100644 --- a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsThreadPoolExecutorTest.java +++ b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsThreadPoolExecutorTest.java @@ -26,6 +26,8 @@ import org.junit.jupiter.api.Test; +import com.amazon.sns.messaging.lib.concurrent.AmazonSnsThreadPoolExecutor; + // @formatter:off class AmazonSnsThreadPoolExecutorTest { diff --git a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/ListenableFutureRegistryTest.java b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/ListenableFutureRegistryTest.java index 2816a8a..be0fcdc 100644 --- a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/ListenableFutureRegistryTest.java +++ b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/ListenableFutureRegistryTest.java @@ -1,3 +1,19 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.amazon.sns.messaging.lib.core; import static org.hamcrest.CoreMatchers.notNullValue; diff --git a/amazon-sns-java-messaging-lib-v1/pom.xml b/amazon-sns-java-messaging-lib-v1/pom.xml index d0b3911..1c8bf5d 100644 --- a/amazon-sns-java-messaging-lib-v1/pom.xml +++ b/amazon-sns-java-messaging-lib-v1/pom.xml @@ -7,7 +7,7 @@ com.github.mvallim amazon-sns-java-messaging-lib - 1.0.8-SNAPSHOT + 1.1.0-SNAPSHOT ../pom.xml diff --git a/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java b/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java index 867a17e..5b10038 100644 --- a/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java +++ b/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java @@ -19,9 +19,9 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; import java.util.function.UnaryOperator; +import com.amazon.sns.messaging.lib.concurrent.ExecutorsProvider; import com.amazon.sns.messaging.lib.concurrent.RingBufferBlockingQueue; import com.amazon.sns.messaging.lib.model.RequestEntry; import com.amazon.sns.messaging.lib.model.TopicProperty; @@ -41,8 +41,8 @@ private AmazonSnsTemplate( final ObjectMapper objectMapper, final UnaryOperator publishDecorator) { super( - new AmazonSnsProducer<>(pendingRequests, topicRequests, Executors.newSingleThreadExecutor()), - new AmazonSnsConsumer<>(amazonSnsClient, topicProperty, objectMapper, pendingRequests, topicRequests, getAmazonSnsThreadPoolExecutor(topicProperty), publishDecorator) + new AmazonSnsProducer<>(pendingRequests, topicRequests, ExecutorsProvider.getExecutorService()), + new AmazonSnsConsumer<>(amazonSnsClient, topicProperty, objectMapper, pendingRequests, topicRequests, AbstractAmazonSnsTemplate.getAmazonSnsThreadPoolExecutor(topicProperty), publishDecorator) ); } diff --git a/amazon-sns-java-messaging-lib-v2/pom.xml b/amazon-sns-java-messaging-lib-v2/pom.xml index 09bd84d..2fbc6f6 100644 --- a/amazon-sns-java-messaging-lib-v2/pom.xml +++ b/amazon-sns-java-messaging-lib-v2/pom.xml @@ -7,7 +7,7 @@ com.github.mvallim amazon-sns-java-messaging-lib - 1.0.8-SNAPSHOT + 1.1.0-SNAPSHOT ../pom.xml diff --git a/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java b/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java index d83304e..7466ab9 100644 --- a/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java +++ b/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java @@ -19,9 +19,9 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; import java.util.function.UnaryOperator; +import com.amazon.sns.messaging.lib.concurrent.ExecutorsProvider; import com.amazon.sns.messaging.lib.concurrent.RingBufferBlockingQueue; import com.amazon.sns.messaging.lib.model.RequestEntry; import com.amazon.sns.messaging.lib.model.TopicProperty; @@ -42,8 +42,8 @@ private AmazonSnsTemplate( final ObjectMapper objectMapper, final UnaryOperator publishDecorator) { super( - new AmazonSnsProducer<>(pendingRequests, topicRequests, Executors.newSingleThreadExecutor()), - new AmazonSnsConsumer<>(amazonSnsClient, topicProperty, objectMapper, pendingRequests, topicRequests, getAmazonSnsThreadPoolExecutor(topicProperty), publishDecorator) + new AmazonSnsProducer<>(pendingRequests, topicRequests, ExecutorsProvider.getExecutorService()), + new AmazonSnsConsumer<>(amazonSnsClient, topicProperty, objectMapper, pendingRequests, topicRequests, AbstractAmazonSnsTemplate.getAmazonSnsThreadPoolExecutor(topicProperty), publishDecorator) ); } diff --git a/pom.xml b/pom.xml index a800b56..f1dd9ff 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.github.mvallim amazon-sns-java-messaging-lib - 1.0.8-SNAPSHOT + 1.1.0-SNAPSHOT pom ${project.artifactId} @@ -20,9 +20,10 @@ UTF-8 ${java.version} ${java.version} + true - 1.18.24 + 1.18.38 1.3.14 4.11.0 5.10.2 @@ -344,8 +345,7 @@ org.apache.maven.plugins maven-surefire-plugin - ${jacoco.argLine} -Dfile.encoding=${project.build.sourceEncoding} - -Xms1024m -Xmx1024m + ${jacoco.argLine} -Dfile.encoding=${project.build.sourceEncoding} -Xms1024m -Xmx1024m @@ -382,7 +382,6 @@ report - true ../target/site/${project.artifactId}/jacoco.exe ../target/site/${project.artifactId} @@ -397,7 +396,6 @@ ${java.version} ${project.build.sourceEncoding} true - true true true @@ -406,11 +404,6 @@ lombok ${lombok.version} - - org.openjdk.jmh - jmh-generator-annprocess - ${jmh.version} - @@ -448,6 +441,16 @@ + + 21 + + 21 + + + 21 + + + sonar