Skip to content

Commit a211148

Browse files
committed
[Fix #722] Close executor service if created
Signed-off-by: fjtirado <[email protected]>
1 parent 9f7b967 commit a211148

File tree

6 files changed

+60
-16
lines changed

6 files changed

+60
-16
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/DefaultExecutorServiceFactory.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,37 @@
1717

1818
import java.util.concurrent.ExecutorService;
1919
import java.util.concurrent.Executors;
20+
import java.util.concurrent.locks.Lock;
21+
import java.util.concurrent.locks.ReentrantLock;
2022

2123
public class DefaultExecutorServiceFactory implements ExecutorServiceFactory {
2224

23-
private static final ExecutorServiceFactory instance = new DefaultExecutorServiceFactory();
25+
private ExecutorService service;
26+
private Lock serviceLock = new ReentrantLock();
2427

2528
public static ExecutorServiceFactory instance() {
26-
return instance;
27-
}
28-
29-
private static class ExecutorServiceHolder {
30-
private static ExecutorService instance = Executors.newCachedThreadPool();
29+
return new DefaultExecutorServiceFactory();
3130
}
3231

3332
@Override
3433
public ExecutorService get() {
35-
return ExecutorServiceHolder.instance;
34+
try {
35+
serviceLock.lock();
36+
if (service == null) {
37+
service = Executors.newCachedThreadPool();
38+
}
39+
} finally {
40+
serviceLock.unlock();
41+
}
42+
return service;
3643
}
3744

3845
private DefaultExecutorServiceFactory() {}
46+
47+
@Override
48+
public void close() {
49+
if (service != null) {
50+
service.shutdown();
51+
}
52+
}
3953
}

impl/core/src/main/java/io/serverlessworkflow/impl/ExecutorServiceFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,6 @@
1818
import java.util.concurrent.ExecutorService;
1919
import java.util.function.Supplier;
2020

21-
@FunctionalInterface
22-
public interface ExecutorServiceFactory extends Supplier<ExecutorService> {}
21+
public interface ExecutorServiceFactory extends Supplier<ExecutorService>, AutoCloseable {
22+
void close();
23+
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,14 @@
3939
import java.util.ServiceLoader.Provider;
4040
import java.util.concurrent.ConcurrentHashMap;
4141
import java.util.concurrent.ExecutorService;
42-
import java.util.concurrent.Executors;
4342
import java.util.stream.Collectors;
43+
import org.slf4j.Logger;
44+
import org.slf4j.LoggerFactory;
4445

4546
public class WorkflowApplication implements AutoCloseable {
4647

48+
private static final Logger logger = LoggerFactory.getLogger(WorkflowApplication.class);
49+
4750
private final TaskExecutorFactory taskFactory;
4851
private final ExpressionFactory exprFactory;
4952
private final ResourceLoaderFactory resourceLoaderFactory;
@@ -137,7 +140,7 @@ public SchemaValidator getValidator(SchemaInline inline) {
137140
private SchemaValidatorFactory schemaValidatorFactory;
138141
private WorkflowPositionFactory positionFactory = () -> new QueueWorkflowPosition();
139142
private WorkflowIdFactory idFactory = () -> UlidCreator.getMonotonicUlid().toString();
140-
private ExecutorServiceFactory executorFactory = () -> Executors.newCachedThreadPool();
143+
private ExecutorServiceFactory executorFactory = DefaultExecutorServiceFactory.instance();
141144
private EventConsumer<?, ?> eventConsumer = InMemoryEvents.get();
142145
private EventPublisher eventPublisher = InMemoryEvents.get();
143146
private RuntimeDescriptorFactory descriptorFactory =
@@ -236,12 +239,23 @@ public WorkflowDefinition workflowDefinition(Workflow workflow) {
236239

237240
@Override
238241
public void close() {
242+
safeClose(executorFactory);
243+
safeClose(eventPublisher);
244+
safeClose(eventConsumer);
239245
for (WorkflowDefinition definition : definitions.values()) {
240-
definition.close();
246+
safeClose(definition);
241247
}
242248
definitions.clear();
243249
}
244250

251+
private void safeClose(AutoCloseable closeable) {
252+
try {
253+
closeable.close();
254+
} catch (Exception ex) {
255+
logger.warn("Error closing resource {}", closeable.getClass().getName(), ex);
256+
}
257+
}
258+
245259
public WorkflowPositionFactory positionFactory() {
246260
return positionFactory;
247261
}

impl/core/src/main/java/io/serverlessworkflow/impl/events/EventConsumer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121
import java.util.Collection;
2222
import java.util.function.Consumer;
2323

24-
public interface EventConsumer<T extends EventRegistration, V extends EventRegistrationBuilder> {
24+
public interface EventConsumer<T extends EventRegistration, V extends EventRegistrationBuilder>
25+
extends AutoCloseable {
2526

2627
V listen(EventFilter filter, WorkflowApplication workflowApplication);
2728

@@ -30,4 +31,6 @@ public interface EventConsumer<T extends EventRegistration, V extends EventRegis
3031
T register(V builder, Consumer<CloudEvent> consumer);
3132

3233
void unregister(T register);
34+
35+
void close();
3336
}

impl/core/src/main/java/io/serverlessworkflow/impl/events/EventPublisher.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import io.cloudevents.CloudEvent;
1919
import java.util.concurrent.CompletableFuture;
2020

21-
public interface EventPublisher {
21+
public interface EventPublisher extends AutoCloseable {
2222
CompletableFuture<Void> publish(CloudEvent event);
23+
24+
void close();
2325
}

impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@
1616
package io.serverlessworkflow.impl.events;
1717

1818
import io.cloudevents.CloudEvent;
19-
import io.serverlessworkflow.impl.DefaultExecutorServiceFactory;
2019
import java.util.Map;
2120
import java.util.concurrent.CompletableFuture;
2221
import java.util.concurrent.ConcurrentHashMap;
22+
import java.util.concurrent.ExecutorService;
23+
import java.util.concurrent.Executors;
2324
import java.util.concurrent.atomic.AtomicReference;
2425
import java.util.function.Consumer;
2526

@@ -31,6 +32,8 @@ public class InMemoryEvents extends AbstractTypeConsumer implements EventPublish
3132

3233
private static InMemoryEvents instance = new InMemoryEvents();
3334

35+
private ExecutorService service = Executors.newCachedThreadPool();
36+
3437
private InMemoryEvents() {}
3538

3639
public static InMemoryEvents get() {
@@ -64,7 +67,7 @@ public CompletableFuture<Void> publish(CloudEvent ce) {
6467
consumer.accept(ce);
6568
}
6669
},
67-
DefaultExecutorServiceFactory.instance().get());
70+
service);
6871
}
6972

7073
@Override
@@ -76,4 +79,11 @@ protected void registerToAll(Consumer<CloudEvent> consumer) {
7679
protected void unregisterFromAll() {
7780
allConsumerRef.set(null);
7881
}
82+
83+
@Override
84+
public void close() {
85+
if (!service.isShutdown()) {
86+
service.shutdown();
87+
}
88+
}
7989
}

0 commit comments

Comments
 (0)