Skip to content

Commit e9a3104

Browse files
committed
refactor: 优化事务工具
1 parent 5cb7fe4 commit e9a3104

File tree

5 files changed

+129
-10
lines changed

5 files changed

+129
-10
lines changed

hsweb-commons/hsweb-commons-crud/src/main/java/org/hswebframework/web/crud/configuration/R2dbcSqlExecutorConfiguration.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,17 @@
55
import org.hswebframework.ezorm.rdb.executor.reactive.ReactiveSqlExecutor;
66
import org.hswebframework.ezorm.rdb.executor.reactive.ReactiveSyncSqlExecutor;
77
import org.hswebframework.web.crud.sql.DefaultR2dbcExecutor;
8+
import org.hswebframework.web.crud.utils.TransactionUtils;
9+
import org.springframework.beans.factory.SmartInitializingSingleton;
810
import org.springframework.boot.autoconfigure.AutoConfiguration;
911
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
1012
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
11-
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
1213
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
1314
import org.springframework.context.annotation.Bean;
14-
import org.springframework.context.annotation.Configuration;
15+
import org.springframework.transaction.ReactiveTransactionManager;
1516

1617
@AutoConfiguration
17-
@AutoConfigureAfter(name = "org.springframework.boot.autoconfigure.r2dbc.R2dbcAutoConfiguration")
18+
@AutoConfigureAfter(name = "org.springframeworrk.boot.autoconfigure.r2dbc.R2dbcAutoConfiguration")
1819
@ConditionalOnBean(ConnectionFactory.class)
1920
public class R2dbcSqlExecutorConfiguration {
2021
@Bean
@@ -31,4 +32,10 @@ public ReactiveSqlExecutor reactiveSqlExecutor(EasyormProperties properties) {
3132
public SyncSqlExecutor syncSqlExecutor(ReactiveSqlExecutor reactiveSqlExecutor) {
3233
return ReactiveSyncSqlExecutor.of(reactiveSqlExecutor);
3334
}
35+
36+
@Bean
37+
public SmartInitializingSingleton transactionUtilsSetup(ReactiveTransactionManager transactionManager){
38+
TransactionUtils.setup(transactionManager);
39+
return ()->{};
40+
}
3441
}

hsweb-commons/hsweb-commons-crud/src/main/java/org/hswebframework/web/crud/utils/TransactionUtils.java

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,29 +3,70 @@
33
import lombok.NonNull;
44
import lombok.extern.slf4j.Slf4j;
55
import org.springframework.transaction.NoTransactionException;
6-
import org.springframework.transaction.reactive.TransactionContextManager;
6+
import org.springframework.transaction.ReactiveTransactionManager;
7+
import org.springframework.transaction.TransactionDefinition;
8+
import org.springframework.transaction.TransactionManager;
79
import org.springframework.transaction.reactive.TransactionSynchronization;
810
import org.springframework.transaction.reactive.TransactionSynchronizationManager;
11+
import org.springframework.transaction.reactive.TransactionalOperator;
12+
import org.springframework.transaction.support.DefaultTransactionDefinition;
13+
import reactor.core.publisher.Flux;
914
import reactor.core.publisher.Mono;
1015

1116
import java.util.function.Function;
1217

18+
import static org.springframework.transaction.TransactionDefinition.PROPAGATION_REQUIRES_NEW;
19+
1320
@Slf4j
1421
public class TransactionUtils {
1522

23+
static TransactionManager transactionManager;
24+
25+
public static void setup(TransactionManager transactionManager) {
26+
TransactionUtils.transactionManager = transactionManager;
27+
}
28+
29+
public static <T> Mono<T> tryRunInTransaction(Mono<T> task, TransactionDefinition definition) {
30+
if (transactionManager instanceof ReactiveTransactionManager tx) {
31+
TransactionalOperator requiresNew =
32+
TransactionalOperator.create(
33+
tx,
34+
definition);
35+
return requiresNew.transactional(task);
36+
}
37+
return task;
38+
}
39+
40+
public static <T> Flux<T> tryRunInTransaction(Flux<T> task, TransactionDefinition definition) {
41+
if (transactionManager instanceof ReactiveTransactionManager tx) {
42+
TransactionalOperator requiresNew =
43+
TransactionalOperator.create(
44+
tx,
45+
definition);
46+
return requiresNew.transactional(task);
47+
}
48+
return task;
49+
}
50+
1651
public static Mono<Void> afterCommit(Mono<Void> task) {
1752
return TransactionUtils.registerSynchronization(
1853
new TransactionSynchronization() {
1954
@Override
2055
@NonNull
2156
public Mono<Void> afterCommit() {
22-
return task;
57+
return tryRunInTransaction(task, new DefaultTransactionDefinition(PROPAGATION_REQUIRES_NEW));
2358
}
2459
},
2560
TransactionSynchronization::afterCommit
2661
);
2762
}
2863

64+
/**
65+
* @param synchronization TransactionSynchronization
66+
* @param whenNoTransaction TransactionSynchronization
67+
* @return TransactionSynchronization
68+
* @see TransactionUtils#tryRunInTransaction(Flux, TransactionDefinition)
69+
*/
2970
public static Mono<Void> registerSynchronization(TransactionSynchronization synchronization,
3071
Function<TransactionSynchronization, Mono<Void>> whenNoTransaction) {
3172
return TransactionSynchronizationManager

hsweb-commons/hsweb-commons-crud/src/test/java/org/hswebframework/web/crud/CrudTests.java

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,45 @@
11
package org.hswebframework.web.crud;
22

3+
import lombok.SneakyThrows;
34
import org.hswebframework.web.crud.entity.CustomTestEntity;
45
import org.hswebframework.web.crud.entity.TestEntity;
56
import org.hswebframework.web.crud.events.EntityBeforeModifyEvent;
67
import org.hswebframework.web.crud.service.CustomTestCustom;
78
import org.hswebframework.web.crud.service.TestEntityService;
9+
import org.hswebframework.web.crud.utils.TransactionUtils;
810
import org.junit.Assert;
911
import org.junit.Test;
1012
import org.junit.runner.RunWith;
1113
import org.springframework.beans.factory.annotation.Autowired;
1214
import org.springframework.boot.test.context.SpringBootTest;
1315
import org.springframework.context.event.EventListener;
1416
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
17+
import org.springframework.transaction.reactive.TransactionalOperator;
18+
import reactor.core.Disposable;
19+
import reactor.core.Disposables;
20+
import reactor.core.publisher.Flux;
1521
import reactor.core.publisher.Mono;
22+
import reactor.core.scheduler.Schedulers;
1623
import reactor.test.StepVerifier;
24+
import reactor.util.context.Context;
1725

18-
@SpringBootTest(classes = {TestApplication.class,TestEntityService.class, CustomTestCustom.class})
26+
import java.util.concurrent.CountDownLatch;
27+
import java.util.concurrent.TimeUnit;
28+
29+
@SpringBootTest(classes = {TestApplication.class, TestEntityService.class, CustomTestCustom.class},
30+
properties = {
31+
"spring.r2dbc.pool.enabled=true",
32+
"spring.r2dbc.pool.max-size=32",
33+
"logging.level.org.springframework.r2dbc.connection=debug"
34+
})
1935
@RunWith(SpringJUnit4ClassRunner.class)
2036
public class CrudTests {
2137

2238
@Autowired
2339
private TestEntityService service;
2440

41+
@Autowired
42+
private TransactionalOperator transactionalOperator;
2543

2644
@Test
2745
public void test() {
@@ -53,4 +71,52 @@ public void test() {
5371
.expectNext(1)
5472
.verifyComplete();
5573
}
74+
75+
@Test
76+
@SneakyThrows
77+
public void testMultiThread() {
78+
Flux.range(0, 100)
79+
.map(e -> {
80+
CustomTestEntity entity = new CustomTestEntity();
81+
entity.setExt("xxx-" + e);
82+
entity.setAge(1);
83+
entity.setName("mt-" + e);
84+
return entity;
85+
})
86+
.cast(TestEntity.class)
87+
.as(service::save)
88+
.block();
89+
90+
Disposable.Swap disposable = Disposables.swap();
91+
92+
CountDownLatch latch = new CountDownLatch(50);
93+
disposable.update(
94+
service
95+
.createQuery()
96+
.like(CustomTestEntity::getName, "mt-%")
97+
.fetch()
98+
.flatMap(e -> service
99+
.updateById(e.getId(), e)
100+
.flatMap(i -> TransactionUtils
101+
.afterCommit(Mono.deferContextual((c) -> service
102+
.updateById(e.getId(), e)
103+
.doOnNext(x -> {
104+
latch.countDown();
105+
if (latch.getCount() <= 0) {
106+
disposable.dispose();
107+
}
108+
})
109+
//.as(transactionalOperator::transactional)
110+
.subscribeOn(Schedulers.boundedElastic())
111+
.then())))
112+
// .subscribeOn(Schedulers.boundedElastic())
113+
)
114+
// .as(transactionalOperator::transactional)
115+
.subscribe()
116+
);
117+
Assert.assertTrue(latch.await(20, TimeUnit.SECONDS));
118+
119+
Thread.sleep(2000);
120+
121+
}
56122
}

hsweb-commons/hsweb-commons-crud/src/test/resources/application.yml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@ logging:
33
org.hswebframework: debug
44
org.springframework.transaction: debug
55
org.springframework.data.r2dbc.connectionfactory: debug
6-
#spring:
7-
# r2dbc:
6+
"org.springframework.transaction.reactive": debug
7+
spring:
8+
r2dbc:
9+
pool:
10+
max-acquire-time: 1s
811
#
912
easyorm:
1013
default-schema: PUBLIC

hsweb-core/src/main/java/org/hswebframework/web/bean/FastBeanCopier.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.hswebframework.web.bean;
22

33
import com.google.common.collect.Maps;
4+
import io.netty.util.internal.ConcurrentSet;
45
import lombok.AllArgsConstructor;
56
import lombok.Getter;
67
import lombok.SneakyThrows;
@@ -24,6 +25,7 @@
2425
import java.lang.reflect.Field;
2526
import java.util.*;
2627
import java.util.concurrent.ConcurrentHashMap;
28+
import java.util.concurrent.ConcurrentMap;
2729
import java.util.function.BiFunction;
2830
import java.util.function.Function;
2931
import java.util.function.Supplier;
@@ -550,7 +552,7 @@ public void setBeanFactory(BeanFactory beanFactory) {
550552

551553
public Collection<?> newCollection(Class<?> targetClass) {
552554

553-
if (targetClass == List.class) {
555+
if (targetClass == List.class || targetClass == Collection.class) {
554556
return new ArrayList<>();
555557
} else if (targetClass == ConcurrentHashMap.KeySetView.class) {
556558
return ConcurrentHashMap.newKeySet();
@@ -560,7 +562,7 @@ public Collection<?> newCollection(Class<?> targetClass) {
560562
return new LinkedList<>();
561563
} else {
562564
try {
563-
return (Collection<?>) targetClass.newInstance();
565+
return (Collection<?>) targetClass.getConstructor().newInstance();
564566
} catch (Exception e) {
565567
throw new UnsupportedOperationException("不支持的类型:" + targetClass, e);
566568
}

0 commit comments

Comments
 (0)