Skip to content

Commit 6422fa8

Browse files
committed
server cluster synchro 核心逻辑异步化
1 parent 953af7b commit 6422fa8

File tree

6 files changed

+102
-19
lines changed

6 files changed

+102
-19
lines changed

spring-cloud-gray-server-cluster/src/main/java/cn/springcloud/gray/server/clustering/synchro/SimpleSynchDataAcceptor.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@
77
import java.time.ZoneId;
88
import java.time.format.DateTimeFormatter;
99
import java.util.*;
10+
import java.util.concurrent.ExecutorService;
11+
import java.util.concurrent.LinkedBlockingQueue;
12+
import java.util.concurrent.ThreadPoolExecutor;
13+
import java.util.concurrent.TimeUnit;
1014

1115
/**
1216
* @author saleson
@@ -17,8 +21,16 @@ public class SimpleSynchDataAcceptor implements SynchDataAcceptor {
1721

1822
private List<SynchDataListener> synchDataListeners;
1923
private Map<String, List<SynchDataListener>> dataTypeListenerCache = new HashMap<>();
24+
private ExecutorService executorService;
2025

2126
public SimpleSynchDataAcceptor(List<SynchDataListener> synchDataListeners) {
27+
this(new ThreadPoolExecutor(10, 10,
28+
0L, TimeUnit.MILLISECONDS,
29+
new LinkedBlockingQueue<Runnable>(10)), synchDataListeners);
30+
}
31+
32+
public SimpleSynchDataAcceptor(ExecutorService executorService, List<SynchDataListener> synchDataListeners) {
33+
this.executorService = executorService;
2234
this.synchDataListeners = synchDataListeners;
2335
refreshDataTypeListenerCache();
2436
}
@@ -31,10 +43,11 @@ public void accept(SynchData synchData) {
3143
if (Objects.isNull(listeners)) {
3244
return;
3345
}
34-
//todo 计划改为异步
35-
for (SynchDataListener synchDataListener : listeners) {
36-
synchDataListener.listen(synchData);
37-
}
46+
executorService.execute(() -> {
47+
for (SynchDataListener synchDataListener : listeners) {
48+
synchDataListener.listen(synchData);
49+
}
50+
});
3851
}
3952

4053
private void refreshDataTypeListenerCache() {

spring-cloud-gray-server-plugins/spring-cloud-gray-server-cluster-http-synchro-plugin/src/main/java/cn/springcloud/gray/server/clustering/synchro/http/HttpServerSynchronizer.java

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@
1313
import java.io.IOException;
1414
import java.io.ObjectOutputStream;
1515
import java.util.Objects;
16+
import java.util.concurrent.ExecutorService;
17+
import java.util.concurrent.LinkedBlockingQueue;
18+
import java.util.concurrent.ThreadPoolExecutor;
19+
import java.util.concurrent.TimeUnit;
1620

1721
/**
1822
* @author saleson
@@ -24,20 +28,27 @@ public class HttpServerSynchronizer implements ServerSynchronizer {
2428
private int peerNodeSynchronizeRetryTimes;
2529
private RestTemplate rest;
2630
private ServerCluster serverCluster;
27-
31+
private ExecutorService executorService;
2832

2933
public HttpServerSynchronizer(ServerCluster serverCluster) {
30-
this(new RestTemplate(), serverCluster);
34+
this(serverCluster, new ThreadPoolExecutor(10, 10,
35+
0L, TimeUnit.MILLISECONDS,
36+
new LinkedBlockingQueue<Runnable>(10)));
37+
}
38+
39+
public HttpServerSynchronizer(ServerCluster serverCluster, ExecutorService executorService) {
40+
this(new RestTemplate(), serverCluster, executorService);
3141
}
3242

33-
public HttpServerSynchronizer(RestTemplate rest, ServerCluster serverCluster) {
34-
this(3, rest, serverCluster);
43+
public HttpServerSynchronizer(RestTemplate rest, ServerCluster serverCluster, ExecutorService executorService) {
44+
this(3, rest, serverCluster, executorService);
3545
}
3646

37-
public HttpServerSynchronizer(int peerNodeSynchronizeRetryTimes, RestTemplate rest, ServerCluster serverCluster) {
47+
public HttpServerSynchronizer(int peerNodeSynchronizeRetryTimes, RestTemplate rest, ServerCluster serverCluster, ExecutorService executorService) {
3848
this.peerNodeSynchronizeRetryTimes = peerNodeSynchronizeRetryTimes;
3949
this.rest = rest;
4050
this.serverCluster = serverCluster;
51+
this.executorService = executorService;
4152
}
4253

4354
@Override
@@ -46,13 +57,14 @@ public void broadcast(SynchData synchData) {
4657
if (ArrayUtils.isEmpty(peerNodes)) {
4758
return;
4859
}
49-
//todo 计划改为异步
50-
if (StringUtils.isEmpty(synchData.getId())) {
51-
synchData.setId(StringUtils.defaultString(synchData.getDataType()) +
52-
cn.springcloud.gray.utils.StringUtils.generateUUID());
53-
}
60+
executorService.execute(() -> {
61+
if (StringUtils.isEmpty(synchData.getId())) {
62+
synchData.setId(StringUtils.defaultString(synchData.getDataType()) +
63+
cn.springcloud.gray.utils.StringUtils.generateUUID());
64+
}
5465

55-
broadcastSynchData(synchData, peerNodes);
66+
broadcastSynchData(synchData, peerNodes);
67+
});
5668
}
5769

5870
private void broadcastSynchData(SynchData synchData, String[] peerNodes) {

spring-cloud-gray-server-plugins/spring-cloud-gray-server-cluster-http-synchro-plugin/src/main/java/cn/springcloud/gray/server/clustering/synchro/http/configuration/HttpSynchroAutoConfiguration.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@
44
import cn.springcloud.gray.server.clustering.synchro.ServerSynchronizer;
55
import cn.springcloud.gray.server.clustering.synchro.http.HttpServerSynchronizer;
66
import cn.springcloud.gray.server.clustering.synchro.http.ServerSynchDataAcceptEndpoint;
7+
import org.springframework.beans.factory.annotation.Qualifier;
78
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
89
import org.springframework.context.annotation.Bean;
910
import org.springframework.context.annotation.Configuration;
1011

12+
import java.util.concurrent.ExecutorService;
13+
1114
/**
1215
* @author saleson
1316
* @date 2020-08-16 04:48
@@ -17,8 +20,10 @@ public class HttpSynchroAutoConfiguration {
1720

1821
@Bean
1922
@ConditionalOnMissingBean
20-
public ServerSynchronizer serverSynchronizer(ServerCluster serverCluster) {
21-
return new HttpServerSynchronizer(serverCluster);
23+
public ServerSynchronizer serverSynchronizer(
24+
@Qualifier("synchroExecutorService") ExecutorService synchroExecutorService,
25+
ServerCluster serverCluster) {
26+
return new HttpServerSynchronizer(serverCluster, synchroExecutorService);
2227
}
2328

2429
@Bean

spring-cloud-gray-server/src/main/java/cn/springcloud/gray/server/configuration/SynchroAutoConfiguration.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,23 @@
11
package cn.springcloud.gray.server.configuration;
22

3+
import cn.springcloud.gray.concurrent.ExecutorConcurrentStrategy;
34
import cn.springcloud.gray.server.clustering.synchro.GrayEventSynchroListener;
45
import cn.springcloud.gray.server.clustering.synchro.SimpleSynchDataAcceptor;
56
import cn.springcloud.gray.server.clustering.synchro.SynchDataAcceptor;
67
import cn.springcloud.gray.server.clustering.synchro.SynchDataListener;
8+
import cn.springcloud.gray.server.configuration.properties.ClusterProperties;
79
import cn.springlcoud.gray.event.server.GrayEventSender;
10+
import org.springframework.beans.factory.annotation.Autowired;
11+
import org.springframework.beans.factory.annotation.Qualifier;
812
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
913
import org.springframework.context.annotation.Bean;
1014
import org.springframework.context.annotation.Configuration;
1115

1216
import java.util.List;
17+
import java.util.concurrent.ExecutorService;
18+
import java.util.concurrent.LinkedBlockingQueue;
19+
import java.util.concurrent.ThreadPoolExecutor;
20+
import java.util.concurrent.TimeUnit;
1321

1422
/**
1523
* @author saleson
@@ -18,11 +26,27 @@
1826
@Configuration
1927
public class SynchroAutoConfiguration {
2028

29+
@Autowired
30+
private ClusterProperties clusterProperties;
31+
32+
33+
@Bean
34+
@ConditionalOnMissingBean(name = "synchroExecutorService")
35+
public ExecutorService synchroExecutorService() {
36+
ExecutorConcurrentStrategy ecs = clusterProperties.
37+
getSynchro().getExecutorConcurrentStrategy();
38+
return new ThreadPoolExecutor(ecs.getCorePoolSize(), ecs.getMaximumPoolSize(),
39+
0L, TimeUnit.MILLISECONDS,
40+
new LinkedBlockingQueue<Runnable>(ecs.getQueueSize()));
41+
}
42+
2143

2244
@Bean
2345
@ConditionalOnMissingBean
24-
public SynchDataAcceptor synchDataAcceptor(List<SynchDataListener> synchDataListeners) {
25-
return new SimpleSynchDataAcceptor(synchDataListeners);
46+
public SynchDataAcceptor synchDataAcceptor(
47+
@Qualifier("synchroExecutorService") ExecutorService synchroExecutorService,
48+
List<SynchDataListener> synchDataListeners) {
49+
return new SimpleSynchDataAcceptor(synchroExecutorService, synchDataListeners);
2650
}
2751

2852

spring-cloud-gray-server/src/main/java/cn/springcloud/gray/server/configuration/properties/ClusterProperties.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package cn.springcloud.gray.server.configuration.properties;
22

3+
import cn.springcloud.gray.concurrent.ExecutorConcurrentStrategy;
34
import cn.springcloud.gray.server.clustering.PeerNode;
45
import lombok.Data;
56
import org.springframework.boot.context.properties.ConfigurationProperties;
@@ -15,4 +16,12 @@
1516
@ConfigurationProperties(prefix = "gray.server.cluster")
1617
public class ClusterProperties {
1718
private List<PeerNode> peerNodes = new ArrayList<>();
19+
private SynchroProperties synchro = new SynchroProperties();
20+
21+
@Data
22+
public class SynchroProperties {
23+
private ExecutorConcurrentStrategy executorConcurrentStrategy = new ExecutorConcurrentStrategy();
24+
25+
}
26+
1827
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package cn.springcloud.gray.concurrent;
2+
3+
import lombok.AllArgsConstructor;
4+
import lombok.Builder;
5+
import lombok.Data;
6+
import lombok.NoArgsConstructor;
7+
8+
/**
9+
* @author saleson
10+
* @date 2020-08-16 16:41
11+
*/
12+
@Data
13+
@AllArgsConstructor
14+
@NoArgsConstructor
15+
@Builder
16+
public class ExecutorConcurrentStrategy {
17+
private int corePoolSize = 10;
18+
private int maximumPoolSize = 10;
19+
private int queueSize = 10;
20+
}

0 commit comments

Comments
 (0)