Skip to content

Commit e4b39f8

Browse files
author
Yang Guo
committed
set default queue priority and range
1 parent 4a3c98c commit e4b39f8

File tree

4 files changed

+24
-8
lines changed

4 files changed

+24
-8
lines changed

platform-control-center/src/main/java/com/flow/platform/cc/config/QueueConfig.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.flow.platform.core.queue.PriorityMessage;
2222
import com.flow.platform.core.queue.RabbitQueue;
2323
import com.flow.platform.util.Logger;
24+
import com.google.common.collect.Range;
2425
import javax.annotation.PostConstruct;
2526
import org.springframework.beans.factory.annotation.Autowired;
2627
import org.springframework.beans.factory.annotation.Value;
@@ -36,9 +37,22 @@
3637
@Configuration
3738
public class QueueConfig {
3839

39-
public final static int CMD_QUEUE_MAX_LENGTH = 100;
40+
public final static int QUEUE_MAX_LENGTH = 100;
4041

41-
public final static int CMD_QUEUE_DEFAULT_PRIORITY = 10;
42+
/**
43+
* Default queue priority
44+
*/
45+
public final static int DEFAULT_PRIORITY = 1;
46+
47+
/**
48+
* The max queue priority for special case like retry
49+
*/
50+
public final static int MAX_PRIORITY = 100;
51+
52+
/**
53+
* The priority range for queue
54+
*/
55+
public final static Range PRIORITY_RANGE = Range.closed(1, 10);
4256

4357
public final static String PROP_CMD_QUEUE_RETRY = "queue.cmd.retry.enable";
4458

@@ -96,18 +110,18 @@ public void init() {
96110
public PlatformQueue<PriorityMessage> cmdQueue() {
97111
if (cmdQueueRabbitEnable) {
98112
LOGGER.trace("Apply RabbitMQ for cmd queue");
99-
return new RabbitQueue(taskExecutor, host, CMD_QUEUE_MAX_LENGTH, CMD_QUEUE_DEFAULT_PRIORITY, cmdQueueName);
113+
return new RabbitQueue(taskExecutor, host, QUEUE_MAX_LENGTH, DEFAULT_PRIORITY, cmdQueueName);
100114
}
101115

102116
LOGGER.trace("Apply in memory queue for cmd queue");
103-
return new InMemoryQueue<>(taskExecutor, CMD_QUEUE_MAX_LENGTH, "CmdQueue");
117+
return new InMemoryQueue<>(taskExecutor, QUEUE_MAX_LENGTH, "CmdQueue");
104118
}
105119

106120
/**
107121
* Queue to handle cmd status update
108122
*/
109123
@Bean
110124
public PlatformQueue<PriorityMessage> cmdStatusQueue() {
111-
return new InMemoryQueue<>(taskExecutor, 100, "CmdStatusQueue");
125+
return new InMemoryQueue<>(taskExecutor, QUEUE_MAX_LENGTH, "CmdStatusQueue");
112126
}
113127
}

platform-control-center/src/main/java/com/flow/platform/cc/consumer/CmdQueueConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ private void resend(final CmdQueueItem item, final int retry) {
161161
}
162162

163163
// reset cmd status
164-
PriorityMessage message = PriorityMessage.create(item.toBytes(), QueueConfig.CMD_QUEUE_DEFAULT_PRIORITY);
164+
PriorityMessage message = PriorityMessage.create(item.toBytes(), QueueConfig.MAX_PRIORITY);
165165
cmdQueue.enqueue(message);
166166
LOGGER.trace("Re-enqueue item %s", item);
167167
}

platform-control-center/src/main/java/com/flow/platform/cc/controller/CmdController.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.flow.platform.cc.controller;
1818

19+
import com.flow.platform.cc.config.QueueConfig;
1920
import com.flow.platform.cc.domain.CmdStatusItem;
2021
import com.flow.platform.cc.service.CmdDispatchService;
2122
import com.flow.platform.cc.service.CmdService;
@@ -73,7 +74,7 @@ public Cmd sendCommand(@RequestBody CmdInfo cmd) {
7374

7475
@PostMapping(path = "/queue/send")
7576
public Cmd sendCommandToQueue(@RequestBody CmdInfo cmd, @RequestParam int priority, @RequestParam int retry) {
76-
if (!Range.closed(1, 10).contains(priority)) {
77+
if (!QueueConfig.PRIORITY_RANGE.contains(priority)) {
7778
throw new IllegalParameterException("Illegal priority value should between (1 - 10)");
7879
}
7980

platform-control-center/src/main/java/com/flow/platform/cc/service/CmdServiceImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static com.flow.platform.domain.CmdType.STOP;
2525

2626
import com.flow.platform.cc.config.AppConfig;
27+
import com.flow.platform.cc.config.QueueConfig;
2728
import com.flow.platform.cc.dao.AgentDao;
2829
import com.flow.platform.cc.dao.CmdDao;
2930
import com.flow.platform.cc.dao.CmdLogDao;
@@ -216,7 +217,7 @@ public Cmd enqueue(CmdInfo cmdInfo, int priority, int retry) {
216217
public void updateStatus(CmdStatusItem statusItem, boolean inQueue) {
217218
if (inQueue) {
218219
LOGGER.trace("Report cmd status from queue: %s", statusItem.getCmdId());
219-
cmdStatusQueue.enqueue(PriorityMessage.create(statusItem.toBytes(), 1));
220+
cmdStatusQueue.enqueue(PriorityMessage.create(statusItem.toBytes(), QueueConfig.DEFAULT_PRIORITY));
220221
return;
221222
}
222223

0 commit comments

Comments
 (0)