Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 11 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class Demo1 {
public static void main(String[] args) throws InterruptedException {
// creates a rendezvous channel
// (a sender & receiver must meet to pass a value: as if the buffer had size 0)
var ch = new Channel<Integer>();
var ch = Channel.<Integer>newRendezvousChannel();

Thread.ofVirtual().start(() -> {
try {
Expand Down Expand Up @@ -100,8 +100,7 @@ import com.softwaremill.jox.Channel;

class Demo2 {
public static void main(String[] args) throws InterruptedException {
// creates a buffered channel (buffer of size 3)
var ch = new Channel<Integer>(3);
var ch = Channel.<Integer>newBufferedChannel(3);

// send()-s won't block
ch.send(1);
Expand Down Expand Up @@ -158,8 +157,7 @@ import com.softwaremill.jox.Channel;

class Demo3 {
public static void main(String[] args) throws InterruptedException {
// creates a buffered channel (buffer of size 3)
var ch = new Channel<Integer>(3);
var ch = Channel.<Integer>newBufferedChannel(3);

// send()-s won't block
ch.send(1);
Expand All @@ -186,9 +184,9 @@ import static com.softwaremill.jox.Select.select;
class Demo4 {
public static void main(String[] args) throws InterruptedException {
// creates a buffered channel (buffer of size 3)
var ch1 = new Channel<Integer>(3);
var ch2 = new Channel<Integer>(3);
var ch3 = new Channel<Integer>(3);
var ch1 = Channel.<Integer>newBufferedChannel(3);
var ch2 = Channel.<Integer>newBufferedChannel(3);
var ch3 = Channel.<Integer>newBufferedChannel(3);

// send a value to two channels
ch2.send(29);
Expand Down Expand Up @@ -218,8 +216,8 @@ import static com.softwaremill.jox.Select.select;

class Demo5 {
public static void main(String[] args) throws InterruptedException {
var ch1 = new Channel<Integer>(1);
var ch2 = new Channel<Integer>(1);
var ch1 = Channel.<Integer>newBufferedChannel(1);
var ch2 = Channel.<Integer>newBufferedChannel(1);

ch1.send(12); // buffer is now full

Expand All @@ -242,8 +240,8 @@ import static com.softwaremill.jox.Select.select;

class Demo6 {
public static void main(String[] args) throws InterruptedException {
var ch1 = new Channel<Integer>(3);
var ch2 = new Channel<Integer>(3);
var ch1 = Channel.<Integer>newBufferedChannel(3);
var ch2 = Channel.<Integer>newBufferedChannel(3);

var received = select(ch1.receiveClause(), ch2.receiveClause(), defaultClause(52));

Expand Down Expand Up @@ -613,7 +611,7 @@ import com.softwaremill.jox.structured.Scopes;
public class Demo {

public static void main(String[] args) throws ExecutionException, InterruptedException {
Channel<Integer> ch = new Channel<>(16);
Channel<Integer> ch = Channel.<Integer>newBufferedDefaultChannel();
Scopes.supervised(scope -> {
scope.fork(() -> {
ch.send(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void arrayBlockingQueue() throws InterruptedException {
@Benchmark
@OperationsPerInvocation(OPERATIONS_PER_INVOCATION)
public void channel() throws InterruptedException {
var ch = new Channel<>(capacity);
var ch = Channel.newBufferedChannel(capacity);
var t1 = Thread.startVirtualThread(() -> {
for (int i = 0; i < OPERATIONS_PER_INVOCATION; i++) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void channelChain() throws InterruptedException {
int elements = OPERATIONS_PER_INVOCATION / chainLength;
Channel<Integer>[] channels = new Channel[chainLength];
for (int i = 0; i < chainLength; i++) {
channels[i] = new Channel<>(capacity);
channels[i] = (capacity == 0) ? Channel.newRendezvousChannel() : Channel.newBufferedChannel(capacity);
}

Thread[] threads = new Thread[chainLength + 1];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void parallelChannels() throws InterruptedException {
var latch = new CountDownLatch(parallelism);

for (int t = 0; t < parallelism; t++) {
var ch = new Channel<Integer>(capacity);
var ch = (capacity == 0) ? Channel.<Integer>newRendezvousChannel() : Channel.<Integer>newBufferedChannel(capacity);
// sender
Thread.startVirtualThread(() -> {
for (int i = 0; i < elementsPerChannel; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void exchanger() throws InterruptedException {
@Benchmark
@OperationsPerInvocation(OPERATIONS_PER_INVOCATION)
public void channel() throws InterruptedException {
var ch = new Channel<>();
var ch = Channel.newRendezvousChannel();
var t1 = Thread.startVirtualThread(() -> {
for (int i = 0; i < OPERATIONS_PER_INVOCATION; i++) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class SelectBenchmark {
@Benchmark
@OperationsPerInvocation(OPERATIONS_PER_INVOCATION)
public void selectWithSingleClause() throws InterruptedException {
var ch = new Channel<Integer>();
var ch = Channel.newRendezvousChannel();
var t1 = Thread.startVirtualThread(() -> {
for (int i = 0; i < OPERATIONS_PER_INVOCATION; i++) {
try {
Expand Down Expand Up @@ -48,8 +48,8 @@ public void selectWithSingleClause() throws InterruptedException {
@Benchmark
@OperationsPerInvocation(OPERATIONS_PER_INVOCATION)
public void selectWithTwoClauses() throws InterruptedException {
var ch1 = new Channel<Integer>();
var ch2 = new Channel<Integer>();
var ch1 = Channel.newRendezvousChannel();
var ch2 = Channel.newRendezvousChannel();
var t1 = Thread.startVirtualThread(() -> {
for (int i = 0; i < OPERATIONS_PER_INVOCATION / 2; i++) {
try {
Expand Down
30 changes: 19 additions & 11 deletions channels/src/main/java/com/softwaremill/jox/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
* - buffered channels, where a given number of sent values might be buffered, before subsequent `send`s block
* - unlimited channels, where an unlimited number of values might be buffered, hence `send` never blocks
* <p>
* The no-argument {@link Channel} constructor creates a rendezvous channel, while a buffered channel can be created
* by providing a positive integer to the constructor. A rendezvous channel behaves like a buffered channel with
* buffer size 0. An unlimited channel can be created using {@link Channel#newUnlimitedChannel()}.
* To create a channel, use {@link Channel#newRendezvousChannel()}, {@link Channel#newBufferedChannel(int)} and
* {@link Channel#newUnlimitedChannel()} methods. Additionally, {@link Channel#newBufferedDefaultChannel()} creates
* a buffered channel with a "default" capacity of 16, which should be a good starting point for most use-cases.
* <p>
* In a rendezvous channel, senders and receivers block, until a matching party arrives (unless one is already waiting).
* Similarly, buffered channels block if the buffer is full (in case of senders), or in case of receivers, if the
Expand Down Expand Up @@ -152,17 +152,10 @@ operations on these (previous) segments, and we'll end up wanting to remove such
}
}

/**
* Creates a rendezvous channel.
*/
public Channel() {
this(0);
}

/**
* Creates a buffered channel (when capacity is positive), or a rendezvous channel if the capacity is 0.
*/
public Channel(int capacity) {
private Channel(int capacity) {
if (capacity < UNLIMITED_CAPACITY) {
throw new IllegalArgumentException("Capacity must be 0 (rendezvous), positive (buffered) or -1 (unlimited channels).");
}
Expand Down Expand Up @@ -205,6 +198,21 @@ private void processInitialBuffer() {
}
}

public static <T> Channel<T> newRendezvousChannel() {
return new Channel<>(0);
}

public static <T> Channel<T> newBufferedChannel(int capacity) {
return new Channel<>(capacity);
}

/**
* Creates a new buffered channel, with the default buffer size (16).
*/
public static <T> Channel<T> newBufferedDefaultChannel() {
return new Channel<>(DEFAULT_BUFFER_SIZE);
}

public static <T> Channel<T> newUnlimitedChannel() {
return new Channel<>(UNLIMITED_CAPACITY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class ChannelBufferedTest {
@Timeout(1)
void testSimpleSendReceiveBuffer1() throws InterruptedException {
// given
Channel<String> channel = new Channel<>(1);
Channel<String> channel = Channel.newBufferedChannel(1);

// when
channel.send("x"); // should not block
Expand All @@ -31,7 +31,7 @@ void testSimpleSendReceiveBuffer1() throws InterruptedException {
@Timeout(1)
void testSimpleSendReceiveBuffer2() throws InterruptedException {
// given
Channel<String> channel = new Channel<>(2);
Channel<String> channel = Channel.newBufferedChannel(2);

// when
channel.send("x"); // should not block
Expand All @@ -48,7 +48,7 @@ void testSimpleSendReceiveBuffer2() throws InterruptedException {
@Timeout(2)
void testBufferCapacityStaysTheSameAfterSendsReceives() throws ExecutionException, InterruptedException {
// given
Channel<Integer> channel = new Channel<>(2);
Channel<Integer> channel = Channel.newBufferedChannel(2);

// when
scoped(scope -> {
Expand Down Expand Up @@ -78,7 +78,7 @@ void testBufferCapacityStaysTheSameAfterSendsReceives() throws ExecutionExceptio
@Timeout(1)
void shouldReceiveFromAChannelUntilDone() throws InterruptedException {
// given
Channel<Integer> c = new Channel<>(3);
Channel<Integer> c = Channel.newBufferedChannel(3);
c.send(1);
c.send(2);
c.done();
Expand All @@ -98,7 +98,7 @@ void shouldReceiveFromAChannelUntilDone() throws InterruptedException {
@Timeout(1)
void shouldNotReceiveFromAChannelInCaseOfAnError() throws InterruptedException {
// given
Channel<Integer> c = new Channel<>(3);
Channel<Integer> c = Channel.newBufferedChannel(3);
c.send(1);
c.send(2);
c.error(new RuntimeException());
Expand All @@ -114,8 +114,8 @@ void shouldNotReceiveFromAChannelInCaseOfAnError() throws InterruptedException {

@Test
void shouldProcessCellsInitially() {
assertTrue(new Channel<String>(1).toString().contains("notProcessed=31"));
assertTrue(new Channel<String>(31).toString().contains("notProcessed=1"));
assertTrue(new Channel<String>(32).toString().contains("notProcessed=0"));
assertTrue(Channel.<String>newBufferedChannel(1).toString().contains("notProcessed=31"));
assertTrue(Channel.<String>newBufferedChannel(31).toString().contains("notProcessed=1"));
assertTrue(Channel.<String>newBufferedChannel(32).toString().contains("notProcessed=0"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class ChannelClosedTest {
@Test
void testClosed_noValues_whenError() throws InterruptedException {
// given
Channel<Integer> c = new Channel<>();
Channel<Integer> c = Channel.newRendezvousChannel();
RuntimeException reason = new RuntimeException();

// when
Expand All @@ -27,7 +27,7 @@ void testClosed_noValues_whenError() throws InterruptedException {
@Test
void testClosed_noValues_whenDone() throws InterruptedException {
// given
Channel<Integer> c = new Channel<>();
Channel<Integer> c = Channel.newRendezvousChannel();

// when
c.done();
Expand All @@ -41,7 +41,7 @@ void testClosed_noValues_whenDone() throws InterruptedException {
@Test
void testClosed_hasSuspendedValues_whenDone() throws InterruptedException, ExecutionException {
// given
Channel<Integer> c = new Channel<>();
Channel<Integer> c = Channel.newRendezvousChannel();

// when
scoped(scope -> {
Expand All @@ -65,7 +65,7 @@ void testClosed_hasSuspendedValues_whenDone() throws InterruptedException, Execu
@Test
void testClosed_hasBufferedValues_whenDone() throws InterruptedException {
// given
Channel<Integer> c = new Channel<>(5);
Channel<Integer> c = Channel.newBufferedChannel(5);

// when
c.send(1);
Expand All @@ -80,7 +80,7 @@ void testClosed_hasBufferedValues_whenDone() throws InterruptedException {
@Test
void testClosed_hasValues_whenError() throws InterruptedException {
// given
Channel<Integer> c = new Channel<>(5);
Channel<Integer> c = Channel.newBufferedChannel(5);

// when
c.send(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class ChannelInterruptionTest {
@Test
void testSendReceiveAfterSendInterrupt() throws Exception {
// given
Channel<String> channel = new Channel<>();
Channel<String> channel = Channel.newRendezvousChannel();

// when
scoped(scope -> {
Expand All @@ -34,7 +34,7 @@ void testSendReceiveAfterSendInterrupt() throws Exception {
@Test
void testSendReceiveAfterReceiveInterrupt() throws Exception {
// given
Channel<String> channel = new Channel<>();
Channel<String> channel = Channel.newRendezvousChannel();

// when
scoped(scope -> {
Expand All @@ -55,7 +55,7 @@ void testRaceInterruptAndSend() throws Exception {
scoped(scope -> {
for (int i = 0; i < 100; i++) {
// given
Channel<String> channel = new Channel<>();
Channel<String> channel = Channel.newRendezvousChannel();

var t1 = forkCancelable(scope, () -> channel.send("x"));
var t2 = fork(scope, channel::receive);
Expand All @@ -76,7 +76,7 @@ void testRaceInterruptAndSend() throws Exception {
@Test
void testReceiveManyInterruptsReceive() throws ExecutionException, InterruptedException {
scoped(scope -> {
Channel<String> channel = new Channel<>();
Channel<String> channel = Channel.newRendezvousChannel();
Set<String> received = ConcurrentHashMap.newKeySet();

// starting with a single receive
Expand Down Expand Up @@ -115,7 +115,7 @@ void testReceiveManyInterruptsReceive() throws ExecutionException, InterruptedEx

@Test
void testManyInterruptedReceivesShouldNotLeakMemory() throws InterruptedException, ExecutionException {
var ch = new Channel<String>();
var ch = Channel.newRendezvousChannel();

scoped(scope -> {
var forks = new Fork[300];
Expand All @@ -138,7 +138,7 @@ void testManyInterruptedReceivesShouldNotLeakMemory() throws InterruptedExceptio

@Test
void testManyInterruptedSendsShouldNotLeakMemory() throws InterruptedException, ExecutionException {
var ch = new Channel<String>(1);
var ch = Channel.<String>newBufferedChannel(1);
ch.send("x");

scoped(scope -> {
Expand Down
Loading
Loading