Skip to content

Commit 9df0075

Browse files
committed
refactor: 优化调度器
1 parent 524ed01 commit 9df0075

File tree

1 file changed

+12
-1
lines changed

1 file changed

+12
-1
lines changed

hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/executor/reactive/r2dbc/R2dbcReactiveSqlExecutor.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.reactivestreams.Publisher;
1818
import org.slf4j.Logger;
1919
import reactor.core.publisher.*;
20+
import reactor.core.scheduler.Schedulers;
2021

2122
import java.time.LocalDateTime;
2223
import java.time.ZoneOffset;
@@ -84,11 +85,21 @@ protected <T> Flux<T> doExecute(Operation operation,
8485
Function<Result, Publisher<T>> mapper) {
8586
return Flux
8687
.from(this.prepareStatement(connection.createStatement(request.getSql()), request).execute())
87-
.flatMap(mapper)
88+
.flatMap(result -> {
89+
return transformResult(result, mapper);
90+
})
8891
.doOnSubscribe(subscription -> printSql(logger, request))
8992
.doOnError(err -> logger.error("==> Error: {}", request.toNativeSql(), err));
9093
}
9194

95+
protected <T> Publisher<T> transformResult(Result result,Function<Result, Publisher<T>> mapper){
96+
if (Schedulers.isInNonBlockingThread()) {
97+
return mapper.apply(result);
98+
}
99+
return Flux
100+
.from(mapper.apply(result))
101+
.publishOn(Schedulers.parallel());
102+
}
92103
/**
93104
* 使用指定的Connection执行SQL并返回执行结果
94105
*

0 commit comments

Comments
 (0)