Skip to content

Commit 0d81193

Browse files
committed
增加任务提交时的切入方式
1 parent e91267c commit 0d81193

File tree

7 files changed

+97
-1
lines changed

7 files changed

+97
-1
lines changed

spring-boot-data-aggregator-autoconfigure/src/main/java/io/github/lvyahui8/spring/autoconfigure/BeanAggregateAutoConfiguration.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ public DataBeanAggregateQueryService dataBeanAggregateQueryService (
136136
service.setRuntimeSettings(runtimeSettings);
137137
service.setExecutorService(aggregateExecutorService());
138138
service.setInterceptorChain(aggregateQueryInterceptorChain());
139+
service.setTaskClazz(properties.getAsyncTaskClass());
139140
service.setApplicationContext(applicationContext);
140141
return service;
141142
}

spring-boot-data-aggregator-autoconfigure/src/main/java/io/github/lvyahui8/spring/autoconfigure/BeanAggregateProperties.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.github.lvyahui8.spring.autoconfigure;
22

3+
import io.github.lvyahui8.spring.aggregate.service.AbstractAsyncQueryTask;
4+
import io.github.lvyahui8.spring.aggregate.service.AsyncQueryTaskAdapter;
35
import lombok.Data;
46
import org.springframework.boot.context.properties.ConfigurationProperties;
57

@@ -38,4 +40,8 @@ public class BeanAggregateProperties {
3840
* Ignore exception thrown by asynchronous execution, method returns null value
3941
*/
4042
private boolean ignoreException = false;
43+
/**
44+
* Async task implement
45+
*/
46+
private Class<? extends AbstractAsyncQueryTask> asyncTaskClass = AsyncQueryTaskAdapter.class;
4147
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package io.github.lvyahui8.spring.aggregate.service;
2+
3+
import lombok.Setter;
4+
5+
import java.util.concurrent.Callable;
6+
7+
/**
8+
9+
* @since 2019/12/25 22:40
10+
*/
11+
public abstract class AbstractAsyncQueryTask<T> implements AsyncQueryTask<T> {
12+
@Setter
13+
private Callable<T> callable;
14+
@Setter
15+
private Thread taskFromThread;
16+
17+
@Override
18+
public T call() throws Exception {
19+
try {
20+
beforeExecute(taskFromThread);
21+
return callable.call();
22+
} finally {
23+
afterExecute(taskFromThread);
24+
}
25+
}
26+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package io.github.lvyahui8.spring.aggregate.service;
2+
3+
import java.util.concurrent.Callable;
4+
5+
/**
6+
7+
* @since 2019/12/24 23:07
8+
*/
9+
public interface AsyncQueryTask<T> extends Callable<T> {
10+
/**
11+
* 任务提交之前执行. 此方法在提交任务的那个线程中执行
12+
*/
13+
void beforeSubmit();
14+
15+
/**
16+
* 任务开始执行前执行. 此方法在异步线程中执行
17+
* @param taskFrom 提交任务的那个线程
18+
*/
19+
void beforeExecute(Thread taskFrom);
20+
21+
/**
22+
* 任务执行结束后执行. 此方法在异步线程中执行
23+
* 注意, 不管用户的方法抛出何种异常, 此方法都会执行.
24+
* @param taskFrom 提交任务的那个线程
25+
*/
26+
void afterExecute(Thread taskFrom);
27+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package io.github.lvyahui8.spring.aggregate.service;
2+
3+
/**
4+
5+
* @since 2019/12/25 22:57
6+
*/
7+
public class AsyncQueryTaskAdapter<T> extends AbstractAsyncQueryTask<T> {
8+
9+
@Override
10+
public void beforeSubmit() {
11+
12+
}
13+
14+
@Override
15+
public void beforeExecute(Thread taskFrom) {
16+
17+
}
18+
19+
@Override
20+
public void afterExecute(Thread taskFrom) {
21+
22+
}
23+
}

spring-boot-data-aggregator-core/src/main/java/io/github/lvyahui8/spring/aggregate/service/impl/DataBeanAggregateQueryServiceImpl.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import io.github.lvyahui8.spring.aggregate.interceptor.AggregateQueryInterceptorChain;
77
import io.github.lvyahui8.spring.aggregate.model.*;
88
import io.github.lvyahui8.spring.aggregate.repository.DataProviderRepository;
9+
import io.github.lvyahui8.spring.aggregate.service.AbstractAsyncQueryTask;
910
import io.github.lvyahui8.spring.aggregate.service.DataBeanAggregateQueryService;
1011
import lombok.Setter;
1112
import lombok.extern.slf4j.Slf4j;
@@ -40,6 +41,9 @@ public class DataBeanAggregateQueryServiceImpl implements DataBeanAggregateQuery
4041
@Setter
4142
private AggregateQueryInterceptorChain interceptorChain;
4243

44+
@Setter
45+
private Class<? extends AbstractAsyncQueryTask> taskClazz ;
46+
4347
private AggregationContext initQueryContext(DataProvideDefinition rootProvider, Map<InvokeSignature,Object> queryCache) {
4448
AggregationContext aggregationContext = new AggregationContext();
4549
aggregationContext.setRootThread(Thread.currentThread());
@@ -139,14 +143,22 @@ private Map<String, Object> getDependObjectMap(Map<String, Object> invokeParams,
139143
Map<String,DataConsumeDefinition> consumeDefinitionMap = new HashMap<>(consumeDefinitions.size());
140144
for (DataConsumeDefinition depend : consumeDefinitions) {
141145
consumeDefinitionMap.put(depend.getId(),depend);
142-
Future<?> future = executorService.submit(() -> {
146+
AbstractAsyncQueryTask queryTask = null;
147+
try {
148+
queryTask = taskClazz.newInstance();
149+
} catch (InstantiationException ignored) {
150+
//
151+
}
152+
queryTask.setCallable(() -> {
143153
try {
144154
Object o = innerGet(repository.get(depend.getId()),invokeParams, depend.getClazz(),context,depend);
145155
return depend.getClazz().cast(o);
146156
} finally {
147157
stopDownLatch.countDown();
148158
}
149159
});
160+
queryTask.setTaskFromThread(Thread.currentThread());
161+
Future<?> future = executorService.submit(queryTask);
150162
futureMap.put(depend.getId() + "_" + depend.getOriginalParameterName(),future);
151163
}
152164
stopDownLatch.await(timeout, TimeUnit.MILLISECONDS);
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
spring.main.banner-mode=off
22
io.github.lvyahui8.spring.base-packages=io.github.lvyahui8.spring.example
33
io.github.lvyahui8.spring.ignore-exception=false
4+
io.github.lvyahui8.spring.async-task-class=io.github.lvyahui8.spring.aggregate.service.AsyncQueryTaskAdapter
45
example.logging=true

0 commit comments

Comments
 (0)