Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/cd-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ jobs:
if: ${{ inputs.environment == 'release' }}
name: Set output variables
run: |
RELEASE_VERSION=$(mvn -q -Dexec.executable="echo" -Dexec.args='${project.version}' --non-recursive exec:exec | grep -e '^[^\[]' | grep -Eo '[0-9]+\.[0-9]+\.[0-9]+')
RELEASE_VERSION=$(mvn help:evaluate -Dexpression=project.version -q -DforceStdout | grep -e '^[^\[]' | grep -Eo '[0-9]+\.[0-9]+\.[0-9]+')
RELEASE_TAG=v${RELEASE_VERSION}

echo "release-version=$RELEASE_VERSION" >> $GITHUB_OUTPUT
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/cd-integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
- id: environment
name: Set output environment passed to the reusable workflow
run: |
VERSION=$(mvn help:evaluate -Dexpression=project.version -q -DforceStdout | cut -d '-' -f 1)
VERSION=$(mvn help:evaluate -Dexpression=project.version -q -DforceStdout | grep -e '^[^\[]' | grep -Eo '[0-9]+\.[0-9]+\.[0-9]+')
echo "version=$VERSION" >> $GITHUB_OUTPUT
echo "target-branch=release/$VERSION" >> $GITHUB_OUTPUT

Expand All @@ -55,4 +55,4 @@ jobs:
type: Snapshot
labels: automatic,snapshot
source-branch: master
target-branch: ${{ needs.variables.outputs.target-branch }}
target-branch: ${{ needs.variables.outputs.target-branch }}
14 changes: 0 additions & 14 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,12 @@ jobs:
outputs:
type: ${{ steps.environment.outputs.type }}
labels: ${{ steps.environment.outputs.labels }}
target-branch: ${{ steps.environment.outputs.target-branch }}
steps:
- name: Checkout repository
uses: actions/checkout@v4

- name: Set up JDK
uses: actions/setup-java@v4
with:
java-version: 17
distribution: "corretto"
cache: "maven"

- id: environment
name: Set output environment passed to the reusable workflow
run: |
VERSION=$(mvn help:evaluate -Dexpression=project.version -q -DforceStdout)
echo "version=$VERSION" >> $GITHUB_OUTPUT
echo "type=$TYPE" >> $GITHUB_OUTPUT
echo "labels=$LABELS" >> $GITHUB_OUTPUT
echo "target-branch=release/$VERSION" >> $GITHUB_OUTPUT

ci:
needs: variables
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,4 @@ logs/
pom.xml.releaseBackup
.attach_*
release.properties
**/pom.xml.versionsBackup
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ You can pull it from the central Maven repositories:
<dependency>
<groupId>com.github.mvallim</groupId>
<artifactId>amazon-sns-java-messaging-lib-v1</artifactId>
<version>1.0.7</version>
<version>1.0.8</version>
</dependency>
```

Expand All @@ -48,7 +48,7 @@ You can pull it from the central Maven repositories:
<dependency>
<groupId>com.github.mvallim</groupId>
<artifactId>amazon-sns-java-messaging-lib-v2</artifactId>
<version>1.0.7</version>
<version>1.0.8</version>
</dependency>
```

Expand All @@ -70,13 +70,13 @@ If you want to try a snapshot version, add the following repository:
### For AWS SDK v1

```groovy
implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v1:1.0.7'
implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v1:1.0.8'
```

### For AWS SDK v2

```groovy
implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v2:1.0.7'
implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v2:1.0.8'
```

If you want to try a snapshot version, add the following repository:
Expand Down
6 changes: 4 additions & 2 deletions amazon-sns-java-messaging-lib-template/pom.xml
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.github.mvallim</groupId>
<artifactId>amazon-sns-java-messaging-lib</artifactId>
<version>1.0.7-SNAPSHOT</version>
<version>1.0.8-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,110 +17,110 @@
package com.amazon.sns.messaging.lib.concurrent;

import java.util.AbstractQueue;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.IntStream;

import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;

@SuppressWarnings({ "java:S2274", "unchecked" })
@SuppressWarnings({ "java:S2274" })
public class RingBufferBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {

private static final int DEFAULT_CAPACITY = 2048;

private final Entry<E>[] buffer;
private final AtomicReferenceArray<Entry<E>> buffer;

private final int capacity;

private final AtomicInteger writeSequence = new AtomicInteger(-1);
private final AtomicLong writeSequence = new AtomicLong(-1);

private final AtomicInteger readSequence = new AtomicInteger(0);
private final AtomicLong readSequence = new AtomicLong(0);

private final AtomicInteger size = new AtomicInteger(0);

private final ReentrantLock reentrantLock;

private final Condition notEmpty;
private final Condition waitingConsumer;

private final Condition notFull;
private final Condition waitingProducer;

public RingBufferBlockingQueue(final int capacity) {
this.capacity = capacity;
this.buffer = new Entry[capacity];
Arrays.setAll(buffer, p -> new Entry<>());
buffer = new AtomicReferenceArray<>(capacity);
reentrantLock = new ReentrantLock(true);
notEmpty = reentrantLock.newCondition();
notFull = reentrantLock.newCondition();
waitingConsumer = reentrantLock.newCondition();
waitingProducer = reentrantLock.newCondition();
IntStream.range(0, capacity).forEach(idx -> buffer.set(idx, new Entry<>()));
}

public RingBufferBlockingQueue() {
this(DEFAULT_CAPACITY);
}

private void enqueue(final E element) throws InterruptedException {
while (isFull()) {
notFull.await();
}

final int nextWriteSeq = writeSequence.get() + 1;
buffer[wrap(nextWriteSeq)].setValue(element);
writeSequence.incrementAndGet();
notEmpty.signal();
private long avoidSequenceOverflow(final long sequence) {
return (sequence < Long.MAX_VALUE ? sequence : wrap(sequence));
}

private E dequeue() throws InterruptedException {
while (isEmpty()) {
notEmpty.await();
}

final E nextValue = buffer[wrap(readSequence.get())].getValue();
readSequence.incrementAndGet();
notFull.signal();
return nextValue;
}

private int wrap(final int sequence) {
return sequence % capacity;
private int wrap(final long sequence) {
return Math.toIntExact(sequence % capacity);
}

@Override
public int size() {
return (writeSequence.get() - readSequence.get()) + 1;
return size.get();
}

@Override
public boolean isEmpty() {
return writeSequence.get() < readSequence.get();
return size.get() == 0;
}

public boolean isFull() {
return size() >= capacity;
return size.get() >= capacity;
}

public int writeSequence() {
public long writeSequence() {
return writeSequence.get();
}

public int readSequence() {
public long readSequence() {
return readSequence.get();
}

@Override
public E peek() {
return isEmpty() ? null : buffer[wrap(readSequence.get())].getValue();
return isEmpty() ? null : buffer.get(wrap(readSequence.get())).getValue();
}

@Override
@SneakyThrows
public void put(final E element) {
try {
reentrantLock.lock();
enqueue(element);

while (isFull()) {
waitingProducer.await();
}

final long prevWriteSeq = writeSequence.get();
final long nextWriteSeq = avoidSequenceOverflow(prevWriteSeq) + 1;

buffer.get(wrap(nextWriteSeq)).setValue(element);

writeSequence.compareAndSet(prevWriteSeq, nextWriteSeq);

size.incrementAndGet();

waitingConsumer.signal();
} finally {
reentrantLock.unlock();
}
Expand All @@ -131,7 +131,25 @@ public void put(final E element) {
public E take() {
try {
reentrantLock.lock();
return dequeue();

while (isEmpty()) {
waitingConsumer.await();
}

final long prevReadSeq = readSequence.get();
final long nextReadSeq = avoidSequenceOverflow(prevReadSeq) + 1;

final E nextValue = buffer.get(wrap(prevReadSeq)).getValue();

buffer.get(wrap(prevReadSeq)).setValue(null);

readSequence.compareAndSet(prevReadSeq, nextReadSeq);

size.decrementAndGet();

waitingProducer.signal();

return nextValue;
} finally {
reentrantLock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.spy;

import java.util.Collections;
import java.util.LinkedList;
Expand All @@ -42,17 +41,18 @@ class RingBufferBlockingQueueTest {

@Test
void testSuccess() {
final ExecutorService producer = Executors.newSingleThreadExecutor();
final ExecutorService producer = Executors.newSingleThreadScheduledExecutor();

final ScheduledExecutorService consumer = Executors.newSingleThreadScheduledExecutor();

final List<RequestEntry<Integer>> requestEntriesOut = new LinkedList<>();

final RingBufferBlockingQueue<RequestEntry<Integer>> ringBlockingQueue = new RingBufferBlockingQueue<>(5120);
final RingBufferBlockingQueue<RequestEntry<Integer>> ringBlockingQueue = new RingBufferBlockingQueue<>();

producer.submit(() -> {
IntStream.range(0, 100_000).forEach(value -> {
ringBlockingQueue.put(RequestEntry.<Integer>builder().withValue(value).build());
System.err.println("write: " + ringBlockingQueue.writeSequence());
});
});

Expand All @@ -61,19 +61,23 @@ void testSuccess() {
final List<RequestEntry<Integer>> requestEntries = new LinkedList<>();

while ((requestEntries.size() < 10) && Objects.nonNull(ringBlockingQueue.peek())) {
requestEntries.add(ringBlockingQueue.take());
final RequestEntry<Integer> take = ringBlockingQueue.take();
System.err.println("read: " + ringBlockingQueue.readSequence());
requestEntries.add(take);
}

requestEntriesOut.addAll(requestEntries);
}
}, 0, 100L, TimeUnit.MILLISECONDS);

await().atMost(1, TimeUnit.MINUTES).until(() -> ringBlockingQueue.writeSequence() == 99_999);
producer.shutdownNow();
await().pollInterval(5, TimeUnit.SECONDS).pollDelay(200, TimeUnit.MILLISECONDS).until(() -> {
return (ringBlockingQueue.writeSequence() == 99_999) && (ringBlockingQueue.readSequence() == 100_000);
});

await().atMost(1, TimeUnit.MINUTES).until(() -> ringBlockingQueue.readSequence() == 100_000);
consumer.shutdownNow();
producer.shutdown();
consumer.shutdown();

assertThat(ringBlockingQueue.size(), is(0));
assertThat(ringBlockingQueue.isEmpty(), is(true));

assertThat(requestEntriesOut, hasSize(100_000));
Expand All @@ -85,8 +89,8 @@ void testSuccess() {
}

@Test
void testSuccessWhenIsEmpty() throws InterruptedException {
final RingBufferBlockingQueue<RequestEntry<Integer>> ringBlockingQueue = spy(new RingBufferBlockingQueue<>());
void testSuccessWhenIsEmpty() {
final RingBufferBlockingQueue<RequestEntry<Integer>> ringBlockingQueue = new RingBufferBlockingQueue<>();

final ExecutorService producer = Executors.newSingleThreadExecutor();

Expand All @@ -97,7 +101,7 @@ void testSuccessWhenIsEmpty() throws InterruptedException {
assertThat(ringBlockingQueue.take().getValue(), is(1));
});

Thread.sleep(2000);
await().pollDelay(2, TimeUnit.SECONDS).until(() -> true);

producer.submit(() -> {
ringBlockingQueue.put(RequestEntry.<Integer>builder().withValue(0).build());
Expand All @@ -114,8 +118,8 @@ void testSuccessWhenIsEmpty() throws InterruptedException {
}

@Test
void testSuccessWhenIsFull() throws InterruptedException {
final RingBufferBlockingQueue<RequestEntry<Integer>> ringBlockingQueue = spy(new RingBufferBlockingQueue<>(1));
void testSuccessWhenIsFull() {
final RingBufferBlockingQueue<RequestEntry<Integer>> ringBlockingQueue = new RingBufferBlockingQueue<>(1);

final ExecutorService producer = Executors.newSingleThreadExecutor();

Expand All @@ -126,7 +130,7 @@ void testSuccessWhenIsFull() throws InterruptedException {
ringBlockingQueue.put(RequestEntry.<Integer>builder().withValue(1).build());
});

Thread.sleep(2000);
await().pollDelay(2, TimeUnit.SECONDS).until(() -> true);

consumer.submit(() -> {
assertThat(ringBlockingQueue.take().getValue(), is(0));
Expand Down
Loading