Skip to content

Commit b519d61

Browse files
authored
Merge pull request #108 from mvallim/feature/support-java-21-virtual-threads
Auto-created pull request into `develop` from `feature/support-java-21-virtual-threads`
2 parents 97da59d + b5b7e05 commit b519d61

File tree

23 files changed

+514
-87
lines changed

23 files changed

+514
-87
lines changed

.github/workflows/ci-maven.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ jobs:
88
runs-on: ubuntu-latest
99
strategy:
1010
matrix:
11-
java-version: [8, 11, 17]
11+
java-version: [8, 11, 17, 21]
1212
steps:
1313
- name: Checkout repository
1414
uses: actions/checkout@v4
@@ -34,7 +34,7 @@ jobs:
3434
runs-on: ubuntu-latest
3535
strategy:
3636
matrix:
37-
java-version: [8, 11, 17]
37+
java-version: [8, 11, 17, 21]
3838
steps:
3939
- name: Checkout repository
4040
uses: actions/checkout@v4

README.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ Combine multiple requests to optimally utilise the network.
1616

1717
Article [Martin Fowler](https://martinfowler.com) [Request Batch](https://martinfowler.com/articles/patterns-of-distributed-systems/request-batch.html)
1818

19-
_**Compatible JDK 8, 11 and 17**_
19+
_**Compatible JDK 8, 11, 17 and 21**_
2020

2121
_**Compatible AWS JDK v1 >= 1.12**_
2222

@@ -38,7 +38,7 @@ You can pull it from the central Maven repositories:
3838
<dependency>
3939
<groupId>com.github.mvallim</groupId>
4040
<artifactId>amazon-sns-java-messaging-lib-v1</artifactId>
41-
<version>1.0.8</version>
41+
<version>1.1.0</version>
4242
</dependency>
4343
```
4444

@@ -48,7 +48,7 @@ You can pull it from the central Maven repositories:
4848
<dependency>
4949
<groupId>com.github.mvallim</groupId>
5050
<artifactId>amazon-sns-java-messaging-lib-v2</artifactId>
51-
<version>1.0.8</version>
51+
<version>1.1.0</version>
5252
</dependency>
5353
```
5454

@@ -70,13 +70,13 @@ If you want to try a snapshot version, add the following repository:
7070
### For AWS SDK v1
7171

7272
```groovy
73-
implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v1:1.0.8'
73+
implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v1:1.1.0'
7474
```
7575

7676
### For AWS SDK v2
7777

7878
```groovy
79-
implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v2:1.0.8'
79+
implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v2:1.1.0'
8080
```
8181

8282
If you want to try a snapshot version, add the following repository:

amazon-sns-java-messaging-lib-template/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<parent>
88
<groupId>com.github.mvallim</groupId>
99
<artifactId>amazon-sns-java-messaging-lib</artifactId>
10-
<version>1.0.8-SNAPSHOT</version>
10+
<version>1.1.0-SNAPSHOT</version>
1111
<relativePath>../pom.xml</relativePath>
1212
</parent>
1313

amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsThreadPoolExecutor.java renamed to amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/AmazonSnsThreadPoolExecutor.java

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package com.amazon.sns.messaging.lib.core;
17+
package com.amazon.sns.messaging.lib.concurrent;
1818

1919
import java.util.Objects;
2020
import java.util.concurrent.SynchronousQueue;
@@ -23,49 +23,53 @@
2323
import java.util.concurrent.atomic.AtomicInteger;
2424

2525
public class AmazonSnsThreadPoolExecutor extends ThreadPoolExecutor {
26-
26+
2727
private final AtomicInteger activeTaskCount = new AtomicInteger();
28-
28+
2929
private final AtomicInteger failedTaskCount = new AtomicInteger();
30-
30+
3131
private final AtomicInteger succeededTaskCount = new AtomicInteger();
32-
32+
3333
public AmazonSnsThreadPoolExecutor(final int maximumPoolSize) {
34-
super(0, maximumPoolSize, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), new BlockingSubmissionPolicy(30000));
34+
super(0, maximumPoolSize, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), ThreadFactoryProvider.getThreadFactory(), new BlockingSubmissionPolicy(30000));
3535
}
36-
36+
3737
public int getActiveTaskCount() {
38-
return activeTaskCount.get();
38+
return this.activeTaskCount.get();
3939
}
40-
40+
4141
public int getFailedTaskCount() {
42-
return failedTaskCount.get();
42+
return this.failedTaskCount.get();
4343
}
44-
44+
4545
public int getSucceededTaskCount() {
46-
return succeededTaskCount.get();
46+
return this.succeededTaskCount.get();
4747
}
48-
48+
49+
public int getQueueSize() {
50+
return getQueue().size();
51+
}
52+
4953
@Override
5054
protected void beforeExecute(final Thread thread, final Runnable runnable) {
5155
try {
5256
super.beforeExecute(thread, runnable);
5357
} finally {
54-
activeTaskCount.incrementAndGet();
58+
this.activeTaskCount.incrementAndGet();
5559
}
5660
}
57-
61+
5862
@Override
5963
protected void afterExecute(final Runnable runnable, final Throwable throwable) {
6064
try {
6165
super.afterExecute(runnable, throwable);
6266
} finally {
6367
if (Objects.nonNull(throwable)) {
64-
failedTaskCount.incrementAndGet();
68+
this.failedTaskCount.incrementAndGet();
6569
} else {
66-
succeededTaskCount.incrementAndGet();
70+
this.succeededTaskCount.incrementAndGet();
6771
}
68-
activeTaskCount.decrementAndGet();
72+
this.activeTaskCount.decrementAndGet();
6973
}
7074
}
7175

amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/BlockingSubmissionPolicy.java renamed to amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/BlockingSubmissionPolicy.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package com.amazon.sns.messaging.lib.core;
17+
package com.amazon.sns.messaging.lib.concurrent;
1818

1919
import java.util.concurrent.BlockingQueue;
2020
import java.util.concurrent.RejectedExecutionException;
@@ -24,14 +24,14 @@
2424

2525
import lombok.SneakyThrows;
2626

27-
class BlockingSubmissionPolicy implements RejectedExecutionHandler {
28-
27+
public class BlockingSubmissionPolicy implements RejectedExecutionHandler {
28+
2929
private final long timeout;
30-
30+
3131
public BlockingSubmissionPolicy(final long timeout) {
3232
this.timeout = timeout;
3333
}
34-
34+
3535
@Override
3636
@SneakyThrows
3737
public void rejectedExecution(final Runnable runnable, final ThreadPoolExecutor executor) {
@@ -40,5 +40,5 @@ public void rejectedExecution(final Runnable runnable, final ThreadPoolExecutor
4040
throw new RejectedExecutionException("Timeout");
4141
}
4242
}
43-
43+
4444
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright 2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.amazon.sns.messaging.lib.concurrent;
18+
19+
import java.lang.reflect.Method;
20+
import java.util.concurrent.ExecutorService;
21+
import java.util.concurrent.Executors;
22+
import java.util.function.Supplier;
23+
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
27+
import lombok.AccessLevel;
28+
import lombok.NoArgsConstructor;
29+
import lombok.SneakyThrows;
30+
31+
@NoArgsConstructor(access = AccessLevel.PRIVATE)
32+
public final class ExecutorsProvider {
33+
34+
private static final Logger LOGGER = LoggerFactory.getLogger(ExecutorsProvider.class);
35+
36+
private static Supplier<ExecutorService> supplierExecutorService;
37+
38+
static {
39+
if (ExecutorsProvider.getJavaVersion() >= 21) {
40+
ExecutorsProvider.supplierExecutorService = ExecutorsProvider::getVirtualExecutorService;
41+
ExecutorsProvider.LOGGER.info("Java version is {}, using virtual thread executor", ExecutorsProvider.getJavaVersion());
42+
} else {
43+
ExecutorsProvider.supplierExecutorService = ExecutorsProvider::getDefaultExecutorService;
44+
ExecutorsProvider.LOGGER.info("Java version is {}, using default thread executor", ExecutorsProvider.getJavaVersion());
45+
}
46+
}
47+
48+
public static ExecutorService getExecutorService() {
49+
return ExecutorsProvider.supplierExecutorService.get();
50+
}
51+
52+
@SneakyThrows
53+
private static ExecutorService getDefaultExecutorService() {
54+
return Executors.newSingleThreadExecutor();
55+
}
56+
57+
@SneakyThrows
58+
private static ExecutorService getVirtualExecutorService() {
59+
final Class<?> clazzThread = Executors.class;
60+
final Method ofVirtualMethod = clazzThread.getMethod("newVirtualThreadPerTaskExecutor");
61+
return ExecutorService.class.cast(ofVirtualMethod.invoke(null));
62+
}
63+
64+
private static int getJavaVersion() {
65+
String version = System.getProperty("java.version");
66+
67+
if (version.startsWith("1.")) {
68+
version = version.substring(2);
69+
}
70+
71+
final int dotPos = version.indexOf('.');
72+
final int dashPos = version.indexOf('-');
73+
final int endIndex = dotPos > -1 ? dotPos : dashPos > -1 ? dashPos : 1;
74+
75+
return Integer.parseInt(version.substring(0, endIndex));
76+
}
77+
78+
}

amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueue.java

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import lombok.Setter;
3333
import lombok.SneakyThrows;
3434

35-
@SuppressWarnings({ "java:S2274" })
3635
public class RingBufferBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {
3736

3837
private static final int DEFAULT_CAPACITY = 2048;
@@ -55,50 +54,54 @@ public class RingBufferBlockingQueue<E> extends AbstractQueue<E> implements Bloc
5554

5655
public RingBufferBlockingQueue(final int capacity) {
5756
this.capacity = capacity;
58-
buffer = new AtomicReferenceArray<>(capacity);
59-
reentrantLock = new ReentrantLock(true);
60-
waitingConsumer = reentrantLock.newCondition();
61-
waitingProducer = reentrantLock.newCondition();
62-
IntStream.range(0, capacity).forEach(idx -> buffer.set(idx, new Entry<>()));
57+
this.buffer = new AtomicReferenceArray<>(capacity);
58+
this.reentrantLock = new ReentrantLock(true);
59+
this.waitingConsumer = this.reentrantLock.newCondition();
60+
this.waitingProducer = this.reentrantLock.newCondition();
61+
IntStream.range(0, capacity).forEach(idx -> this.buffer.set(idx, new Entry<>()));
6362
}
6463

6564
public RingBufferBlockingQueue() {
66-
this(DEFAULT_CAPACITY);
65+
this(RingBufferBlockingQueue.DEFAULT_CAPACITY);
6766
}
6867

6968
private long avoidSequenceOverflow(final long sequence) {
7069
return (sequence < Long.MAX_VALUE ? sequence : wrap(sequence));
7170
}
7271

7372
private int wrap(final long sequence) {
74-
return Math.toIntExact(sequence % capacity);
73+
return Math.toIntExact(sequence % this.capacity);
7574
}
76-
75+
76+
public int capacity() {
77+
return this.capacity;
78+
}
79+
7780
@Override
7881
public int size() {
79-
return size.get();
82+
return this.size.get();
8083
}
8184

8285
@Override
8386
public boolean isEmpty() {
84-
return size.get() == 0;
87+
return this.size.get() == 0;
8588
}
8689

8790
public boolean isFull() {
88-
return size.get() >= capacity;
91+
return this.size.get() >= this.capacity;
8992
}
9093

9194
public long writeSequence() {
92-
return writeSequence.get();
95+
return this.writeSequence.get();
9396
}
9497

9598
public long readSequence() {
96-
return readSequence.get();
99+
return this.readSequence.get();
97100
}
98101

99102
@Override
100103
public E peek() {
101-
return isEmpty() ? null : buffer.get(wrap(readSequence.get())).getValue();
104+
return isEmpty() ? null : this.buffer.get(wrap(this.readSequence.get())).getValue();
102105
}
103106

104107
@Override

0 commit comments

Comments
 (0)