Skip to content

Commit d6c4491

Browse files
committed
[Fix #722] Close executor service if created
Signed-off-by: fjtirado <[email protected]>
1 parent 9f7b967 commit d6c4491

File tree

9 files changed

+111
-22
lines changed

9 files changed

+111
-22
lines changed

fluent/agentic-langchain4j/src/main/java/io/serverlessworkflow/fluent/agentic/langchain4j/ParallelAgentServiceImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import dev.langchain4j.agentic.internal.AgentExecutor;
1919
import dev.langchain4j.agentic.workflow.ParallelAgentService;
20+
import io.serverlessworkflow.impl.ExecutorServiceHolder;
2021
import java.util.List;
2122
import java.util.concurrent.ExecutorService;
2223

@@ -33,7 +34,7 @@ public static <T> ParallelAgentService<T> builder(Class<T> agentServiceClass) {
3334

3435
@Override
3536
public ParallelAgentService<T> executorService(ExecutorService executorService) {
36-
this.workflowExecBuilder.withExecutorFactory(() -> executorService);
37+
this.workflowExecBuilder.withExecutorFactory(new ExecutorServiceHolder(executorService));
3738
return this;
3839
}
3940

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl;
17+
18+
import java.util.concurrent.ExecutorService;
19+
20+
public abstract class AbstractExecutorServiceHolder implements ExecutorServiceFactory {
21+
22+
protected ExecutorService service;
23+
24+
@Override
25+
public void close() {
26+
if (service != null && !service.isShutdown()) {
27+
service.shutdown();
28+
}
29+
}
30+
}

impl/core/src/main/java/io/serverlessworkflow/impl/DefaultExecutorServiceFactory.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,23 @@
1717

1818
import java.util.concurrent.ExecutorService;
1919
import java.util.concurrent.Executors;
20+
import java.util.concurrent.locks.Lock;
21+
import java.util.concurrent.locks.ReentrantLock;
2022

21-
public class DefaultExecutorServiceFactory implements ExecutorServiceFactory {
23+
public class DefaultExecutorServiceFactory extends AbstractExecutorServiceHolder {
2224

23-
private static final ExecutorServiceFactory instance = new DefaultExecutorServiceFactory();
24-
25-
public static ExecutorServiceFactory instance() {
26-
return instance;
27-
}
28-
29-
private static class ExecutorServiceHolder {
30-
private static ExecutorService instance = Executors.newCachedThreadPool();
31-
}
25+
private Lock serviceLock = new ReentrantLock();
3226

3327
@Override
3428
public ExecutorService get() {
35-
return ExecutorServiceHolder.instance;
29+
try {
30+
serviceLock.lock();
31+
if (service == null) {
32+
service = Executors.newCachedThreadPool();
33+
}
34+
} finally {
35+
serviceLock.unlock();
36+
}
37+
return service;
3638
}
37-
38-
private DefaultExecutorServiceFactory() {}
3939
}

impl/core/src/main/java/io/serverlessworkflow/impl/ExecutorServiceFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,6 @@
1818
import java.util.concurrent.ExecutorService;
1919
import java.util.function.Supplier;
2020

21-
@FunctionalInterface
22-
public interface ExecutorServiceFactory extends Supplier<ExecutorService> {}
21+
public interface ExecutorServiceFactory extends Supplier<ExecutorService>, AutoCloseable {
22+
void close();
23+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl;
17+
18+
import java.util.concurrent.ExecutorService;
19+
20+
public class ExecutorServiceHolder extends AbstractExecutorServiceHolder {
21+
22+
public ExecutorServiceHolder(ExecutorService service) {
23+
this.service = service;
24+
}
25+
26+
@Override
27+
public ExecutorService get() {
28+
return service;
29+
}
30+
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,14 @@
3939
import java.util.ServiceLoader.Provider;
4040
import java.util.concurrent.ConcurrentHashMap;
4141
import java.util.concurrent.ExecutorService;
42-
import java.util.concurrent.Executors;
4342
import java.util.stream.Collectors;
43+
import org.slf4j.Logger;
44+
import org.slf4j.LoggerFactory;
4445

4546
public class WorkflowApplication implements AutoCloseable {
4647

48+
private static final Logger logger = LoggerFactory.getLogger(WorkflowApplication.class);
49+
4750
private final TaskExecutorFactory taskFactory;
4851
private final ExpressionFactory exprFactory;
4952
private final ResourceLoaderFactory resourceLoaderFactory;
@@ -137,7 +140,7 @@ public SchemaValidator getValidator(SchemaInline inline) {
137140
private SchemaValidatorFactory schemaValidatorFactory;
138141
private WorkflowPositionFactory positionFactory = () -> new QueueWorkflowPosition();
139142
private WorkflowIdFactory idFactory = () -> UlidCreator.getMonotonicUlid().toString();
140-
private ExecutorServiceFactory executorFactory = () -> Executors.newCachedThreadPool();
143+
private ExecutorServiceFactory executorFactory = new DefaultExecutorServiceFactory();
141144
private EventConsumer<?, ?> eventConsumer = InMemoryEvents.get();
142145
private EventPublisher eventPublisher = InMemoryEvents.get();
143146
private RuntimeDescriptorFactory descriptorFactory =
@@ -236,12 +239,23 @@ public WorkflowDefinition workflowDefinition(Workflow workflow) {
236239

237240
@Override
238241
public void close() {
242+
safeClose(executorFactory);
243+
safeClose(eventPublisher);
244+
safeClose(eventConsumer);
239245
for (WorkflowDefinition definition : definitions.values()) {
240-
definition.close();
246+
safeClose(definition);
241247
}
242248
definitions.clear();
243249
}
244250

251+
private void safeClose(AutoCloseable closeable) {
252+
try {
253+
closeable.close();
254+
} catch (Exception ex) {
255+
logger.warn("Error closing resource {}", closeable.getClass().getName(), ex);
256+
}
257+
}
258+
245259
public WorkflowPositionFactory positionFactory() {
246260
return positionFactory;
247261
}

impl/core/src/main/java/io/serverlessworkflow/impl/events/EventConsumer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121
import java.util.Collection;
2222
import java.util.function.Consumer;
2323

24-
public interface EventConsumer<T extends EventRegistration, V extends EventRegistrationBuilder> {
24+
public interface EventConsumer<T extends EventRegistration, V extends EventRegistrationBuilder>
25+
extends AutoCloseable {
2526

2627
V listen(EventFilter filter, WorkflowApplication workflowApplication);
2728

@@ -30,4 +31,6 @@ public interface EventConsumer<T extends EventRegistration, V extends EventRegis
3031
T register(V builder, Consumer<CloudEvent> consumer);
3132

3233
void unregister(T register);
34+
35+
void close();
3336
}

impl/core/src/main/java/io/serverlessworkflow/impl/events/EventPublisher.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import io.cloudevents.CloudEvent;
1919
import java.util.concurrent.CompletableFuture;
2020

21-
public interface EventPublisher {
21+
public interface EventPublisher extends AutoCloseable {
2222
CompletableFuture<Void> publish(CloudEvent event);
23+
24+
void close();
2325
}

impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import io.cloudevents.CloudEvent;
1919
import io.serverlessworkflow.impl.DefaultExecutorServiceFactory;
20+
import io.serverlessworkflow.impl.ExecutorServiceFactory;
2021
import java.util.Map;
2122
import java.util.concurrent.CompletableFuture;
2223
import java.util.concurrent.ConcurrentHashMap;
@@ -37,6 +38,8 @@ public static InMemoryEvents get() {
3738
return instance;
3839
}
3940

41+
private ExecutorServiceFactory serviceFactory = new DefaultExecutorServiceFactory();
42+
4043
private Map<String, Consumer<CloudEvent>> topicMap = new ConcurrentHashMap<>();
4144

4245
private AtomicReference<Consumer<CloudEvent>> allConsumerRef = new AtomicReference<>();
@@ -64,7 +67,7 @@ public CompletableFuture<Void> publish(CloudEvent ce) {
6467
consumer.accept(ce);
6568
}
6669
},
67-
DefaultExecutorServiceFactory.instance().get());
70+
serviceFactory.get());
6871
}
6972

7073
@Override
@@ -76,4 +79,9 @@ protected void registerToAll(Consumer<CloudEvent> consumer) {
7679
protected void unregisterFromAll() {
7780
allConsumerRef.set(null);
7881
}
82+
83+
@Override
84+
public void close() {
85+
serviceFactory.close();
86+
}
7987
}

0 commit comments

Comments
 (0)