diff --git a/.github/workflows/cd-deploy.yml b/.github/workflows/cd-deploy.yml
index a37cb1d..9806012 100644
--- a/.github/workflows/cd-deploy.yml
+++ b/.github/workflows/cd-deploy.yml
@@ -51,7 +51,7 @@ jobs:
if: ${{ inputs.environment == 'release' }}
name: Set output variables
run: |
- RELEASE_VERSION=$(mvn -q -Dexec.executable="echo" -Dexec.args='${project.version}' --non-recursive exec:exec | grep -e '^[^\[]' | grep -Eo '[0-9]+\.[0-9]+\.[0-9]+')
+ RELEASE_VERSION=$(mvn help:evaluate -Dexpression=project.version -q -DforceStdout | grep -e '^[^\[]' | grep -Eo '[0-9]+\.[0-9]+\.[0-9]+')
RELEASE_TAG=v${RELEASE_VERSION}
echo "release-version=$RELEASE_VERSION" >> $GITHUB_OUTPUT
diff --git a/.github/workflows/cd-integration.yml b/.github/workflows/cd-integration.yml
index 547a3ff..4cc2456 100644
--- a/.github/workflows/cd-integration.yml
+++ b/.github/workflows/cd-integration.yml
@@ -32,7 +32,7 @@ jobs:
- id: environment
name: Set output environment passed to the reusable workflow
run: |
- VERSION=$(mvn help:evaluate -Dexpression=project.version -q -DforceStdout | cut -d '-' -f 1)
+ VERSION=$(mvn help:evaluate -Dexpression=project.version -q -DforceStdout | grep -e '^[^\[]' | grep -Eo '[0-9]+\.[0-9]+\.[0-9]+')
echo "version=$VERSION" >> $GITHUB_OUTPUT
echo "target-branch=release/$VERSION" >> $GITHUB_OUTPUT
@@ -55,4 +55,4 @@ jobs:
type: Snapshot
labels: automatic,snapshot
source-branch: master
- target-branch: ${{ needs.variables.outputs.target-branch }}
\ No newline at end of file
+ target-branch: ${{ needs.variables.outputs.target-branch }}
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 04eadb2..a1cdadf 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -20,26 +20,12 @@ jobs:
outputs:
type: ${{ steps.environment.outputs.type }}
labels: ${{ steps.environment.outputs.labels }}
- target-branch: ${{ steps.environment.outputs.target-branch }}
steps:
- - name: Checkout repository
- uses: actions/checkout@v4
-
- - name: Set up JDK
- uses: actions/setup-java@v4
- with:
- java-version: 17
- distribution: "corretto"
- cache: "maven"
-
- id: environment
name: Set output environment passed to the reusable workflow
run: |
- VERSION=$(mvn help:evaluate -Dexpression=project.version -q -DforceStdout)
- echo "version=$VERSION" >> $GITHUB_OUTPUT
echo "type=$TYPE" >> $GITHUB_OUTPUT
echo "labels=$LABELS" >> $GITHUB_OUTPUT
- echo "target-branch=release/$VERSION" >> $GITHUB_OUTPUT
ci:
needs: variables
diff --git a/.gitignore b/.gitignore
index 82093c9..793d078 100644
--- a/.gitignore
+++ b/.gitignore
@@ -94,3 +94,4 @@ logs/
pom.xml.releaseBackup
.attach_*
release.properties
+**/pom.xml.versionsBackup
diff --git a/README.md b/README.md
index 7afa0af..7be244c 100644
--- a/README.md
+++ b/README.md
@@ -38,7 +38,7 @@ You can pull it from the central Maven repositories:
com.github.mvallim
amazon-sns-java-messaging-lib-v1
- 1.0.7
+ 1.0.8
```
@@ -48,7 +48,7 @@ You can pull it from the central Maven repositories:
com.github.mvallim
amazon-sns-java-messaging-lib-v2
- 1.0.7
+ 1.0.8
```
@@ -70,13 +70,13 @@ If you want to try a snapshot version, add the following repository:
### For AWS SDK v1
```groovy
-implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v1:1.0.7'
+implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v1:1.0.8'
```
### For AWS SDK v2
```groovy
-implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v2:1.0.7'
+implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v2:1.0.8'
```
If you want to try a snapshot version, add the following repository:
diff --git a/amazon-sns-java-messaging-lib-template/pom.xml b/amazon-sns-java-messaging-lib-template/pom.xml
index 35adad0..8f1797c 100644
--- a/amazon-sns-java-messaging-lib-template/pom.xml
+++ b/amazon-sns-java-messaging-lib-template/pom.xml
@@ -1,11 +1,13 @@
-
+
4.0.0
com.github.mvallim
amazon-sns-java-messaging-lib
- 1.0.7-SNAPSHOT
+ 1.0.8-SNAPSHOT
../pom.xml
diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueue.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueue.java
index 7f21cca..ae0ed55 100644
--- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueue.java
+++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueue.java
@@ -17,102 +17,88 @@
package com.amazon.sns.messaging.lib.concurrent;
import java.util.AbstractQueue;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.IntStream;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
-@SuppressWarnings({ "java:S2274", "unchecked" })
+@SuppressWarnings({ "java:S2274" })
public class RingBufferBlockingQueue extends AbstractQueue implements BlockingQueue {
private static final int DEFAULT_CAPACITY = 2048;
- private final Entry[] buffer;
+ private final AtomicReferenceArray> buffer;
private final int capacity;
- private final AtomicInteger writeSequence = new AtomicInteger(-1);
+ private final AtomicLong writeSequence = new AtomicLong(-1);
- private final AtomicInteger readSequence = new AtomicInteger(0);
+ private final AtomicLong readSequence = new AtomicLong(0);
+
+ private final AtomicInteger size = new AtomicInteger(0);
private final ReentrantLock reentrantLock;
- private final Condition notEmpty;
+ private final Condition waitingConsumer;
- private final Condition notFull;
+ private final Condition waitingProducer;
public RingBufferBlockingQueue(final int capacity) {
this.capacity = capacity;
- this.buffer = new Entry[capacity];
- Arrays.setAll(buffer, p -> new Entry<>());
+ buffer = new AtomicReferenceArray<>(capacity);
reentrantLock = new ReentrantLock(true);
- notEmpty = reentrantLock.newCondition();
- notFull = reentrantLock.newCondition();
+ waitingConsumer = reentrantLock.newCondition();
+ waitingProducer = reentrantLock.newCondition();
+ IntStream.range(0, capacity).forEach(idx -> buffer.set(idx, new Entry<>()));
}
public RingBufferBlockingQueue() {
this(DEFAULT_CAPACITY);
}
- private void enqueue(final E element) throws InterruptedException {
- while (isFull()) {
- notFull.await();
- }
-
- final int nextWriteSeq = writeSequence.get() + 1;
- buffer[wrap(nextWriteSeq)].setValue(element);
- writeSequence.incrementAndGet();
- notEmpty.signal();
+ private long avoidSequenceOverflow(final long sequence) {
+ return (sequence < Long.MAX_VALUE ? sequence : wrap(sequence));
}
- private E dequeue() throws InterruptedException {
- while (isEmpty()) {
- notEmpty.await();
- }
-
- final E nextValue = buffer[wrap(readSequence.get())].getValue();
- readSequence.incrementAndGet();
- notFull.signal();
- return nextValue;
- }
-
- private int wrap(final int sequence) {
- return sequence % capacity;
+ private int wrap(final long sequence) {
+ return Math.toIntExact(sequence % capacity);
}
@Override
public int size() {
- return (writeSequence.get() - readSequence.get()) + 1;
+ return size.get();
}
@Override
public boolean isEmpty() {
- return writeSequence.get() < readSequence.get();
+ return size.get() == 0;
}
public boolean isFull() {
- return size() >= capacity;
+ return size.get() >= capacity;
}
- public int writeSequence() {
+ public long writeSequence() {
return writeSequence.get();
}
- public int readSequence() {
+ public long readSequence() {
return readSequence.get();
}
@Override
public E peek() {
- return isEmpty() ? null : buffer[wrap(readSequence.get())].getValue();
+ return isEmpty() ? null : buffer.get(wrap(readSequence.get())).getValue();
}
@Override
@@ -120,7 +106,21 @@ public E peek() {
public void put(final E element) {
try {
reentrantLock.lock();
- enqueue(element);
+
+ while (isFull()) {
+ waitingProducer.await();
+ }
+
+ final long prevWriteSeq = writeSequence.get();
+ final long nextWriteSeq = avoidSequenceOverflow(prevWriteSeq) + 1;
+
+ buffer.get(wrap(nextWriteSeq)).setValue(element);
+
+ writeSequence.compareAndSet(prevWriteSeq, nextWriteSeq);
+
+ size.incrementAndGet();
+
+ waitingConsumer.signal();
} finally {
reentrantLock.unlock();
}
@@ -131,7 +131,25 @@ public void put(final E element) {
public E take() {
try {
reentrantLock.lock();
- return dequeue();
+
+ while (isEmpty()) {
+ waitingConsumer.await();
+ }
+
+ final long prevReadSeq = readSequence.get();
+ final long nextReadSeq = avoidSequenceOverflow(prevReadSeq) + 1;
+
+ final E nextValue = buffer.get(wrap(prevReadSeq)).getValue();
+
+ buffer.get(wrap(prevReadSeq)).setValue(null);
+
+ readSequence.compareAndSet(prevReadSeq, nextReadSeq);
+
+ size.decrementAndGet();
+
+ waitingProducer.signal();
+
+ return nextValue;
} finally {
reentrantLock.unlock();
}
diff --git a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueueTest.java b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueueTest.java
index cdc97e2..9275031 100644
--- a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueueTest.java
+++ b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueueTest.java
@@ -21,7 +21,6 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.mockito.Mockito.spy;
import java.util.Collections;
import java.util.LinkedList;
@@ -42,17 +41,18 @@ class RingBufferBlockingQueueTest {
@Test
void testSuccess() {
- final ExecutorService producer = Executors.newSingleThreadExecutor();
+ final ExecutorService producer = Executors.newSingleThreadScheduledExecutor();
final ScheduledExecutorService consumer = Executors.newSingleThreadScheduledExecutor();
final List> requestEntriesOut = new LinkedList<>();
- final RingBufferBlockingQueue> ringBlockingQueue = new RingBufferBlockingQueue<>(5120);
+ final RingBufferBlockingQueue> ringBlockingQueue = new RingBufferBlockingQueue<>();
producer.submit(() -> {
IntStream.range(0, 100_000).forEach(value -> {
ringBlockingQueue.put(RequestEntry.builder().withValue(value).build());
+ System.err.println("write: " + ringBlockingQueue.writeSequence());
});
});
@@ -61,19 +61,23 @@ void testSuccess() {
final List> requestEntries = new LinkedList<>();
while ((requestEntries.size() < 10) && Objects.nonNull(ringBlockingQueue.peek())) {
- requestEntries.add(ringBlockingQueue.take());
+ final RequestEntry take = ringBlockingQueue.take();
+ System.err.println("read: " + ringBlockingQueue.readSequence());
+ requestEntries.add(take);
}
requestEntriesOut.addAll(requestEntries);
}
}, 0, 100L, TimeUnit.MILLISECONDS);
- await().atMost(1, TimeUnit.MINUTES).until(() -> ringBlockingQueue.writeSequence() == 99_999);
- producer.shutdownNow();
+ await().pollInterval(5, TimeUnit.SECONDS).pollDelay(200, TimeUnit.MILLISECONDS).until(() -> {
+ return (ringBlockingQueue.writeSequence() == 99_999) && (ringBlockingQueue.readSequence() == 100_000);
+ });
- await().atMost(1, TimeUnit.MINUTES).until(() -> ringBlockingQueue.readSequence() == 100_000);
- consumer.shutdownNow();
+ producer.shutdown();
+ consumer.shutdown();
+ assertThat(ringBlockingQueue.size(), is(0));
assertThat(ringBlockingQueue.isEmpty(), is(true));
assertThat(requestEntriesOut, hasSize(100_000));
@@ -85,8 +89,8 @@ void testSuccess() {
}
@Test
- void testSuccessWhenIsEmpty() throws InterruptedException {
- final RingBufferBlockingQueue> ringBlockingQueue = spy(new RingBufferBlockingQueue<>());
+ void testSuccessWhenIsEmpty() {
+ final RingBufferBlockingQueue> ringBlockingQueue = new RingBufferBlockingQueue<>();
final ExecutorService producer = Executors.newSingleThreadExecutor();
@@ -97,7 +101,7 @@ void testSuccessWhenIsEmpty() throws InterruptedException {
assertThat(ringBlockingQueue.take().getValue(), is(1));
});
- Thread.sleep(2000);
+ await().pollDelay(2, TimeUnit.SECONDS).until(() -> true);
producer.submit(() -> {
ringBlockingQueue.put(RequestEntry.builder().withValue(0).build());
@@ -114,8 +118,8 @@ void testSuccessWhenIsEmpty() throws InterruptedException {
}
@Test
- void testSuccessWhenIsFull() throws InterruptedException {
- final RingBufferBlockingQueue> ringBlockingQueue = spy(new RingBufferBlockingQueue<>(1));
+ void testSuccessWhenIsFull() {
+ final RingBufferBlockingQueue> ringBlockingQueue = new RingBufferBlockingQueue<>(1);
final ExecutorService producer = Executors.newSingleThreadExecutor();
@@ -126,7 +130,7 @@ void testSuccessWhenIsFull() throws InterruptedException {
ringBlockingQueue.put(RequestEntry.builder().withValue(1).build());
});
- Thread.sleep(2000);
+ await().pollDelay(2, TimeUnit.SECONDS).until(() -> true);
consumer.submit(() -> {
assertThat(ringBlockingQueue.take().getValue(), is(0));
diff --git a/amazon-sns-java-messaging-lib-v1/pom.xml b/amazon-sns-java-messaging-lib-v1/pom.xml
index 1e14907..d0b3911 100644
--- a/amazon-sns-java-messaging-lib-v1/pom.xml
+++ b/amazon-sns-java-messaging-lib-v1/pom.xml
@@ -1,22 +1,24 @@
-
-
+
+
4.0.0
-
+
com.github.mvallim
amazon-sns-java-messaging-lib
- 1.0.7-SNAPSHOT
+ 1.0.8-SNAPSHOT
../pom.xml
-
+
amazon-sns-java-messaging-lib-v1
${project.artifactId}
-
+
1.12.661
-
+
@@ -28,7 +30,7 @@
-
+
com.github.mvallim
diff --git a/amazon-sns-java-messaging-lib-v2/pom.xml b/amazon-sns-java-messaging-lib-v2/pom.xml
index 60469be..09bd84d 100644
--- a/amazon-sns-java-messaging-lib-v2/pom.xml
+++ b/amazon-sns-java-messaging-lib-v2/pom.xml
@@ -1,14 +1,16 @@
-
-
+
+
4.0.0
-
+
com.github.mvallim
amazon-sns-java-messaging-lib
- 1.0.7-SNAPSHOT
+ 1.0.8-SNAPSHOT
../pom.xml
-
+
amazon-sns-java-messaging-lib-v2
${project.artifactId}
@@ -16,7 +18,7 @@
2.20.162
-
+
@@ -39,5 +41,5 @@
sns
-
+
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index bca5029..a800b56 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
com.github.mvallim
amazon-sns-java-messaging-lib
- 1.0.7-SNAPSHOT
+ 1.0.8-SNAPSHOT
pom
${project.artifactId}