Skip to content

Commit ce4f48c

Browse files
author
yang.guo
authored
Merge pull request #114 from FlowCI/feature/cc/cmd_queue_pause_bug
Feature/cc/cmd queue pause bug
2 parents 15df698 + e165577 commit ce4f48c

File tree

3 files changed

+54
-31
lines changed

3 files changed

+54
-31
lines changed

platform-api/src/main/java/com/flow/platform/api/service/node/YmlServiceImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,10 @@ public Node loadYmlContent(final Node root, final Consumer<Yml> onSuccess, final
167167
} catch (ExecutionException | TaskRejectedException e) {
168168
LOGGER.warn("Fail to get task executor for node: " + root.getPath());
169169
nodeService.updateYmlState(root, YmlStatusValue.ERROR, e.getMessage());
170+
171+
if (onError != null) {
172+
onError.accept(e);
173+
}
170174
}
171175

172176
return root;

platform-control-center/src/main/java/com/flow/platform/cc/service/AgentService.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@
3030
*/
3131
public interface AgentService extends WebhookService {
3232

33-
int AGENT_SESSION_TIMEOUT_TASK_PERIOD = 60 * 1000; // millisecond
33+
int SESSION_TIMEOUT_TASK_HEARTBEAT = 60 * 1000; // millisecond
34+
35+
int IDLE_AGENT_TASK_HEARTBEAT = 30 * 1000; // millisecond
3436

3537
/**
3638
* Async update agent offline and online list by zone, will send to agent report queue
@@ -77,11 +79,6 @@ public interface AgentService extends WebhookService {
7779
*/
7880
boolean isSessionTimeout(Agent agent, ZonedDateTime compareDate, long timeoutInSeconds);
7981

80-
/**
81-
* To check agent session timeout
82-
*/
83-
void sessionTimeoutTask();
84-
8582
/**
8683
* Create agent and return token
8784
*/
@@ -105,4 +102,14 @@ public interface AgentService extends WebhookService {
105102
* delete agent
106103
*/
107104
void delete(Agent agent);
105+
106+
/**
107+
* To check agent session timeout
108+
*/
109+
void sessionTimeoutTask();
110+
111+
/**
112+
* Task to check num of idle agent for zone
113+
*/
114+
void idleAgentTask();
108115
}

platform-control-center/src/main/java/com/flow/platform/cc/service/AgentServiceImpl.java

Lines changed: 37 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -165,31 +165,6 @@ public boolean isSessionTimeout(Agent agent, ZonedDateTime compareDate, long tim
165165
return sessionAlive >= timeoutInSeconds;
166166
}
167167

168-
@Override
169-
@Transactional(propagation = Propagation.NEVER)
170-
@Scheduled(initialDelay = 10 * 1000, fixedDelay = AGENT_SESSION_TIMEOUT_TASK_PERIOD)
171-
public void sessionTimeoutTask() {
172-
if (!taskConfig.isEnableAgentSessionTimeoutTask()) {
173-
return;
174-
}
175-
176-
LOGGER.traceMarker("sessionTimeoutTask", "start");
177-
ZonedDateTime now = DateUtil.utcNow();
178-
179-
for (Zone zone : zoneService.getZones()) {
180-
Collection<Agent> agents = listForOnline(zone.getName());
181-
for (Agent agent : agents) {
182-
if (agent.getSessionId() != null && isSessionTimeout(agent, now, zone.getAgentSessionTimeout())) {
183-
Cmd delSessionCmd = cmdService.create(new CmdInfo(agent.getPath(), CmdType.DELETE_SESSION, null));
184-
cmdDispatchService.dispatch(delSessionCmd.getId(), false);
185-
LOGGER.traceMarker("sessionTimeoutTask", "Send DELETE_SESSION to agent %s", agent);
186-
}
187-
}
188-
}
189-
190-
LOGGER.traceMarker("sessionTimeoutTask", "end");
191-
}
192-
193168
@Override
194169
public Agent create(AgentPath agentPath, String webhook) {
195170
Agent agent = agentDao.get(agentPath);
@@ -246,4 +221,41 @@ public void delete(Agent agent){
246221
}
247222

248223
}
224+
225+
@Override
226+
@Transactional(propagation = Propagation.NEVER)
227+
@Scheduled(initialDelay = 10 * 1000, fixedDelay = SESSION_TIMEOUT_TASK_HEARTBEAT)
228+
public void sessionTimeoutTask() {
229+
if (!taskConfig.isEnableAgentSessionTimeoutTask()) {
230+
return;
231+
}
232+
233+
LOGGER.traceMarker("sessionTimeoutTask", "start");
234+
ZonedDateTime now = DateUtil.utcNow();
235+
236+
for (Zone zone : zoneService.getZones()) {
237+
Collection<Agent> agents = listForOnline(zone.getName());
238+
for (Agent agent : agents) {
239+
if (agent.getSessionId() != null && isSessionTimeout(agent, now, zone.getAgentSessionTimeout())) {
240+
Cmd delSessionCmd = cmdService.create(new CmdInfo(agent.getPath(), CmdType.DELETE_SESSION, null));
241+
cmdDispatchService.dispatch(delSessionCmd.getId(), false);
242+
LOGGER.traceMarker("sessionTimeoutTask", "Send DELETE_SESSION to agent %s", agent);
243+
}
244+
}
245+
}
246+
247+
LOGGER.traceMarker("sessionTimeoutTask", "end");
248+
}
249+
250+
@Override
251+
@Transactional(propagation = Propagation.NEVER)
252+
@Scheduled(initialDelay = 10 * 1000, fixedDelay = IDLE_AGENT_TASK_HEARTBEAT)
253+
public void idleAgentTask() {
254+
for (Zone zone : zoneService.getZones()) {
255+
List<Agent> availableList = findAvailable(zone.getName());
256+
if (availableList.size() > 0) {
257+
this.dispatchEvent(new AgentResourceEvent(this, zone.getName(), Category.RELEASED));
258+
}
259+
}
260+
}
249261
}

0 commit comments

Comments
 (0)