Skip to content

Commit 78bdf13

Browse files
author
yang.guo
authored
Merge pull request #107 from FlowCI/feature/cc/optimize_cmd_queue
Feature/cc/optimize cmd queue
2 parents 67e56b0 + 80b79a3 commit 78bdf13

File tree

19 files changed

+287
-76
lines changed

19 files changed

+287
-76
lines changed

docker/app-cc.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ mq.host = amqp://localhost:5672
2828
mq.management.host = http://localhost:15672
2929

3030
#### cmd queue settings ###
31+
queue.cmd.retry.enable = false
3132
queue.cmd.rabbit.enable = false
3233
queue.cmd.rabbit.name = flow-cmd-queue-default
3334
queue.cmd.idle_agent.timeout = 30

platform-control-center/src/main/java/com/flow/platform/cc/config/QueueConfig.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ public class QueueConfig {
4242

4343
public final static int CMD_QUEUE_DEFAULT_PRIORITY = 10;
4444

45+
public final static String PROP_CMD_QUEUE_RETRY = "queue.cmd.retry.enable";
46+
4547
private final static Logger LOGGER = new Logger(QueueConfig.class);
4648

4749
/**
@@ -70,14 +72,26 @@ public class QueueConfig {
7072
@Value("${queue.cmd.rabbit.enable}")
7173
private Boolean cmdQueueRabbitEnable;
7274

75+
/**
76+
* Enable cmd queue retry instead of pause/resume logic
77+
*/
78+
@Value("${queue.cmd.retry.enable}")
79+
private Boolean cmdQueueRetryEnable;
80+
81+
/**
82+
* AppConfig task executor
83+
*/
7384
@Autowired
74-
private ThreadPoolTaskExecutor taskExecutor; // from AppConfig
85+
private ThreadPoolTaskExecutor taskExecutor;
7586

7687
@PostConstruct
7788
public void init() {
7889
LOGGER.trace("Host: %s", host);
7990
LOGGER.trace("Management Host: %s", mgrHost);
91+
8092
LOGGER.trace("Cmd queue name: %s", cmdQueueName);
93+
LOGGER.trace("Cmd RabbitMQ enabled: %s", cmdQueueRabbitEnable);
94+
LOGGER.trace("Cmd queue retry enabled: %s", cmdQueueRetryEnable);
8195
}
8296

8397
@Bean
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright 2017 flow.ci
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.flow.platform.cc.consumer;
18+
19+
import com.flow.platform.cc.config.QueueConfig;
20+
import com.flow.platform.cc.event.AgentResourceEvent;
21+
import com.flow.platform.cc.event.AgentResourceEvent.Category;
22+
import com.flow.platform.cc.service.ZoneService;
23+
import com.flow.platform.core.queue.PlatformQueue;
24+
import com.flow.platform.util.Logger;
25+
import org.springframework.amqp.core.Message;
26+
import org.springframework.beans.factory.annotation.Autowired;
27+
import org.springframework.beans.factory.annotation.Value;
28+
import org.springframework.context.ApplicationListener;
29+
import org.springframework.stereotype.Component;
30+
31+
/**
32+
* @author yang
33+
*/
34+
@Component
35+
public class AgentResourceEventHandler implements ApplicationListener<AgentResourceEvent> {
36+
37+
private final static Logger LOGGER = new Logger(AgentResourceEventHandler.class);
38+
39+
@Autowired
40+
private ZoneService zoneService;
41+
42+
@Autowired
43+
private PlatformQueue<Message> cmdQueue;
44+
45+
@Value("${queue.cmd.retry.enable}")
46+
private Boolean cmdQueueRetryEnable;
47+
48+
@Override
49+
public void onApplicationEvent(AgentResourceEvent event) {
50+
String zone = event.getZone();
51+
LOGGER.trace("AgentResourceEvent received for zone '%s' with '%s'", zone, event.getCategory());
52+
53+
// cleanup agent from zone
54+
zoneService.keepIdleAgentTask();
55+
56+
// do not control cmd queue since enable retry
57+
Boolean isRetry = Boolean.parseBoolean(System.getProperty(QueueConfig.PROP_CMD_QUEUE_RETRY, "false"));
58+
if (cmdQueueRetryEnable || isRetry) {
59+
return;
60+
}
61+
62+
if (event.getCategory() == Category.FULL) {
63+
cmdQueue.pause();
64+
LOGGER.trace("Pause cmd queue since no agent resources");
65+
return;
66+
}
67+
68+
if (event.getCategory() == Category.RELEASED) {
69+
cmdQueue.resume();
70+
LOGGER.trace("Resume cmd queue since has agent resource released");
71+
}
72+
}
73+
}

platform-control-center/src/main/java/com/flow/platform/cc/consumer/NoAvailbleResourceEventHandler.java

Lines changed: 0 additions & 44 deletions
This file was deleted.

platform-control-center/src/main/java/com/flow/platform/cc/event/NoAvailableResourceEvent.java renamed to platform-control-center/src/main/java/com/flow/platform/cc/event/AgentResourceEvent.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,32 @@
2525
*
2626
* @author yang
2727
*/
28-
public class NoAvailableResourceEvent extends ApplicationEvent {
28+
public class AgentResourceEvent extends ApplicationEvent {
2929

30-
private String zone;
30+
public enum Category {
31+
RELEASED,
3132

32-
public NoAvailableResourceEvent(Object source, String zone) {
33+
/**
34+
* All agent resource occupied
35+
*/
36+
FULL
37+
}
38+
39+
private final String zone;
40+
41+
private final Category category;
42+
43+
public AgentResourceEvent(Object source, String zone, Category category) {
3344
super(source);
3445
this.zone = zone;
46+
this.category = category;
3547
}
3648

3749
public String getZone() {
3850
return zone;
3951
}
52+
53+
public Category getCategory() {
54+
return category;
55+
}
4056
}

platform-control-center/src/main/java/com/flow/platform/cc/service/AgentServiceImpl.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import com.flow.platform.cc.config.TaskConfig;
2020
import com.flow.platform.cc.dao.AgentDao;
21+
import com.flow.platform.cc.event.AgentResourceEvent;
22+
import com.flow.platform.cc.event.AgentResourceEvent.Category;
2123
import com.flow.platform.cc.exception.AgentErr;
2224
import com.flow.platform.core.exception.IllegalParameterException;
2325
import com.flow.platform.core.exception.IllegalStatusException;
@@ -145,6 +147,11 @@ public void saveWithStatus(Agent agent, AgentStatus status) {
145147
if (statusIsChanged) {
146148
this.webhookCallback(agent);
147149
}
150+
151+
// boardcast AgentResourceEvent for release
152+
if (agent.getStatus() == AgentStatus.IDLE) {
153+
this.dispatchEvent(new AgentResourceEvent(this, agent.getZone(), Category.RELEASED));
154+
}
148155
}
149156

150157
@Override

platform-control-center/src/main/java/com/flow/platform/cc/service/CmdDispatchServiceImpl.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818

1919
import com.flow.platform.cc.config.TaskConfig;
2020
import com.flow.platform.cc.domain.CmdStatusItem;
21-
import com.flow.platform.cc.event.NoAvailableResourceEvent;
21+
import com.flow.platform.cc.event.AgentResourceEvent;
22+
import com.flow.platform.cc.event.AgentResourceEvent.Category;
2223
import com.flow.platform.cc.exception.AgentErr;
2324
import com.flow.platform.cc.exception.AgentErr.NotAvailableException;
2425
import com.flow.platform.cc.util.ZKHelper;
@@ -130,11 +131,9 @@ public Cmd dispatch(String cmdId, boolean reset) {
130131
CmdStatusItem statusItem = new CmdStatusItem(cmd.getId(), CmdStatus.REJECTED, null, false, true);
131132
cmdService.updateStatus(statusItem, false);
132133

133-
// broadcast NoAvailableResourceEvent with zone name
134-
String zone = cmd.getAgentPath().getZone();
135-
134+
// broadcast AgentResourceEvent with zone name
136135
if (e instanceof NotAvailableException) {
137-
this.dispatchEvent(new NoAvailableResourceEvent(this, zone));
136+
this.dispatchEvent(new AgentResourceEvent(this, cmd.getAgentPath().getZone(), Category.FULL));
138137
}
139138

140139
throw e;

platform-control-center/src/main/java/com/flow/platform/cc/service/CmdServiceImpl.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import com.flow.platform.cc.dao.CmdResultDao;
3030
import com.flow.platform.cc.domain.CmdQueueItem;
3131
import com.flow.platform.cc.domain.CmdStatusItem;
32+
import com.flow.platform.cc.event.AgentResourceEvent;
33+
import com.flow.platform.cc.event.AgentResourceEvent.Category;
3234
import com.flow.platform.cc.exception.AgentErr;
3335
import com.flow.platform.core.queue.PlatformQueue;
3436
import com.flow.platform.core.service.WebhookServiceImplBase;

platform-control-center/src/main/java/com/flow/platform/cc/service/ZoneServiceImpl.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package com.flow.platform.cc.service;
1818

1919
import com.flow.platform.cc.config.TaskConfig;
20-
import com.flow.platform.cc.event.NoAvailableResourceEvent;
2120
import com.flow.platform.cc.util.ZKHelper;
2221
import com.flow.platform.cloud.InstanceManager;
2322
import com.flow.platform.core.context.ContextEvent;
@@ -45,8 +44,6 @@
4544
import org.springframework.beans.factory.annotation.Autowired;
4645
import org.springframework.scheduling.annotation.Scheduled;
4746
import org.springframework.stereotype.Service;
48-
import org.springframework.transaction.annotation.Isolation;
49-
import org.springframework.transaction.annotation.Transactional;
5047

5148
/**
5249

platform-control-center/src/main/resources/app-default.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ mq.host = amqp://localhost:5672
2828
mq.management.host = http://localhost:15672
2929

3030
#### cmd queue settings ###
31+
queue.cmd.retry.enable = false
3132
queue.cmd.rabbit.enable = false
3233
queue.cmd.rabbit.name = flow-cmd-queue-default
3334
queue.cmd.idle_agent.timeout = 30

0 commit comments

Comments
 (0)