Skip to content

Commit 75b216c

Browse files
committed
Fix sequence with TSID implementation in MongoSequenceIncrementer
Signed-off-by: Hyun Jong Park <[email protected]>
1 parent 088487b commit 75b216c

File tree

8 files changed

+206
-34
lines changed

8 files changed

+206
-34
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/configuration/support/MongoDefaultBatchConfiguration.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ protected JobKeyGenerator getJobKeyGenerator() {
159159
* @since 6.0
160160
*/
161161
protected DataFieldMaxValueIncrementer getJobInstanceIncrementer() {
162-
return new MongoSequenceIncrementer(getMongoOperations(), "BATCH_JOB_INSTANCE_SEQ");
162+
return new MongoSequenceIncrementer();
163163
}
164164

165165
/**
@@ -168,7 +168,7 @@ protected DataFieldMaxValueIncrementer getJobInstanceIncrementer() {
168168
* @since 6.0
169169
*/
170170
protected DataFieldMaxValueIncrementer getJobExecutionIncrementer() {
171-
return new MongoSequenceIncrementer(getMongoOperations(), "BATCH_JOB_EXECUTION_SEQ");
171+
return new MongoSequenceIncrementer();
172172
}
173173

174174
/**
@@ -177,7 +177,7 @@ protected DataFieldMaxValueIncrementer getJobExecutionIncrementer() {
177177
* @since 6.0
178178
*/
179179
protected DataFieldMaxValueIncrementer getStepExecutionIncrementer() {
180-
return new MongoSequenceIncrementer(getMongoOperations(), "BATCH_STEP_EXECUTION_SEQ");
180+
return new MongoSequenceIncrementer();
181181
}
182182

183183
}

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobExecutionDao.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,6 @@ public class MongoJobExecutionDao implements JobExecutionDao {
5050

5151
private static final String JOB_EXECUTIONS_COLLECTION_NAME = "BATCH_JOB_EXECUTION";
5252

53-
private static final String JOB_EXECUTIONS_SEQUENCE_NAME = "BATCH_JOB_EXECUTION_SEQ";
54-
5553
private final MongoOperations mongoOperations;
5654

5755
private final JobExecutionConverter jobExecutionConverter = new JobExecutionConverter();
@@ -62,7 +60,7 @@ public class MongoJobExecutionDao implements JobExecutionDao {
6260

6361
public MongoJobExecutionDao(MongoOperations mongoOperations) {
6462
this.mongoOperations = mongoOperations;
65-
this.jobExecutionIncrementer = new MongoSequenceIncrementer(mongoOperations, JOB_EXECUTIONS_SEQUENCE_NAME);
63+
this.jobExecutionIncrementer = new MongoSequenceIncrementer();
6664
}
6765

6866
public void setJobExecutionIncrementer(DataFieldMaxValueIncrementer jobExecutionIncrementer) {

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobInstanceDao.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,6 @@ public class MongoJobInstanceDao implements JobInstanceDao {
4444

4545
private static final String COLLECTION_NAME = "BATCH_JOB_INSTANCE";
4646

47-
private static final String SEQUENCE_NAME = "BATCH_JOB_INSTANCE_SEQ";
48-
4947
private final MongoOperations mongoOperations;
5048

5149
private DataFieldMaxValueIncrementer jobInstanceIncrementer;
@@ -57,7 +55,7 @@ public class MongoJobInstanceDao implements JobInstanceDao {
5755
public MongoJobInstanceDao(MongoOperations mongoOperations) {
5856
Assert.notNull(mongoOperations, "mongoOperations must not be null.");
5957
this.mongoOperations = mongoOperations;
60-
this.jobInstanceIncrementer = new MongoSequenceIncrementer(mongoOperations, SEQUENCE_NAME);
58+
this.jobInstanceIncrementer = new MongoSequenceIncrementer();
6159
}
6260

6361
public void setJobKeyGenerator(JobKeyGenerator jobKeyGenerator) {

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoSequenceIncrementer.java

Lines changed: 51 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,10 @@
1515
*/
1616
package org.springframework.batch.core.repository.dao.mongodb;
1717

18-
import com.mongodb.client.model.FindOneAndUpdateOptions;
19-
import com.mongodb.client.model.ReturnDocument;
20-
import org.bson.Document;
21-
2218
import org.springframework.dao.DataAccessException;
23-
import org.springframework.data.mongodb.core.MongoOperations;
2419
import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer;
2520

26-
// Based on https://www.mongodb.com/blog/post/generating-globally-unique-identifiers-for-use-with-mongodb
27-
// Section: Use a single counter document to generate unique identifiers one at a time
21+
import java.util.concurrent.atomic.AtomicInteger;
2822

2923
/**
3024
* @author Mahmoud Ben Hassine
@@ -33,22 +27,33 @@
3327
*/
3428
public class MongoSequenceIncrementer implements DataFieldMaxValueIncrementer {
3529

36-
private final MongoOperations mongoTemplate;
30+
private static final int NODE_BITS = 10;
31+
private static final int SEQUENCE_BITS = 12;
32+
private static final int NODE_SHIFT = SEQUENCE_BITS;
33+
private static final int TIMESTAMP_SHIFT = NODE_BITS + SEQUENCE_BITS;
34+
private static final int SEQUENCE_MASK = (1 << SEQUENCE_BITS) - 1;
35+
private static final int NODE_MASK = (1 << NODE_BITS) - 1;
36+
37+
private static final long TSID_EPOCH = 1577836800000L;
38+
39+
private final int nodeId;
40+
private final AtomicInteger sequence = new AtomicInteger(0);
41+
private volatile long lastTimestamp = -1L;
3742

38-
private final String sequenceName;
43+
public MongoSequenceIncrementer() {
44+
this.nodeId = (int) (System.nanoTime() & NODE_MASK);
45+
}
3946

40-
public MongoSequenceIncrementer(MongoOperations mongoTemplate, String sequenceName) {
41-
this.mongoTemplate = mongoTemplate;
42-
this.sequenceName = sequenceName;
47+
public MongoSequenceIncrementer(int nodeId) {
48+
if (nodeId < 0 || nodeId > NODE_MASK) {
49+
throw new IllegalArgumentException("Node ID must be between 0 and " + NODE_MASK);
50+
}
51+
this.nodeId = nodeId;
4352
}
4453

4554
@Override
4655
public long nextLongValue() throws DataAccessException {
47-
return mongoTemplate.execute("BATCH_SEQUENCES",
48-
collection -> collection
49-
.findOneAndUpdate(new Document("_id", sequenceName), new Document("$inc", new Document("count", 1)),
50-
new FindOneAndUpdateOptions().returnDocument(ReturnDocument.AFTER))
51-
.getLong("count"));
56+
return generateTsid();
5257
}
5358

5459
@Override
@@ -61,4 +66,33 @@ public String nextStringValue() throws DataAccessException {
6166
throw new UnsupportedOperationException();
6267
}
6368

69+
private synchronized long generateTsid() {
70+
long timestamp = System.currentTimeMillis() - TSID_EPOCH;
71+
72+
if (timestamp < lastTimestamp) {
73+
timestamp = lastTimestamp;
74+
}
75+
76+
if (timestamp == lastTimestamp) {
77+
int seq = sequence.incrementAndGet() & SEQUENCE_MASK;
78+
if (seq == 0) {
79+
timestamp = waitNextMillis(lastTimestamp);
80+
lastTimestamp = timestamp;
81+
}
82+
return (timestamp << TIMESTAMP_SHIFT) | ((long) nodeId << NODE_SHIFT) | seq;
83+
} else {
84+
sequence.set(0);
85+
lastTimestamp = timestamp;
86+
return (timestamp << TIMESTAMP_SHIFT) | ((long) nodeId << NODE_SHIFT);
87+
}
88+
}
89+
90+
private long waitNextMillis(long lastTimestamp) {
91+
long timestamp = System.currentTimeMillis() - TSID_EPOCH;
92+
while (timestamp <= lastTimestamp) {
93+
timestamp = System.currentTimeMillis() - TSID_EPOCH;
94+
}
95+
return timestamp;
96+
}
97+
6498
}

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoStepExecutionDao.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,6 @@ public class MongoStepExecutionDao implements StepExecutionDao {
4343

4444
private static final String STEP_EXECUTIONS_COLLECTION_NAME = "BATCH_STEP_EXECUTION";
4545

46-
private static final String STEP_EXECUTIONS_SEQUENCE_NAME = "BATCH_STEP_EXECUTION_SEQ";
47-
4846
private static final String JOB_EXECUTIONS_COLLECTION_NAME = "BATCH_JOB_EXECUTION";
4947

5048
private final StepExecutionConverter stepExecutionConverter = new StepExecutionConverter();
@@ -59,7 +57,7 @@ public class MongoStepExecutionDao implements StepExecutionDao {
5957

6058
public MongoStepExecutionDao(MongoOperations mongoOperations) {
6159
this.mongoOperations = mongoOperations;
62-
this.stepExecutionIncrementer = new MongoSequenceIncrementer(mongoOperations, STEP_EXECUTIONS_SEQUENCE_NAME);
60+
this.stepExecutionIncrementer = new MongoSequenceIncrementer();
6361
}
6462

6563
public void setStepExecutionIncrementer(DataFieldMaxValueIncrementer stepExecutionIncrementer) {

spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/MongoJobRepositoryFactoryBean.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -108,15 +108,13 @@ public void afterPropertiesSet() throws Exception {
108108
super.afterPropertiesSet();
109109
Assert.notNull(this.mongoOperations, "MongoOperations must not be null.");
110110
if (this.jobInstanceIncrementer == null) {
111-
this.jobInstanceIncrementer = new MongoSequenceIncrementer(this.mongoOperations, "BATCH_JOB_INSTANCE_SEQ");
111+
this.jobInstanceIncrementer = new MongoSequenceIncrementer();
112112
}
113113
if (this.jobExecutionIncrementer == null) {
114-
this.jobExecutionIncrementer = new MongoSequenceIncrementer(this.mongoOperations,
115-
"BATCH_JOB_EXECUTION_SEQ");
114+
this.jobExecutionIncrementer = new MongoSequenceIncrementer();
116115
}
117116
if (this.stepExecutionIncrementer == null) {
118-
this.stepExecutionIncrementer = new MongoSequenceIncrementer(this.mongoOperations,
119-
"BATCH_STEP_EXECUTION_SEQ");
117+
this.stepExecutionIncrementer = new MongoSequenceIncrementer();
120118
}
121119
}
122120

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright 2024-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.core.repository.dao.mongodb;
17+
18+
import org.junit.jupiter.api.Test;
19+
import org.springframework.dao.DataAccessException;
20+
21+
import java.util.ArrayList;
22+
import java.util.Collections;
23+
import java.util.HashSet;
24+
import java.util.List;
25+
import java.util.Set;
26+
import java.util.concurrent.CountDownLatch;
27+
import java.util.concurrent.ExecutorService;
28+
import java.util.concurrent.Executors;
29+
import java.util.concurrent.TimeUnit;
30+
31+
import static org.junit.jupiter.api.Assertions.*;
32+
33+
/**
34+
* Tests for {@link MongoSequenceIncrementer}.
35+
*/
36+
public class MongoSequenceIncrementerTests {
37+
38+
@Test
39+
void testTimeOrdering() throws DataAccessException {
40+
MongoSequenceIncrementer incrementer = new MongoSequenceIncrementer();
41+
List<Long> ids = new ArrayList<>();
42+
43+
for (int i = 0; i < 10; i++) {
44+
ids.add(incrementer.nextLongValue());
45+
}
46+
47+
List<Long> sorted = new ArrayList<>(ids);
48+
Collections.sort(sorted);
49+
assertEquals(sorted, ids, "IDs should be in time order");
50+
}
51+
52+
@Test
53+
void testConcurrency() throws InterruptedException {
54+
MongoSequenceIncrementer incrementer = new MongoSequenceIncrementer();
55+
Set<Long> ids = Collections.synchronizedSet(new HashSet<>());
56+
int threadCount = 10;
57+
int idsPerThread = 100;
58+
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
59+
CountDownLatch latch = new CountDownLatch(threadCount);
60+
61+
for (int i = 0; i < threadCount; i++) {
62+
executor.submit(() -> {
63+
try {
64+
for (int j = 0; j < idsPerThread; j++) {
65+
ids.add(incrementer.nextLongValue());
66+
}
67+
}
68+
catch (DataAccessException e) {
69+
fail("Should not throw DataAccessException: " + e.getMessage());
70+
}
71+
finally {
72+
latch.countDown();
73+
}
74+
});
75+
}
76+
77+
latch.await(10, TimeUnit.SECONDS);
78+
executor.shutdown();
79+
80+
assertEquals(threadCount * idsPerThread, ids.size(),
81+
"All IDs generated from multiple threads should be unique");
82+
}
83+
84+
@Test
85+
void testNodeIdSeparation() throws DataAccessException {
86+
MongoSequenceIncrementer incrementer1 = new MongoSequenceIncrementer(1);
87+
MongoSequenceIncrementer incrementer2 = new MongoSequenceIncrementer(2);
88+
89+
long id1 = incrementer1.nextLongValue();
90+
long id2 = incrementer2.nextLongValue();
91+
92+
assertNotEquals(id1, id2, "IDs from different nodes should be different");
93+
94+
long nodeId1 = (id1 >> 12) & 0x3FF;
95+
long nodeId2 = (id2 >> 12) & 0x3FF;
96+
97+
assertEquals(1, nodeId1, "First ID should have node ID 1");
98+
assertEquals(2, nodeId2, "Second ID should have node ID 2");
99+
}
100+
101+
}

spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoDBJobRepositoryIntegrationTests.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,51 @@ void testJobExecution(@Autowired JobOperator jobOperator, @Autowired Job job) th
8686
dump(stepExecutionsCollection, "step execution = ");
8787
}
8888

89+
@Test
90+
void testParallelJobExecution(@Autowired JobOperator jobOperator, @Autowired Job job) throws Exception {
91+
int parallelJobs = 10;
92+
Thread[] threads = new Thread[parallelJobs];
93+
JobExecution[] executions = new JobExecution[parallelJobs];
94+
95+
for (int i = 0; i < parallelJobs; i++) {
96+
final int idx = i;
97+
threads[i] = new Thread(() -> {
98+
JobParameters jobParameters = new JobParametersBuilder()
99+
.addString("name", "foo" + idx)
100+
.addLocalDateTime("runtime", LocalDateTime.now())
101+
.toJobParameters();
102+
try {
103+
executions[idx] = jobOperator.start(job, jobParameters);
104+
} catch (Exception e) {
105+
throw new RuntimeException(e);
106+
}
107+
});
108+
threads[i].start();
109+
}
110+
111+
for (Thread t : threads) {
112+
t.join();
113+
}
114+
115+
for (JobExecution exec : executions) {
116+
Assertions.assertNotNull(exec);
117+
Assertions.assertEquals(ExitStatus.COMPLETED, exec.getExitStatus());
118+
}
119+
120+
MongoCollection<Document> jobInstancesCollection = mongoTemplate.getCollection("BATCH_JOB_INSTANCE");
121+
MongoCollection<Document> jobExecutionsCollection = mongoTemplate.getCollection("BATCH_JOB_EXECUTION");
122+
MongoCollection<Document> stepExecutionsCollection = mongoTemplate.getCollection("BATCH_STEP_EXECUTION");
123+
124+
Assertions.assertEquals(parallelJobs, jobInstancesCollection.countDocuments());
125+
Assertions.assertEquals(parallelJobs, jobExecutionsCollection.countDocuments());
126+
Assertions.assertEquals(parallelJobs * 2, stepExecutionsCollection.countDocuments());
127+
128+
// dump results for inspection
129+
dump(jobInstancesCollection, "job instance = ");
130+
dump(jobExecutionsCollection, "job execution = ");
131+
dump(stepExecutionsCollection, "step execution = ");
132+
}
133+
89134
private static void dump(MongoCollection<Document> collection, String prefix) {
90135
for (Document document : collection.find()) {
91136
System.out.println(prefix + document.toJson());

0 commit comments

Comments
 (0)