Skip to content

Commit 8094f3a

Browse files
authored
Merge pull request #98 from apriltuesday/EVA-4056
EVA-4056: Add trace logging and release unused accessions when prefiltering
2 parents 102bfbd + fec067e commit 8094f3a

File tree

3 files changed

+49
-4
lines changed

3 files changed

+49
-4
lines changed

accession-commons-core/src/main/java/uk/ac/ebi/ampt2d/commons/accession/core/BasicAccessioningService.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
import java.io.Serializable;
3535
import java.util.ArrayList;
36+
import java.util.Collections;
3637
import java.util.List;
3738
import java.util.Map;
3839
import java.util.Set;
@@ -94,6 +95,8 @@ private Map<HASH, MODEL> mapHashOfMessages(List<? extends MODEL> messages) {
9495
* See {@link #getPreexistingAccessions(List)} } for more details.
9596
*/
9697
private List<GetOrCreateAccessionWrapper<MODEL, HASH, ACCESSION>> saveAccessions(List<AccessionWrapper<MODEL, HASH, ACCESSION>> accessions) {
98+
logger.trace("Accessions to save: {}", accessions.stream().map(AccessionWrapper::getAccession).collect(
99+
Collectors.toList()));
97100
switch (this.accessionSaveMode) {
98101
case PREFILTER_EXISTING:
99102
return saveAccessionsPrefilteringExisting(accessions);
@@ -130,6 +133,16 @@ private List<GetOrCreateAccessionWrapper<MODEL, HASH, ACCESSION>> saveAccessions
130133
List<AccessionWrapper<MODEL, HASH, ACCESSION>> preexistingAccessions = dbService.findAllByHash(allHashes);
131134
Set<HASH> preexistingHashes = preexistingAccessions.stream().map(AccessionWrapper::getHash).collect(Collectors.toSet());
132135

136+
// release accessions associated with pre-existing hashes
137+
Set<ACCESSION> accessionsToRelease = accessions
138+
.stream()
139+
.filter(accession -> preexistingHashes.contains(accession.getHash()))
140+
.map(AccessionWrapper::getAccession)
141+
.collect(Collectors.toSet());
142+
SaveResponse<ACCESSION> response = new SaveResponse<>(Collections.emptySet(), accessionsToRelease);
143+
accessionGenerator.postSave(response);
144+
145+
// save rest of the accessions
133146
List<AccessionWrapper<MODEL, HASH, ACCESSION>> accessionsToSave = accessions.stream()
134147
.filter(accession -> !preexistingHashes.contains(accession.getHash()))
135148
.collect(Collectors.toList());

accession-commons-monotonic-generator-jpa/src/main/java/uk/ac/ebi/ampt2d/commons/accession/generators/monotonic/BlockManager.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package uk.ac.ebi.ampt2d.commons.accession.generators.monotonic;
1919

20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
2022
import org.springframework.data.util.Pair;
2123
import uk.ac.ebi.ampt2d.commons.accession.core.exceptions.AccessionCouldNotBeGeneratedException;
2224
import uk.ac.ebi.ampt2d.commons.accession.core.exceptions.AccessionIsNotPendingException;
@@ -36,6 +38,8 @@
3638
*/
3739
class BlockManager {
3840

41+
private static final Logger logger = LoggerFactory.getLogger(BlockManager.class);
42+
3943
private final PriorityQueue<ContiguousIdBlock> assignedBlocks;
4044

4145
private final MonotonicRangePriorityQueue availableRanges;
@@ -52,6 +56,7 @@ public BlockManager() {
5256
}
5357

5458
public void addBlock(ContiguousIdBlock block) {
59+
logger.trace("Adding block: {}", block);
5560
assignedBlocks.add(block);
5661
availableRanges.add(new MonotonicRange(block.getLastCommitted() + 1, block.getLastValue()));
5762
}
@@ -67,12 +72,14 @@ public MonotonicRangePriorityQueue getAvailableRanges() {
6772
* @return Array of monotonically increasing IDs
6873
*/
6974
public long[] pollNext(int maxValues) throws AccessionCouldNotBeGeneratedException {
75+
logger.trace("Polling for {} values", maxValues);
7076
if (!hasAvailableAccessions(maxValues)) {
7177
throw new AccessionCouldNotBeGeneratedException("Block manager doesn't have " + maxValues + " values available.");
7278
}
7379
MonotonicRange monotonicRange = pollNextMonotonicRange(maxValues);
7480
long[] ids = monotonicRange.getIds();
7581
generatedAccessions.addAll(LongStream.of(ids).boxed().collect(Collectors.toList()));
82+
logger.trace("Generated accessions: {}", ids);
7683
return ids;
7784
}
7885

@@ -98,6 +105,7 @@ public boolean hasAvailableAccessions(int accessionsNeeded) {
98105
}
99106

100107
public Set<ContiguousIdBlock> commit(long[] accessions) throws AccessionIsNotPendingException {
108+
logger.trace("Inside commit for accessions: {}", accessions);
101109
assertAccessionsArePending(accessions);
102110
return doCommit(accessions);
103111
}
@@ -119,17 +127,31 @@ private Set<ContiguousIdBlock> doCommit(long[] accessions) {
119127
addToCommitted(accessions);
120128

121129
ContiguousIdBlock block = assignedBlocks.peek();
122-
while (block != null && committedAccessions.peek() != null &&
123-
committedAccessions.peek() == block.getLastCommitted() + 1) {
124-
//Next value continues sequence, change last committed value
130+
logger.trace("Trying to commit within block: {}", block);
131+
while (true) {
132+
if (block == null) {
133+
logger.trace("No more blocks");
134+
break;
135+
} else if (committedAccessions.peek() == null) {
136+
logger.trace("No more accessions to commit");
137+
break;
138+
} else if (committedAccessions.peek() != block.getLastCommitted() + 1) {
139+
logger.trace("Next accession to commit is not in sequence: {} != {} + 1",
140+
committedAccessions.peek(), block.getLastCommitted());
141+
break;
142+
}
143+
// Next value continues sequence, change last committed value
144+
logger.trace("Setting last committed to {}", committedAccessions.peek());
125145
block.setLastCommitted(committedAccessions.poll());
126146
blocksToUpdate.add(block);
127147
if (!block.isNotFull()) {
128148
assignedBlocks.poll();
129149
block = assignedBlocks.peek();
150+
logger.trace("Trying to commit within block: {}", block);
130151
}
131152
}
132153

154+
logger.trace("Blocks to update: {}", blocksToUpdate);
133155
return blocksToUpdate;
134156
}
135157

@@ -141,6 +163,7 @@ private void addToCommitted(long[] accessions) {
141163
}
142164

143165
public void release(long[] accessions) throws AccessionIsNotPendingException {
166+
logger.trace("Inside release for accessions: {}", accessions);
144167
assertAccessionsArePending(accessions);
145168
doRelease(accessions);
146169
}
@@ -158,6 +181,7 @@ private void doRelease(long[] accessions) {
158181
* @throws AccessionIsNotPendingException When the generated accession does not match with the accession to commit
159182
*/
160183
public Set<ContiguousIdBlock> recoverState(long[] committedElements) throws AccessionIsNotPendingException {
184+
logger.trace("Inside recoverState for accessions: {}", committedElements);
161185
List<MonotonicRange> ranges = MonotonicRange.convertToMonotonicRanges(committedElements);
162186
List<MonotonicRange> newAvailableRanges = new ArrayList<>();
163187
for (MonotonicRange monotonicRange : this.availableRanges) {

accession-commons-monotonic-generator-jpa/src/main/java/uk/ac/ebi/ampt2d/commons/accession/generators/monotonic/MonotonicAccessionGenerator.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package uk.ac.ebi.ampt2d.commons.accession.generators.monotonic;
1919

20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
2022
import uk.ac.ebi.ampt2d.commons.accession.block.initialization.BlockInitializationException;
2123
import uk.ac.ebi.ampt2d.commons.accession.core.exceptions.AccessionCouldNotBeGeneratedException;
2224
import uk.ac.ebi.ampt2d.commons.accession.core.exceptions.AccessionGeneratorShutDownException;
@@ -43,6 +45,8 @@
4345
*/
4446
public class MonotonicAccessionGenerator<MODEL> implements AccessionGenerator<MODEL, Long> {
4547

48+
private static final Logger logger = LoggerFactory.getLogger(MonotonicAccessionGenerator.class);
49+
4650
private final BlockManager blockManager;
4751
private final String categoryId;
4852
private final ContiguousIdBlockService blockService;
@@ -68,6 +72,7 @@ private static void assertBlockParametersAreInitialized(ContiguousIdBlockService
6872
}
6973

7074
private boolean recoverAndReserveUncompletedBlock(String applicationInstanceId) {
75+
logger.trace("Reserving incomplete block");
7176
if (monotonicDatabaseService != null) {
7277
ContiguousIdBlock uncompletedBlock = blockService
7378
.reserveFirstUncompletedBlockForCategoryIdAndApplicationInstanceId(categoryId, applicationInstanceId);
@@ -100,6 +105,7 @@ private void recoverStateForElements(long[] committedElements) throws AccessionI
100105
public synchronized long[] generateAccessions(int numAccessionsToGenerate, String applicationInstanceId)
101106
throws AccessionCouldNotBeGeneratedException {
102107
checkAccessionGeneratorNotShutDown();
108+
logger.trace("Generating {} accessions for application ID {}", numAccessionsToGenerate, applicationInstanceId);
103109
long[] accessions = new long[numAccessionsToGenerate];
104110
reserveBlocksUntilSizeIs(numAccessionsToGenerate, applicationInstanceId);
105111

@@ -128,6 +134,7 @@ private synchronized void reserveBlocksUntilSizeIs(int totalAccessionsToGenerate
128134
}
129135

130136
private synchronized void reserveBlock(String categoryId, String instanceId) {
137+
logger.trace("Inside reserveBlock");
131138
if (UNCOMPLETED_BLOCKS_AVAILABLE) {
132139
boolean reservedUncompleted = recoverAndReserveUncompletedBlock(instanceId);
133140
if (!reservedUncompleted) {
@@ -140,6 +147,7 @@ private synchronized void reserveBlock(String categoryId, String instanceId) {
140147

141148

142149
private synchronized void reserveNewBlock(String categoryId, String instanceId) {
150+
logger.trace("Reserving new block");
143151
blockManager.addBlock(blockService.reserveNewBlock(categoryId, instanceId));
144152
}
145153

@@ -183,7 +191,7 @@ public synchronized void postSave(SaveResponse<Long> response) {
183191
public void shutDownAccessionGenerator() {
184192
List<ContiguousIdBlock> blockList = blockManager.getAssignedBlocks();
185193
blockList.stream().forEach(block -> block.releaseReserved());
186-
blockService.save(blockList);
194+
ExponentialBackOff.execute(() -> blockService.save(blockList), 10, 30);
187195
blockManager.shutDownBlockManager();
188196
SHUTDOWN = true;
189197
}

0 commit comments

Comments
 (0)