Skip to content

Commit aa4f8a9

Browse files
authored
Add Actor, external runner and fix flows publisher issues (#81)
1 parent 2cdeb73 commit aa4f8a9

File tree

15 files changed

+530
-13
lines changed

15 files changed

+530
-13
lines changed

flows/pom.xml

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,29 @@
1313
<version>0.3.1</version>
1414
<packaging>jar</packaging>
1515

16+
<dependencyManagement>
17+
<dependencies>
18+
<dependency>
19+
<groupId>org.apache.pekko</groupId>
20+
<artifactId>pekko-bom_3</artifactId>
21+
<version>1.1.2</version>
22+
<type>pom</type>
23+
<scope>import</scope>
24+
</dependency>
25+
</dependencies>
26+
</dependencyManagement>
27+
1628
<dependencies>
29+
<dependency>
30+
<groupId>com.softwaremill.jox</groupId>
31+
<artifactId>channels</artifactId>
32+
<version>0.3.1</version>
33+
</dependency>
34+
<dependency>
35+
<groupId>com.softwaremill.jox</groupId>
36+
<artifactId>structured</artifactId>
37+
<version>0.3.1</version>
38+
</dependency>
1739
<dependency>
1840
<groupId>org.junit.jupiter</groupId>
1941
<artifactId>junit-jupiter</artifactId>
@@ -55,14 +77,14 @@
5577
</exclusions>
5678
</dependency>
5779
<dependency>
58-
<groupId>com.softwaremill.jox</groupId>
59-
<artifactId>channels</artifactId>
60-
<version>0.3.1</version>
80+
<groupId>org.apache.pekko</groupId>
81+
<artifactId>pekko-stream_3</artifactId>
82+
<scope>test</scope>
6183
</dependency>
6284
<dependency>
63-
<groupId>com.softwaremill.jox</groupId>
64-
<artifactId>structured</artifactId>
65-
<version>0.3.1</version>
85+
<groupId>org.apache.pekko</groupId>
86+
<artifactId>pekko-stream-testkit_3</artifactId>
87+
<scope>test</scope>
6688
</dependency>
6789
</dependencies>
6890
</project>

flows/src/main/java/com/softwaremill/jox/flows/Flows.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import com.softwaremill.jox.Source;
3434
import com.softwaremill.jox.structured.Fork;
3535
import com.softwaremill.jox.structured.Scopes;
36+
import com.softwaremill.jox.structured.ThrowingConsumer;
3637

3738
public final class Flows {
3839

flows/src/main/java/com/softwaremill/jox/flows/FromFlowPublisher.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,17 @@
1414
import com.softwaremill.jox.ChannelDone;
1515
import com.softwaremill.jox.ChannelError;
1616
import com.softwaremill.jox.Sink;
17+
import com.softwaremill.jox.structured.ExternalRunner;
1718
import com.softwaremill.jox.structured.Scope;
1819
import com.softwaremill.jox.structured.UnsupervisedScope;
1920

2021
class FromFlowPublisher<T> implements Flow.Publisher<T> {
2122

22-
private final Scope scope;
23+
private final ExternalRunner externalRunner;
2324
private final FlowStage<T> last;
2425

2526
FromFlowPublisher(Scope scope, FlowStage<T> last) {
26-
this.scope = scope;
27+
this.externalRunner = scope.externalRunner();
2728
this.last = last;
2829
}
2930

@@ -39,10 +40,12 @@ public void subscribe(Flow.Subscriber<? super T> subscriber) {
3940
// we cannot block `subscribe` (see https://github.com/reactive-streams/reactive-streams-jvm/issues/393),
4041
// hence running in a fork; however, the reactive library might run .subscribe on a different thread, that's
4142
// why we need to use the external runner functionality
42-
scope.fork(() -> {
43-
runToSubscriber(subscriber);
44-
return null;
45-
});
43+
externalRunner.runAsync(scope ->
44+
scope.fork(() -> {
45+
runToSubscriber(subscriber);
46+
return null;
47+
})
48+
);
4649
}
4750

4851
private void runToSubscriber(Flow.Subscriber<? super T> subscriber) throws ExecutionException, InterruptedException {
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package com.softwaremill.jox.flows;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
5+
import java.time.Duration;
6+
import java.util.List;
7+
import java.util.concurrent.ExecutionException;
8+
9+
import com.softwaremill.jox.structured.Scopes;
10+
import org.apache.pekko.actor.ActorSystem;
11+
import org.apache.pekko.stream.javadsl.AsPublisher;
12+
import org.apache.pekko.stream.javadsl.Sink;
13+
import org.apache.pekko.stream.javadsl.Source;
14+
import org.junit.jupiter.api.AfterEach;
15+
import org.junit.jupiter.api.BeforeEach;
16+
import org.junit.jupiter.api.Test;
17+
import org.reactivestreams.FlowAdapters;
18+
import org.reactivestreams.Publisher;
19+
20+
public class FlowPekkoStreamTest {
21+
22+
private ActorSystem system;
23+
24+
@BeforeEach
25+
void setUp() {
26+
system = ActorSystem.create("test");
27+
}
28+
29+
@AfterEach
30+
void cleanUp() {
31+
system.terminate();
32+
}
33+
34+
@Test
35+
void test() throws ExecutionException, InterruptedException {
36+
Scopes.supervised(scope -> {
37+
var flow = Flows.fromIterable(List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
38+
.map(i -> i * 2)
39+
.filter(i -> i % 3 == 0);
40+
var result = Source
41+
.fromPublisher(FlowAdapters.toPublisher(flow.toPublisher(scope)))
42+
.map(i -> i * 2)
43+
.runWith(Sink.seq(), system)
44+
.toCompletableFuture()
45+
.get();
46+
47+
assertEquals(List.of(12, 24, 36), result);
48+
return null;
49+
});
50+
}
51+
52+
@Test
53+
public void testSimpleFlow() throws ExecutionException, InterruptedException {
54+
Scopes.supervised(scope -> {
55+
var flow = Flows.fromIterable(List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
56+
.map(i -> i * 2)
57+
.filter(i -> i % 3 == 0);
58+
var result = Source
59+
.fromPublisher(FlowAdapters.toPublisher(flow.toPublisher(scope)))
60+
.map(i -> i * 2)
61+
.runWith(Sink.seq(), system)
62+
.toCompletableFuture()
63+
.get();
64+
65+
assertEquals(List.of(12, 24, 36), result);
66+
return null;
67+
});
68+
}
69+
70+
@Test
71+
public void testConcurrentFlow() throws ExecutionException, InterruptedException {
72+
Scopes.supervised(scope -> {
73+
var flow = Flows.tick(Duration.ofMillis(100), "x")
74+
.merge(Flows.tick(Duration.ofMillis(200), "y"), false, false)
75+
.take(5);
76+
var result = Source
77+
.fromPublisher(FlowAdapters.toPublisher(flow.toPublisher(scope)))
78+
.map(s -> s + s)
79+
.runWith(Sink.seq(), system)
80+
.toCompletableFuture()
81+
.get();
82+
83+
result = result.stream().sorted().toList();
84+
assertEquals(List.of("xx", "xx", "xx", "yy", "yy"), result);
85+
return null;
86+
});
87+
}
88+
89+
@Test
90+
public void testFlowFromSimplePublisher() throws Exception {
91+
Publisher<Integer> publisher = Source
92+
.fromIterator(() -> List.of(1, 2, 3).iterator())
93+
.map(i -> i * 2)
94+
.runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), system);
95+
96+
var result = Flows.fromPublisher(FlowAdapters.toFlowPublisher(publisher))
97+
.map(i -> i * 10)
98+
.runToList();
99+
100+
assertEquals(List.of(20, 40, 60), result);
101+
}
102+
103+
@Test
104+
public void testFlowFromConcurrentPublisher() throws Exception {
105+
Publisher<String> publisher = Source
106+
.tick(Duration.ZERO, Duration.ofMillis(100), "x")
107+
.merge(Source.tick(Duration.ZERO, Duration.ofMillis(200), "y"))
108+
.take(5)
109+
.runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), system);
110+
111+
var result = Flows.fromPublisher(FlowAdapters.toFlowPublisher(publisher))
112+
.map(s -> s + s)
113+
.runToList();
114+
115+
result.sort(String::compareTo);
116+
assertEquals(List.of("xx", "xx", "xx", "yy", "yy"), result);
117+
}
118+
}

structured/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@
1414
<packaging>jar</packaging>
1515

1616
<dependencies>
17+
<dependency>
18+
<groupId>com.softwaremill.jox</groupId>
19+
<artifactId>channels</artifactId>
20+
<version>0.3.1</version>
21+
</dependency>
1722
<dependency>
1823
<groupId>org.junit.jupiter</groupId>
1924
<artifactId>junit-jupiter</artifactId>
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package com.softwaremill.jox.structured;
2+
3+
import static com.softwaremill.jox.structured.Scopes.unsupervised;
4+
5+
import java.util.concurrent.Callable;
6+
import java.util.concurrent.CompletableFuture;
7+
import java.util.concurrent.ExecutionException;
8+
import java.util.function.Consumer;
9+
10+
import com.softwaremill.jox.Channel;
11+
import com.softwaremill.jox.Sink;
12+
13+
public class ActorRef<T> {
14+
15+
private final Sink<ThrowingConsumer<T>> c;
16+
17+
public ActorRef(Sink<ThrowingConsumer<T>> c) {
18+
this.c = c;
19+
}
20+
21+
/**
22+
* Send an invocation to the actor and await for the result.
23+
* <p>
24+
* The `f` function should be an invocation of a method on `T` and should not directly or indirectly return the `T` value, as this might
25+
* expose the actor's internal mutable state to other threads.
26+
* <p>
27+
* Any non-fatal exceptions thrown by `f` will be propagated to the caller and the actor will continue processing other invocations.
28+
* Fatal exceptions will be propagated to the actor's enclosing scope, and the actor will close.
29+
*/
30+
public <U> U ask(ThrowingFunction<T, U> f) throws Exception {
31+
CompletableFuture<U> cf = new CompletableFuture<>();
32+
c.send(t -> {
33+
try {
34+
cf.complete(f.apply(t));
35+
} catch (Throwable e) {
36+
if (e instanceof RuntimeException) {
37+
cf.completeExceptionally(e);
38+
} else {
39+
cf.completeExceptionally(e);
40+
throw e;
41+
}
42+
}
43+
});
44+
try {
45+
return cf.get();
46+
} catch (ExecutionException e) {
47+
throw (Exception) e.getCause();
48+
}
49+
}
50+
51+
/**
52+
* Send an invocation to the actor that should be processed in the background (fire-and-forget). Might block until there's enough space
53+
* in the actor's mailbox (incoming channel).
54+
* <p>
55+
* Any exceptions thrown by `f` will be propagated to the actor's enclosing scope, and the actor will close.
56+
*/
57+
public void tell(ThrowingConsumer<T> f) throws InterruptedException {
58+
c.send(f);
59+
}
60+
61+
/**
62+
* The same as {@link ActorRef#create(Scope, Object, Consumer)} but with empty close action.
63+
*/
64+
public static <T> ActorRef<T> create(Scope scope, T logic) {
65+
return create(scope, logic, null);
66+
}
67+
68+
/**
69+
* Creates a new actor ref, that is a fork in the current concurrency scope, which protects a mutable resource (`logic`) and executes
70+
* invocations on it serially, one after another. It is guaranteed that `logic` will be accessed by at most one thread at a time. The
71+
* methods of `logic: T` define the actor's interface (the messages that can be "sent to the actor").
72+
* <p>
73+
* Invocations can be scheduled using the returned `ActorRef`. When an invocation is an `ActorRef.ask`, any non-fatal exceptions are
74+
* propagated to the caller, and the actor continues. Fatal exceptions, or exceptions that occur during `ActorRef.tell` invocations,
75+
* cause the actor's channel to be closed with an error, and are propagated to the enclosing scope.
76+
* <p>
77+
* The actor's mailbox (incoming channel) will have a capacity as specified by the {@link Channel#BUFFER_SIZE} in scope or {@link Channel#DEFAULT_BUFFER_SIZE} is used.
78+
*/
79+
public static <T> ActorRef<T> create(Scope scope, T logic, Consumer<T> close) {
80+
Channel<ThrowingConsumer<T>> c = Channel.withScopedBufferSize();
81+
ActorRef<T> ref = new ActorRef<>(c);
82+
scope.fork(() -> {
83+
try {
84+
while (true) {
85+
ThrowingConsumer<T> m = c.receive();
86+
try {
87+
m.accept(logic);
88+
} catch (Throwable t) {
89+
c.error(t);
90+
throw t;
91+
}
92+
}
93+
} finally {
94+
if (close != null) {
95+
uninterruptible(() -> {
96+
close.accept(logic);
97+
return null;
98+
});
99+
}
100+
}
101+
});
102+
return ref;
103+
}
104+
105+
private static void uninterruptible(Callable<Void> f) throws ExecutionException, InterruptedException {
106+
unsupervised(scope -> {
107+
Fork<Void> t = scope.forkUnsupervised(f);
108+
109+
ThrowingRunnable joinDespiteInterrupted = () -> {
110+
while (true) {
111+
try {
112+
t.join();
113+
break;
114+
} catch (InterruptedException e) {
115+
// Continue the loop to retry joining
116+
}
117+
}
118+
};
119+
joinDespiteInterrupted.run();
120+
return null;
121+
});
122+
}
123+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.softwaremill.jox.structured;
2+
3+
public record ExternalRunner(ActorRef<ExternalScheduler> scheduler) {
4+
/** Allows to runs the given function asynchronously, in the scope of the concurrency scope in which this runner was created.
5+
* <p>
6+
* `f` should return promptly, not to obstruct execution of other scheduled functions. Typically, it should start a background fork.
7+
*/
8+
public void runAsync(ThrowingConsumer<Scope> f) {
9+
SneakyThrows.sneakyThrows(() ->
10+
scheduler.ask(s -> {
11+
s.run(f);
12+
return null;
13+
})
14+
);
15+
}
16+
}

0 commit comments

Comments
 (0)