Skip to content

Commit 04af57c

Browse files
authored
[Improvement-17646][JdbcRegistry] Using transaction in JdbcRegistryDataManager (#17754)
1 parent ade9d39 commit 04af57c

File tree

7 files changed

+106
-106
lines changed

7 files changed

+106
-106
lines changed

docs/docs/en/guide/installation/registry-plugins/jdbc.md

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,19 +38,13 @@ registry:
3838
heartbeat-refresh-interval: 3s
3939
# Once the client's heartbeat is not refresh in this time, the server will consider the client is offline.
4040
session-timeout: 60s
41-
# The hikari configuration, default will use the same datasource pool as DolphinScheduler.
42-
hikari-config:
43-
jdbc-url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler
44-
username: root
45-
password: root
46-
maximum-pool-size: 5
47-
connection-timeout: 9000
48-
idle-timeout: 600000
4941
```
5042

51-
## Use different database configuration for jdbc registry center
43+
## Set DataSource for worker
5244

53-
You need to set the registry properties in master/worker/api's application.yml
45+
Since worker server doesn't contain datasource, so you need to config datasource for worker.
46+
47+
You need to set the registry hikari-config properties at worker's application.yml
5448

5549
### Use Mysql as registry center
5650

docs/docs/zh/guide/installation/registry-plugins/jdbc.md

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,19 +32,11 @@ registry:
3232
heartbeat-refresh-interval: 3s
3333
# Once the client's heartbeat is not refresh in this time, the server will consider the client is offline.
3434
session-timeout: 60s
35-
# The hikari configuration, default will use the same datasource pool as DolphinScheduler.
36-
hikari-config:
37-
jdbc-url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler
38-
username: root
39-
password: root
40-
maximum-pool-size: 5
41-
connection-timeout: 9000
42-
idle-timeout: 600000
4335
```
4436

45-
## 对 JDBC 注册中心使用不同的数据库配置
37+
## 为 worker 配置数据源
4638

47-
需要在 master/worker/api 的 application.yml 中设置属性
39+
由于Worker服务默认不包含数据源,因此你需要在 worker 的 application.yml 中为注册中心设置数据源属性
4840

4941
### 使用 MySQL 作为注册中心
5042

dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryAutoConfiguration.java

Lines changed: 28 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,6 @@
1717

1818
package org.apache.dolphinscheduler.plugin.registry.jdbc;
1919

20-
import org.apache.dolphinscheduler.plugin.registry.jdbc.mapper.JdbcRegistryClientHeartbeatMapper;
21-
import org.apache.dolphinscheduler.plugin.registry.jdbc.mapper.JdbcRegistryDataChangeEventMapper;
22-
import org.apache.dolphinscheduler.plugin.registry.jdbc.mapper.JdbcRegistryDataMapper;
23-
import org.apache.dolphinscheduler.plugin.registry.jdbc.mapper.JdbcRegistryLockMapper;
2420
import org.apache.dolphinscheduler.plugin.registry.jdbc.repository.JdbcRegistryClientRepository;
2521
import org.apache.dolphinscheduler.plugin.registry.jdbc.repository.JdbcRegistryDataChangeEventRepository;
2622
import org.apache.dolphinscheduler.plugin.registry.jdbc.repository.JdbcRegistryDataRepository;
@@ -30,6 +26,8 @@
3026

3127
import org.apache.ibatis.session.SqlSessionFactory;
3228

29+
import javax.sql.DataSource;
30+
3331
import lombok.extern.slf4j.Slf4j;
3432

3533
import org.mybatis.spring.SqlSessionTemplate;
@@ -40,6 +38,9 @@
4038
import org.springframework.context.annotation.Bean;
4139
import org.springframework.context.annotation.ComponentScan;
4240
import org.springframework.context.annotation.Configuration;
41+
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
42+
import org.springframework.transaction.PlatformTransactionManager;
43+
import org.springframework.transaction.support.TransactionTemplate;
4344

4445
import com.baomidou.mybatisplus.autoconfigure.MybatisPlusAutoConfiguration;
4546
import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
@@ -62,13 +63,15 @@ public IJdbcRegistryServer jdbcRegistryServer(JdbcRegistryDataRepository jdbcReg
6263
JdbcRegistryLockRepository jdbcRegistryLockRepository,
6364
JdbcRegistryClientRepository jdbcRegistryClientRepository,
6465
JdbcRegistryDataChangeEventRepository jdbcRegistryDataChangeEventRepository,
65-
JdbcRegistryProperties jdbcRegistryProperties) {
66+
JdbcRegistryProperties jdbcRegistryProperties,
67+
TransactionTemplate jdbcTransactionTemplate) {
6668
return new JdbcRegistryServer(
6769
jdbcRegistryDataRepository,
6870
jdbcRegistryLockRepository,
6971
jdbcRegistryClientRepository,
7072
jdbcRegistryDataChangeEventRepository,
71-
jdbcRegistryProperties);
73+
jdbcRegistryProperties,
74+
jdbcTransactionTemplate);
7275
}
7376

7477
@Bean
@@ -81,42 +84,38 @@ public JdbcRegistry jdbcRegistry(JdbcRegistryProperties jdbcRegistryProperties,
8184

8285
@Bean
8386
@ConditionalOnMissingBean
84-
public SqlSessionFactory sqlSessionFactory(JdbcRegistryProperties jdbcRegistryProperties) throws Exception {
85-
log.info("Initialize jdbcRegistrySqlSessionFactory");
86-
MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
87-
sqlSessionFactoryBean.setDataSource(new HikariDataSource(jdbcRegistryProperties.getHikariConfig()));
88-
return sqlSessionFactoryBean.getObject();
87+
public DataSource jdbcRegistryDataSource(JdbcRegistryProperties jdbcRegistryProperties) {
88+
return new HikariDataSource(jdbcRegistryProperties.getHikariConfig());
8989
}
9090

9191
@Bean
9292
@ConditionalOnMissingBean
93-
public SqlSessionTemplate sqlSessionTemplate(SqlSessionFactory jdbcRegistrySqlSessionFactory) {
94-
log.info("Initialize jdbcRegistrySqlSessionTemplate");
95-
return new SqlSessionTemplate(jdbcRegistrySqlSessionFactory);
96-
}
97-
98-
@Bean
99-
public JdbcRegistryDataMapper jdbcRegistryDataMapper(SqlSessionTemplate jdbcRegistrySqlSessionTemplate) {
100-
jdbcRegistrySqlSessionTemplate.getConfiguration().addMapper(JdbcRegistryDataMapper.class);
101-
return jdbcRegistrySqlSessionTemplate.getMapper(JdbcRegistryDataMapper.class);
93+
public PlatformTransactionManager jdbcRegistryTransactionManager(DataSource jdbcRegistryDataSource) {
94+
DataSourceTransactionManager transactionManager = new DataSourceTransactionManager();
95+
transactionManager.setDataSource(jdbcRegistryDataSource);
96+
return transactionManager;
10297
}
10398

10499
@Bean
105-
public JdbcRegistryLockMapper jdbcRegistryLockMapper(SqlSessionTemplate jdbcRegistrySqlSessionTemplate) {
106-
jdbcRegistrySqlSessionTemplate.getConfiguration().addMapper(JdbcRegistryLockMapper.class);
107-
return jdbcRegistrySqlSessionTemplate.getMapper(JdbcRegistryLockMapper.class);
100+
@ConditionalOnMissingBean
101+
public TransactionTemplate jdbcTransactionTemplate(PlatformTransactionManager jdbcRegistryTransactionManager) {
102+
TransactionTemplate transactionTemplate = new TransactionTemplate();
103+
transactionTemplate.setTransactionManager(jdbcRegistryTransactionManager);
104+
return transactionTemplate;
108105
}
109106

110107
@Bean
111-
public JdbcRegistryDataChangeEventMapper jdbcRegistryDataChangeEventMapper(SqlSessionTemplate jdbcRegistrySqlSessionTemplate) {
112-
jdbcRegistrySqlSessionTemplate.getConfiguration().addMapper(JdbcRegistryDataChangeEventMapper.class);
113-
return jdbcRegistrySqlSessionTemplate.getMapper(JdbcRegistryDataChangeEventMapper.class);
108+
@ConditionalOnMissingBean
109+
public SqlSessionFactory jdbcRegistrySqlSessionFactory(DataSource jdbcRegistryDataSource) throws Exception {
110+
MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
111+
sqlSessionFactoryBean.setDataSource(jdbcRegistryDataSource);
112+
return sqlSessionFactoryBean.getObject();
114113
}
115114

116115
@Bean
117-
public JdbcRegistryClientHeartbeatMapper jdbcRegistryClientHeartbeatMapper(SqlSessionTemplate jdbcRegistrySqlSessionTemplate) {
118-
jdbcRegistrySqlSessionTemplate.getConfiguration().addMapper(JdbcRegistryClientHeartbeatMapper.class);
119-
return jdbcRegistrySqlSessionTemplate.getMapper(JdbcRegistryClientHeartbeatMapper.class);
116+
@ConditionalOnMissingBean
117+
public SqlSessionTemplate jdbcRegistrySqlSessionTemplate(SqlSessionFactory jdbcRegistrySqlSessionFactory) {
118+
return new SqlSessionTemplate(jdbcRegistrySqlSessionFactory);
120119
}
121120

122121
}

dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryProperties.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,13 @@ public void validate(Object target, Errors errors) {
6565
JdbcRegistryProperties jdbcRegistryProperties = (JdbcRegistryProperties) target;
6666
if (jdbcRegistryProperties.getHeartbeatRefreshInterval().compareTo(MIN_HEARTBEAT_REFRESH_INTERVAL) < 0) {
6767
errors.rejectValue("heartbeatRefreshInterval", "heartbeatRefreshInterval",
68-
"heartbeatRefreshInterval must be greater than 1s");
68+
"registry.heartbeatRefreshInterval must be greater than 1s");
6969
}
7070

7171
if (jdbcRegistryProperties.getSessionTimeout().toMillis() < 3
7272
* jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis()) {
7373
errors.rejectValue("sessionTimeout", "sessionTimeout",
74-
"sessionTimeout must be greater than 3 * heartbeatRefreshInterval");
74+
"registry.sessionTimeout must be greater than 3 * heartbeatRefreshInterval");
7575
}
7676
if (StringUtils.isEmpty(jdbcRegistryClientName)) {
7777
jdbcRegistryClientName = NetUtils.getHost() + ":" + serverPort;
@@ -86,7 +86,6 @@ private void print() {
8686
"\n jdbcRegistryClientName -> " + jdbcRegistryClientName +
8787
"\n heartbeatRefreshInterval -> " + heartbeatRefreshInterval +
8888
"\n sessionTimeout -> " + sessionTimeout +
89-
"\n hikariConfig -> " + hikariConfig +
9089
"\n****************************JdbcRegistryProperties**************************************";
9190
log.info(config);
9291
}

dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryDataManager.java

Lines changed: 62 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141

4242
import lombok.extern.slf4j.Slf4j;
4343

44+
import org.springframework.transaction.support.TransactionTemplate;
45+
4446
import com.google.common.collect.Lists;
4547

4648
@Slf4j
@@ -57,23 +59,27 @@ public class JdbcRegistryDataManager
5759

5860
private final JdbcRegistryDataChangeEventRepository jdbcRegistryDataChangeEventRepository;
5961

62+
private final TransactionTemplate jdbcRegistryTransactionTemplate;
63+
6064
private final List<RegistryRowChangeListener<JdbcRegistryDataDTO>> registryRowChangeListeners;
6165

6266
private long lastDetectedJdbcRegistryDataChangeEventId = -1;
6367

6468
public JdbcRegistryDataManager(JdbcRegistryProperties registryProperties,
6569
JdbcRegistryDataRepository jdbcRegistryDataRepository,
66-
JdbcRegistryDataChangeEventRepository jdbcRegistryDataChangeEventRepository) {
70+
JdbcRegistryDataChangeEventRepository jdbcRegistryDataChangeEventRepository,
71+
TransactionTemplate jdbcRegistryTransactionTemplate) {
6772
this.registryProperties = registryProperties;
6873
this.jdbcRegistryDataChangeEventRepository = jdbcRegistryDataChangeEventRepository;
6974
this.jdbcRegistryDataRepository = jdbcRegistryDataRepository;
75+
this.jdbcRegistryTransactionTemplate = jdbcRegistryTransactionTemplate;
7076
this.registryRowChangeListeners = new CopyOnWriteArrayList<>();
71-
this.lastDetectedJdbcRegistryDataChangeEventId =
72-
jdbcRegistryDataChangeEventRepository.getMaxJdbcRegistryDataChangeEventId();
7377
}
7478

7579
@Override
7680
public void start() {
81+
this.lastDetectedJdbcRegistryDataChangeEventId =
82+
jdbcRegistryDataChangeEventRepository.getMaxJdbcRegistryDataChangeEventId();
7783
JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().scheduleWithFixedDelay(
7884
this::detectJdbcRegistryDataChangeEvent,
7985
registryProperties.getHeartbeatRefreshInterval().toMillis(),
@@ -162,67 +168,73 @@ public void putJdbcRegistryData(Long clientId, String key, String value, DataTyp
162168
checkNotNull(key);
163169
checkNotNull(dataType);
164170

165-
Optional<JdbcRegistryDataDTO> jdbcRegistryDataOptional = jdbcRegistryDataRepository.selectByKey(key);
166-
if (jdbcRegistryDataOptional.isPresent()) {
167-
JdbcRegistryDataDTO jdbcRegistryData = jdbcRegistryDataOptional.get();
168-
if (!dataType.name().equals(jdbcRegistryData.getDataType())) {
169-
throw new UnsupportedOperationException("The data type: " + jdbcRegistryData.getDataType()
170-
+ " of the key: " + key + " cannot be updated");
171-
}
171+
final Optional<JdbcRegistryDataDTO> jdbcRegistryDataOptional = jdbcRegistryDataRepository.selectByKey(key);
172172

173-
if (DataType.EPHEMERAL.name().equals(jdbcRegistryData.getDataType())) {
174-
if (!jdbcRegistryData.getClientId().equals(clientId)) {
175-
throw new UnsupportedOperationException(
176-
"The EPHEMERAL data: " + key + " can only be updated by its owner: "
177-
+ jdbcRegistryData.getClientId() + " but not: " + clientId);
173+
jdbcRegistryTransactionTemplate.execute(status -> {
174+
if (jdbcRegistryDataOptional.isPresent()) {
175+
JdbcRegistryDataDTO jdbcRegistryData = jdbcRegistryDataOptional.get();
176+
if (!dataType.name().equals(jdbcRegistryData.getDataType())) {
177+
throw new UnsupportedOperationException("The data type: " + jdbcRegistryData.getDataType()
178+
+ " of the key: " + key + " cannot be updated");
178179
}
179-
}
180180

181-
jdbcRegistryData.setDataValue(value);
182-
jdbcRegistryData.setLastUpdateTime(new Date());
183-
jdbcRegistryDataRepository.updateById(jdbcRegistryData);
181+
if (DataType.EPHEMERAL.name().equals(jdbcRegistryData.getDataType())) {
182+
if (!jdbcRegistryData.getClientId().equals(clientId)) {
183+
throw new UnsupportedOperationException(
184+
"The EPHEMERAL data: " + key + " can only be updated by its owner: "
185+
+ jdbcRegistryData.getClientId() + " but not: " + clientId);
186+
}
187+
}
184188

185-
JdbcRegistryDataChangeEventDTO jdbcRegistryDataChangeEvent = JdbcRegistryDataChangeEventDTO.builder()
186-
.jdbcRegistryData(jdbcRegistryData)
187-
.eventType(JdbcRegistryDataChangeEventDTO.EventType.UPDATE)
188-
.createTime(new Date())
189-
.build();
190-
jdbcRegistryDataChangeEventRepository.insert(jdbcRegistryDataChangeEvent);
191-
} else {
192-
JdbcRegistryDataDTO jdbcRegistryDataDTO = JdbcRegistryDataDTO.builder()
193-
.clientId(clientId)
194-
.dataKey(key)
195-
.dataValue(value)
196-
.dataType(dataType.name())
197-
.createTime(new Date())
198-
.lastUpdateTime(new Date())
199-
.build();
200-
jdbcRegistryDataRepository.insert(jdbcRegistryDataDTO);
201-
JdbcRegistryDataChangeEventDTO registryDataChangeEvent = JdbcRegistryDataChangeEventDTO.builder()
202-
.jdbcRegistryData(jdbcRegistryDataDTO)
203-
.eventType(JdbcRegistryDataChangeEventDTO.EventType.ADD)
204-
.createTime(new Date())
205-
.build();
206-
jdbcRegistryDataChangeEventRepository.insert(registryDataChangeEvent);
207-
}
189+
jdbcRegistryData.setDataValue(value);
190+
jdbcRegistryData.setLastUpdateTime(new Date());
191+
jdbcRegistryDataRepository.updateById(jdbcRegistryData);
192+
193+
JdbcRegistryDataChangeEventDTO jdbcRegistryDataChangeEvent = JdbcRegistryDataChangeEventDTO.builder()
194+
.jdbcRegistryData(jdbcRegistryData)
195+
.eventType(JdbcRegistryDataChangeEventDTO.EventType.UPDATE)
196+
.createTime(new Date())
197+
.build();
198+
jdbcRegistryDataChangeEventRepository.insert(jdbcRegistryDataChangeEvent);
199+
} else {
200+
JdbcRegistryDataDTO jdbcRegistryDataDTO = JdbcRegistryDataDTO.builder()
201+
.clientId(clientId)
202+
.dataKey(key)
203+
.dataValue(value)
204+
.dataType(dataType.name())
205+
.createTime(new Date())
206+
.lastUpdateTime(new Date())
207+
.build();
208+
jdbcRegistryDataRepository.insert(jdbcRegistryDataDTO);
209+
JdbcRegistryDataChangeEventDTO registryDataChangeEvent = JdbcRegistryDataChangeEventDTO.builder()
210+
.jdbcRegistryData(jdbcRegistryDataDTO)
211+
.eventType(JdbcRegistryDataChangeEventDTO.EventType.ADD)
212+
.createTime(new Date())
213+
.build();
214+
jdbcRegistryDataChangeEventRepository.insert(registryDataChangeEvent);
215+
}
216+
return null;
217+
});
208218

209219
}
210220

211221
@Override
212222
public void deleteJdbcRegistryDataByKey(String key) {
213223
checkNotNull(key);
214-
// todo: this is not atomic, need to be improved
215224
Optional<JdbcRegistryDataDTO> jdbcRegistryDataOptional = jdbcRegistryDataRepository.selectByKey(key);
216225
if (!jdbcRegistryDataOptional.isPresent()) {
217226
return;
218227
}
219-
jdbcRegistryDataRepository.deleteByKey(key);
220-
final JdbcRegistryDataChangeEventDTO registryDataChangeEvent = JdbcRegistryDataChangeEventDTO.builder()
221-
.jdbcRegistryData(jdbcRegistryDataOptional.get())
222-
.eventType(JdbcRegistryDataChangeEventDTO.EventType.DELETE)
223-
.createTime(new Date())
224-
.build();
225-
jdbcRegistryDataChangeEventRepository.insert(registryDataChangeEvent);
228+
jdbcRegistryTransactionTemplate.execute(status -> {
229+
jdbcRegistryDataRepository.deleteByKey(key);
230+
final JdbcRegistryDataChangeEventDTO registryDataChangeEvent = JdbcRegistryDataChangeEventDTO.builder()
231+
.jdbcRegistryData(jdbcRegistryDataOptional.get())
232+
.eventType(JdbcRegistryDataChangeEventDTO.EventType.DELETE)
233+
.createTime(new Date())
234+
.build();
235+
jdbcRegistryDataChangeEventRepository.insert(registryDataChangeEvent);
236+
return null;
237+
});
226238
}
227239

228240
private void doTriggerJdbcRegistryDataAddedListener(List<JdbcRegistryDataDTO> valuesToAdd) {

dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryLockManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public void acquireJdbcRegistryLock(Long clientId, String lockKey) {
7979
// The lock is already exist, wait it release.
8080
continue;
8181
}
82-
log.debug("Acquire the lock {} failed try again", lockKey);
82+
log.debug("{} acquire the lock {} failed try again", lockOwner, lockKey);
8383
// acquire failed, wait and try again
8484
ThreadUtils.sleep(jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis());
8585
}
@@ -123,7 +123,7 @@ public boolean acquireJdbcRegistryLock(Long clientId, String lockKey, long timeo
123123
// The lock is already exist, wait it release.
124124
continue;
125125
}
126-
log.debug("Acquire the lock {} failed try again", lockKey);
126+
log.debug("{} acquire the lock {} failed try again", lockOwner, lockKey);
127127
// acquire failed, wait and try again
128128
ThreadUtils.sleep(jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis());
129129
}

0 commit comments

Comments
 (0)