Skip to content

Commit 806a20f

Browse files
committed
SNA-17. Use version field
1 parent 7008a98 commit 806a20f

File tree

12 files changed

+94
-22
lines changed

12 files changed

+94
-22
lines changed

neo4j-adapter/src/main/java/com/github/valb3r/springbatch/adapters/neo4j/dao/Neo4jExecutionContextDao.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ public void saveExecutionContext(JobExecution jobExecution) {
4848
serializeContext(jobExecution.getExecutionContext())
4949
);
5050

51-
jobExecs.save(execution);
51+
val result = jobExecs.save(execution);
52+
jobExecution.setVersion(result.getVersion().intValue());
5253
}
5354

5455
@Override
@@ -59,7 +60,8 @@ public void saveExecutionContext(StepExecution stepExecution) {
5960
serializeContext(stepExecution.getExecutionContext())
6061
);
6162

62-
stepExecs.save(execution);
63+
val result = stepExecs.save(execution);
64+
stepExecution.setVersion(result.getVersion().intValue());
6365
}
6466

6567
@Override

neo4j-adapter/src/main/java/com/github/valb3r/springbatch/adapters/neo4j/dao/Neo4jJobExecutionDao.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import org.springframework.batch.core.JobExecution;
88
import org.springframework.batch.core.JobInstance;
99
import org.springframework.batch.core.repository.dao.JobExecutionDao;
10+
import org.springframework.dao.OptimisticLockingFailureException;
1011
import org.springframework.stereotype.Component;
1112
import org.springframework.transaction.annotation.Transactional;
1213

@@ -24,14 +25,30 @@ public class Neo4jJobExecutionDao implements JobExecutionDao {
2425
@Override
2526
@Transactional
2627
public void saveJobExecution(JobExecution jobExecution) {
28+
if (null != jobExecution.getId() && jobExecs.existsById(jobExecution.getId())) {
29+
throw new IllegalStateException("Job execution exists: " + jobExecution.getId());
30+
}
31+
32+
jobExecution.incrementVersion();
2733
val result = jobExecs.save(Neo4jJobExecution.MAP.map(jobExecution, new CycleAvoidingMappingContext()));
2834
jobExecution.setId(result.getId());
2935
}
3036

3137
@Override
3238
@Transactional
3339
public void updateJobExecution(JobExecution jobExecution) {
34-
jobExecs.save(Neo4jJobExecution.MAP.map(jobExecution, new CycleAvoidingMappingContext()));
40+
val exec = jobExecs.findById(jobExecution.getId())
41+
.orElseThrow(() -> new IllegalStateException("Job execution does not exist: " + jobExecution.getId()));
42+
43+
if (!exec.getVersion().equals(jobExecution.getVersion())) {
44+
throw new OptimisticLockingFailureException("Attempt to update job execution id="
45+
+ jobExecution.getId() + " with wrong version (" + jobExecution.getVersion()
46+
+ "), where current version is " + exec.getVersion());
47+
}
48+
49+
jobExecution.incrementVersion();
50+
val result = jobExecs.save(Neo4jJobExecution.MAP.map(jobExecution, new CycleAvoidingMappingContext()));
51+
jobExecution.setVersion(result.getVersion());
3552
}
3653

3754
@Override
@@ -71,6 +88,16 @@ public JobExecution getJobExecution(Long executionId) {
7188
@Override
7289
@Transactional
7390
public void synchronizeStatus(JobExecution jobExecution) {
74-
saveJobExecution(jobExecution);
91+
val exec = jobExecs.findById(jobExecution.getId())
92+
.orElseThrow(() -> new IllegalStateException("Job execution does not exist: " + jobExecution.getId()));
93+
94+
int currentVersion = exec.getVersion();
95+
if (currentVersion == jobExecution.getVersion()) {
96+
return;
97+
}
98+
99+
exec.setStatus(exec.getStatus().upgradeTo(jobExecution.getStatus()));
100+
exec.setVersion(exec.getVersion() + 1);
101+
jobExecs.save(exec);
75102
}
76103
}

neo4j-adapter/src/main/java/com/github/valb3r/springbatch/adapters/neo4j/dao/Neo4jJobInstanceDao.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public JobInstance createJobInstance(String jobName, JobParameters jobParameters
3333
.jobName(jobName)
3434
.jobKey(keyGenerator.generateKey(jobParameters))
3535
.parameters(jobParameters)
36+
.version(0)
3637
.build()
3738
);
3839

neo4j-adapter/src/main/java/com/github/valb3r/springbatch/adapters/neo4j/dao/Neo4jStepExecutionDao.java

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.springframework.batch.core.JobInstance;
99
import org.springframework.batch.core.StepExecution;
1010
import org.springframework.batch.core.repository.dao.StepExecutionDao;
11+
import org.springframework.dao.OptimisticLockingFailureException;
1112
import org.springframework.stereotype.Component;
1213
import org.springframework.transaction.annotation.Transactional;
1314

@@ -24,30 +25,48 @@ public class Neo4jStepExecutionDao implements StepExecutionDao {
2425
@Override
2526
@Transactional
2627
public void saveStepExecution(StepExecution stepExecution) {
28+
if (null != stepExecution.getId() && stepExecs.existsById(stepExecution.getId())) {
29+
throw new IllegalStateException("Step execution exists: " + stepExecution.getId());
30+
}
31+
32+
stepExecution.incrementVersion();
2733
val exec = stepExecs.save(Neo4jStepExecution.MAP.map(stepExecution));
2834
stepExecution.setId(exec.getId());
35+
stepExecution.setVersion(exec.getVersion());
2936
}
3037

3138
@Override
3239
@Transactional
3340
public void saveStepExecutions(Collection<StepExecution> stepExecutions) {
3441
stepExecs.saveAll(
35-
stepExecutions.stream().map(Neo4jStepExecution.MAP::map).collect(Collectors.toList())
42+
stepExecutions.stream().map(Neo4jStepExecution.MAP::map).collect(Collectors.toList())
3643
);
3744
}
3845

3946
@Override
4047
@Transactional
4148
public void updateStepExecution(StepExecution stepExecution) {
42-
saveStepExecution(stepExecution);
49+
val exec = stepExecs.findById(stepExecution.getId())
50+
.orElseThrow(() -> new IllegalStateException("Step execution does not exist: " + stepExecution.getId()));
51+
52+
if (!exec.getVersion().equals(stepExecution.getVersion())) {
53+
throw new OptimisticLockingFailureException("Attempt to update job execution id="
54+
+ stepExecution.getId() + " with wrong version (" + stepExecution.getVersion()
55+
+ "), where current version is " + exec.getVersion());
56+
}
57+
58+
stepExecution.incrementVersion();
59+
val updated = stepExecs.save(Neo4jStepExecution.MAP.map(stepExecution));
60+
stepExecution.setId(updated.getId());
61+
stepExecution.setVersion(updated.getVersion());
4362
}
4463

4564
@Override
4665
@Transactional
4766
public StepExecution getStepExecution(JobExecution jobExecution, Long stepExecutionId) {
4867
return stepExecs.findBy(jobExecution.getId(), stepExecutionId)
49-
.map(it -> Neo4jStepExecution.MAP.map(it, jobExecution))
50-
.orElse(null);
68+
.map(it -> Neo4jStepExecution.MAP.map(it, jobExecution))
69+
.orElse(null);
5170
}
5271

5372
@Override
@@ -66,10 +85,13 @@ public StepExecution getLastStepExecution(JobInstance jobInstance, String stepNa
6685
@Transactional
6786
public void addStepExecutions(JobExecution jobExecution) {
6887
stepExecs.findStepExecutions(jobExecution.getId())
69-
.forEach(it -> new StepExecution(
70-
it.getStepName(),
71-
jobExecution,
72-
it.getId())
73-
);
88+
.forEach(it -> {
89+
val exec = new StepExecution(
90+
it.getStepName(),
91+
jobExecution,
92+
it.getId());
93+
exec.setVersion(it.getVersion());
94+
}
95+
);
7496
}
7597
}

neo4j-adapter/src/main/java/com/github/valb3r/springbatch/adapters/neo4j/ogm/entity/Neo4jJobExecution.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.neo4j.ogm.annotation.Id;
1919
import org.neo4j.ogm.annotation.NodeEntity;
2020
import org.neo4j.ogm.annotation.Relationship;
21+
import org.neo4j.ogm.annotation.Version;
2122
import org.neo4j.ogm.annotation.typeconversion.Convert;
2223
import org.springframework.batch.core.BatchStatus;
2324
import org.springframework.batch.core.ExitStatus;
@@ -77,6 +78,8 @@ public class Neo4jJobExecution {
7778
@Convert(ExecutionContextConverter.class)
7879
private Map<String, Object> executionContext;
7980

81+
private Integer version;
82+
8083
@Mapper
8184
public interface FromBatch {
8285

neo4j-adapter/src/main/java/com/github/valb3r/springbatch/adapters/neo4j/ogm/entity/Neo4jJobInstance.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.neo4j.ogm.annotation.Id;
1313
import org.neo4j.ogm.annotation.NodeEntity;
1414
import org.neo4j.ogm.annotation.Relationship;
15+
import org.neo4j.ogm.annotation.Version;
1516
import org.neo4j.ogm.annotation.typeconversion.Convert;
1617
import org.springframework.batch.core.JobInstance;
1718
import org.springframework.batch.core.JobParameters;
@@ -49,12 +50,16 @@ public class Neo4jJobInstance {
4950
@CreatedDate
5051
private LocalDateTime createdAt;
5152

53+
private Integer version;
54+
5255
@Mapper
5356
public interface FromBatch {
5457
Neo4jJobInstance map(JobInstance batch);
5558

5659
default JobInstance map(Neo4jJobInstance batch) {
57-
return new JobInstance(batch.getId(), batch.getJobName());
60+
JobInstance instance = new JobInstance(batch.getId(), batch.getJobName());
61+
instance.setVersion(batch.getVersion());
62+
return instance;
5863
}
5964
}
6065
}

neo4j-adapter/src/main/java/com/github/valb3r/springbatch/adapters/neo4j/ogm/entity/Neo4jStepExecution.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.neo4j.ogm.annotation.Id;
1515
import org.neo4j.ogm.annotation.NodeEntity;
1616
import org.neo4j.ogm.annotation.Relationship;
17+
import org.neo4j.ogm.annotation.Version;
1718
import org.neo4j.ogm.annotation.typeconversion.Convert;
1819
import com.github.valb3r.springbatch.adapters.neo4j.dao.converters.ExitStatusConverter;
1920
import org.springframework.batch.core.BatchStatus;
@@ -75,6 +76,8 @@ public class Neo4jStepExecution {
7576
@Convert(ExecutionContextConverter.class)
7677
private Map<String, Object> executionContext;
7778

79+
private Integer version;
80+
7881
@Mapper
7982
public interface FromBatch {
8083

neo4j-adapter/src/test/java/com/github/valb3r/springbatch/adapters/dao/flow/parametrized/BaseParametrizedDaoBasedExecutionTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ void runOneStepTasklet() {
6262
assertThat(stepExecutionDao.getLastStepExecution(job.getInstance(), STEP_TWO)).isNull();
6363
assertThat(job.getExecution().getStatus()).isEqualTo(BatchStatus.COMPLETED);
6464

65+
assertThat(job.getExecution().getVersion()).isEqualTo(2);
6566
assertThat(job.getExecution().getJobParameters().getDate(TODAY)).isEqualTo(EXPECTED_DATE);
6667
assertThat(job.getExecution().getJobParameters().getDouble(ONE)).isEqualTo(1.0);
6768
assertThat(job.getExecution().getJobParameters().getLong(TWO)).isEqualTo(2L);
@@ -81,6 +82,7 @@ void runTwoStepsTasklet() {
8182
assertThat(stepExecutionDao.getLastStepExecution(job.getInstance(), STEP_TWO)).isNotNull();
8283
assertThat(job.getExecution().getStatus()).isEqualTo(BatchStatus.COMPLETED);
8384

85+
assertThat(job.getExecution().getVersion()).isEqualTo(2);
8486
assertThat(job.getExecution().getJobParameters().getDate(TODAY)).isEqualTo(EXPECTED_DATE);
8587
assertThat(job.getExecution().getJobParameters().getDouble(ONE)).isEqualTo(1.0);
8688
assertThat(job.getExecution().getJobParameters().getLong(TWO)).isEqualTo(2L);
@@ -103,6 +105,7 @@ void runOneStepReaderWriter() {
103105
assertThat(stepExecutionDao.getLastStepExecution(job.getInstance(), STEP_TWO)).isNull();
104106
assertThat(job.getExecution().getStatus()).isEqualTo(BatchStatus.COMPLETED);
105107

108+
assertThat(job.getExecution().getVersion()).isEqualTo(2);
106109
assertThat(job.getExecution().getJobParameters().getDate(TODAY)).isEqualTo(EXPECTED_DATE);
107110
assertThat(job.getExecution().getJobParameters().getDouble(ONE)).isEqualTo(1.0);
108111
assertThat(job.getExecution().getJobParameters().getLong(TWO)).isEqualTo(2L);

neo4j-adapter/src/test/java/com/github/valb3r/springbatch/adapters/dao/flow/simple/BaseSimpleDaoBasedExecutionTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ void runOneStepTasklet() {
5757
assertThat(stepExecutionDao.getLastStepExecution(job.getInstance(), STEP_ONE)).isNotNull();
5858
assertThat(stepExecutionDao.getLastStepExecution(job.getInstance(), STEP_TWO)).isNull();
5959
assertThat(job.getExecution().getStatus()).isEqualTo(BatchStatus.COMPLETED);
60+
assertThat(job.getExecution().getVersion()).isEqualTo(2);
6061
}
6162

6263
@Test
@@ -71,6 +72,7 @@ void runTwoStepsTasklet() {
7172
assertThat(stepExecutionDao.getLastStepExecution(job.getInstance(), STEP_ONE)).isNotNull();
7273
assertThat(stepExecutionDao.getLastStepExecution(job.getInstance(), STEP_TWO)).isNotNull();
7374
assertThat(job.getExecution().getStatus()).isEqualTo(BatchStatus.COMPLETED);
75+
assertThat(job.getExecution().getVersion()).isEqualTo(2);
7476
}
7577

7678
@Test
@@ -88,6 +90,7 @@ void runOneStepReaderWriter() {
8890
assertThat(stepExecutionDao.getLastStepExecution(job.getInstance(), STEP_ONE)).isNotNull();
8991
assertThat(stepExecutionDao.getLastStepExecution(job.getInstance(), STEP_TWO)).isNull();
9092
assertThat(job.getExecution().getStatus()).isEqualTo(BatchStatus.COMPLETED);
93+
assertThat(job.getExecution().getVersion()).isEqualTo(2);
9194
}
9295

9396
@Test
@@ -105,5 +108,6 @@ void runTwoStepsReaderWriter() {
105108
assertThat(stepExecutionDao.getLastStepExecution(job.getInstance(), STEP_ONE)).isNotNull();
106109
assertThat(stepExecutionDao.getLastStepExecution(job.getInstance(), STEP_TWO)).isNotNull();
107110
assertThat(job.getExecution().getStatus()).isEqualTo(BatchStatus.COMPLETED);
111+
assertThat(job.getExecution().getVersion()).isEqualTo(2);
108112
}
109113
}

neo4j-adapter/src/test/java/com/github/valb3r/springbatch/adapters/dao/repository/ExecutionContextDaoTest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,15 @@ void dropDatabase() {
5353
@Test
5454
void getExecutionContextFromJobExecution() {
5555
var newExec = execution();
56-
jobExecDao.saveJobExecution(newExec);
56+
jobExecDao.updateJobExecution(newExec);
5757

5858
assertThat(ctxDao.getExecutionContext(newExec)).isEqualToComparingFieldByField(execCtx());
5959
}
6060

6161
@Test
6262
void getExecutionContextFromStepExecution() {
6363
var newExec = execution();
64-
jobExecDao.saveJobExecution(newExec);
64+
jobExecDao.updateJobExecution(newExec);
6565
var stepExec = new StepExecution(STEP_NAME, newExec);
6666
stepExec.setExecutionContext(stepExecCtx());
6767
stepExecDao.saveStepExecution(stepExec);
@@ -72,7 +72,7 @@ void getExecutionContextFromStepExecution() {
7272
@Test
7373
void saveExecutionContextForJobExecution() {
7474
var newExec = execution();
75-
jobExecDao.saveJobExecution(newExec);
75+
jobExecDao.updateJobExecution(newExec);
7676

7777
newExec.setExecutionContext(newExecCtx());
7878
ctxDao.saveExecutionContext(newExec);
@@ -83,7 +83,7 @@ void saveExecutionContextForJobExecution() {
8383
@Test
8484
void saveExecutionContextForStepExecution() {
8585
var newExec = execution();
86-
jobExecDao.saveJobExecution(newExec);
86+
jobExecDao.updateJobExecution(newExec);
8787
var stepExec = new StepExecution(STEP_NAME, newExec);
8888
stepExec.setExecutionContext(stepExecCtx());
8989
stepExecDao.saveStepExecution(stepExec);
@@ -97,7 +97,7 @@ void saveExecutionContextForStepExecution() {
9797
@Test
9898
void saveExecutionContexts() {
9999
var newExec = execution();
100-
jobExecDao.saveJobExecution(newExec);
100+
jobExecDao.updateJobExecution(newExec);
101101
var stepExecOne = new StepExecution(STEP_NAME, newExec);
102102
stepExecDao.saveStepExecution(stepExecOne);
103103
var stepExecTwo = new StepExecution(STEP_NAME_OTHER, newExec);
@@ -115,7 +115,7 @@ void saveExecutionContexts() {
115115
void updateExecutionContextForJobExecution() {
116116
var newExec = execution();
117117
newExec.setExecutionContext(execCtx());
118-
jobExecDao.saveJobExecution(newExec);
118+
jobExecDao.updateJobExecution(newExec);
119119

120120
newExec.setExecutionContext(newExecCtx());
121121
ctxDao.updateExecutionContext(newExec);
@@ -126,7 +126,7 @@ void updateExecutionContextForJobExecution() {
126126
@Test
127127
void updateExecutionContextForStepExecution() {
128128
var newExec = execution();
129-
jobExecDao.saveJobExecution(newExec);
129+
jobExecDao.updateJobExecution(newExec);
130130
var stepExec = new StepExecution(STEP_NAME, newExec);
131131
stepExec.setExecutionContext(stepExecCtx());
132132
stepExecDao.saveStepExecution(stepExec);

0 commit comments

Comments
 (0)