Skip to content

Commit 9a1b6df

Browse files
committed
合并代码
2 parents 47e556b + ad44e91 commit 9a1b6df

File tree

23 files changed

+203
-212
lines changed

23 files changed

+203
-212
lines changed

platform-agent/src/main/java/com/flow/platform/agent/CmdManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,7 @@ public synchronized void kill() {
258258
for (Map.Entry<Cmd, CmdResult> entry : running.entrySet()) {
259259
CmdResult r = entry.getValue();
260260
Cmd cmd = entry.getKey();
261+
finished.put(cmd, r);
261262

262263
r.getProcess().destroy();
263264

platform-api/src/main/java/com/flow/platform/api/config/AppConfig.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.flow.platform.core.config.DatabaseConfig;
2424
import com.flow.platform.core.queue.InMemoryQueue;
2525
import com.flow.platform.core.queue.PlatformQueue;
26+
import com.flow.platform.core.queue.PriorityMessage;
2627
import com.flow.platform.core.util.ThreadUtil;
2728
import com.flow.platform.util.Logger;
2829
import java.io.IOException;
@@ -34,15 +35,12 @@
3435
import org.apache.velocity.app.VelocityEngine;
3536
import org.apache.velocity.runtime.RuntimeConstants;
3637
import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
37-
import org.springframework.beans.factory.annotation.Autowired;
3838
import org.springframework.beans.factory.annotation.Value;
3939
import org.springframework.context.annotation.Bean;
4040
import org.springframework.context.annotation.Configuration;
4141
import org.springframework.context.annotation.Import;
4242
import org.springframework.context.event.ApplicationEventMulticaster;
4343
import org.springframework.context.event.SimpleApplicationEventMulticaster;
44-
import org.springframework.scheduling.TaskScheduler;
45-
import org.springframework.scheduling.annotation.EnableScheduling;
4644
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
4745

4846
/**
@@ -136,7 +134,7 @@ public ThreadPoolTaskExecutor taskExecutor() {
136134
* Queue to process cmd callback task
137135
*/
138136
@Bean
139-
public PlatformQueue<CmdCallbackQueueItem> cmdCallbackQueue() {
137+
public PlatformQueue<PriorityMessage> cmdCallbackQueue() {
140138
return new InMemoryQueue<>(executor, 50, "CmdCallbackQueue");
141139
}
142140

platform-api/src/main/java/com/flow/platform/api/consumer/CmdCallbackQueueConsumer.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.flow.platform.api.service.job.JobService;
2121
import com.flow.platform.core.exception.NotFoundException;
2222
import com.flow.platform.core.queue.PlatformQueue;
23+
import com.flow.platform.core.queue.PriorityMessage;
2324
import com.flow.platform.core.queue.QueueListener;
2425
import com.flow.platform.util.Logger;
2526
import javax.annotation.PostConstruct;
@@ -30,12 +31,12 @@
3031
* @author yh@firim
3132
*/
3233
@Component
33-
public class CmdCallbackQueueConsumer implements QueueListener<CmdCallbackQueueItem> {
34+
public class CmdCallbackQueueConsumer implements QueueListener<PriorityMessage> {
3435

3536
private final static Logger LOGGER = new Logger(CmdCallbackQueueConsumer.class);
3637

3738
@Autowired
38-
private PlatformQueue<CmdCallbackQueueItem> cmdCallbackQueue;
39+
private PlatformQueue<PriorityMessage> cmdCallbackQueue;
3940

4041
@Autowired
4142
private JobService jobService;
@@ -46,10 +47,13 @@ public void init() {
4647
}
4748

4849
@Override
49-
public void onQueueItem(CmdCallbackQueueItem item) {
50-
if (item == null) {
50+
public void onQueueItem(PriorityMessage message) {
51+
if (message == null) {
5152
return;
5253
}
54+
55+
CmdCallbackQueueItem item = CmdCallbackQueueItem.parse(message.getBody(), CmdCallbackQueueItem.class);
56+
5357
try {
5458
jobService.callback(item);
5559
} catch (NotFoundException notFoundException) {

platform-api/src/main/java/com/flow/platform/api/domain/CmdCallbackQueueItem.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@
1717
package com.flow.platform.api.domain;
1818

1919
import com.flow.platform.domain.Cmd;
20+
import com.flow.platform.domain.Jsonable;
2021
import java.math.BigInteger;
2122

2223
/**
2324
* @author yh@firim
2425
*/
25-
public class CmdCallbackQueueItem {
26+
public class CmdCallbackQueueItem extends Jsonable {
2627

2728
private final BigInteger jobId;
2829

platform-api/src/main/java/com/flow/platform/api/service/job/JobServiceImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import com.flow.platform.core.exception.IllegalStatusException;
5656
import com.flow.platform.core.exception.NotFoundException;
5757
import com.flow.platform.core.queue.PlatformQueue;
58+
import com.flow.platform.core.queue.PriorityMessage;
5859
import com.flow.platform.core.service.ApplicationEventService;
5960
import com.flow.platform.domain.Cmd;
6061
import com.flow.platform.domain.CmdInfo;
@@ -108,7 +109,7 @@ public class JobServiceImpl extends ApplicationEventService implements JobServic
108109
private JobNodeService jobNodeService;
109110

110111
@Autowired
111-
private PlatformQueue<CmdCallbackQueueItem> cmdCallbackQueue;
112+
private PlatformQueue<PriorityMessage> cmdCallbackQueue;
112113

113114
@Autowired
114115
private JobDao jobDao;
@@ -458,7 +459,7 @@ private void onRunShellCallback(String path, Cmd cmd, Job job) {
458459

459460
@Override
460461
public void enterQueue(CmdCallbackQueueItem cmdQueueItem) {
461-
cmdCallbackQueue.enqueue(cmdQueueItem);
462+
cmdCallbackQueue.enqueue(PriorityMessage.create(cmdQueueItem.toBytes(), 1));
462463
}
463464

464465
@Override

platform-control-center/src/main/java/com/flow/platform/cc/config/QueueConfig.java

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,13 @@
1616

1717
package com.flow.platform.cc.config;
1818

19-
import com.flow.platform.cc.domain.CmdStatusItem;
2019
import com.flow.platform.core.queue.InMemoryQueue;
2120
import com.flow.platform.core.queue.PlatformQueue;
21+
import com.flow.platform.core.queue.PriorityMessage;
2222
import com.flow.platform.core.queue.RabbitQueue;
23-
import com.flow.platform.domain.AgentPath;
2423
import com.flow.platform.util.Logger;
24+
import com.google.common.collect.Range;
2525
import javax.annotation.PostConstruct;
26-
import org.springframework.amqp.core.Message;
2726
import org.springframework.beans.factory.annotation.Autowired;
2827
import org.springframework.beans.factory.annotation.Value;
2928
import org.springframework.context.annotation.Bean;
@@ -38,9 +37,22 @@
3837
@Configuration
3938
public class QueueConfig {
4039

41-
public final static int CMD_QUEUE_MAX_LENGTH = 100;
40+
public final static int QUEUE_MAX_LENGTH = 100;
4241

43-
public final static int CMD_QUEUE_DEFAULT_PRIORITY = 10;
42+
/**
43+
* Default queue priority
44+
*/
45+
public final static int DEFAULT_PRIORITY = 1;
46+
47+
/**
48+
* The max queue priority for special case like retry
49+
*/
50+
public final static int MAX_PRIORITY = 100;
51+
52+
/**
53+
* The priority range for queue
54+
*/
55+
public final static Range PRIORITY_RANGE = Range.closed(1, 10);
4456

4557
public final static String PROP_CMD_QUEUE_RETRY = "queue.cmd.retry.enable";
4658

@@ -95,29 +107,21 @@ public void init() {
95107
}
96108

97109
@Bean
98-
public PlatformQueue<Message> cmdQueue() {
110+
public PlatformQueue<PriorityMessage> cmdQueue() {
99111
if (cmdQueueRabbitEnable) {
100112
LOGGER.trace("Apply RabbitMQ for cmd queue");
101-
return new RabbitQueue(taskExecutor, host, CMD_QUEUE_MAX_LENGTH, CMD_QUEUE_DEFAULT_PRIORITY, cmdQueueName);
113+
return new RabbitQueue(taskExecutor, host, QUEUE_MAX_LENGTH, DEFAULT_PRIORITY, cmdQueueName);
102114
}
103115

104116
LOGGER.trace("Apply in memory queue for cmd queue");
105-
return new InMemoryQueue<>(taskExecutor, CMD_QUEUE_MAX_LENGTH, "CmdQueue");
106-
}
107-
108-
/**
109-
* Queue to handle agent report online in sync
110-
*/
111-
@Bean
112-
public PlatformQueue<AgentPath> agentReportQueue() {
113-
return new InMemoryQueue<>(taskExecutor, 100, "AgentReportQueue");
117+
return new InMemoryQueue<>(taskExecutor, QUEUE_MAX_LENGTH, "CmdQueue");
114118
}
115119

116120
/**
117121
* Queue to handle cmd status update
118122
*/
119123
@Bean
120-
public PlatformQueue<CmdStatusItem> cmdStatusQueue() {
121-
return new InMemoryQueue<>(taskExecutor, 100, "CmdStatusQueue");
124+
public PlatformQueue<PriorityMessage> cmdStatusQueue() {
125+
return new InMemoryQueue<>(taskExecutor, QUEUE_MAX_LENGTH, "CmdStatusQueue");
122126
}
123127
}

platform-control-center/src/main/java/com/flow/platform/cc/consumer/AgentReportQueueConsumer.java

Lines changed: 0 additions & 91 deletions
This file was deleted.

platform-control-center/src/main/java/com/flow/platform/cc/consumer/AgentResourceEventHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.flow.platform.cc.event.AgentResourceEvent.Category;
2222
import com.flow.platform.cc.service.ZoneService;
2323
import com.flow.platform.core.queue.PlatformQueue;
24+
import com.flow.platform.core.queue.PriorityMessage;
2425
import com.flow.platform.util.Logger;
2526
import org.springframework.amqp.core.Message;
2627
import org.springframework.beans.factory.annotation.Autowired;
@@ -40,7 +41,7 @@ public class AgentResourceEventHandler implements ApplicationListener<AgentResou
4041
private ZoneService zoneService;
4142

4243
@Autowired
43-
private PlatformQueue<Message> cmdQueue;
44+
private PlatformQueue<PriorityMessage> cmdQueue;
4445

4546
@Value("${queue.cmd.retry.enable}")
4647
private Boolean cmdQueueRetryEnable;

platform-control-center/src/main/java/com/flow/platform/cc/consumer/CmdQueueConsumer.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.flow.platform.core.exception.IllegalParameterException;
2626
import com.flow.platform.core.exception.IllegalStatusException;
2727
import com.flow.platform.core.queue.PlatformQueue;
28+
import com.flow.platform.core.queue.PriorityMessage;
2829
import com.flow.platform.core.queue.QueueListener;
2930
import com.flow.platform.domain.Cmd;
3031
import com.flow.platform.domain.CmdStatus;
@@ -33,10 +34,6 @@
3334
import java.time.Duration;
3435
import java.time.Instant;
3536
import javax.annotation.PostConstruct;
36-
import org.springframework.amqp.core.Message;
37-
import org.springframework.amqp.core.MessageProperties;
38-
import org.springframework.amqp.rabbit.annotation.RabbitListener;
39-
import org.springframework.amqp.rabbit.core.RabbitTemplate;
4037
import org.springframework.beans.factory.annotation.Autowired;
4138
import org.springframework.beans.factory.annotation.Value;
4239
import org.springframework.stereotype.Component;
@@ -47,7 +44,7 @@
4744
4845
*/
4946
@Component
50-
public class CmdQueueConsumer implements QueueListener<Message> {
47+
public class CmdQueueConsumer implements QueueListener<PriorityMessage> {
5148

5249
private final static Logger LOGGER = new Logger(CmdQueueConsumer.class);
5350

@@ -67,15 +64,15 @@ public class CmdQueueConsumer implements QueueListener<Message> {
6764
private AgentService agentService;
6865

6966
@Autowired
70-
private PlatformQueue<Message> cmdQueue;
67+
private PlatformQueue<PriorityMessage> cmdQueue;
7168

7269
@PostConstruct
7370
public void init() {
7471
cmdQueue.register(this);
7572
}
7673

7774
@Override
78-
public void onQueueItem(Message message) {
75+
public void onQueueItem(PriorityMessage message) {
7976
CmdQueueItem item = CmdQueueItem.parse(message.getBody(), CmdQueueItem.class);
8077
String cmdId = item.getCmdId();
8178
LOGGER.trace("Receive a cmd queue item: %s", item);
@@ -155,7 +152,6 @@ private void resend(final CmdQueueItem item, final int retry) {
155152
return;
156153
}
157154

158-
item.setPriority(QueueConfig.CMD_QUEUE_DEFAULT_PRIORITY);
159155
item.setRetry(retry);
160156

161157
try {
@@ -165,9 +161,7 @@ private void resend(final CmdQueueItem item, final int retry) {
165161
}
166162

167163
// reset cmd status
168-
MessageProperties properties = new MessageProperties();
169-
properties.setPriority(item.getPriority());
170-
Message message = new Message(item.toBytes(), properties);
164+
PriorityMessage message = PriorityMessage.create(item.toBytes(), QueueConfig.MAX_PRIORITY);
171165
cmdQueue.enqueue(message);
172166
LOGGER.trace("Re-enqueue item %s", item);
173167
}

0 commit comments

Comments
 (0)