Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/cd-integration.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: Build, Gates and Pull Request

on:
pull_request_target:
pull_request:
branches:
- develop
types:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/cd-release.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: Build and Publish Release

on:
pull_request_target:
pull_request:
branches:
- master
types:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/cd-snapshot.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: Build, Publish Snapshot and Pull Request

on:
pull_request_target:
pull_request:
branches:
- release/**
types:
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/ci-maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
java-version: [8, 11, 17]
java-version: [8, 11, 17, 21]
steps:
- name: Checkout repository
uses: actions/checkout@v4
Expand All @@ -34,7 +34,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
java-version: [8, 11, 17]
java-version: [8, 11, 17, 21]
steps:
- name: Checkout repository
uses: actions/checkout@v4
Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Combine multiple requests to optimally utilise the network.

Article [Martin Fowler](https://martinfowler.com) [Request Batch](https://martinfowler.com/articles/patterns-of-distributed-systems/request-batch.html)

_**Compatible JDK 8, 11 and 17**_
_**Compatible JDK 8, 11, 17 and 21**_

_**Compatible AWS JDK v1 >= 1.12**_

Expand All @@ -38,7 +38,7 @@ You can pull it from the central Maven repositories:
<dependency>
<groupId>com.github.mvallim</groupId>
<artifactId>amazon-sns-java-messaging-lib-v1</artifactId>
<version>1.0.8</version>
<version>1.1.0</version>
</dependency>
```

Expand All @@ -48,7 +48,7 @@ You can pull it from the central Maven repositories:
<dependency>
<groupId>com.github.mvallim</groupId>
<artifactId>amazon-sns-java-messaging-lib-v2</artifactId>
<version>1.0.8</version>
<version>1.1.0</version>
</dependency>
```

Expand All @@ -70,13 +70,13 @@ If you want to try a snapshot version, add the following repository:
### For AWS SDK v1

```groovy
implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v1:1.0.8'
implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v1:1.1.0'
```

### For AWS SDK v2

```groovy
implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v2:1.0.8'
implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v2:1.1.0'
```

If you want to try a snapshot version, add the following repository:
Expand Down
2 changes: 1 addition & 1 deletion amazon-sns-java-messaging-lib-template/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>com.github.mvallim</groupId>
<artifactId>amazon-sns-java-messaging-lib</artifactId>
<version>1.0.8-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.amazon.sns.messaging.lib.core;
package com.amazon.sns.messaging.lib.concurrent;

import java.util.Objects;
import java.util.concurrent.SynchronousQueue;
Expand All @@ -23,49 +23,53 @@
import java.util.concurrent.atomic.AtomicInteger;

public class AmazonSnsThreadPoolExecutor extends ThreadPoolExecutor {

private final AtomicInteger activeTaskCount = new AtomicInteger();

private final AtomicInteger failedTaskCount = new AtomicInteger();

private final AtomicInteger succeededTaskCount = new AtomicInteger();

public AmazonSnsThreadPoolExecutor(final int maximumPoolSize) {
super(0, maximumPoolSize, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), new BlockingSubmissionPolicy(30000));
super(0, maximumPoolSize, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), ThreadFactoryProvider.getThreadFactory(), new BlockingSubmissionPolicy(30000));
}

public int getActiveTaskCount() {
return activeTaskCount.get();
return this.activeTaskCount.get();
}

public int getFailedTaskCount() {
return failedTaskCount.get();
return this.failedTaskCount.get();
}

public int getSucceededTaskCount() {
return succeededTaskCount.get();
return this.succeededTaskCount.get();
}


public int getQueueSize() {
return getQueue().size();
}

@Override
protected void beforeExecute(final Thread thread, final Runnable runnable) {
try {
super.beforeExecute(thread, runnable);
} finally {
activeTaskCount.incrementAndGet();
this.activeTaskCount.incrementAndGet();
}
}

@Override
protected void afterExecute(final Runnable runnable, final Throwable throwable) {
try {
super.afterExecute(runnable, throwable);
} finally {
if (Objects.nonNull(throwable)) {
failedTaskCount.incrementAndGet();
this.failedTaskCount.incrementAndGet();
} else {
succeededTaskCount.incrementAndGet();
this.succeededTaskCount.incrementAndGet();
}
activeTaskCount.decrementAndGet();
this.activeTaskCount.decrementAndGet();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.amazon.sns.messaging.lib.core;
package com.amazon.sns.messaging.lib.concurrent;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
Expand All @@ -24,14 +24,14 @@

import lombok.SneakyThrows;

class BlockingSubmissionPolicy implements RejectedExecutionHandler {

public class BlockingSubmissionPolicy implements RejectedExecutionHandler {
private final long timeout;

public BlockingSubmissionPolicy(final long timeout) {
this.timeout = timeout;
}

@Override
@SneakyThrows
public void rejectedExecution(final Runnable runnable, final ThreadPoolExecutor executor) {
Expand All @@ -40,5 +40,5 @@ public void rejectedExecution(final Runnable runnable, final ThreadPoolExecutor
throw new RejectedExecutionException("Timeout");
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.amazon.sns.messaging.lib.concurrent;

import java.lang.reflect.Method;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Supplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.SneakyThrows;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ExecutorsProvider {

private static final Logger LOGGER = LoggerFactory.getLogger(ExecutorsProvider.class);

private static Supplier<ExecutorService> supplierExecutorService;

static {
if (ExecutorsProvider.getJavaVersion() >= 21) {
ExecutorsProvider.supplierExecutorService = ExecutorsProvider::getVirtualExecutorService;
ExecutorsProvider.LOGGER.info("Java version is {}, using virtual thread executor", ExecutorsProvider.getJavaVersion());
} else {
ExecutorsProvider.supplierExecutorService = ExecutorsProvider::getDefaultExecutorService;
ExecutorsProvider.LOGGER.info("Java version is {}, using default thread executor", ExecutorsProvider.getJavaVersion());
}
}

public static ExecutorService getExecutorService() {
return ExecutorsProvider.supplierExecutorService.get();
}

@SneakyThrows
private static ExecutorService getDefaultExecutorService() {
return Executors.newSingleThreadExecutor();
}

@SneakyThrows
private static ExecutorService getVirtualExecutorService() {
final Class<?> clazzThread = Executors.class;
final Method ofVirtualMethod = clazzThread.getMethod("newVirtualThreadPerTaskExecutor");
return ExecutorService.class.cast(ofVirtualMethod.invoke(null));
}

private static int getJavaVersion() {
String version = System.getProperty("java.version");

if (version.startsWith("1.")) {
version = version.substring(2);
}

final int dotPos = version.indexOf('.');
final int dashPos = version.indexOf('-');
final int endIndex = dotPos > -1 ? dotPos : dashPos > -1 ? dashPos : 1;

return Integer.parseInt(version.substring(0, endIndex));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import lombok.Setter;
import lombok.SneakyThrows;

@SuppressWarnings({ "java:S2274" })
public class RingBufferBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {

private static final int DEFAULT_CAPACITY = 2048;
Expand All @@ -55,50 +54,54 @@ public class RingBufferBlockingQueue<E> extends AbstractQueue<E> implements Bloc

public RingBufferBlockingQueue(final int capacity) {
this.capacity = capacity;
buffer = new AtomicReferenceArray<>(capacity);
reentrantLock = new ReentrantLock(true);
waitingConsumer = reentrantLock.newCondition();
waitingProducer = reentrantLock.newCondition();
IntStream.range(0, capacity).forEach(idx -> buffer.set(idx, new Entry<>()));
this.buffer = new AtomicReferenceArray<>(capacity);
this.reentrantLock = new ReentrantLock(true);
this.waitingConsumer = this.reentrantLock.newCondition();
this.waitingProducer = this.reentrantLock.newCondition();
IntStream.range(0, capacity).forEach(idx -> this.buffer.set(idx, new Entry<>()));
}

public RingBufferBlockingQueue() {
this(DEFAULT_CAPACITY);
this(RingBufferBlockingQueue.DEFAULT_CAPACITY);
}

private long avoidSequenceOverflow(final long sequence) {
return (sequence < Long.MAX_VALUE ? sequence : wrap(sequence));
}

private int wrap(final long sequence) {
return Math.toIntExact(sequence % capacity);
return Math.toIntExact(sequence % this.capacity);
}


public int capacity() {
return this.capacity;
}

@Override
public int size() {
return size.get();
return this.size.get();
}

@Override
public boolean isEmpty() {
return size.get() == 0;
return this.size.get() == 0;
}

public boolean isFull() {
return size.get() >= capacity;
return this.size.get() >= this.capacity;
}

public long writeSequence() {
return writeSequence.get();
return this.writeSequence.get();
}

public long readSequence() {
return readSequence.get();
return this.readSequence.get();
}

@Override
public E peek() {
return isEmpty() ? null : buffer.get(wrap(readSequence.get())).getValue();
return isEmpty() ? null : this.buffer.get(wrap(this.readSequence.get())).getValue();
}

@Override
Expand Down
Loading
Loading