Skip to content

Commit 80b79a3

Browse files
author
Yang Guo
committed
add cmd queue retry feature toggle and provide get running status for platform queue
1 parent 9362e8d commit 80b79a3

File tree

13 files changed

+90
-8
lines changed

13 files changed

+90
-8
lines changed

docker/app-cc.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ mq.host = amqp://localhost:5672
2828
mq.management.host = http://localhost:15672
2929

3030
#### cmd queue settings ###
31+
queue.cmd.retry.enable = false
3132
queue.cmd.rabbit.enable = false
3233
queue.cmd.rabbit.name = flow-cmd-queue-default
3334
queue.cmd.idle_agent.timeout = 30

platform-control-center/src/main/java/com/flow/platform/cc/config/QueueConfig.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ public class QueueConfig {
4242

4343
public final static int CMD_QUEUE_DEFAULT_PRIORITY = 10;
4444

45+
public final static String PROP_CMD_QUEUE_RETRY = "queue.cmd.retry.enable";
46+
4547
private final static Logger LOGGER = new Logger(QueueConfig.class);
4648

4749
/**
@@ -70,14 +72,26 @@ public class QueueConfig {
7072
@Value("${queue.cmd.rabbit.enable}")
7173
private Boolean cmdQueueRabbitEnable;
7274

75+
/**
76+
* Enable cmd queue retry instead of pause/resume logic
77+
*/
78+
@Value("${queue.cmd.retry.enable}")
79+
private Boolean cmdQueueRetryEnable;
80+
81+
/**
82+
* AppConfig task executor
83+
*/
7384
@Autowired
74-
private ThreadPoolTaskExecutor taskExecutor; // from AppConfig
85+
private ThreadPoolTaskExecutor taskExecutor;
7586

7687
@PostConstruct
7788
public void init() {
7889
LOGGER.trace("Host: %s", host);
7990
LOGGER.trace("Management Host: %s", mgrHost);
91+
8092
LOGGER.trace("Cmd queue name: %s", cmdQueueName);
93+
LOGGER.trace("Cmd RabbitMQ enabled: %s", cmdQueueRabbitEnable);
94+
LOGGER.trace("Cmd queue retry enabled: %s", cmdQueueRetryEnable);
8195
}
8296

8397
@Bean

platform-control-center/src/main/java/com/flow/platform/cc/consumer/AgentResourceEventHandler.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@
1616

1717
package com.flow.platform.cc.consumer;
1818

19+
import com.flow.platform.cc.config.QueueConfig;
1920
import com.flow.platform.cc.event.AgentResourceEvent;
2021
import com.flow.platform.cc.event.AgentResourceEvent.Category;
2122
import com.flow.platform.cc.service.ZoneService;
2223
import com.flow.platform.core.queue.PlatformQueue;
2324
import com.flow.platform.util.Logger;
2425
import org.springframework.amqp.core.Message;
2526
import org.springframework.beans.factory.annotation.Autowired;
27+
import org.springframework.beans.factory.annotation.Value;
2628
import org.springframework.context.ApplicationListener;
2729
import org.springframework.stereotype.Component;
2830

@@ -40,6 +42,9 @@ public class AgentResourceEventHandler implements ApplicationListener<AgentResou
4042
@Autowired
4143
private PlatformQueue<Message> cmdQueue;
4244

45+
@Value("${queue.cmd.retry.enable}")
46+
private Boolean cmdQueueRetryEnable;
47+
4348
@Override
4449
public void onApplicationEvent(AgentResourceEvent event) {
4550
String zone = event.getZone();
@@ -48,6 +53,12 @@ public void onApplicationEvent(AgentResourceEvent event) {
4853
// cleanup agent from zone
4954
zoneService.keepIdleAgentTask();
5055

56+
// do not control cmd queue since enable retry
57+
Boolean isRetry = Boolean.parseBoolean(System.getProperty(QueueConfig.PROP_CMD_QUEUE_RETRY, "false"));
58+
if (cmdQueueRetryEnable || isRetry) {
59+
return;
60+
}
61+
5162
if (event.getCategory() == Category.FULL) {
5263
cmdQueue.pause();
5364
LOGGER.trace("Pause cmd queue since no agent resources");

platform-control-center/src/main/java/com/flow/platform/cc/service/AgentServiceImpl.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import com.flow.platform.cc.config.TaskConfig;
2020
import com.flow.platform.cc.dao.AgentDao;
21+
import com.flow.platform.cc.event.AgentResourceEvent;
22+
import com.flow.platform.cc.event.AgentResourceEvent.Category;
2123
import com.flow.platform.cc.exception.AgentErr;
2224
import com.flow.platform.core.exception.IllegalParameterException;
2325
import com.flow.platform.core.exception.IllegalStatusException;
@@ -145,6 +147,11 @@ public void saveWithStatus(Agent agent, AgentStatus status) {
145147
if (statusIsChanged) {
146148
this.webhookCallback(agent);
147149
}
150+
151+
// boardcast AgentResourceEvent for release
152+
if (agent.getStatus() == AgentStatus.IDLE) {
153+
this.dispatchEvent(new AgentResourceEvent(this, agent.getZone(), Category.RELEASED));
154+
}
148155
}
149156

150157
@Override

platform-control-center/src/main/java/com/flow/platform/cc/service/CmdServiceImpl.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -305,11 +305,6 @@ private void updateAgentStatusFromCmd(Cmd cmd) {
305305

306306
Agent agent = agentService.find(agentPath);
307307
agentService.saveWithStatus(agent, isAgentBusy ? AgentStatus.BUSY : AgentStatus.IDLE);
308-
309-
// boardcast AgentResourceEvent for release
310-
if (agent.getStatus() == AgentStatus.IDLE) {
311-
this.dispatchEvent(new AgentResourceEvent(this, agent.getZone(), Category.RELEASED));
312-
}
313308
}
314309

315310
/**

platform-control-center/src/main/resources/app-default.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ mq.host = amqp://localhost:5672
2828
mq.management.host = http://localhost:15672
2929

3030
#### cmd queue settings ###
31+
queue.cmd.retry.enable = false
3132
queue.cmd.rabbit.enable = false
3233
queue.cmd.rabbit.name = flow-cmd-queue-default
3334
queue.cmd.idle_agent.timeout = 30

platform-control-center/src/test/java/com/flow/platform/cc/test/consumer/CmdQueueConsumerTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
2424
import static com.github.tomakehurst.wiremock.client.WireMock.verify;
2525

26+
import com.flow.platform.cc.config.QueueConfig;
2627
import com.flow.platform.cc.domain.CmdStatusItem;
2728
import com.flow.platform.cc.service.AgentService;
2829
import com.flow.platform.cc.service.CmdService;
@@ -47,6 +48,7 @@
4748
import org.junit.Test;
4849
import org.junit.runners.MethodSorters;
4950
import org.springframework.beans.factory.annotation.Autowired;
51+
import org.springframework.beans.factory.annotation.Value;
5052
import org.springframework.dao.CannotAcquireLockException;
5153

5254
/**
@@ -69,8 +71,13 @@ public class CmdQueueConsumerTest extends TestBase {
6971
@Autowired
7072
private CmdService cmdService;
7173

74+
@Value("${queue.cmd.retry.enable}")
75+
private Boolean cmdQueueRetryEnable;
76+
7277
@Before
7378
public void before() throws Throwable {
79+
System.setProperty(QueueConfig.PROP_CMD_QUEUE_RETRY, "true");
80+
7481
zoneService.createZone(new Zone(ZONE, "mock-cloud-provider"));
7582

7683
// ensure zookeeper node is created for zone
@@ -205,5 +212,6 @@ public void should_stop_queued_cmd() throws Throwable {
205212
@After
206213
public void deleteZone() {
207214
deleteNodeWithChildren(ZKHelper.buildPath(ZONE, null));
215+
System.setProperty(QueueConfig.PROP_CMD_QUEUE_RETRY, "false");
208216
}
209217
}

platform-control-center/src/test/java/com/flow/platform/cc/test/controller/CmdControllerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public void should_list_cmd_types() throws Throwable {
8484
String raw = result.getResponse().getContentAsString();
8585
CmdType[] types = gsonConfig.fromJson(raw, CmdType[].class);
8686
Assert.assertNotNull(types);
87-
Assert.assertTrue(types.length == 6);
87+
Assert.assertTrue(types.length == 7);
8888
}
8989

9090
@Test

platform-control-center/src/test/java/com/flow/platform/cc/test/service/CmdDispatchServiceTest.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import com.flow.platform.cc.service.CmdService;
2626
import com.flow.platform.cc.test.TestBase;
2727
import com.flow.platform.cc.util.ZKHelper;
28+
import com.flow.platform.core.exception.FlowException;
29+
import com.flow.platform.core.queue.PlatformQueue;
2830
import com.flow.platform.domain.Agent;
2931
import com.flow.platform.domain.AgentPath;
3032
import com.flow.platform.domain.AgentStatus;
@@ -41,6 +43,7 @@
4143
import org.junit.FixMethodOrder;
4244
import org.junit.Test;
4345
import org.junit.runners.MethodSorters;
46+
import org.springframework.amqp.core.Message;
4447
import org.springframework.beans.factory.annotation.Autowired;
4548

4649
/**
@@ -61,6 +64,9 @@ public class CmdDispatchServiceTest extends TestBase {
6164
@Autowired
6265
private List<Zone> defaultZones;
6366

67+
@Autowired
68+
private PlatformQueue<Message> cmdQueue;
69+
6470
private AgentPath agentPath;
6571

6672
private Agent target;
@@ -91,8 +97,30 @@ public void toCreateSession() throws Throwable {
9197
}
9298

9399
@Test
94-
public void should_broadcast_agent_resource_event_if_no_available_agent() throws Throwable {
100+
public void should_broadcast_agent_resource_event_if_no_available_agent() throws Throwable {
95101
// given: make no available agent resource
102+
Assert.assertEquals(true, cmdQueue.isRunning());
103+
104+
// when: send create session cmd
105+
Cmd cmdToCreateSession = cmdService.create(new CmdInfo(agentPath, CmdType.CREATE_SESSION, null));
106+
107+
try {
108+
cmdDispatchService.dispatch(cmdToCreateSession.getId(), false);
109+
} catch (FlowException ignore) {
110+
111+
}
112+
113+
// then: queue should be pause since no available
114+
Assert.assertEquals(false, cmdQueue.isRunning());
115+
116+
// when: send delete session to release agent
117+
CmdInfo cmd = new CmdInfo(agentPath, CmdType.DELETE_SESSION, null);
118+
cmd.setSessionId(target.getSessionId());
119+
Cmd cmdToDeleteSession = cmdService.create(cmd);
120+
cmdDispatchService.dispatch(cmdToDeleteSession.getId(), false);
121+
122+
// then: queue should be resumed since agent resource released
123+
Assert.assertEquals(true, cmdQueue.isRunning());
96124
}
97125

98126
@Test

platform-control-center/src/test/resources/app-test.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ mq.host = amqp://localhost:5672
3535
mq.management.host = http://localhost:15672
3636

3737
#### cmd queue settings ###
38+
queue.cmd.retry.enable = false
3839
queue.cmd.rabbit.enable = false
3940
queue.cmd.rabbit.name = flow-cmd-queue-default
4041
queue.cmd.idle_agent.timeout = 0

0 commit comments

Comments
 (0)