Skip to content

Commit 0b69eb2

Browse files
endrju19Andrzej Kobylińskiadamw
authored
Implement non-blocking trySend and tryReceive methods (#262)
## Summary - Implement non-blocking `trySend`/`tryReceive` (and `OrClosed` variants) on `Channel`, `Sink`, and `Source` for integration with non-blocking frameworks (Netty, Vert.x, etc.) - Add optimized implementations in `Channel` using CAS-based cell reservation with pre-check optimization to avoid unnecessary CAS attempts - Update downstream usages in `FromFlowPublisher`, `KafkaDrain`, and `KafkaStage` to use `trySend` instead of blocking `send` - Add `Channel.estimateSize()` for monitoring and observability - Add Fray deterministic concurrency tests and comprehensive unit tests covering all channel types - Add contention caveat to Javadoc and `channels.md` documentation Resolves #187 --------- Co-authored-by: Andrzej Kobyliński <andrzej.kobylinski@extern.corify.de> Co-authored-by: adamw <adam@warski.org>
1 parent 1234a95 commit 0b69eb2

File tree

8 files changed

+817
-64
lines changed

8 files changed

+817
-64
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ target
44
.cursor
55
tmp
66
.vscode
7+
.metals
78

89
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
910
hs_err_pid*
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
package com.softwaremill.jox.fray;
2+
3+
import static com.softwaremill.jox.fray.Config.CHANNEL_SIZE;
4+
5+
import java.util.ArrayList;
6+
7+
import org.junit.jupiter.api.extension.ExtendWith;
8+
import org.pastalab.fray.junit.junit5.FrayTestExtension;
9+
import org.pastalab.fray.junit.junit5.annotations.ConcurrencyTest;
10+
11+
import com.softwaremill.jox.Channel;
12+
import com.softwaremill.jox.ChannelDone;
13+
14+
@ExtendWith(FrayTestExtension.class)
15+
public class FrayTrySendReceiveTest {
16+
17+
// trySend | receive
18+
19+
@ConcurrencyTest
20+
public void trySendReceiveTest() throws InterruptedException {
21+
Channel<Integer> ch = Channel.newBufferedChannel(CHANNEL_SIZE);
22+
23+
Fork<Void> f1 =
24+
Fork.newNoResult(
25+
() -> {
26+
if (!ch.trySend(10)) {
27+
ch.send(10);
28+
}
29+
});
30+
Fork<Integer> f2 = Fork.newWithResult(ch::receive);
31+
32+
Fork.startAll(f1, f2);
33+
f1.join();
34+
35+
assert (f2.join() == 10);
36+
}
37+
38+
// send | tryReceive
39+
40+
@ConcurrencyTest
41+
public void sendTryReceiveTest() throws InterruptedException {
42+
Channel<Integer> ch = Channel.newBufferedChannel(CHANNEL_SIZE);
43+
44+
Fork<Void> f1 = Fork.newNoResult(() -> ch.send(10));
45+
Fork<Integer> f2 =
46+
Fork.newWithResult(
47+
() -> {
48+
Integer result = ch.tryReceive();
49+
if (result != null) return result;
50+
return ch.receive();
51+
});
52+
53+
Fork.startAll(f1, f2);
54+
f1.join();
55+
56+
assert (f2.join() == 10);
57+
}
58+
59+
// trySend | tryReceive
60+
61+
@ConcurrencyTest
62+
public void trySendTryReceiveTest() throws InterruptedException {
63+
Channel<Integer> ch = Channel.newBufferedChannel(CHANNEL_SIZE);
64+
65+
Fork<Boolean> f1 = Fork.newWithResult(() -> ch.trySend(10));
66+
Fork<Integer> f2 = Fork.newWithResult(ch::tryReceive);
67+
68+
Fork.startAll(f1, f2);
69+
boolean sent = f1.join();
70+
Integer received = f2.join();
71+
72+
if (received != null) {
73+
assert sent;
74+
assert (received == 10);
75+
}
76+
}
77+
78+
// multiple trySend | multiple tryReceive
79+
80+
@ConcurrencyTest
81+
public void multiTrySendMultiTryReceiveTest() throws InterruptedException {
82+
Channel<Integer> ch = Channel.newBufferedChannel(CHANNEL_SIZE);
83+
84+
int concurrency = 4;
85+
86+
var sendForks = new ArrayList<Fork<Void>>();
87+
var receiveForks = new ArrayList<Fork<Integer>>();
88+
89+
for (int i = 0; i < concurrency; i++) {
90+
final var finalI = i;
91+
sendForks.add(
92+
Fork.newNoResult(
93+
() -> {
94+
if (!ch.trySend(finalI)) {
95+
ch.send(finalI);
96+
}
97+
}));
98+
receiveForks.add(
99+
Fork.newWithResult(
100+
() -> {
101+
Integer result = ch.tryReceive();
102+
if (result != null) return result;
103+
return ch.receive();
104+
}));
105+
}
106+
107+
Fork.startAll(sendForks.toArray(new Fork<?>[0]));
108+
Fork.startAll(receiveForks.toArray(new Fork<?>[0]));
109+
110+
for (Fork<Void> sendFork : sendForks) {
111+
sendFork.join();
112+
}
113+
114+
var result = 0;
115+
for (Fork<Integer> receiveFork : receiveForks) {
116+
result += receiveFork.join();
117+
}
118+
119+
assert (result == concurrency * (concurrency - 1) / 2);
120+
}
121+
122+
// trySend | close | tryReceive
123+
124+
@ConcurrencyTest
125+
public void trySendCloseTryReceiveTest() throws InterruptedException {
126+
Channel<Integer> ch = Channel.newBufferedChannel(CHANNEL_SIZE);
127+
128+
Fork<Boolean> f1 =
129+
Fork.newWithResult(
130+
() -> {
131+
Object r = ch.trySendOrClosed(10);
132+
return r == null;
133+
});
134+
135+
Fork<Void> f2 = Fork.newNoResult(ch::done);
136+
137+
Fork<Object> f3 = Fork.newWithResult(ch::tryReceiveOrClosed);
138+
139+
Fork.startAll(f1, f2, f3);
140+
141+
boolean sent = f1.join();
142+
f2.join();
143+
Object received = f3.join();
144+
145+
if (received != null && !(received instanceof ChannelDone)) {
146+
assert (received.equals(10));
147+
assert sent;
148+
}
149+
}
150+
}

0 commit comments

Comments
 (0)