Skip to content

Commit 5533aeb

Browse files
committed
实现并发查询核心功能
1 parent 6ac2892 commit 5533aeb

File tree

18 files changed

+284
-19
lines changed

18 files changed

+284
-19
lines changed

README.md

Lines changed: 125 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,130 @@
22

33
## 背景
44

5+
后台接口在调用一些接口时, 为了写代码方便, 多数时候我们习惯串行调用, 即使这些接口调用并无依赖性.
6+
7+
此框架目的旨在保持开发的简便性, 并同时支持并发和数据复用
8+
59
## 原理
610

7-
## 作者
11+
1. CountDownLatch + Future
12+
2. 目标参数依赖中, 需要查询接口获取的数据, 封装为一个任务交给线程池处理, 处理完成之后, 再统一目标方法
13+
14+
## 使用方法
15+
16+
- DataBeanProvider 定义数据model提供者
17+
- DataBeanConsumer 定义方法将要消费的model
18+
- InvokeParameter 指定用户手动传入的参数
19+
- DataBeanAggregateQueryFacade 查询指定的model
20+
21+
## 示例
22+
23+
开发一个用户汇总数据接口, 包括用户的用户基础信息和博客列表
24+
25+
**1. 定义提供基础数据的"原子"服务**
26+
27+
使用DataBeanProvider定义要提供的数据, 使用InvokeParameter指定查询时要传递的参数
28+
29+
博客列表服务, 需要参数userId
30+
31+
```java
32+
@Service
33+
public class PostServiceImpl implements PostService {
34+
@DataBeanProvider(id = "posts")
35+
@Override
36+
public List<Post> getPosts(@InvokeParameter("userId") Long userId) {
37+
try {
38+
Thread.sleep(1000L);
39+
} catch (InterruptedException e) {
40+
//
41+
}
42+
Post post = new Post();
43+
post.setTitle("spring data aggregate example");
44+
post.setContent("No active profile set, falling back to default profiles");
45+
return Collections.singletonList(post);
46+
}
47+
}
48+
49+
```
50+
51+
用户基础信息查询服务, 需要参数userId
52+
53+
```java
54+
@Service
55+
public class UserServiceImpl implements UserService {
56+
57+
@DataBeanProvider(id = "user")
58+
@Override
59+
public User get(@InvokeParameter("userId") Long id) {
60+
/* */
61+
try {
62+
Thread.sleep(100L);
63+
} catch (InterruptedException e) {
64+
//
65+
}
66+
/* mock a user*/
67+
User user = new User();
68+
user.setId(id);
69+
user.setEmail("[email protected]");
70+
user.setUsername("lvyahui8");
71+
return user;
72+
}
73+
}
74+
```
75+
76+
**2. 定义并实现聚合层**
77+
78+
组合DataBeanProvider\DataBeanConsumer\InvokeParameter实现汇聚功能
79+
80+
```java
81+
@Component
82+
public class UserAggregate {
83+
@DataBeanProvider(id="userWithPosts")
84+
public User userWithPosts(
85+
@DataBeanConsumer(id = "user") User user,
86+
@DataBeanConsumer(id = "posts") List<Post> posts) {
87+
user.setPosts(posts);
88+
return user;
89+
}
90+
}
91+
```
92+
93+
**3. 调用聚合层接口**
94+
95+
注解了DataBeanProvider方法的接口都不能直接调用, 而需要通过门面类DataBeanAggregateQueryFacade访问.
96+
97+
指定要查询的DataBeanProvider.id, 查询参数, 返回值类型即可
98+
99+
```java
100+
@SpringBootApplication
101+
public class ExampleApplication {
102+
public static void main(String[] args) throws Exception {
103+
ConfigurableApplicationContext context = SpringApplication.run(ExampleApplication.class);
104+
DataBeanAggregateQueryFacade queryFacade = context.getBean(DataBeanAggregateQueryFacade.class);
105+
User user = queryFacade.get("userWithPosts", Collections.singletonMap("userId",1L), User.class);
106+
Assert.notNull(user,"user not null");
107+
Assert.notNull(user.getPosts(),"user posts not null");
108+
System.out.println(user);
109+
}
110+
}
111+
```
112+
113+
**运行结果**
114+
115+
可以看到, user 和posts是由异步线程进行的查询, 而userWithPosts是主调线程
116+
117+
其中
118+
119+
- 基础user信息查询时间 1000ms
120+
- 用户博客列表查询时间 1000ms
121+
- **总的查询时间 1005ms**
122+
123+
```
124+
2019-06-03 23:56:52.254 INFO 9088 --- [aggregateTask-2] f.s.a.s.DataBeanAgregateQueryServiceImpl : query id: posts, costTime: 1000ms, model: List, params: 1
125+
2019-06-03 23:56:52.254 INFO 9088 --- [aggregateTask-1] f.s.a.s.DataBeanAgregateQueryServiceImpl : query id: user, costTime: 1000ms, model: User, params: 1
126+
2019-06-03 23:56:52.255 INFO 9088 --- [ main] f.s.a.s.DataBeanAgregateQueryServiceImpl : query id: userWithPosts, costTime: 1005ms, model: User, params: User...........
127+
```
128+
129+
## 作者
130+
131+

pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,14 @@
6262
<artifactId>spring-boot-data-aggregate-example</artifactId>
6363
<version>${project.version}</version>
6464
</dependency>
65+
66+
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
67+
<dependency>
68+
<groupId>org.apache.commons</groupId>
69+
<artifactId>commons-lang3</artifactId>
70+
<version>3.9</version>
71+
</dependency>
72+
6573
</dependencies>
6674
</dependencyManagement>
6775

spring-boot-data-aggregate-autoconfigure/src/main/java/org/feego/spring/annotation/DataBeanProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,5 @@
1313
@Documented
1414
public @interface DataBeanProvider {
1515
String id();
16-
long timeout() default 1000;
16+
long timeout() default 10000;
1717
}
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
package org.feego.spring.annotation;
22

3+
import java.lang.annotation.*;
4+
35
/**
46
* 数据聚合时需要的输入参数
57
*
68
79
* @since 2019/6/2 16:32
810
*/
11+
@Target({ElementType.PARAMETER})
12+
@Retention(RetentionPolicy.RUNTIME)
13+
@Documented
914
public @interface InvokeParameter {
1015
String value();
1116
}

spring-boot-data-aggregate-autoconfigure/src/main/java/org/feego/spring/autoconfigure/BeanAggregateAutoConfiguration.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@
33
import lombok.extern.slf4j.Slf4j;
44
import org.feego.spring.aggregate.facade.DataBeanAggregateQueryFacade;
55
import org.feego.spring.aggregate.facade.impl.DataBeanAggregateQueryFacadeImpl;
6-
import org.feego.spring.aggregate.model.DataProvider;
6+
import org.feego.spring.aggregate.model.*;
77
import org.feego.spring.aggregate.repository.DataProviderRepository;
88
import org.feego.spring.aggregate.repository.impl.DataProviderRepositoryImpl;
99
import org.feego.spring.aggregate.service.DataBeanAgregateQueryServiceImpl;
10+
import org.feego.spring.annotation.DataBeanConsumer;
1011
import org.feego.spring.annotation.DataBeanProvider;
12+
import org.feego.spring.annotation.InvokeParameter;
1113
import org.reflections.Reflections;
1214
import org.reflections.scanners.MethodAnnotationsScanner;
1315
import org.springframework.beans.BeansException;
@@ -21,6 +23,9 @@
2123
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
2224

2325
import java.lang.reflect.Method;
26+
import java.lang.reflect.Parameter;
27+
import java.util.ArrayList;
28+
import java.util.List;
2429
import java.util.Set;
2530
import java.util.concurrent.ExecutorService;
2631
import java.util.concurrent.LinkedBlockingDeque;
@@ -65,7 +70,7 @@ public ExecutorService aggregateExecutorService() {
6570
properties.getThreadNumber() < 4 ? 4 : properties.getThreadNumber() ,
6671
2L, TimeUnit.HOURS,
6772
new LinkedBlockingDeque<>(properties.getQueueSize()),
68-
new CustomizableThreadFactory("aggregateThread-"));
73+
new CustomizableThreadFactory("aggregateTask-"));
6974
}
7075

7176
private void scanProviders(DataProviderRepository repository) {
@@ -79,6 +84,35 @@ private void scanProviders(DataProviderRepository repository) {
7984
provider.setId(beanProvider.id());
8085
provider.setMethod(method);
8186
provider.setTimeout(beanProvider.timeout());
87+
Parameter[] parameters = provider.getMethod().getParameters();
88+
List<MethodArg> methodArgs = new ArrayList<>(method.getParameterCount());
89+
provider.setDepends(new ArrayList<>(method.getParameterCount()));
90+
provider.setParams(new ArrayList<>(method.getParameterCount()));
91+
for (Parameter parameter : parameters) {
92+
MethodArg methodArg = new MethodArg();
93+
DataBeanConsumer bean = parameter.getAnnotation(DataBeanConsumer.class);
94+
InvokeParameter invokeParameter = parameter.getAnnotation(InvokeParameter.class);
95+
if(bean != null) {
96+
methodArg.setAnnotionKey(bean.id());
97+
methodArg.setDenpendType(DenpendType.OTHER_MODEL);
98+
DataDepend dataDepend = new DataDepend();
99+
dataDepend.setClazz(parameter.getType());
100+
dataDepend.setId(bean.id());
101+
provider.getDepends().add(dataDepend);
102+
} else if (invokeParameter != null){
103+
methodArg.setAnnotionKey(invokeParameter.value());
104+
methodArg.setDenpendType(DenpendType.INVOKE_PARAM);
105+
InvokeParam param = new InvokeParam();
106+
param.setKey(param.getKey());
107+
provider.getParams().add(param);
108+
} else {
109+
throw new IllegalArgumentException(
110+
"paramter must ananotion by InvokeParameter or DataBeanConsumer");
111+
}
112+
methodArg.setParameter(parameter);
113+
methodArgs.add(methodArg);
114+
}
115+
provider.setMethodArgs(methodArgs);
82116
repository.put(beanProvider.id(),provider);
83117
}
84118
}

spring-boot-data-aggregate-core/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@
2626
<groupId>org.springframework.boot</groupId>
2727
<artifactId>spring-boot-starter</artifactId>
2828
</dependency>
29+
<dependency>
30+
<groupId>org.apache.commons</groupId>
31+
<artifactId>commons-lang3</artifactId>
32+
</dependency>
2933
</dependencies>
3034

3135
<build>
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package org.feego.spring.aggregate.facade;
22

3+
import java.lang.reflect.InvocationTargetException;
34
import java.util.Map;
45

56
/**
67
78
* @since 2019/6/1 0:22
89
*/
910
public interface DataBeanAggregateQueryFacade {
10-
<T> T get(String id, Map<String,Object> invokeParams, Class<T> clazz);
11+
<T> T get(String id, Map<String,Object> invokeParams, Class<T> clazz) throws InterruptedException, IllegalAccessException, InvocationTargetException;
1112
}

spring-boot-data-aggregate-core/src/main/java/org/feego/spring/aggregate/facade/impl/DataBeanAggregateQueryFacadeImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import org.feego.spring.aggregate.service.DataBeanAgregateQueryService;
55
import org.springframework.util.Assert;
66

7+
import java.lang.reflect.InvocationTargetException;
78
import java.util.Collections;
89
import java.util.Map;
910

@@ -20,7 +21,7 @@ public DataBeanAggregateQueryFacadeImpl(DataBeanAgregateQueryService dataBeanAgr
2021
}
2122

2223
@Override
23-
public <T> T get(String id, Map<String,Object> invokeParams, Class<T> clazz) {
24+
public <T> T get(String id, Map<String,Object> invokeParams, Class<T> clazz) throws InterruptedException, IllegalAccessException, InvocationTargetException {
2425
Assert.notNull(id,"id must be not null!");
2526
Assert.notNull(clazz,"clazz must be not null !");
2627
if(invokeParams == null) {

spring-boot-data-aggregate-core/src/main/java/org/feego/spring/aggregate/model/DataProvider.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,5 @@ public class DataProvider {
1616
private Long timeout;
1717
private List<DataDepend> depends;
1818
private List<InvokeParam> params;
19+
private List<MethodArg> methodArgs;
1920
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package org.feego.spring.aggregate.model;
2+
3+
/**
4+
5+
* @since 2019/6/3 22:44
6+
*/
7+
public enum DenpendType {
8+
INVOKE_PARAM,
9+
OTHER_MODEL
10+
}

0 commit comments

Comments
 (0)