Skip to content

Commit beb89fe

Browse files
author
Yang Guo
committed
refactor on priority queue item with timestamp
1 parent 1d24f3a commit beb89fe

File tree

7 files changed

+74
-24
lines changed

7 files changed

+74
-24
lines changed

platform-api/src/main/java/com/flow/platform/api/config/QueueConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
@Configurable
3232
public class QueueConfig {
3333

34+
public final static long DEFAULT_CMD_CALLBACK_QUEUE_PRIORITY = 1L;
35+
3436
@Autowired
3537
private ThreadPoolTaskExecutor taskExecutor;
3638

platform-api/src/main/java/com/flow/platform/api/controller/CmdWebhookController.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.flow.platform.api.controller;
1818

19+
import com.flow.platform.api.config.QueueConfig;
1920
import com.flow.platform.api.domain.CmdCallbackQueueItem;
2021
import com.flow.platform.api.service.job.JobService;
2122
import com.flow.platform.core.exception.IllegalParameterException;
@@ -64,7 +65,7 @@ public void execute(@RequestBody Cmd cmd, @RequestParam String identifier) {
6465
cmd.getStatus(),
6566
cmd.getId(),
6667
cmd.getCmdResult());
67-
jobService.enqueue(new CmdCallbackQueueItem(jobId, cmd), System.nanoTime());
68+
jobService.enqueue(new CmdCallbackQueueItem(jobId, cmd), QueueConfig.DEFAULT_CMD_CALLBACK_QUEUE_PRIORITY);
6869
} catch (NumberFormatException warn) {
6970
LOGGER.warn("Invalid job id format");
7071
}

platform-control-center/src/main/java/com/flow/platform/cc/consumer/CmdQueueConsumer.java

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -98,27 +98,22 @@ public void onQueueItem(PriorityMessage message) {
9898
cmd.setRetry(retry);
9999
cmdService.save(cmd);
100100

101-
// re-enqueue
102-
resend(cmd.getId(), retry);
101+
// do not retry
102+
if (retry <= 0) {
103+
return;
104+
}
105+
106+
// retry the message
107+
retry(message);
103108

104109
} catch (Throwable e) {
105110
LOGGER.error("Unexpected exception", e);
106111
}
107112
}
108113

109-
/**
110-
* Re-enqueue cmd and return num of retry
111-
*/
112-
private void resend(final String cmdId, final int retry) {
113-
if (retry <= 0) {
114-
return;
115-
}
116-
117-
ThreadUtil.sleep(RETRY_WAIT_TIME);
118-
119-
// reset cmd status
120-
PriorityMessage message = PriorityMessage.create(cmdId.getBytes(), QueueConfig.MAX_PRIORITY);
114+
private void retry(final PriorityMessage message) {
115+
message.setPriority(QueueConfig.MAX_PRIORITY);
121116
cmdQueue.enqueue(message);
122-
LOGGER.trace("Re-enqueue item %s", cmdId);
117+
ThreadUtil.sleep(RETRY_WAIT_TIME);
123118
}
124119
}

platform-core/src/main/java/com/flow/platform/core/queue/PriorityMessage.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,27 +27,44 @@ public class PriorityMessage extends Message implements PriorityQueueItem {
2727

2828
private Long priority;
2929

30+
private Long timestamp;
31+
3032
public static PriorityMessage create(byte[] content, long priority) {
3133
MessageProperties properties = new MessageProperties();
3234
return new PriorityMessage(content, properties, priority);
3335
}
3436

3537
public PriorityMessage(Message message) {
3638
super(message.getBody(), message.getMessageProperties());
39+
this.timestamp = System.nanoTime();
3740
}
3841

3942
public PriorityMessage(byte[] body, MessageProperties messageProperties, long priority) {
4043
super(body, messageProperties);
4144
this.priority = priority;
45+
this.timestamp = System.currentTimeMillis();
4246
}
4347

4448
@Override
4549
public Long getPriority() {
4650
return this.priority;
4751
}
4852

53+
public void setPriority(long priority) {
54+
this.priority = priority;
55+
}
56+
57+
@Override
58+
public Long getTimestamp() {
59+
return this.timestamp;
60+
}
61+
62+
public void setTimestamp(long timestamp) {
63+
this.timestamp = timestamp;
64+
}
65+
4966
@Override
5067
public int compareTo(PriorityQueueItem o) {
51-
return o.getPriority().compareTo(getPriority());
68+
return COMPARATOR.compare(this, o);
5269
}
5370
}

platform-core/src/test/java/com/flow/platform/core/test/PlatformQueueTest.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.flow.platform.core.test;
1818

1919
import com.flow.platform.core.queue.PriorityMessage;
20+
import com.flow.platform.core.util.ThreadUtil;
2021
import com.flow.platform.queue.PlatformQueue;
2122
import com.flow.platform.queue.QueueListener;
2223
import com.flow.platform.util.ObjectWrapper;
@@ -87,7 +88,7 @@ public void should_enqueue_for_in_memory_queue() throws Throwable {
8788
@Test
8889
public void should_enqueue_with_priority_in_memory_queue() throws Throwable {
8990
// given: queue listener
90-
final int size = 2;
91+
final int size = 4;
9192
CountDownLatch latch = new CountDownLatch(size);
9293
List<String> prioritizedList = new ArrayList<>(size);
9394

@@ -98,17 +99,24 @@ public void should_enqueue_with_priority_in_memory_queue() throws Throwable {
9899

99100
// when:
100101
inMemoryQueue.register(listener);
101-
inMemoryQueue.enqueue(PriorityMessage.create("1".getBytes(), 1)); // should be second
102-
inMemoryQueue.enqueue(PriorityMessage.create("2".getBytes(), 10)); // should be first
102+
inMemoryQueue.enqueue(PriorityMessage.create("1".getBytes(), 1));
103+
ThreadUtil.sleep(1);
104+
inMemoryQueue.enqueue(PriorityMessage.create("2".getBytes(), 1));
105+
ThreadUtil.sleep(1);
106+
inMemoryQueue.enqueue(PriorityMessage.create("3".getBytes(), 10));
107+
ThreadUtil.sleep(1);
108+
inMemoryQueue.enqueue(PriorityMessage.create("4".getBytes(), 10));
103109
inMemoryQueue.start();
104110

105111
// then:
106112
boolean await = latch.await(60, TimeUnit.SECONDS);
107113
Assert.assertTrue(await);
108114
Assert.assertEquals(size, prioritizedList.size());
109115

110-
Assert.assertEquals("2", prioritizedList.get(0));
111-
Assert.assertEquals("1", prioritizedList.get(1));
116+
Assert.assertEquals("4", prioritizedList.get(0));
117+
Assert.assertEquals("3", prioritizedList.get(1));
118+
Assert.assertEquals("2", prioritizedList.get(2));
119+
Assert.assertEquals("1", prioritizedList.get(3));
112120
}
113121

114122
@Test

platform-queue/src/main/java/com/flow/platform/queue/DefaultQueueMessage.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,23 +25,31 @@ public class DefaultQueueMessage implements PriorityQueueItem {
2525

2626
private Long priority;
2727

28+
private Long timestamp;
29+
2830
public DefaultQueueMessage(byte[] body, Long priority) {
2931
this.body = body;
3032
this.priority = priority;
33+
this.timestamp = System.currentTimeMillis();
3134
}
3235

3336
@Override
3437
public Long getPriority() {
3538
return priority;
3639
}
3740

41+
@Override
42+
public Long getTimestamp() {
43+
return timestamp;
44+
}
45+
3846
@Override
3947
public byte[] getBody() {
40-
return new byte[0];
48+
return body;
4149
}
4250

4351
@Override
4452
public int compareTo(PriorityQueueItem o) {
45-
return o.getPriority().compareTo(getPriority());
53+
return COMPARATOR.compare(this, o);
4654
}
4755
}

platform-queue/src/main/java/com/flow/platform/queue/PriorityQueueItem.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,31 @@
1616

1717
package com.flow.platform.queue;
1818

19+
import java.util.Comparator;
20+
import java.util.Objects;
21+
1922
/**
2023
* @author yang
2124
*/
2225
public interface PriorityQueueItem extends Comparable<PriorityQueueItem> {
2326

27+
ItemComparator COMPARATOR = new ItemComparator();
28+
2429
Long getPriority();
2530

31+
Long getTimestamp();
32+
2633
byte[] getBody();
34+
35+
class ItemComparator implements Comparator<PriorityQueueItem> {
36+
37+
@Override
38+
public int compare(PriorityQueueItem o1, PriorityQueueItem o2) {
39+
if (Objects.equals(o1.getPriority(), o2.getPriority())) {
40+
return o2.getTimestamp().compareTo(o1.getTimestamp());
41+
}
42+
43+
return o2.getPriority().compareTo(o1.getPriority());
44+
}
45+
}
2746
}

0 commit comments

Comments
 (0)