Execute fire and forget for long running process #1209
-
|
Hi! I need to implement a method that would trigger fire and forget behavior. The code I wrote are something like this: public Uni<String> startProcess() {
longRunningProcess().subcribe().with(logger::info(message))
return Uni.createFrom().item("Processing");
}
public Uni<String> longRunningProcess() {
//Perform long running task then return with String: "Finished"
}This code seems to work as my expected behavior. Is this code functioning as intended, or is there room for improvement? |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 2 replies
-
|
In general, when I need to do such kind of things, I use the following pattern: |
Beta Was this translation helpful? Give feedback.
-
|
@cescoffier , your example does not behave the way I would expect for a "fire and forget" as I would not want the main thread to wait on the long running task package _01_basics;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import java.util.concurrent.TimeUnit;
public class _09a_Uni_From_CompletionStage {
static long start = System.currentTimeMillis();
public static void main(String[] args) throws Exception {
log("⚡️ Uni from CompletionStage, fire and forget?");
run1();
boolean finished = Infrastructure.getDefaultWorkerPool().awaitTermination(5, TimeUnit.SECONDS);
log("⚡️ awaitTermination called, "+finished);
}
static void run1() throws Exception {
var cs = longRunningProcess()
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
.subscribeAsCompletionStage();
log("run1 begin sleep, longRunningProcess.isDone: "+cs.isDone());
Thread.sleep(2000);
log("run1 end sleep, longRunningProcess.isDone: "+cs.isDone());
while (!cs.isDone()) {
Thread.sleep(1000);
}
log("run1 done waiting, longRunningProcess.result: "+cs.get());
}
static void run2() throws Exception {
var cs = longRunningProcess2()
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
.subscribeAsCompletionStage();
log("run2 begin sleep, longRunningProcess2.isDone: "+cs.isDone());
Thread.sleep(2000);
log("run2 end sleep, longRunningProcess2.isDone: "+cs.isDone());
while (!cs.isDone()) {
Thread.sleep(1000);
}
log("run2 done waiting, longRunningProcess2.result: "+cs.get());
}
static Uni<String> longRunningProcess() {
Uni<String> result = Uni.createFrom().item("Finished.longRunningProcess");
log("begin longRunningProcess");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
log("end longRunningProcess");
return result;
}
static Uni<String> longRunningProcess2() {
Uni<String> result = Uni.createFrom().item("Finished.longRunningProcess2");
Infrastructure.getDefaultWorkerPool().submit(() -> {
log("begin longRunningProcess2");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
log("end longRunningProcess2");
});
return result;
}
static void log(String message) {
long timestamp = System.currentTimeMillis();
long elapsed = timestamp - start;
System.out.printf("[%6d](%s): %s\n", elapsed, Thread.currentThread().getName(), message);
}
}Example with run1(): [ 0](main): ⚡️ Uni from CompletionStage, fire and forget?
[ 14](main): begin longRunningProcess
[ 5019](main): end longRunningProcess
[ 5029](main): run1 begin sleep, longRunningProcess.isDone: false
[ 7035](main): run1 end sleep, longRunningProcess.isDone: true
[ 7036](main): run1 done waiting, longRunningProcess.result: Finished.longRunningProcess
[ 12039](main): ⚡️ awaitTermination called, falseExample with run2(): [ 0](main): ⚡️ Uni from CompletionStage, fire and forget?
[ 17](pool-1-thread-1): begin longRunningProcess2
[ 23](main): run2 begin sleep, longRunningProcess2.isDone: false
[ 2029](main): run2 end sleep, longRunningProcess2.isDone: true
[ 2030](main): run2 done waiting, longRunningProcess2.result: Finished.longRunningProcess2
[ 5023](pool-1-thread-1): end longRunningProcess2
[ 7037](main): ⚡️ awaitTermination called, falseOf course in the run2 case, the |
Beta Was this translation helpful? Give feedback.
-
|
Ok, looping back to this. What I was missing was having the first Example with run1(): [ 0](main): ⚡️ Uni from CompletionStage, fire and forget?
[ 19](main): run1 begin sleep, longRunningProcess.isDone: false (1)
[ 20](pool-1-thread-1): begin longRunningProcess (2)
[ 2025](main): run1 end sleep, longRunningProcess.isDone: false (3)
[ 5025](pool-1-thread-1): end longRunningProcess (4)
[ 5026](pool-1-thread-1): After longRunningProcess item (5)
[ 5038](main): run1 done waiting, longRunningProcess.result: Finished.longRunningProcess (6)
[ 10047](main): ⚡️ awaitTermination called, false(1): main thread is sleeping while Example with run2(): [ 0](main): ⚡️ Uni from CompletionStage, fire and forget?
[ 16](pool-1-thread-1): begin longRunningProcess2 (1)
[ 22](main): run2 begin sleep, longRunningProcess2.isDone: false (2)
[ 25](pool-1-thread-2): After longRunningProcess item (3)
[ 2029](main): run2 end sleep, longRunningProcess2.isDone: true (4)
[ 2030](main): run2 done waiting, longRunningProcess2.result: Work not part of Uni chain (5)
[ 5021](pool-1-thread-1): end longRunningProcess2 (6)
[ 7040](main): ⚡️ awaitTermination called, false(1): The The Code: package _01_basics;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import java.util.concurrent.TimeUnit;
public class _09a_Uni_From_CompletionStage {
static long start = System.currentTimeMillis();
public static void main(String[] args) throws Exception {
log("⚡️ Uni from CompletionStage, fire and forget?");
run2();
boolean finished = Infrastructure.getDefaultWorkerPool().awaitTermination(5, TimeUnit.SECONDS);
log("⚡️ awaitTermination called, "+finished);
System.exit(0);
}
static void run1() throws Exception {
var cs = longRunningProcess()
.invoke(() -> log("After longRunningProcess item"))
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
.subscribeAsCompletionStage();
// longRunningProcess Uni should be running in background for 5s
log("run1 begin sleep, longRunningProcess.isDone: "+cs.isDone());
Thread.sleep(2000);
// Should still be running, cs.isDone() == false
log("run1 end sleep, longRunningProcess.isDone: "+cs.isDone());
// Wait for CS to finish
while (!cs.isDone()) {
Thread.sleep(1000);
}
// Print Uni result
log("run1 done waiting, longRunningProcess.result: "+cs.get());
}
static void run2() throws Exception {
var cs = longRunningProcess2()
.invoke(() -> log("After longRunningProcess item"))
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
.subscribeAsCompletionStage();
// longRunningProcess Uni should be running in background for 5s
log("run2 begin sleep, longRunningProcess2.isDone: "+cs.isDone());
Thread.sleep(2000);
// Should still be running, but now cs.isDone() == true
log("run2 end sleep, longRunningProcess2.isDone: "+cs.isDone());
if (!cs.isDone()) {
log("run2, assertion failure, subscription should have completed");
}
// Print Uni result
log("run2 done waiting, longRunningProcess2.result: "+cs.get());
}
static Uni<String> longRunningProcess() {
return Uni.createFrom()
.item(() -> {
log("begin longRunningProcess");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
log("end longRunningProcess");
return "Finished.longRunningProcess";
});
}
static Uni<String> longRunningProcess2() {
Uni<String> result = Uni.createFrom()
.item("Work not part of Uni chain");
Infrastructure.getDefaultWorkerPool().submit(() -> {
log("begin longRunningProcess2");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
log("end longRunningProcess2");
});
return result;
}
static void log(String message) {
long timestamp = System.currentTimeMillis();
long elapsed = timestamp - start;
System.out.printf("[%6d](%s): %s\n", elapsed, Thread.currentThread().getName(), message);
}
} |
Beta Was this translation helpful? Give feedback.
In general, when I need to do such kind of things, I use the following pattern: