Skip to content

Commit d799bc8

Browse files
authored
EVA-4045 Recover and reserve only required uncompleted blocks not all (#95)
1 parent b76eca5 commit d799bc8

File tree

7 files changed

+121
-71
lines changed

7 files changed

+121
-71
lines changed

accession-commons-monotonic-generator-jpa/src/main/java/uk/ac/ebi/ampt2d/commons/accession/generators/monotonic/MonotonicAccessionGenerator.java

Lines changed: 32 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@ public class MonotonicAccessionGenerator<MODEL> implements AccessionGenerator<MO
4747
private final String categoryId;
4848
private final ContiguousIdBlockService blockService;
4949
private MonotonicDatabaseService monotonicDatabaseService;
50+
private boolean UNCOMPLETED_BLOCKS_AVAILABLE = true;
5051

5152
private boolean SHUTDOWN = false;
52-
private boolean RUN_RECOVERY = true;
5353

5454
public MonotonicAccessionGenerator(String categoryId,
5555
ContiguousIdBlockService contiguousIdBlockService,
@@ -67,26 +67,23 @@ private static void assertBlockParametersAreInitialized(ContiguousIdBlockService
6767
}
6868
}
6969

70-
private void recoverState(String applicationInstanceId) {
71-
if (RUN_RECOVERY && monotonicDatabaseService != null) {
72-
List<ContiguousIdBlock> uncompletedBlocks = blockService
73-
.reserveUncompletedBlocksForCategoryIdAndApplicationInstanceId(categoryId, applicationInstanceId);
74-
//Insert as available ranges
75-
for (ContiguousIdBlock block : uncompletedBlocks) {
76-
blockManager.addBlock(block);
77-
}
78-
// As we are going through the available ranges and at the same time we are also going to manipulate/update them
79-
// Need to make a copy of the original for iteration to avoid ConcurrentModificationException
80-
MonotonicRangePriorityQueue copyOfAvailableRanges = new MonotonicRangePriorityQueue();
81-
for (MonotonicRange range : getAvailableRanges()) {
82-
copyOfAvailableRanges.offer(range);
83-
}
84-
for (MonotonicRange monotonicRange : copyOfAvailableRanges) {
85-
recoverStateForElements(monotonicDatabaseService.getAccessionsInRanges(Collections.singletonList(monotonicRange)));
86-
}
70+
private boolean recoverAndReserveUncompletedBlock(String applicationInstanceId) {
71+
if (monotonicDatabaseService != null) {
72+
ContiguousIdBlock uncompletedBlock = blockService
73+
.reserveFirstUncompletedBlockForCategoryIdAndApplicationInstanceId(categoryId, applicationInstanceId);
74+
if (uncompletedBlock != null) {
75+
//Insert as available ranges
76+
blockManager.addBlock(uncompletedBlock);
8777

88-
RUN_RECOVERY = false;
78+
recoverStateForElements(monotonicDatabaseService.getAccessionsInRanges(
79+
Collections.singletonList(new MonotonicRange(uncompletedBlock.getLastCommitted() + 1,
80+
uncompletedBlock.getLastValue()))));
81+
82+
return true;
83+
}
8984
}
85+
86+
return false;
9087
}
9188

9289
/**
@@ -103,9 +100,8 @@ private void recoverStateForElements(long[] committedElements) throws AccessionI
103100
public synchronized long[] generateAccessions(int numAccessionsToGenerate, String applicationInstanceId)
104101
throws AccessionCouldNotBeGeneratedException {
105102
checkAccessionGeneratorNotShutDown();
106-
recoverState(applicationInstanceId);
107103
long[] accessions = new long[numAccessionsToGenerate];
108-
reserveNewBlocksUntilSizeIs(numAccessionsToGenerate, applicationInstanceId);
104+
reserveBlocksUntilSizeIs(numAccessionsToGenerate, applicationInstanceId);
109105

110106
int i = 0;
111107
while (i < numAccessionsToGenerate) {
@@ -123,14 +119,26 @@ public synchronized long[] generateAccessions(int numAccessionsToGenerate, Strin
123119
* Ensures that the available ranges queue hold @param totalAccessionsToGenerate or more elements
124120
*
125121
* @param totalAccessionsToGenerate
126-
* @param applicationInstanceId - The id of the application(instance) that is trying to reserve the block
122+
* @param applicationInstanceId - The id of the application(instance) that is trying to reserve the block
127123
*/
128-
private synchronized void reserveNewBlocksUntilSizeIs(int totalAccessionsToGenerate, String applicationInstanceId) {
124+
private synchronized void reserveBlocksUntilSizeIs(int totalAccessionsToGenerate, String applicationInstanceId) {
129125
while (!blockManager.hasAvailableAccessions(totalAccessionsToGenerate)) {
130-
ExponentialBackOff.execute(() -> reserveNewBlock(categoryId, applicationInstanceId), 10, 30);
126+
ExponentialBackOff.execute(() -> reserveBlock(categoryId, applicationInstanceId), 10, 30);
131127
}
132128
}
133129

130+
private synchronized void reserveBlock(String categoryId, String instanceId) {
131+
if (UNCOMPLETED_BLOCKS_AVAILABLE) {
132+
boolean reservedUncompleted = recoverAndReserveUncompletedBlock(instanceId);
133+
if (!reservedUncompleted) {
134+
UNCOMPLETED_BLOCKS_AVAILABLE = false;
135+
}
136+
} else {
137+
reserveNewBlock(categoryId, instanceId);
138+
}
139+
}
140+
141+
134142
private synchronized void reserveNewBlock(String categoryId, String instanceId) {
135143
blockManager.addBlock(blockService.reserveNewBlock(categoryId, instanceId));
136144
}

accession-commons-monotonic-generator-jpa/src/main/java/uk/ac/ebi/ampt2d/commons/accession/persistence/jpa/monotonic/repositories/ContiguousIdBlockRepository.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.repositories;
1919

20+
import org.springframework.data.domain.Pageable;
2021
import org.springframework.data.jpa.repository.Lock;
2122
import org.springframework.data.jpa.repository.Query;
2223
import org.springframework.data.repository.CrudRepository;
@@ -38,7 +39,8 @@ public interface ContiguousIdBlockRepository extends CrudRepository<ContiguousId
3839
// Note that application instances reserving the same new blocks is prevented by the uniqueness constraint in the
3940
// database and subsequent retry in the accession generator.
4041
@Lock(LockModeType.PESSIMISTIC_WRITE)
41-
List<ContiguousIdBlock> findUncompletedAndUnreservedBlocksOrderByLastValueAsc(@Param("categoryId") String categoryId);
42+
List<ContiguousIdBlock> findUncompletedAndUnreservedBlocksOrderByLastValueAsc(@Param("categoryId") String categoryId,
43+
Pageable pageable);
4244

4345
ContiguousIdBlock findFirstByCategoryIdOrderByLastValueDesc(String categoryId);
4446

accession-commons-monotonic-generator-jpa/src/main/java/uk/ac/ebi/ampt2d/commons/accession/persistence/jpa/monotonic/service/ContiguousIdBlockService.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.slf4j.Logger;
2121
import org.slf4j.LoggerFactory;
22+
import org.springframework.data.domain.PageRequest;
2223
import org.springframework.transaction.annotation.Isolation;
2324
import org.springframework.transaction.annotation.Propagation;
2425
import org.springframework.transaction.annotation.Transactional;
@@ -116,12 +117,12 @@ public ContiguousIdBlock reserveNewBlock(String categoryId, String instanceId) {
116117
ContiguousIdBlock reservedBlock;
117118
if (lastBlock != null) {
118119
reservedBlock = repository.save(lastBlock.nextBlock(instanceId, blockParameters.getBlockSize(),
119-
blockParameters.getNextBlockInterval(),
120-
blockParameters.getBlockStartValue()));
120+
blockParameters.getNextBlockInterval(),
121+
blockParameters.getBlockStartValue()));
121122
} else {
122123
ContiguousIdBlock newBlock = new ContiguousIdBlock(categoryId, instanceId,
123-
blockParameters.getBlockStartValue(),
124-
blockParameters.getBlockSize());
124+
blockParameters.getBlockStartValue(),
125+
blockParameters.getBlockSize());
125126
reservedBlock = repository.save(newBlock);
126127
}
127128
logger.trace("Reserved new block: {}", reservedBlock);
@@ -134,16 +135,20 @@ public BlockParameters getBlockParameters(String categoryId) {
134135
}
135136

136137
@Transactional(isolation = Isolation.SERIALIZABLE)
137-
public List<ContiguousIdBlock> reserveUncompletedBlocksForCategoryIdAndApplicationInstanceId(String categoryId, String applicationInstanceId) {
138-
logger.trace("Inside reserveUncompletedBlocks for instanceId {}", applicationInstanceId);
139-
List<ContiguousIdBlock> blockList = repository.findUncompletedAndUnreservedBlocksOrderByLastValueAsc(categoryId);
140-
blockList.stream().forEach(block -> {
141-
logger.trace("Reserving incomplete and unreserved block {}", block);
138+
public ContiguousIdBlock reserveFirstUncompletedBlockForCategoryIdAndApplicationInstanceId(String categoryId, String applicationInstanceId) {
139+
logger.trace("Inside reserveUncompletedBlock for instanceId {}", applicationInstanceId);
140+
ContiguousIdBlock block = repository.findUncompletedAndUnreservedBlocksOrderByLastValueAsc(categoryId,
141+
PageRequest.of(0, 1)).stream()
142+
.findFirst().orElse(null);
143+
144+
if (block != null) {
142145
block.setApplicationInstanceId(applicationInstanceId);
143146
block.markAsReserved();
144-
});
145-
save(blockList);
146-
return blockList;
147+
148+
save(block);
149+
}
150+
151+
return block;
147152
}
148153

149154
public List<ContiguousIdBlock> allBlocksForCategoryIdReservedBeforeTheGivenTimeFrame(String categoryId,

accession-commons-monotonic-generator-jpa/src/test/java/uk/ac/ebi/ampt2d/commons/accession/core/BasicMonotonicAccessioningWithAlternateRangesTest.java

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,28 @@ public void testUnknownCategory() throws AccessionCouldNotBeGeneratedException {
7575
.getOrCreate(getObjectsForAccessionsInRange(1, 10), INSTANCE_ID);
7676
}
7777

78+
@Test
79+
public void testReserveNewBlock() throws AccessionCouldNotBeGeneratedException {
80+
String categoryId = "eva_2";
81+
String instanceId2 = "test-instance_2";
82+
83+
assertEquals(0, getAllUncompletedBlocksForCategoryId(contiguousIdBlockRepository, categoryId).size());
84+
85+
MonotonicAccessionGenerator generator = getGenerator(categoryId, instanceId2);
86+
generator.generateAccessions(1, INSTANCE_ID);
87+
88+
assertEquals(1, getAllUncompletedBlocksForCategoryId(contiguousIdBlockRepository, categoryId).size());
89+
ContiguousIdBlock uncompletedBlock = getAllUncompletedBlocksForCategoryId(contiguousIdBlockRepository, categoryId).get(0);
90+
assertEquals(0l, uncompletedBlock.getFirstValue());
91+
assertEquals(9l, uncompletedBlock.getLastValue());
92+
assertEquals(-1l, uncompletedBlock.getLastCommitted());
93+
94+
MonotonicRangePriorityQueue availableRanges = generator.getAvailableRanges();
95+
assertEquals(1, availableRanges.size());
96+
assertEquals(1l, availableRanges.peek().getStart());
97+
assertEquals(9l, availableRanges.peek().getEnd());
98+
}
99+
78100
@Test
79101
public void testRecoverState() throws AccessionCouldNotBeGeneratedException {
80102
String categoryId = "eva_2";
@@ -104,7 +126,7 @@ public void testRecoverState() throws AccessionCouldNotBeGeneratedException {
104126

105127
// run recover state
106128
MonotonicAccessionGenerator generator = getGenerator(categoryId, instanceId2);
107-
generator.generateAccessions(0, INSTANCE_ID);
129+
generator.generateAccessions(1, INSTANCE_ID);
108130

109131
// As we have already saved accessions in db from 100 to 124, the status should be
110132
// block-1 (100 to 109) : fully complete
@@ -118,7 +140,7 @@ public void testRecoverState() throws AccessionCouldNotBeGeneratedException {
118140

119141
MonotonicRangePriorityQueue availableRanges = generator.getAvailableRanges();
120142
assertEquals(1, availableRanges.size());
121-
assertEquals(125l, availableRanges.peek().getStart());
143+
assertEquals(126l, availableRanges.peek().getStart());
122144
assertEquals(129l, availableRanges.peek().getEnd());
123145
}
124146

@@ -205,14 +227,11 @@ public void testInitializeBlockManagerInMonotonicAccessionGenerator() throws Acc
205227
unreservedAndNotFullBlocks = blockInDBList.stream()
206228
.filter(b -> b.isNotFull() && b.isNotReserved())
207229
.collect(Collectors.toList());
208-
assertEquals(0, unreservedAndNotFullBlocks.size());
230+
assertEquals(1, unreservedAndNotFullBlocks.size());
209231
List<ContiguousIdBlock> reservedAndNotFullBlocks = blockInDBList.stream()
210232
.filter(b -> b.isNotFull() && b.isReserved())
211233
.collect(Collectors.toList());
212-
assertEquals(1, reservedAndNotFullBlocks.size());
213-
assertEquals(9, reservedAndNotFullBlocks.get(0).getLastValue());
214-
assertEquals(-1, reservedAndNotFullBlocks.get(0).getLastCommitted());
215-
assertEquals(true, reservedAndNotFullBlocks.get(0).isReserved());
234+
assertEquals(0, reservedAndNotFullBlocks.size());
216235
}
217236

218237
private List<TestModel> getObjectsForAccessionsInRange(int startRange, int endRange) {
@@ -221,7 +240,7 @@ private List<TestModel> getObjectsForAccessionsInRange(int startRange, int endRa
221240
}
222241

223242
private BasicAccessioningService<TestModel, String, Long> getAccessioningService(String categoryId,
224-
String instanceId) {
243+
String instanceId) {
225244
return new BasicAccessioningService<>(
226245
getGenerator(categoryId, instanceId),
227246
databaseService,

accession-commons-monotonic-generator-jpa/src/test/java/uk/ac/ebi/ampt2d/commons/accession/core/BasicMonotonicAccessioningWithAlternateRangesTestWithPreFiltering.java

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,28 @@ public void testUnknownCategory() throws AccessionCouldNotBeGeneratedException {
7575
.getOrCreate(getObjectsForAccessionsInRange(1, 10), INSTANCE_ID);
7676
}
7777

78+
@Test
79+
public void testReserveNewBlock() throws AccessionCouldNotBeGeneratedException {
80+
String categoryId = "eva_2";
81+
String instanceId2 = "test-instance_2";
82+
83+
assertEquals(0, getAllUncompletedBlocksForCategoryId(contiguousIdBlockRepository, categoryId).size());
84+
85+
MonotonicAccessionGenerator generator = getGenerator(categoryId, instanceId2);
86+
generator.generateAccessions(1, INSTANCE_ID);
87+
88+
assertEquals(1, getAllUncompletedBlocksForCategoryId(contiguousIdBlockRepository, categoryId).size());
89+
ContiguousIdBlock uncompletedBlock = getAllUncompletedBlocksForCategoryId(contiguousIdBlockRepository, categoryId).get(0);
90+
assertEquals(0l, uncompletedBlock.getFirstValue());
91+
assertEquals(9l, uncompletedBlock.getLastValue());
92+
assertEquals(-1l, uncompletedBlock.getLastCommitted());
93+
94+
MonotonicRangePriorityQueue availableRanges = generator.getAvailableRanges();
95+
assertEquals(1, availableRanges.size());
96+
assertEquals(1l, availableRanges.peek().getStart());
97+
assertEquals(9l, availableRanges.peek().getEnd());
98+
}
99+
78100
@Test
79101
public void testRecoverState() throws AccessionCouldNotBeGeneratedException {
80102
String categoryId = "eva_2";
@@ -104,7 +126,7 @@ public void testRecoverState() throws AccessionCouldNotBeGeneratedException {
104126

105127
// run recover state
106128
MonotonicAccessionGenerator generator = getGenerator(categoryId, instanceId2);
107-
generator.generateAccessions(0, INSTANCE_ID);
129+
generator.generateAccessions(1, INSTANCE_ID);
108130

109131
// As we have already saved accessions in db from 100 to 124, the status should be
110132
// block-1 (100 to 109) : fully complete
@@ -118,7 +140,7 @@ public void testRecoverState() throws AccessionCouldNotBeGeneratedException {
118140

119141
MonotonicRangePriorityQueue availableRanges = generator.getAvailableRanges();
120142
assertEquals(1, availableRanges.size());
121-
assertEquals(125l, availableRanges.peek().getStart());
143+
assertEquals(126l, availableRanges.peek().getStart());
122144
assertEquals(129l, availableRanges.peek().getEnd());
123145
}
124146

@@ -205,14 +227,11 @@ public void testInitializeBlockManagerInMonotonicAccessionGenerator() throws Acc
205227
unreservedAndNotFullBlocks = blockInDBList.stream()
206228
.filter(b -> b.isNotFull() && b.isNotReserved())
207229
.collect(Collectors.toList());
208-
assertEquals(0, unreservedAndNotFullBlocks.size());
230+
assertEquals(1, unreservedAndNotFullBlocks.size());
209231
List<ContiguousIdBlock> reservedAndNotFullBlocks = blockInDBList.stream()
210232
.filter(b -> b.isNotFull() && b.isReserved())
211233
.collect(Collectors.toList());
212-
assertEquals(1, reservedAndNotFullBlocks.size());
213-
assertEquals(9, reservedAndNotFullBlocks.get(0).getLastValue());
214-
assertEquals(-1, reservedAndNotFullBlocks.get(0).getLastCommitted());
215-
assertEquals(true, reservedAndNotFullBlocks.get(0).isReserved());
234+
assertEquals(0, reservedAndNotFullBlocks.size());
216235
}
217236

218237
private List<TestModel> getObjectsForAccessionsInRange(int startRange, int endRange) {
@@ -221,7 +240,7 @@ private List<TestModel> getObjectsForAccessionsInRange(int startRange, int endRa
221240
}
222241

223242
private BasicAccessioningService<TestModel, String, Long> getAccessioningService(String categoryId,
224-
String instanceId) {
243+
String instanceId) {
225244
return new BasicAccessioningService<>(
226245
getGenerator(categoryId, instanceId),
227246
databaseService,

accession-commons-monotonic-generator-jpa/src/test/java/uk/ac/ebi/ampt2d/commons/accession/generators/monotonic/MonotonicAccessionGeneratorTest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ public void assertRecoverNoPendingCommit() throws Exception {
315315
new MonotonicAccessionGenerator(CATEGORY_ID, service, monotonicDBService);
316316
ContiguousIdBlock block = findFirstByCategoryIdAndApplicationInstanceIdOrderByLastValueDesc(CATEGORY_ID);
317317
assertEquals(-1, block.getLastCommitted());
318-
generatorRecovering.generateAccessions(0, INSTANCE_ID);
318+
generatorRecovering.generateAccessions(1, INSTANCE_ID);
319319
assertFalse(generatorRecovering.getAvailableRanges().isEmpty());
320320
}
321321

@@ -331,10 +331,10 @@ public void assertRecoverPendingCommit() throws Exception {
331331
CATEGORY_ID, service, monotonicDBService);
332332
ContiguousIdBlock block = findFirstByCategoryIdAndApplicationInstanceIdOrderByLastValueDesc(CATEGORY_ID);
333333
assertEquals(1, block.getLastCommitted());
334-
generatorRecovering.generateAccessions(0, INSTANCE_ID);
334+
generatorRecovering.generateAccessions(1, INSTANCE_ID);
335335
assertEquals(1, generatorRecovering.getAvailableRanges().size());
336336
MonotonicRange monotonicRange = generatorRecovering.getAvailableRanges().peek();
337-
assertEquals(2, monotonicRange.getStart());
337+
assertEquals(3, monotonicRange.getStart());
338338
assertEquals(BLOCK_SIZE - 1, monotonicRange.getEnd());
339339
}
340340

@@ -484,9 +484,9 @@ public void testInitializeBlockManager() throws AccessionCouldNotBeGeneratedExce
484484
MonotonicAccessionGenerator generator1 = new MonotonicAccessionGenerator(CATEGORY_ID_2, service, monotonicDBService);
485485
assertEquals(0, generator1.getAvailableRanges().size());
486486
// its recover state reserves the UnCompleted block
487-
generator1.generateAccessions(0, INSTANCE_ID);
487+
generator1.generateAccessions(1, INSTANCE_ID);
488488
assertEquals(1, generator1.getAvailableRanges().size());
489-
assertEquals(new MonotonicRange(0, 9), generator1.getAvailableRanges().peek());
489+
assertEquals(new MonotonicRange(1, 9), generator1.getAvailableRanges().peek());
490490

491491
// Block is currently reserved by Generator-1
492492
blockInDBList = findAllByCategoryIdAndApplicationInstanceIdOrderByLastValueAsc(CATEGORY_ID_2);
@@ -508,9 +508,9 @@ public void testInitializeBlockManager() throws AccessionCouldNotBeGeneratedExce
508508
// Generator-3 can reserve the same Uncompleted block, once Generator-1 releases it
509509
generator1.shutDownAccessionGenerator();
510510
MonotonicAccessionGenerator generator3 = new MonotonicAccessionGenerator(CATEGORY_ID_2, service, monotonicDBService);
511-
generator3.generateAccessions(0, INSTANCE_ID);
511+
generator3.generateAccessions(1, INSTANCE_ID);
512512
assertEquals(1, generator3.getAvailableRanges().size());
513-
assertEquals(new MonotonicRange(0, 9), generator3.getAvailableRanges().peek());
513+
assertEquals(new MonotonicRange(1, 9), generator3.getAvailableRanges().peek());
514514
}
515515

516516
@Test

0 commit comments

Comments
 (0)