Skip to content

Commit 2abc811

Browse files
authored
Make channel constructor private, add explicit factory methods (#95)
1 parent c7fb8b9 commit 2abc811

File tree

21 files changed

+132
-126
lines changed

21 files changed

+132
-126
lines changed

README.md

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ class Demo1 {
7070
public static void main(String[] args) throws InterruptedException {
7171
// creates a rendezvous channel
7272
// (a sender & receiver must meet to pass a value: as if the buffer had size 0)
73-
var ch = new Channel<Integer>();
73+
var ch = Channel.<Integer>newRendezvousChannel();
7474

7575
Thread.ofVirtual().start(() -> {
7676
try {
@@ -100,8 +100,7 @@ import com.softwaremill.jox.Channel;
100100

101101
class Demo2 {
102102
public static void main(String[] args) throws InterruptedException {
103-
// creates a buffered channel (buffer of size 3)
104-
var ch = new Channel<Integer>(3);
103+
var ch = Channel.<Integer>newBufferedChannel(3);
105104

106105
// send()-s won't block
107106
ch.send(1);
@@ -158,8 +157,7 @@ import com.softwaremill.jox.Channel;
158157

159158
class Demo3 {
160159
public static void main(String[] args) throws InterruptedException {
161-
// creates a buffered channel (buffer of size 3)
162-
var ch = new Channel<Integer>(3);
160+
var ch = Channel.<Integer>newBufferedChannel(3);
163161

164162
// send()-s won't block
165163
ch.send(1);
@@ -186,9 +184,9 @@ import static com.softwaremill.jox.Select.select;
186184
class Demo4 {
187185
public static void main(String[] args) throws InterruptedException {
188186
// creates a buffered channel (buffer of size 3)
189-
var ch1 = new Channel<Integer>(3);
190-
var ch2 = new Channel<Integer>(3);
191-
var ch3 = new Channel<Integer>(3);
187+
var ch1 = Channel.<Integer>newBufferedChannel(3);
188+
var ch2 = Channel.<Integer>newBufferedChannel(3);
189+
var ch3 = Channel.<Integer>newBufferedChannel(3);
192190

193191
// send a value to two channels
194192
ch2.send(29);
@@ -218,8 +216,8 @@ import static com.softwaremill.jox.Select.select;
218216

219217
class Demo5 {
220218
public static void main(String[] args) throws InterruptedException {
221-
var ch1 = new Channel<Integer>(1);
222-
var ch2 = new Channel<Integer>(1);
219+
var ch1 = Channel.<Integer>newBufferedChannel(1);
220+
var ch2 = Channel.<Integer>newBufferedChannel(1);
223221

224222
ch1.send(12); // buffer is now full
225223

@@ -242,8 +240,8 @@ import static com.softwaremill.jox.Select.select;
242240

243241
class Demo6 {
244242
public static void main(String[] args) throws InterruptedException {
245-
var ch1 = new Channel<Integer>(3);
246-
var ch2 = new Channel<Integer>(3);
243+
var ch1 = Channel.<Integer>newBufferedChannel(3);
244+
var ch2 = Channel.<Integer>newBufferedChannel(3);
247245

248246
var received = select(ch1.receiveClause(), ch2.receiveClause(), defaultClause(52));
249247

@@ -613,7 +611,7 @@ import com.softwaremill.jox.structured.Scopes;
613611
public class Demo {
614612

615613
public static void main(String[] args) throws ExecutionException, InterruptedException {
616-
Channel<Integer> ch = new Channel<>(16);
614+
Channel<Integer> ch = Channel.<Integer>newBufferedDefaultChannel();
617615
Scopes.supervised(scope -> {
618616
scope.fork(() -> {
619617
ch.send(1);

bench/bench-java/src/main/java/com/softwaremill/jox/BufferedBenchmark.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public void arrayBlockingQueue() throws InterruptedException {
5555
@Benchmark
5656
@OperationsPerInvocation(OPERATIONS_PER_INVOCATION)
5757
public void channel() throws InterruptedException {
58-
var ch = new Channel<>(capacity);
58+
var ch = Channel.newBufferedChannel(capacity);
5959
var t1 = Thread.startVirtualThread(() -> {
6060
for (int i = 0; i < OPERATIONS_PER_INVOCATION; i++) {
6161
try {

bench/bench-java/src/main/java/com/softwaremill/jox/ChainedBenchmark.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public void channelChain() throws InterruptedException {
3232
int elements = OPERATIONS_PER_INVOCATION / chainLength;
3333
Channel<Integer>[] channels = new Channel[chainLength];
3434
for (int i = 0; i < chainLength; i++) {
35-
channels[i] = new Channel<>(capacity);
35+
channels[i] = (capacity == 0) ? Channel.newRendezvousChannel() : Channel.newBufferedChannel(capacity);
3636
}
3737

3838
Thread[] threads = new Thread[chainLength + 1];

bench/bench-java/src/main/java/com/softwaremill/jox/ParallelBenchmark.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public void parallelChannels() throws InterruptedException {
3232
var latch = new CountDownLatch(parallelism);
3333

3434
for (int t = 0; t < parallelism; t++) {
35-
var ch = new Channel<Integer>(capacity);
35+
var ch = (capacity == 0) ? Channel.<Integer>newRendezvousChannel() : Channel.<Integer>newBufferedChannel(capacity);
3636
// sender
3737
Thread.startVirtualThread(() -> {
3838
for (int i = 0; i < elementsPerChannel; i++) {

bench/bench-java/src/main/java/com/softwaremill/jox/RendezvousBenchmark.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public void exchanger() throws InterruptedException {
7676
@Benchmark
7777
@OperationsPerInvocation(OPERATIONS_PER_INVOCATION)
7878
public void channel() throws InterruptedException {
79-
var ch = new Channel<>();
79+
var ch = Channel.newRendezvousChannel();
8080
var t1 = Thread.startVirtualThread(() -> {
8181
for (int i = 0; i < OPERATIONS_PER_INVOCATION; i++) {
8282
try {

bench/bench-java/src/main/java/com/softwaremill/jox/SelectBenchmark.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public class SelectBenchmark {
2020
@Benchmark
2121
@OperationsPerInvocation(OPERATIONS_PER_INVOCATION)
2222
public void selectWithSingleClause() throws InterruptedException {
23-
var ch = new Channel<Integer>();
23+
var ch = Channel.newRendezvousChannel();
2424
var t1 = Thread.startVirtualThread(() -> {
2525
for (int i = 0; i < OPERATIONS_PER_INVOCATION; i++) {
2626
try {
@@ -48,8 +48,8 @@ public void selectWithSingleClause() throws InterruptedException {
4848
@Benchmark
4949
@OperationsPerInvocation(OPERATIONS_PER_INVOCATION)
5050
public void selectWithTwoClauses() throws InterruptedException {
51-
var ch1 = new Channel<Integer>();
52-
var ch2 = new Channel<Integer>();
51+
var ch1 = Channel.newRendezvousChannel();
52+
var ch2 = Channel.newRendezvousChannel();
5353
var t1 = Thread.startVirtualThread(() -> {
5454
for (int i = 0; i < OPERATIONS_PER_INVOCATION / 2; i++) {
5555
try {

channels/src/main/java/com/softwaremill/jox/Channel.java

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@
2424
* - buffered channels, where a given number of sent values might be buffered, before subsequent `send`s block
2525
* - unlimited channels, where an unlimited number of values might be buffered, hence `send` never blocks
2626
* <p>
27-
* The no-argument {@link Channel} constructor creates a rendezvous channel, while a buffered channel can be created
28-
* by providing a positive integer to the constructor. A rendezvous channel behaves like a buffered channel with
29-
* buffer size 0. An unlimited channel can be created using {@link Channel#newUnlimitedChannel()}.
27+
* To create a channel, use {@link Channel#newRendezvousChannel()}, {@link Channel#newBufferedChannel(int)} and
28+
* {@link Channel#newUnlimitedChannel()} methods. Additionally, {@link Channel#newBufferedDefaultChannel()} creates
29+
* a buffered channel with a "default" capacity of 16, which should be a good starting point for most use-cases.
3030
* <p>
3131
* In a rendezvous channel, senders and receivers block, until a matching party arrives (unless one is already waiting).
3232
* Similarly, buffered channels block if the buffer is full (in case of senders), or in case of receivers, if the
@@ -152,17 +152,10 @@ operations on these (previous) segments, and we'll end up wanting to remove such
152152
}
153153
}
154154

155-
/**
156-
* Creates a rendezvous channel.
157-
*/
158-
public Channel() {
159-
this(0);
160-
}
161-
162155
/**
163156
* Creates a buffered channel (when capacity is positive), or a rendezvous channel if the capacity is 0.
164157
*/
165-
public Channel(int capacity) {
158+
private Channel(int capacity) {
166159
if (capacity < UNLIMITED_CAPACITY) {
167160
throw new IllegalArgumentException("Capacity must be 0 (rendezvous), positive (buffered) or -1 (unlimited channels).");
168161
}
@@ -205,6 +198,21 @@ private void processInitialBuffer() {
205198
}
206199
}
207200

201+
public static <T> Channel<T> newRendezvousChannel() {
202+
return new Channel<>(0);
203+
}
204+
205+
public static <T> Channel<T> newBufferedChannel(int capacity) {
206+
return new Channel<>(capacity);
207+
}
208+
209+
/**
210+
* Creates a new buffered channel, with the default buffer size (16).
211+
*/
212+
public static <T> Channel<T> newBufferedDefaultChannel() {
213+
return new Channel<>(DEFAULT_BUFFER_SIZE);
214+
}
215+
208216
public static <T> Channel<T> newUnlimitedChannel() {
209217
return new Channel<>(UNLIMITED_CAPACITY);
210218
}

channels/src/test/java/com/softwaremill/jox/ChannelBufferedTest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ public class ChannelBufferedTest {
1717
@Timeout(1)
1818
void testSimpleSendReceiveBuffer1() throws InterruptedException {
1919
// given
20-
Channel<String> channel = new Channel<>(1);
20+
Channel<String> channel = Channel.newBufferedChannel(1);
2121

2222
// when
2323
channel.send("x"); // should not block
@@ -31,7 +31,7 @@ void testSimpleSendReceiveBuffer1() throws InterruptedException {
3131
@Timeout(1)
3232
void testSimpleSendReceiveBuffer2() throws InterruptedException {
3333
// given
34-
Channel<String> channel = new Channel<>(2);
34+
Channel<String> channel = Channel.newBufferedChannel(2);
3535

3636
// when
3737
channel.send("x"); // should not block
@@ -48,7 +48,7 @@ void testSimpleSendReceiveBuffer2() throws InterruptedException {
4848
@Timeout(2)
4949
void testBufferCapacityStaysTheSameAfterSendsReceives() throws ExecutionException, InterruptedException {
5050
// given
51-
Channel<Integer> channel = new Channel<>(2);
51+
Channel<Integer> channel = Channel.newBufferedChannel(2);
5252

5353
// when
5454
scoped(scope -> {
@@ -78,7 +78,7 @@ void testBufferCapacityStaysTheSameAfterSendsReceives() throws ExecutionExceptio
7878
@Timeout(1)
7979
void shouldReceiveFromAChannelUntilDone() throws InterruptedException {
8080
// given
81-
Channel<Integer> c = new Channel<>(3);
81+
Channel<Integer> c = Channel.newBufferedChannel(3);
8282
c.send(1);
8383
c.send(2);
8484
c.done();
@@ -98,7 +98,7 @@ void shouldReceiveFromAChannelUntilDone() throws InterruptedException {
9898
@Timeout(1)
9999
void shouldNotReceiveFromAChannelInCaseOfAnError() throws InterruptedException {
100100
// given
101-
Channel<Integer> c = new Channel<>(3);
101+
Channel<Integer> c = Channel.newBufferedChannel(3);
102102
c.send(1);
103103
c.send(2);
104104
c.error(new RuntimeException());
@@ -114,8 +114,8 @@ void shouldNotReceiveFromAChannelInCaseOfAnError() throws InterruptedException {
114114

115115
@Test
116116
void shouldProcessCellsInitially() {
117-
assertTrue(new Channel<String>(1).toString().contains("notProcessed=31"));
118-
assertTrue(new Channel<String>(31).toString().contains("notProcessed=1"));
119-
assertTrue(new Channel<String>(32).toString().contains("notProcessed=0"));
117+
assertTrue(Channel.<String>newBufferedChannel(1).toString().contains("notProcessed=31"));
118+
assertTrue(Channel.<String>newBufferedChannel(31).toString().contains("notProcessed=1"));
119+
assertTrue(Channel.<String>newBufferedChannel(32).toString().contains("notProcessed=0"));
120120
}
121121
}

channels/src/test/java/com/softwaremill/jox/ChannelClosedTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ public class ChannelClosedTest {
1212
@Test
1313
void testClosed_noValues_whenError() throws InterruptedException {
1414
// given
15-
Channel<Integer> c = new Channel<>();
15+
Channel<Integer> c = Channel.newRendezvousChannel();
1616
RuntimeException reason = new RuntimeException();
1717

1818
// when
@@ -27,7 +27,7 @@ void testClosed_noValues_whenError() throws InterruptedException {
2727
@Test
2828
void testClosed_noValues_whenDone() throws InterruptedException {
2929
// given
30-
Channel<Integer> c = new Channel<>();
30+
Channel<Integer> c = Channel.newRendezvousChannel();
3131

3232
// when
3333
c.done();
@@ -41,7 +41,7 @@ void testClosed_noValues_whenDone() throws InterruptedException {
4141
@Test
4242
void testClosed_hasSuspendedValues_whenDone() throws InterruptedException, ExecutionException {
4343
// given
44-
Channel<Integer> c = new Channel<>();
44+
Channel<Integer> c = Channel.newRendezvousChannel();
4545

4646
// when
4747
scoped(scope -> {
@@ -65,7 +65,7 @@ void testClosed_hasSuspendedValues_whenDone() throws InterruptedException, Execu
6565
@Test
6666
void testClosed_hasBufferedValues_whenDone() throws InterruptedException {
6767
// given
68-
Channel<Integer> c = new Channel<>(5);
68+
Channel<Integer> c = Channel.newBufferedChannel(5);
6969

7070
// when
7171
c.send(1);
@@ -80,7 +80,7 @@ void testClosed_hasBufferedValues_whenDone() throws InterruptedException {
8080
@Test
8181
void testClosed_hasValues_whenError() throws InterruptedException {
8282
// given
83-
Channel<Integer> c = new Channel<>(5);
83+
Channel<Integer> c = Channel.newBufferedChannel(5);
8484

8585
// when
8686
c.send(1);

channels/src/test/java/com/softwaremill/jox/ChannelInterruptionTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public class ChannelInterruptionTest {
1616
@Test
1717
void testSendReceiveAfterSendInterrupt() throws Exception {
1818
// given
19-
Channel<String> channel = new Channel<>();
19+
Channel<String> channel = Channel.newRendezvousChannel();
2020

2121
// when
2222
scoped(scope -> {
@@ -34,7 +34,7 @@ void testSendReceiveAfterSendInterrupt() throws Exception {
3434
@Test
3535
void testSendReceiveAfterReceiveInterrupt() throws Exception {
3636
// given
37-
Channel<String> channel = new Channel<>();
37+
Channel<String> channel = Channel.newRendezvousChannel();
3838

3939
// when
4040
scoped(scope -> {
@@ -55,7 +55,7 @@ void testRaceInterruptAndSend() throws Exception {
5555
scoped(scope -> {
5656
for (int i = 0; i < 100; i++) {
5757
// given
58-
Channel<String> channel = new Channel<>();
58+
Channel<String> channel = Channel.newRendezvousChannel();
5959

6060
var t1 = forkCancelable(scope, () -> channel.send("x"));
6161
var t2 = fork(scope, channel::receive);
@@ -76,7 +76,7 @@ void testRaceInterruptAndSend() throws Exception {
7676
@Test
7777
void testReceiveManyInterruptsReceive() throws ExecutionException, InterruptedException {
7878
scoped(scope -> {
79-
Channel<String> channel = new Channel<>();
79+
Channel<String> channel = Channel.newRendezvousChannel();
8080
Set<String> received = ConcurrentHashMap.newKeySet();
8181

8282
// starting with a single receive
@@ -115,7 +115,7 @@ void testReceiveManyInterruptsReceive() throws ExecutionException, InterruptedEx
115115

116116
@Test
117117
void testManyInterruptedReceivesShouldNotLeakMemory() throws InterruptedException, ExecutionException {
118-
var ch = new Channel<String>();
118+
var ch = Channel.newRendezvousChannel();
119119

120120
scoped(scope -> {
121121
var forks = new Fork[300];
@@ -138,7 +138,7 @@ void testManyInterruptedReceivesShouldNotLeakMemory() throws InterruptedExceptio
138138

139139
@Test
140140
void testManyInterruptedSendsShouldNotLeakMemory() throws InterruptedException, ExecutionException {
141-
var ch = new Channel<String>(1);
141+
var ch = Channel.<String>newBufferedChannel(1);
142142
ch.send("x");
143143

144144
scoped(scope -> {

0 commit comments

Comments
 (0)