Skip to content

Commit 642791e

Browse files
author
Yang Guo
committed
apply priority message for queue item
1 parent e7e32a3 commit 642791e

File tree

21 files changed

+128
-204
lines changed

21 files changed

+128
-204
lines changed

platform-agent/src/main/java/com/flow/platform/agent/CmdManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,7 @@ public synchronized void kill() {
258258
for (Map.Entry<Cmd, CmdResult> entry : running.entrySet()) {
259259
CmdResult r = entry.getValue();
260260
Cmd cmd = entry.getKey();
261+
finished.put(cmd, r);
261262

262263
r.getProcess().destroy();
263264

platform-api/src/main/java/com/flow/platform/api/config/AppConfig.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.flow.platform.core.config.DatabaseConfig;
2424
import com.flow.platform.core.queue.InMemoryQueue;
2525
import com.flow.platform.core.queue.PlatformQueue;
26+
import com.flow.platform.core.queue.PriorityMessage;
2627
import com.flow.platform.core.util.ThreadUtil;
2728
import com.flow.platform.util.Logger;
2829
import java.io.IOException;
@@ -34,15 +35,12 @@
3435
import org.apache.velocity.app.VelocityEngine;
3536
import org.apache.velocity.runtime.RuntimeConstants;
3637
import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
37-
import org.springframework.beans.factory.annotation.Autowired;
3838
import org.springframework.beans.factory.annotation.Value;
3939
import org.springframework.context.annotation.Bean;
4040
import org.springframework.context.annotation.Configuration;
4141
import org.springframework.context.annotation.Import;
4242
import org.springframework.context.event.ApplicationEventMulticaster;
4343
import org.springframework.context.event.SimpleApplicationEventMulticaster;
44-
import org.springframework.scheduling.TaskScheduler;
45-
import org.springframework.scheduling.annotation.EnableScheduling;
4644
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
4745

4846
/**
@@ -136,7 +134,7 @@ public ThreadPoolTaskExecutor taskExecutor() {
136134
* Queue to process cmd callback task
137135
*/
138136
@Bean
139-
public PlatformQueue<CmdCallbackQueueItem> cmdCallbackQueue() {
137+
public PlatformQueue<PriorityMessage> cmdCallbackQueue() {
140138
return new InMemoryQueue<>(executor, 50, "CmdCallbackQueue");
141139
}
142140

platform-api/src/main/java/com/flow/platform/api/consumer/CmdCallbackQueueConsumer.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.flow.platform.api.service.job.JobService;
2121
import com.flow.platform.core.exception.NotFoundException;
2222
import com.flow.platform.core.queue.PlatformQueue;
23+
import com.flow.platform.core.queue.PriorityMessage;
2324
import com.flow.platform.core.queue.QueueListener;
2425
import com.flow.platform.util.Logger;
2526
import javax.annotation.PostConstruct;
@@ -30,12 +31,12 @@
3031
* @author yh@firim
3132
*/
3233
@Component
33-
public class CmdCallbackQueueConsumer implements QueueListener<CmdCallbackQueueItem> {
34+
public class CmdCallbackQueueConsumer implements QueueListener<PriorityMessage> {
3435

3536
private final static Logger LOGGER = new Logger(CmdCallbackQueueConsumer.class);
3637

3738
@Autowired
38-
private PlatformQueue<CmdCallbackQueueItem> cmdCallbackQueue;
39+
private PlatformQueue<PriorityMessage> cmdCallbackQueue;
3940

4041
@Autowired
4142
private JobService jobService;
@@ -46,10 +47,13 @@ public void init() {
4647
}
4748

4849
@Override
49-
public void onQueueItem(CmdCallbackQueueItem item) {
50-
if (item == null) {
50+
public void onQueueItem(PriorityMessage message) {
51+
if (message == null) {
5152
return;
5253
}
54+
55+
CmdCallbackQueueItem item = CmdCallbackQueueItem.parse(message.getBody(), CmdCallbackQueueItem.class);
56+
5357
try {
5458
jobService.callback(item);
5559
} catch (NotFoundException notFoundException) {

platform-api/src/main/java/com/flow/platform/api/domain/CmdCallbackQueueItem.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@
1717
package com.flow.platform.api.domain;
1818

1919
import com.flow.platform.domain.Cmd;
20+
import com.flow.platform.domain.Jsonable;
2021
import java.math.BigInteger;
2122

2223
/**
2324
* @author yh@firim
2425
*/
25-
public class CmdCallbackQueueItem {
26+
public class CmdCallbackQueueItem extends Jsonable {
2627

2728
private final BigInteger jobId;
2829

platform-api/src/main/java/com/flow/platform/api/service/job/JobServiceImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import com.flow.platform.core.exception.IllegalStatusException;
5656
import com.flow.platform.core.exception.NotFoundException;
5757
import com.flow.platform.core.queue.PlatformQueue;
58+
import com.flow.platform.core.queue.PriorityMessage;
5859
import com.flow.platform.core.service.ApplicationEventService;
5960
import com.flow.platform.domain.Cmd;
6061
import com.flow.platform.domain.CmdInfo;
@@ -108,7 +109,7 @@ public class JobServiceImpl extends ApplicationEventService implements JobServic
108109
private JobNodeService jobNodeService;
109110

110111
@Autowired
111-
private PlatformQueue<CmdCallbackQueueItem> cmdCallbackQueue;
112+
private PlatformQueue<PriorityMessage> cmdCallbackQueue;
112113

113114
@Autowired
114115
private JobDao jobDao;
@@ -458,7 +459,7 @@ private void onRunShellCallback(String path, Cmd cmd, Job job) {
458459

459460
@Override
460461
public void enterQueue(CmdCallbackQueueItem cmdQueueItem) {
461-
cmdCallbackQueue.enqueue(cmdQueueItem);
462+
cmdCallbackQueue.enqueue(PriorityMessage.create(cmdQueueItem.toBytes(), 1));
462463
}
463464

464465
@Override

platform-control-center/src/main/java/com/flow/platform/cc/config/QueueConfig.java

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,12 @@
1616

1717
package com.flow.platform.cc.config;
1818

19-
import com.flow.platform.cc.domain.CmdStatusItem;
2019
import com.flow.platform.core.queue.InMemoryQueue;
2120
import com.flow.platform.core.queue.PlatformQueue;
21+
import com.flow.platform.core.queue.PriorityMessage;
2222
import com.flow.platform.core.queue.RabbitQueue;
23-
import com.flow.platform.domain.AgentPath;
2423
import com.flow.platform.util.Logger;
2524
import javax.annotation.PostConstruct;
26-
import org.springframework.amqp.core.Message;
2725
import org.springframework.beans.factory.annotation.Autowired;
2826
import org.springframework.beans.factory.annotation.Value;
2927
import org.springframework.context.annotation.Bean;
@@ -95,7 +93,7 @@ public void init() {
9593
}
9694

9795
@Bean
98-
public PlatformQueue<Message> cmdQueue() {
96+
public PlatformQueue<PriorityMessage> cmdQueue() {
9997
if (cmdQueueRabbitEnable) {
10098
LOGGER.trace("Apply RabbitMQ for cmd queue");
10199
return new RabbitQueue(taskExecutor, host, CMD_QUEUE_MAX_LENGTH, CMD_QUEUE_DEFAULT_PRIORITY, cmdQueueName);
@@ -105,19 +103,11 @@ public PlatformQueue<Message> cmdQueue() {
105103
return new InMemoryQueue<>(taskExecutor, CMD_QUEUE_MAX_LENGTH, "CmdQueue");
106104
}
107105

108-
/**
109-
* Queue to handle agent report online in sync
110-
*/
111-
@Bean
112-
public PlatformQueue<AgentPath> agentReportQueue() {
113-
return new InMemoryQueue<>(taskExecutor, 100, "AgentReportQueue");
114-
}
115-
116106
/**
117107
* Queue to handle cmd status update
118108
*/
119109
@Bean
120-
public PlatformQueue<CmdStatusItem> cmdStatusQueue() {
110+
public PlatformQueue<PriorityMessage> cmdStatusQueue() {
121111
return new InMemoryQueue<>(taskExecutor, 100, "CmdStatusQueue");
122112
}
123113
}

platform-control-center/src/main/java/com/flow/platform/cc/consumer/AgentReportQueueConsumer.java

Lines changed: 0 additions & 91 deletions
This file was deleted.

platform-control-center/src/main/java/com/flow/platform/cc/consumer/AgentResourceEventHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.flow.platform.cc.event.AgentResourceEvent.Category;
2222
import com.flow.platform.cc.service.ZoneService;
2323
import com.flow.platform.core.queue.PlatformQueue;
24+
import com.flow.platform.core.queue.PriorityMessage;
2425
import com.flow.platform.util.Logger;
2526
import org.springframework.amqp.core.Message;
2627
import org.springframework.beans.factory.annotation.Autowired;
@@ -40,7 +41,7 @@ public class AgentResourceEventHandler implements ApplicationListener<AgentResou
4041
private ZoneService zoneService;
4142

4243
@Autowired
43-
private PlatformQueue<Message> cmdQueue;
44+
private PlatformQueue<PriorityMessage> cmdQueue;
4445

4546
@Value("${queue.cmd.retry.enable}")
4647
private Boolean cmdQueueRetryEnable;

platform-control-center/src/main/java/com/flow/platform/cc/consumer/CmdQueueConsumer.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.flow.platform.core.exception.IllegalParameterException;
2626
import com.flow.platform.core.exception.IllegalStatusException;
2727
import com.flow.platform.core.queue.PlatformQueue;
28+
import com.flow.platform.core.queue.PriorityMessage;
2829
import com.flow.platform.core.queue.QueueListener;
2930
import com.flow.platform.domain.Cmd;
3031
import com.flow.platform.domain.CmdStatus;
@@ -33,10 +34,6 @@
3334
import java.time.Duration;
3435
import java.time.Instant;
3536
import javax.annotation.PostConstruct;
36-
import org.springframework.amqp.core.Message;
37-
import org.springframework.amqp.core.MessageProperties;
38-
import org.springframework.amqp.rabbit.annotation.RabbitListener;
39-
import org.springframework.amqp.rabbit.core.RabbitTemplate;
4037
import org.springframework.beans.factory.annotation.Autowired;
4138
import org.springframework.beans.factory.annotation.Value;
4239
import org.springframework.stereotype.Component;
@@ -47,7 +44,7 @@
4744
4845
*/
4946
@Component
50-
public class CmdQueueConsumer implements QueueListener<Message> {
47+
public class CmdQueueConsumer implements QueueListener<PriorityMessage> {
5148

5249
private final static Logger LOGGER = new Logger(CmdQueueConsumer.class);
5350

@@ -67,15 +64,15 @@ public class CmdQueueConsumer implements QueueListener<Message> {
6764
private AgentService agentService;
6865

6966
@Autowired
70-
private PlatformQueue<Message> cmdQueue;
67+
private PlatformQueue<PriorityMessage> cmdQueue;
7168

7269
@PostConstruct
7370
public void init() {
7471
cmdQueue.register(this);
7572
}
7673

7774
@Override
78-
public void onQueueItem(Message message) {
75+
public void onQueueItem(PriorityMessage message) {
7976
CmdQueueItem item = CmdQueueItem.parse(message.getBody(), CmdQueueItem.class);
8077
String cmdId = item.getCmdId();
8178
LOGGER.trace("Receive a cmd queue item: %s", item);
@@ -155,7 +152,6 @@ private void resend(final CmdQueueItem item, final int retry) {
155152
return;
156153
}
157154

158-
item.setPriority(QueueConfig.CMD_QUEUE_DEFAULT_PRIORITY);
159155
item.setRetry(retry);
160156

161157
try {
@@ -165,9 +161,7 @@ private void resend(final CmdQueueItem item, final int retry) {
165161
}
166162

167163
// reset cmd status
168-
MessageProperties properties = new MessageProperties();
169-
properties.setPriority(item.getPriority());
170-
Message message = new Message(item.toBytes(), properties);
164+
PriorityMessage message = PriorityMessage.create(item.toBytes(), QueueConfig.CMD_QUEUE_DEFAULT_PRIORITY);
171165
cmdQueue.enqueue(message);
172166
LOGGER.trace("Re-enqueue item %s", item);
173167
}

platform-control-center/src/main/java/com/flow/platform/cc/consumer/CmdStatusQueueConsumer.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
import com.flow.platform.cc.domain.CmdStatusItem;
2020
import com.flow.platform.cc.service.CmdService;
2121
import com.flow.platform.core.queue.PlatformQueue;
22+
import com.flow.platform.core.queue.PriorityMessage;
2223
import com.flow.platform.core.queue.QueueListener;
24+
import com.flow.platform.domain.CmdStatus;
2325
import com.flow.platform.util.Logger;
2426
import javax.annotation.PostConstruct;
2527
import org.springframework.beans.factory.annotation.Autowired;
@@ -31,12 +33,12 @@
3133
* @author yang
3234
*/
3335
@Component
34-
public class CmdStatusQueueConsumer implements QueueListener<CmdStatusItem> {
36+
public class CmdStatusQueueConsumer implements QueueListener<PriorityMessage> {
3537

3638
private final static Logger LOGGER = new Logger(CmdStatusQueueConsumer.class);
3739

3840
@Autowired
39-
private PlatformQueue<CmdStatusItem> cmdStatusQueue;
41+
private PlatformQueue<PriorityMessage> cmdStatusQueue;
4042

4143
@Autowired
4244
private CmdService cmdService;
@@ -48,14 +50,15 @@ public void init() {
4850
}
4951

5052
@Override
51-
public void onQueueItem(CmdStatusItem item) {
53+
public void onQueueItem(PriorityMessage item) {
5254
if (item == null) {
5355
return;
5456
}
5557

5658
try {
59+
CmdStatusItem statusItem = CmdStatusItem.parse(item.getBody(), CmdStatusItem.class);
5760
LOGGER.debug(Thread.currentThread().getName() + " : " + item.toString());
58-
cmdService.updateStatus(item, false);
61+
cmdService.updateStatus(statusItem, false);
5962
} catch (Throwable e) {
6063
LOGGER.error("Update cmd error:", e);
6164
}

0 commit comments

Comments
 (0)