Skip to content

Commit 88695f4

Browse files
authored
Merge pull request #1858 from steve-community/refactor-Async-methods-with-custom-executor
Refactor: Async methods to use custom executor
2 parents 12f4cea + 28d5ca7 commit 88695f4

11 files changed

+57
-155
lines changed

src/main/java/de/rwth/idsg/steve/config/BeanConfiguration.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -132,28 +132,29 @@ public DSLContext dslContext(DataSource dataSource,
132132
return DSL.using(conf);
133133
}
134134

135-
@Bean(destroyMethod = "close")
136-
public DelegatingTaskScheduler asyncTaskScheduler() {
135+
@Bean
136+
public ThreadPoolTaskScheduler taskScheduler() {
137137
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
138138
scheduler.setPoolSize(5);
139139
scheduler.setThreadNamePrefix("SteVe-TaskScheduler-");
140140
scheduler.setWaitForTasksToCompleteOnShutdown(true);
141141
scheduler.setAwaitTerminationSeconds(30);
142142
scheduler.initialize();
143143

144-
return new DelegatingTaskScheduler(scheduler);
144+
return scheduler;
145145
}
146146

147-
@Bean(destroyMethod = "close")
148-
public DelegatingTaskExecutor asyncTaskExecutor() {
147+
@Bean
148+
@Primary
149+
public ThreadPoolTaskExecutor taskExecutor() {
149150
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
150151
executor.setCorePoolSize(5);
151152
executor.setThreadNamePrefix("SteVe-TaskExecutor-");
152153
executor.setWaitForTasksToCompleteOnShutdown(true);
153154
executor.setAwaitTerminationSeconds(30);
154155
executor.initialize();
155156

156-
return new DelegatingTaskExecutor(executor);
157+
return executor;
157158
}
158159

159160
/**

src/main/java/de/rwth/idsg/steve/config/DelegatingTaskExecutor.java

Lines changed: 0 additions & 48 deletions
This file was deleted.

src/main/java/de/rwth/idsg/steve/config/DelegatingTaskScheduler.java

Lines changed: 0 additions & 51 deletions
This file was deleted.

src/main/java/de/rwth/idsg/steve/ocpp/soap/MessageHeaderInterceptor.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919
package de.rwth.idsg.steve.ocpp.soap;
2020

21-
import de.rwth.idsg.steve.config.DelegatingTaskExecutor;
2221
import de.rwth.idsg.steve.ocpp.OcppProtocol;
2322
import de.rwth.idsg.steve.repository.OcppServerRepository;
2423
import de.rwth.idsg.steve.repository.impl.ChargePointRepositoryImpl;
@@ -37,6 +36,7 @@
3736
import org.apache.cxf.ws.addressing.AddressingProperties;
3837
import org.apache.cxf.ws.addressing.ContextUtils;
3938
import org.apache.cxf.ws.addressing.EndpointReferenceType;
39+
import org.springframework.core.task.TaskExecutor;
4040
import org.springframework.stereotype.Component;
4141

4242
import javax.xml.namespace.QName;
@@ -61,18 +61,18 @@ public class MessageHeaderInterceptor extends AbstractPhaseInterceptor<Message>
6161

6262
private final OcppServerRepository ocppServerRepository;
6363
private final ChargePointRegistrationService chargePointRegistrationService;
64-
private final DelegatingTaskExecutor asyncTaskExecutor;
64+
private final TaskExecutor taskExecutor;
6565

6666
private static final String BOOT_OPERATION_NAME = "BootNotification";
6767
private static final String CHARGEBOX_ID_HEADER = "ChargeBoxIdentity";
6868

6969
public MessageHeaderInterceptor(OcppServerRepository ocppServerRepository,
7070
ChargePointRegistrationService chargePointRegistrationService,
71-
DelegatingTaskExecutor asyncTaskExecutor) {
71+
TaskExecutor taskExecutor) {
7272
super(Phase.PRE_INVOKE);
7373
this.ocppServerRepository = ocppServerRepository;
7474
this.chargePointRegistrationService = chargePointRegistrationService;
75-
this.asyncTaskExecutor = asyncTaskExecutor;
75+
this.taskExecutor = taskExecutor;
7676
}
7777

7878
@Override
@@ -97,7 +97,7 @@ public void handleMessage(Message message) throws Fault {
9797
// 2. update endpoint
9898
// -------------------------------------------------------------------------
9999

100-
asyncTaskExecutor.execute(() -> {
100+
taskExecutor.execute(() -> {
101101
try {
102102
String endpointAddress = getEndpointAddress(message);
103103
if (endpointAddress != null) {

src/main/java/de/rwth/idsg/steve/ocpp/ws/AbstractWebSocketEndpoint.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,23 +19,23 @@
1919
package de.rwth.idsg.steve.ocpp.ws;
2020

2121
import com.google.common.base.Strings;
22-
import de.rwth.idsg.steve.config.DelegatingTaskScheduler;
2322
import de.rwth.idsg.steve.config.SteveProperties;
2423
import de.rwth.idsg.steve.config.WebSocketConfiguration;
2524
import de.rwth.idsg.steve.ocpp.OcppTransport;
2625
import de.rwth.idsg.steve.ocpp.OcppVersion;
2726
import de.rwth.idsg.steve.ocpp.ws.data.CommunicationContext;
2827
import de.rwth.idsg.steve.ocpp.ws.data.SessionContext;
29-
import de.rwth.idsg.steve.ocpp.ws.pipeline.OcppCallHandler;
3028
import de.rwth.idsg.steve.ocpp.ws.pipeline.Deserializer;
3129
import de.rwth.idsg.steve.ocpp.ws.pipeline.IncomingPipeline;
30+
import de.rwth.idsg.steve.ocpp.ws.pipeline.OcppCallHandler;
3231
import de.rwth.idsg.steve.repository.OcppServerRepository;
3332
import de.rwth.idsg.steve.service.notification.OcppStationWebSocketConnected;
3433
import de.rwth.idsg.steve.service.notification.OcppStationWebSocketDisconnected;
3534
import org.joda.time.DateTime;
3635
import org.slf4j.Logger;
3736
import org.slf4j.LoggerFactory;
3837
import org.springframework.context.ApplicationEventPublisher;
38+
import org.springframework.scheduling.TaskScheduler;
3939
import org.springframework.web.socket.BinaryMessage;
4040
import org.springframework.web.socket.CloseStatus;
4141
import org.springframework.web.socket.PongMessage;
@@ -61,7 +61,7 @@ public abstract class AbstractWebSocketEndpoint extends ConcurrentWebSocketHandl
6161

6262
public static final String CHARGEBOX_ID_KEY = "CHARGEBOX_ID_KEY";
6363

64-
private final DelegatingTaskScheduler asyncTaskScheduler;
64+
private final TaskScheduler taskScheduler;
6565
private final OcppServerRepository ocppServerRepository;
6666
private final FutureResponseContextStore futureResponseContextStore;
6767
private final IncomingPipeline pipeline;
@@ -72,13 +72,13 @@ public abstract class AbstractWebSocketEndpoint extends ConcurrentWebSocketHandl
7272
private final List<Consumer<String>> disconnectedCallbackList = new ArrayList<>();
7373
private final Object sessionContextLock = new Object();
7474

75-
public AbstractWebSocketEndpoint(DelegatingTaskScheduler asyncTaskScheduler,
75+
public AbstractWebSocketEndpoint(TaskScheduler taskScheduler,
7676
OcppServerRepository ocppServerRepository,
7777
FutureResponseContextStore futureResponseContextStore,
7878
ApplicationEventPublisher applicationEventPublisher,
7979
SteveProperties steveProperties,
8080
AbstractTypeStore typeStore) {
81-
this.asyncTaskScheduler = asyncTaskScheduler;
81+
this.taskScheduler = taskScheduler;
8282
this.ocppServerRepository = ocppServerRepository;
8383
this.futureResponseContextStore = futureResponseContextStore;
8484
this.pipeline = new IncomingPipeline(new Deserializer(futureResponseContextStore, typeStore), this);
@@ -148,7 +148,7 @@ public void onOpen(WebSocketSession session) throws Exception {
148148

149149
// Just to keep the connection alive, such that the servers do not close
150150
// the connection because of a idle timeout, we ping-pong at fixed intervals.
151-
ScheduledFuture pingSchedule = asyncTaskScheduler.scheduleAtFixedRate(
151+
ScheduledFuture pingSchedule = taskScheduler.scheduleAtFixedRate(
152152
new PingTask(chargeBoxId, session),
153153
Instant.now().plus(WebSocketConfiguration.PING_INTERVAL),
154154
WebSocketConfiguration.PING_INTERVAL

src/main/java/de/rwth/idsg/steve/ocpp/ws/ocpp12/Ocpp12WebSocketEndpoint.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import de.rwth.idsg.ocpp.jaxb.RequestType;
2222
import de.rwth.idsg.ocpp.jaxb.ResponseType;
23-
import de.rwth.idsg.steve.config.DelegatingTaskScheduler;
2423
import de.rwth.idsg.steve.config.SteveProperties;
2524
import de.rwth.idsg.steve.ocpp.OcppProtocol;
2625
import de.rwth.idsg.steve.ocpp.OcppVersion;
@@ -38,6 +37,7 @@
3837
import ocpp.cs._2010._08.StatusNotificationRequest;
3938
import ocpp.cs._2010._08.StopTransactionRequest;
4039
import org.springframework.context.ApplicationEventPublisher;
40+
import org.springframework.scheduling.TaskScheduler;
4141
import org.springframework.stereotype.Component;
4242

4343
/**
@@ -49,13 +49,13 @@ public class Ocpp12WebSocketEndpoint extends AbstractWebSocketEndpoint {
4949

5050
private final CentralSystemService12_SoapServer server;
5151

52-
public Ocpp12WebSocketEndpoint(DelegatingTaskScheduler asyncTaskScheduler,
52+
public Ocpp12WebSocketEndpoint(TaskScheduler taskScheduler,
5353
OcppServerRepository ocppServerRepository,
5454
FutureResponseContextStore futureResponseContextStore,
5555
ApplicationEventPublisher applicationEventPublisher,
5656
CentralSystemService12_SoapServer server,
5757
SteveProperties steveProperties) {
58-
super(asyncTaskScheduler, ocppServerRepository, futureResponseContextStore, applicationEventPublisher, steveProperties, Ocpp12TypeStore.INSTANCE);
58+
super(taskScheduler, ocppServerRepository, futureResponseContextStore, applicationEventPublisher, steveProperties, Ocpp12TypeStore.INSTANCE);
5959
this.server = server;
6060
}
6161

src/main/java/de/rwth/idsg/steve/ocpp/ws/ocpp15/Ocpp15WebSocketEndpoint.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import de.rwth.idsg.ocpp.jaxb.RequestType;
2222
import de.rwth.idsg.ocpp.jaxb.ResponseType;
23-
import de.rwth.idsg.steve.config.DelegatingTaskScheduler;
2423
import de.rwth.idsg.steve.config.SteveProperties;
2524
import de.rwth.idsg.steve.ocpp.OcppProtocol;
2625
import de.rwth.idsg.steve.ocpp.OcppVersion;
@@ -39,6 +38,7 @@
3938
import ocpp.cs._2012._06.StatusNotificationRequest;
4039
import ocpp.cs._2012._06.StopTransactionRequest;
4140
import org.springframework.context.ApplicationEventPublisher;
41+
import org.springframework.scheduling.TaskScheduler;
4242
import org.springframework.stereotype.Component;
4343

4444
/**
@@ -50,13 +50,13 @@ public class Ocpp15WebSocketEndpoint extends AbstractWebSocketEndpoint {
5050

5151
private final CentralSystemService15_SoapServer server;
5252

53-
public Ocpp15WebSocketEndpoint(DelegatingTaskScheduler asyncTaskScheduler,
53+
public Ocpp15WebSocketEndpoint(TaskScheduler taskScheduler,
5454
OcppServerRepository ocppServerRepository,
5555
FutureResponseContextStore futureResponseContextStore,
5656
ApplicationEventPublisher applicationEventPublisher,
5757
CentralSystemService15_SoapServer server,
5858
SteveProperties steveProperties) {
59-
super(asyncTaskScheduler, ocppServerRepository, futureResponseContextStore, applicationEventPublisher, steveProperties, Ocpp15TypeStore.INSTANCE);
59+
super(taskScheduler, ocppServerRepository, futureResponseContextStore, applicationEventPublisher, steveProperties, Ocpp15TypeStore.INSTANCE);
6060
this.server = server;
6161
}
6262

src/main/java/de/rwth/idsg/steve/ocpp/ws/ocpp16/Ocpp16WebSocketEndpoint.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import de.rwth.idsg.ocpp.jaxb.RequestType;
2222
import de.rwth.idsg.ocpp.jaxb.ResponseType;
23-
import de.rwth.idsg.steve.config.DelegatingTaskScheduler;
2423
import de.rwth.idsg.steve.config.SteveProperties;
2524
import de.rwth.idsg.steve.ocpp.OcppProtocol;
2625
import de.rwth.idsg.steve.ocpp.OcppVersion;
@@ -39,6 +38,7 @@
3938
import ocpp.cs._2015._10.StatusNotificationRequest;
4039
import ocpp.cs._2015._10.StopTransactionRequest;
4140
import org.springframework.context.ApplicationEventPublisher;
41+
import org.springframework.scheduling.TaskScheduler;
4242
import org.springframework.stereotype.Component;
4343

4444
/**
@@ -50,13 +50,13 @@ public class Ocpp16WebSocketEndpoint extends AbstractWebSocketEndpoint {
5050

5151
private final CentralSystemService16_SoapServer server;
5252

53-
public Ocpp16WebSocketEndpoint(DelegatingTaskScheduler asyncTaskScheduler,
53+
public Ocpp16WebSocketEndpoint(TaskScheduler taskScheduler,
5454
OcppServerRepository ocppServerRepository,
5555
FutureResponseContextStore futureResponseContextStore,
5656
ApplicationEventPublisher applicationEventPublisher,
5757
CentralSystemService16_SoapServer server,
5858
SteveProperties steveProperties) {
59-
super(asyncTaskScheduler, ocppServerRepository, futureResponseContextStore, applicationEventPublisher, steveProperties, Ocpp16TypeStore.INSTANCE);
59+
super(taskScheduler, ocppServerRepository, futureResponseContextStore, applicationEventPublisher, steveProperties, Ocpp16TypeStore.INSTANCE);
6060
this.server = server;
6161
}
6262

src/main/java/de/rwth/idsg/steve/service/BackgroundService.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@
1818
*/
1919
package de.rwth.idsg.steve.service;
2020

21-
import de.rwth.idsg.steve.config.DelegatingTaskExecutor;
2221
import de.rwth.idsg.steve.repository.dto.ChargePointSelect;
2322
import lombok.AccessLevel;
2423
import lombok.RequiredArgsConstructor;
24+
import org.springframework.core.task.TaskExecutor;
2525

2626
import java.util.List;
2727
import java.util.function.Consumer;
@@ -33,10 +33,10 @@
3333
@RequiredArgsConstructor
3434
public class BackgroundService {
3535

36-
private final DelegatingTaskExecutor asyncTaskExecutor;
36+
private final TaskExecutor taskExecutor;
3737

38-
public static BackgroundService with(DelegatingTaskExecutor asyncTaskExecutor) {
39-
return new BackgroundService(asyncTaskExecutor);
38+
public static BackgroundService with(TaskExecutor taskExecutor) {
39+
return new BackgroundService(taskExecutor);
4040
}
4141

4242
public Runner forFirst(List<ChargePointSelect> list) {
@@ -57,7 +57,7 @@ private class BackgroundSingleRunner implements Runner {
5757

5858
@Override
5959
public void execute(Consumer<ChargePointSelect> consumer) {
60-
asyncTaskExecutor.execute(() -> consumer.accept(cps));
60+
taskExecutor.execute(() -> consumer.accept(cps));
6161
}
6262
}
6363

@@ -67,7 +67,7 @@ private class BackgroundListRunner implements Runner {
6767

6868
@Override
6969
public void execute(Consumer<ChargePointSelect> consumer) {
70-
asyncTaskExecutor.execute(() -> list.forEach(consumer));
70+
taskExecutor.execute(() -> list.forEach(consumer));
7171
}
7272
}
7373
}

0 commit comments

Comments
 (0)