Skip to content
This repository was archived by the owner on May 14, 2025. It is now read-only.

Commit 44a25af

Browse files
cppwfsjvalkeal
authored andcommitted
User can now select transaction Isolation for CTR
resolves #4680
1 parent c3ba57f commit 44a25af

File tree

10 files changed

+58
-14
lines changed

10 files changed

+58
-14
lines changed

result.txt

130 Bytes
Binary file not shown.

spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedBatchConfigurer.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2020 the original author or authors.
2+
* Copyright 2017-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -35,14 +35,15 @@
3535
import org.springframework.boot.autoconfigure.batch.BasicBatchConfigurer;
3636
import org.springframework.boot.autoconfigure.batch.BatchProperties;
3737
import org.springframework.boot.autoconfigure.transaction.TransactionManagerCustomizers;
38+
import org.springframework.cloud.dataflow.composedtaskrunner.properties.ComposedTaskProperties;
3839
import org.springframework.cloud.dataflow.composedtaskrunner.support.ComposedTaskException;
3940
import org.springframework.cloud.dataflow.composedtaskrunner.support.SqlServerSequenceMaxValueIncrementer;
4041
import org.springframework.jdbc.support.MetaDataAccessException;
4142
import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer;
4243
import org.springframework.util.StringUtils;
4344

4445
/**
45-
* A BatchConfigurer for CTR that will establish the transaction isolation lavel to READ_COMMITTED.
46+
* A BatchConfigurer for CTR that will establish the transaction isolation level to ISOLATION_REPEATABLE_READ by default.
4647
*
4748
* @author Glenn Renfro
4849
*/
@@ -54,6 +55,8 @@ public class ComposedBatchConfigurer extends BasicBatchConfigurer {
5455

5556
private Map<String, DataFieldMaxValueIncrementer> incrementerMap;
5657

58+
private ComposedTaskProperties composedTaskProperties;
59+
5760
/**
5861
* Create a new {@link BasicBatchConfigurer} instance.
5962
*
@@ -62,10 +65,12 @@ public class ComposedBatchConfigurer extends BasicBatchConfigurer {
6265
* @param transactionManagerCustomizers transaction manager customizers (or
6366
* {@code null})
6467
*/
65-
protected ComposedBatchConfigurer(BatchProperties properties, DataSource dataSource, TransactionManagerCustomizers transactionManagerCustomizers) {
68+
protected ComposedBatchConfigurer(BatchProperties properties, DataSource dataSource,
69+
TransactionManagerCustomizers transactionManagerCustomizers, ComposedTaskProperties composedTaskProperties) {
6670
super(properties, dataSource, transactionManagerCustomizers);
6771
this.incrementerDataSource = dataSource;
6872
incrementerMap = new HashMap<>();
73+
this.composedTaskProperties = composedTaskProperties;
6974
}
7075

7176
protected JobRepository createJobRepository() {
@@ -85,7 +90,7 @@ public DataFieldMaxValueIncrementer getIncrementer(String incrementerType, Strin
8590
factory.setIncrementerFactory(incrementerFactory);
8691
factory.setDataSource(this.incrementerDataSource);
8792
factory.setTransactionManager(this.getTransactionManager());
88-
factory.setIsolationLevelForCreate("ISOLATION_REPEATABLE_READ");
93+
factory.setIsolationLevelForCreate(this.composedTaskProperties.getTransactionIsolationLevel());
8994
try {
9095
factory.afterPropertiesSet();
9196
return factory.getObject();

spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedTaskRunnerConfiguration.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2020 the original author or authors.
2+
* Copyright 2017-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,7 +21,6 @@
2121
import org.springframework.batch.core.StepExecutionListener;
2222
import org.springframework.batch.core.configuration.annotation.BatchConfigurer;
2323
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
24-
import org.springframework.beans.factory.annotation.Autowired;
2524
import org.springframework.boot.autoconfigure.batch.BatchProperties;
2625
import org.springframework.boot.autoconfigure.transaction.TransactionManagerCustomizers;
2726
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -46,22 +45,19 @@
4645
@Import(org.springframework.cloud.dataflow.composedtaskrunner.StepBeanDefinitionRegistrar.class)
4746
public class ComposedTaskRunnerConfiguration {
4847

49-
@Autowired
50-
private ComposedTaskProperties properties;
51-
5248
@Bean
5349
public StepExecutionListener composedTaskStepExecutionListener(TaskExplorer taskExplorer){
5450
return new org.springframework.cloud.dataflow.composedtaskrunner.ComposedTaskStepExecutionListener(taskExplorer);
5551
}
5652

5753
@Bean
58-
public org.springframework.cloud.dataflow.composedtaskrunner.ComposedRunnerJobFactory composedTaskJob() {
54+
public org.springframework.cloud.dataflow.composedtaskrunner.ComposedRunnerJobFactory composedTaskJob(ComposedTaskProperties properties) {
5955

60-
return new org.springframework.cloud.dataflow.composedtaskrunner.ComposedRunnerJobFactory(this.properties);
56+
return new org.springframework.cloud.dataflow.composedtaskrunner.ComposedRunnerJobFactory(properties);
6157
}
6258

6359
@Bean
64-
public TaskExecutor taskExecutor() {
60+
public TaskExecutor taskExecutor(ComposedTaskProperties properties) {
6561
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
6662
taskExecutor.setCorePoolSize(properties.getSplitThreadCorePoolSize());
6763
taskExecutor.setMaxPoolSize(properties.getSplitThreadMaxPoolSize());
@@ -75,7 +71,10 @@ public TaskExecutor taskExecutor() {
7571
}
7672

7773
@Bean
78-
public BatchConfigurer getComposedBatchConfigurer(BatchProperties properties, DataSource dataSource, TransactionManagerCustomizers transactionManagerCustomizers) {
79-
return new org.springframework.cloud.dataflow.composedtaskrunner.ComposedBatchConfigurer(properties, dataSource, transactionManagerCustomizers);
74+
public BatchConfigurer getComposedBatchConfigurer(BatchProperties properties,
75+
DataSource dataSource, TransactionManagerCustomizers transactionManagerCustomizers,
76+
ComposedTaskProperties composedTaskProperties) {
77+
return new org.springframework.cloud.dataflow.composedtaskrunner.ComposedBatchConfigurer(properties,
78+
dataSource, transactionManagerCustomizers, composedTaskProperties);
8079
}
8180
}

spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/properties/ComposedTaskProperties.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,12 @@ public class ComposedTaskProperties {
199199
*/
200200
private boolean skipTlsCertificateVerification = false;
201201

202+
/**
203+
* Establish the transaction isolation level for the Composed Task Runner.
204+
* Default is ISOLATION_REPEATABLE_READ.
205+
*/
206+
private String transactionIsolationLevel = "ISOLATION_REPEATABLE_READ";
207+
202208
public ComposedTaskProperties() {
203209
try {
204210
this.dataflowServerUri = new URI("http://localhost:9393");
@@ -407,4 +413,12 @@ public boolean isUuidInstanceEnabled() {
407413
public void setUuidInstanceEnabled(boolean uuIdInstanceEnabled) {
408414
this.uuidInstanceEnabled = uuIdInstanceEnabled;
409415
}
416+
417+
public String getTransactionIsolationLevel() {
418+
return transactionIsolationLevel;
419+
}
420+
421+
public void setTransactionIsolationLevel(String transactionIsolationLevel) {
422+
this.transactionIsolationLevel = transactionIsolationLevel;
423+
}
410424
}

spring-cloud-dataflow-composed-task-runner/src/main/resources/META-INF/spring-configuration-metadata.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,12 @@
113113
"description": "The platform property that will be used for each task in the workflow when it is launched.",
114114
"sourceType": "org.springframework.cloud.dataflow.composedtaskrunner.properties.ComposedTaskProperties"
115115
},
116+
{
117+
"name": "transaction-isolation-level",
118+
"type": "java.lang.String",
119+
"description": "Establish the transaction isolation level for the Composed Task Runner. Default is ISOLATION_REPEATABLE_READ",
120+
"sourceType": "org.springframework.cloud.dataflow.composedtaskrunner.properties.ComposedTaskProperties"
121+
},
116122
{
117123
"name": "skip-tls-certificate-verification",
118124
"type": "java.lang.Boolean",

spring-cloud-dataflow-composed-task-runner/src/test/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedTaskRunnerConfigurationNoPropertiesTests.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.springframework.boot.autoconfigure.jdbc.EmbeddedDataSourceConfiguration;
3434
import org.springframework.cloud.common.security.CommonSecurityAutoConfiguration;
3535
import org.springframework.cloud.dataflow.composedtaskrunner.configuration.DataFlowTestConfiguration;
36+
import org.springframework.cloud.dataflow.composedtaskrunner.properties.ComposedTaskProperties;
3637
import org.springframework.cloud.dataflow.rest.client.TaskOperations;
3738
import org.springframework.context.ApplicationContext;
3839
import org.springframework.test.annotation.DirtiesContext;
@@ -42,6 +43,7 @@
4243
import org.springframework.test.util.ReflectionTestUtils;
4344
import org.springframework.util.Assert;
4445

46+
import static org.assertj.core.api.Assertions.assertThat;
4547
import static org.mockito.Mockito.mock;
4648
import static org.mockito.Mockito.verify;
4749

@@ -66,6 +68,9 @@ public class ComposedTaskRunnerConfigurationNoPropertiesTests {
6668
@Autowired
6769
private ApplicationContext context;
6870

71+
@Autowired
72+
private ComposedTaskProperties composedTaskProperties;
73+
6974
@Test
7075
@DirtiesContext
7176
public void testComposedConfiguration() throws Exception {
@@ -75,6 +80,7 @@ public void testComposedConfiguration() throws Exception {
7580
TaskOperations taskOperations = mock(TaskOperations.class);
7681
ReflectionTestUtils.setField(ctrStep.getTasklet(), "taskOperations", taskOperations);
7782
job.execute(jobExecution);
83+
assertThat(composedTaskProperties.getTransactionIsolationLevel()).isEqualTo("ISOLATION_REPEATABLE_READ");
7884

7985
Assert.notNull(job.getJobParametersIncrementer(), "JobParametersIncrementer must not be null.");
8086
verify(taskOperations).launch("AAA", new HashMap<>(0), new ArrayList<>(0));

spring-cloud-dataflow-composed-task-runner/src/test/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedTaskRunnerConfigurationWithPropertiesTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
@TestPropertySource(properties = {"graph=ComposedTest-AAA && ComposedTest-BBB && ComposedTest-CCC","max-wait-time=1010",
6060
"composed-task-properties=" + ComposedTaskRunnerConfigurationWithPropertiesTests.COMPOSED_TASK_PROPS ,
6161
"interval-time-between-checks=1100", "composed-task-arguments=--baz=boo --AAA.foo=bar BBB.que=qui",
62+
"transaction-isolation-level=ISOLATION_READ_COMMITTED",
6263
"dataflow-server-uri=https://bar", "spring.cloud.task.name=ComposedTest"})
6364
@EnableAutoConfiguration(exclude = { CommonSecurityAutoConfiguration.class})
6465
public class ComposedTaskRunnerConfigurationWithPropertiesTests {
@@ -99,6 +100,8 @@ public void testComposedConfiguration() throws Exception {
99100
assertThat(composedTaskProperties.getMaxWaitTime()).isEqualTo(1010);
100101
assertThat(composedTaskProperties.getIntervalTimeBetweenChecks()).isEqualTo(1100);
101102
assertThat(composedTaskProperties.getDataflowServerUri().toASCIIString()).isEqualTo("https://bar");
103+
assertThat(composedTaskProperties.getTransactionIsolationLevel()).isEqualTo("ISOLATION_READ_COMMITTED");
104+
102105
List<String> args = new ArrayList<>(1);
103106
args.add("--baz=boo --foo=bar");
104107
Assert.notNull(job.getJobParametersIncrementer(), "JobParametersIncrementer must not be null.");

spring-cloud-dataflow-docs/src/main/asciidoc/tasks.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,11 @@ Set this option to `true` when running multiple instances of the same composed t
578578
The amount of time, in milliseconds, that the `ComposedTaskRunner` waits between checks of the database to see if a task has completed. (Integer, default: `10000`).
579579
`ComposedTaskRunner` uses the datastore to determine the status of each child tasks. This interval indicates to `ComposedTaskRunner` how often it should check the status its child tasks.
580580

581+
* `transaction-isolation-level`
582+
Establish the transaction isolation level for the Composed Task Runner.
583+
A list of available transaction isolation levels can be found https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/transaction/TransactionDefinition.html[here].
584+
Default is `ISOLATION_REPEATABLE_READ`.
585+
581586
* `max-wait-time`
582587
The maximum amount of time, in milliseconds, that an individual step can run before the execution of the Composed task is failed (Integer, default: 0).
583588
Determines the maximum time each child task is allowed to run before the CTR ends with a failure. The default of `0` indicates no timeout.

ssbj.properties

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
task.ssbjp=maven://org.springframework.cloud:spring-cloud-dataflow-single-step-batch-job:2.9.0-SNAPSHOT
2+
task.ssbjp.metadata=maven://org.springframework.cloud:spring-cloud-dataflow-single-step-batch-job:jar:metadata:2.9.0-SNAPSHOT
3+

test.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
abc
2+
def
3+
ghi

0 commit comments

Comments
 (0)