Skip to content

Commit fe33f00

Browse files
authored
Merge pull request #84 from iExecBlockchainComputing/feature/rework-queue-service-with-priority-queue
Feature/rework queue service with priority queue
2 parents 0343024 + c93331d commit fe33f00

File tree

3 files changed

+151
-181
lines changed

3 files changed

+151
-181
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ All notable changes to this project will be documented in this file.
66

77
### New Features
88
- Enable Prometheus actuator. (#79)
9+
- Rework `QueueService` with a thread pool based on a `PriorityBlockingQueue`. (#84)
910
### Bug Fixes
1011
- Fix security rule to access Swagger API. (#79)
1112
### Dependency Upgrades
Lines changed: 49 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -1,96 +1,54 @@
11
package com.iexec.blockchain.tool;
22

3-
import lombok.extern.slf4j.Slf4j;
4-
import org.springframework.scheduling.annotation.Scheduled;
3+
import lombok.EqualsAndHashCode;
54
import org.springframework.stereotype.Service;
65

7-
import java.util.Objects;
8-
import java.util.concurrent.CompletableFuture;
9-
import java.util.concurrent.ExecutorService;
10-
import java.util.concurrent.Executors;
11-
import java.util.concurrent.PriorityBlockingQueue;
6+
import javax.validation.constraints.NotNull;
7+
import java.util.concurrent.*;
128

139
/**
14-
* Execute {@link Runnable}s as they arrive.
15-
* Some {@link Runnable}s can have higher priority than others, so they can be treated before the others.
10+
* Execute {@link Runnable}s as they are submitted.
11+
* <p>
12+
* The thread pool uses a {@link java.util.concurrent.PriorityBlockingQueue} to execute tasks depending on a priority order.
13+
* The {@code ThreadPoolExecutor#newTaskFor} method is overridden to wrap the submitted {@link java.lang.Runnable} in a
14+
* {@link TaskWithPriority<Runnable>} where it will be cast back to a {@link BlockchainAction}.
15+
* <p>
16+
* The {@link TaskWithPriority} is a {@link java.lang.Comparable} with a deferred call to {@link BlockchainAction#compareTo(BlockchainAction)}.
17+
* This enables the thread pool to retrieve tasks from the queue depending on the implemented priority rule.
18+
* The priority rule is a simple {@code boolean} flag in {@link BlockchainAction}.
19+
* Tasks with a priority flag defined as {@literal true}, then a lower creation timestamp are sorted first.
1620
*/
17-
@Slf4j
1821
@Service
1922
public class QueueService {
20-
private final PriorityBlockingQueue<BlockchainAction> queue = new PriorityBlockingQueue<>();
21-
private final ExecutorService executorService;
22-
private CompletableFuture<Void> actionExecutor;
23+
private final PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<>();
24+
private final ThreadPoolExecutor executorService;
2325

2426
public QueueService() {
25-
executorService = Executors.newFixedThreadPool(1);
26-
}
27-
28-
/**
29-
* Scheduled method execution.
30-
* If the {@link QueueService#actionExecutor} isn't running, start a new thread.
31-
* Otherwise, do nothing.
32-
*/
33-
@Scheduled(fixedRate = 30000)
34-
private void startAsyncActionsExecution() {
35-
if (actionExecutor != null && !actionExecutor.isDone()) {
36-
return;
37-
}
38-
39-
actionExecutor = CompletableFuture.runAsync(this::executeActions, executorService);
40-
}
41-
42-
/**
43-
* Execute {@link Runnable}s as they come.
44-
* At each occurrence, execute the first action in the queue or wait for a new one to appear.
45-
* The first action is the action with the highest priority that is in the queue for the longer time.
46-
*/
47-
void executeActions() {
48-
while (Thread.currentThread().isAlive() && !Thread.currentThread().isInterrupted()) {
49-
executeFirstAction();
50-
}
51-
}
52-
53-
/**
54-
* Execute the first action in the queue or wait for a new one to appear.
55-
* The first action is the action with the highest priority that is in the queue for the longer time.
56-
*/
57-
private void executeFirstAction() {
58-
try {
59-
// Wait until a new action is available.
60-
Runnable runnable = queue.take().getRunnable();
61-
// Execute an action and wait for its completion.
62-
runnable.run();
63-
} catch (InterruptedException e) {
64-
log.error("Action thread got interrupted.", e);
65-
Thread.currentThread().interrupt();
66-
} catch (RuntimeException e) {
67-
log.error("An error occurred while executing an action.", e);
68-
}
27+
executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, queue) {
28+
@Override
29+
protected <T> RunnableFuture<T> newTaskFor(@NotNull Runnable runnable, T value) {
30+
return new TaskWithPriority<>(runnable);
31+
}
32+
};
6933
}
7034

7135
/**
72-
* Add a {@link Runnable} to the low priority or the high priority queue, depending on {@code priority}.
73-
* If it's not priority, it will be executed once:
74-
* <ul>
75-
* <li>All low priority {@link Runnable}s inserted before this one have completed;</li>
76-
* <li>All high priority {@link Runnable}s inserted before the execution of this one have completed.</li>
77-
* </ul>
36+
* Submit a {@link Runnable} to the thread pool.
7837
*
79-
* If it's priority, it will be executed once
80-
* all high priority {@link Runnable}s inserted before this one have completed.
81-
*
82-
* @param runnable {@link Runnable} to execute.
83-
* @param priority Whether this {@link Runnable} is priority.
38+
* @param runnable {@link Runnable} to submit to the queue.
39+
* @param priority Whether this {@link Runnable} has a high ({@literal true}) or low ({@literal false}) priority.
40+
* @return A Future representing pending completion of the runnable.
8441
*/
85-
public void addExecutionToQueue(Runnable runnable, boolean priority) {
86-
queue.add(new BlockchainAction(runnable, priority));
42+
public Future<Void> addExecutionToQueue(Runnable runnable, boolean priority) {
43+
return executorService.submit(new BlockchainAction(runnable, priority), null);
8744
}
8845

8946
/**
90-
* Represent an action that could wait in a {@link java.util.Queue}.
47+
* Represent an action submitted to the {@link java.util.concurrent.PriorityBlockingQueue}.
9148
* It contains its timestamp creation, its priority and its {@link Runnable}.
9249
*/
93-
private static class BlockchainAction implements Comparable<BlockchainAction> {
50+
@EqualsAndHashCode
51+
static class BlockchainAction implements Comparable<BlockchainAction>, Runnable {
9452
private final Runnable runnable;
9553
private final boolean priority;
9654
private final long time;
@@ -101,15 +59,13 @@ public BlockchainAction(Runnable runnable, boolean priority) {
10159
this.time = System.nanoTime();
10260
}
10361

104-
public Runnable getRunnable() {
105-
return runnable;
62+
@Override
63+
public void run() {
64+
runnable.run();
10665
}
10766

10867
@Override
109-
public int compareTo(BlockchainAction other) {
110-
if (other == null) {
111-
return -1;
112-
}
68+
public int compareTo(@NotNull BlockchainAction other) {
11369
if (this.priority && !other.priority) {
11470
return -1;
11571
}
@@ -118,19 +74,26 @@ public int compareTo(BlockchainAction other) {
11874
}
11975
return Long.compare(this.time, other.time);
12076
}
77+
}
12178

122-
@Override
123-
public boolean equals(Object o) {
124-
if (this == o) return true;
125-
if (o == null || getClass() != o.getClass()) return false;
126-
BlockchainAction that = (BlockchainAction) o;
127-
return priority == that.priority && time == that.time && Objects.equals(runnable, that.runnable);
79+
/**
80+
* Wrap a {@link BlockchainAction} for a thread pool with a {@code PriorityBlockingQueue<Runnable>}.
81+
* <p>
82+
* These class instances are {@code Comparable} and defer the comparison to {@link BlockchainAction#compareTo(BlockchainAction)}.
83+
*/
84+
@EqualsAndHashCode(callSuper = true)
85+
static class TaskWithPriority<T> extends FutureTask<T> implements Comparable<TaskWithPriority<?>> {
86+
private final BlockchainAction action;
87+
TaskWithPriority(Runnable task) {
88+
super(task, null);
89+
this.action = (BlockchainAction) task;
12890
}
12991

13092
@Override
131-
public int hashCode() {
132-
return Objects.hash(runnable, priority, time);
93+
public int compareTo(@NotNull TaskWithPriority other) {
94+
return this.action.compareTo(other.action);
13395
}
13496
}
97+
13598
}
13699

0 commit comments

Comments
 (0)