Skip to content

Commit 55c1a6d

Browse files
author
yang.guo
authored
Merge pull request #115 from FlowCI/feature/cc/agent_status_change
Feature/cc/agent status change
2 parents 36c1a03 + aec59cf commit 55c1a6d

File tree

14 files changed

+129
-26
lines changed

14 files changed

+129
-26
lines changed

platform-api/src/main/java/com/flow/platform/api/controller/AgentController.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import com.flow.platform.api.events.AgentStatusChangeEvent;
2323
import com.flow.platform.api.service.AgentService;
2424
import com.flow.platform.core.exception.IllegalParameterException;
25-
import com.flow.platform.core.service.ApplicationEventService;
2625
import com.flow.platform.domain.Agent;
2726
import com.flow.platform.domain.AgentPath;
2827
import com.flow.platform.domain.AgentSettings;
@@ -42,7 +41,7 @@
4241
*/
4342
@RestController
4443
@RequestMapping(path = "/agents")
45-
public class AgentController extends ApplicationEventService {
44+
public class AgentController {
4645

4746
@Autowired
4847
private AgentService agentService;
@@ -193,7 +192,7 @@ public BooleanValue shutDown(@RequestParam String zone,
193192
*/
194193
@PostMapping(path = "/callback")
195194
public void callback(@RequestBody Agent agent) {
196-
this.dispatchEvent(new AgentStatusChangeEvent(this, agent));
195+
agentService.onAgentStatusChange(agent);
197196
}
198197

199198
/**

platform-api/src/main/java/com/flow/platform/api/dao/job/JobDao.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@ public interface JobDao extends BaseDao<BigInteger, Job> {
6969
*/
7070
Job get(String path, Integer number);
7171

72+
/**
73+
* get job by session id
74+
*/
75+
Job get(String sessionId);
76+
7277
/**
7378
* get max build number for node path
7479
*/

platform-api/src/main/java/com/flow/platform/api/dao/job/JobDaoImpl.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ public Job get(BigInteger key) {
8787
});
8888
}
8989

90+
91+
9092
@Override
9193
public List<Job> latestByPath(List<String> paths) {
9294
return execute((Session session) -> {
@@ -163,6 +165,13 @@ public Job get(String path, Integer number) {
163165
});
164166
}
165167

168+
@Override
169+
public Job get(String sessionId) {
170+
return execute(session -> session.createQuery("from Job where sessionId = :sessionId", Job.class)
171+
.setParameter("sessionId", sessionId)
172+
.uniqueResult());
173+
}
174+
166175
@Override
167176
public Integer maxBuildNumber(String path) {
168177
return execute((Session session) -> {

platform-api/src/main/java/com/flow/platform/api/service/AgentService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,9 @@ public interface AgentService {
5858
* @param agentPath required
5959
*/
6060
void sendSysCmd(AgentPath agentPath);
61+
62+
/**
63+
* Handle agent status call back from cc
64+
*/
65+
void onAgentStatusChange(Agent agent);
6166
}

platform-api/src/main/java/com/flow/platform/api/service/AgentServiceImpl.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,17 @@
1919
import com.flow.platform.api.dao.job.JobDao;
2020
import com.flow.platform.api.domain.AgentWithFlow;
2121
import com.flow.platform.api.domain.job.Job;
22+
import com.flow.platform.api.domain.job.JobStatus;
23+
import com.flow.platform.api.domain.job.NodeResult;
2224
import com.flow.platform.api.domain.job.NodeStatus;
25+
import com.flow.platform.api.events.AgentStatusChangeEvent;
2326
import com.flow.platform.api.service.job.CmdService;
27+
import com.flow.platform.api.service.job.JobService;
28+
import com.flow.platform.api.service.job.NodeResultService;
2429
import com.flow.platform.api.util.PlatformURL;
2530
import com.flow.platform.core.exception.HttpException;
2631
import com.flow.platform.core.exception.IllegalStatusException;
32+
import com.flow.platform.core.service.ApplicationEventService;
2733
import com.flow.platform.domain.Agent;
2834
import com.flow.platform.domain.AgentPath;
2935
import com.flow.platform.domain.AgentPathWithWebhook;
@@ -53,7 +59,7 @@
5359
*/
5460

5561
@Service
56-
public class AgentServiceImpl implements AgentService {
62+
public class AgentServiceImpl extends ApplicationEventService implements AgentService {
5763

5864
private final Logger LOGGER = new Logger(AgentService.class);
5965

@@ -71,6 +77,12 @@ public class AgentServiceImpl implements AgentService {
7177
@Autowired
7278
private CmdService cmdService;
7379

80+
@Autowired
81+
private JobService jobService;
82+
83+
@Autowired
84+
private NodeResultService nodeResultService;
85+
7486
@Value(value = "${domain}")
7587
private String domain;
7688

@@ -228,4 +240,30 @@ private Agent findAgent(AgentPath agentPath) {
228240
return agent;
229241
}
230242

243+
@Override
244+
public void onAgentStatusChange(Agent agent) {
245+
this.dispatchEvent(new AgentStatusChangeEvent(this, agent));
246+
247+
// do not check related job if agent status not offline
248+
if (agent.getStatus() != AgentStatus.OFFLINE) {
249+
return;
250+
}
251+
252+
// find related job and set job to failure
253+
String sessionId = agent.getSessionId();
254+
if (Strings.isNullOrEmpty(sessionId)) {
255+
return;
256+
}
257+
258+
// find agent related job by session id
259+
Job job = jobService.find(sessionId);
260+
if (job == null) {
261+
return;
262+
}
263+
264+
if (Job.RUNNING_STATUS.contains(job.getStatus())) {
265+
job.setFailureMessage(String.format("Agent %s is offline when job running", agent.getPath()));
266+
jobService.updateJobStatusAndSave(job, JobStatus.FAILURE);
267+
}
268+
}
231269
}

platform-api/src/main/java/com/flow/platform/api/service/job/JobService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ public interface JobService {
4545
*/
4646
Job find(BigInteger jobId);
4747

48+
/**
49+
* Find by agent session id
50+
*/
51+
Job find(String sessionId);
52+
4853
/**
4954
* Get job yml content
5055
*/

platform-api/src/main/java/com/flow/platform/api/service/job/JobServiceImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,11 @@ public Job find(BigInteger jobId) {
146146
return find(job);
147147
}
148148

149+
@Override
150+
public Job find(String sessionId) {
151+
return jobDao.get(sessionId);
152+
}
153+
149154
@Override
150155
public String findYml(String path, Integer number) {
151156
Job job = find(path, number);

platform-control-center/src/main/java/com/flow/platform/cc/service/AgentService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ public interface AgentService extends WebhookService {
3535
int IDLE_AGENT_TASK_HEARTBEAT = 30 * 1000; // millisecond
3636

3737
/**
38-
* Async update agent offline and online list by zone, will send to agent report queue
38+
* Async update agent offline or online status
3939
*/
40-
void reportOnline(String zone, Set<String> agents);
40+
void report(AgentPath path, AgentStatus status);
4141

4242
/**
4343
* List agent by zone name

platform-control-center/src/main/java/com/flow/platform/cc/service/AgentServiceImpl.java

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.springframework.beans.factory.annotation.Autowired;
5050
import org.springframework.beans.factory.annotation.Value;
5151
import org.springframework.dao.DataAccessException;
52+
import org.springframework.dao.DataIntegrityViolationException;
5253
import org.springframework.scheduling.annotation.Scheduled;
5354
import org.springframework.stereotype.Service;
5455
import org.springframework.transaction.annotation.Isolation;
@@ -89,14 +90,34 @@ public class AgentServiceImpl extends WebhookServiceImplBase implements AgentSer
8990
private String secretKey;
9091

9192
@Override
92-
public void reportOnline(String zone, Set<String> agents) {
93-
// set offline agents
94-
agentDao.batchUpdateStatus(zone, AgentStatus.OFFLINE, agents, true);
95-
96-
// send to report queue
97-
for (String agent : agents) {
98-
AgentPath key = new AgentPath(zone, agent);
99-
agentReportQueue.enqueue(key);
93+
public void report(AgentPath path, AgentStatus status) {
94+
Agent exist = find(path);
95+
96+
// For agent offline status
97+
if (status == AgentStatus.OFFLINE) {
98+
saveWithStatus(exist, AgentStatus.OFFLINE);
99+
return;
100+
}
101+
102+
// create new agent with idle status
103+
if (exist == null) {
104+
try {
105+
exist = create(path, null);
106+
LOGGER.trace("Create agent %s from report", path);
107+
} catch (DataIntegrityViolationException ignore) {
108+
// agent been created at some other threads
109+
return;
110+
}
111+
}
112+
113+
// update exist offline agent to idle status
114+
if (exist.getStatus() == AgentStatus.OFFLINE) {
115+
saveWithStatus(exist, AgentStatus.IDLE);
116+
}
117+
118+
// do not update agent status when its busy
119+
if (exist.getStatus() == AgentStatus.BUSY) {
120+
// do nothing
100121
}
101122
}
102123

platform-control-center/src/main/java/com/flow/platform/cc/service/ZoneServiceImpl.java

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
import com.flow.platform.core.context.ContextEvent;
2323
import com.flow.platform.core.context.SpringContext;
2424
import com.flow.platform.domain.Agent;
25+
import com.flow.platform.domain.AgentPath;
2526
import com.flow.platform.domain.AgentSettings;
27+
import com.flow.platform.domain.AgentStatus;
2628
import com.flow.platform.domain.Cmd;
2729
import com.flow.platform.domain.CmdInfo;
2830
import com.flow.platform.domain.CmdType;
@@ -41,6 +43,7 @@
4143
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
4244
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type;
4345
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
46+
import org.apache.zookeeper.ZKUtil;
4447
import org.springframework.beans.factory.annotation.Autowired;
4548
import org.springframework.scheduling.annotation.Scheduled;
4649
import org.springframework.stereotype.Service;
@@ -116,7 +119,9 @@ public String createZone(Zone zone) {
116119
List<String> agents = zkClient.getChildren(zonePath);
117120

118121
if (!agents.isEmpty()) {
119-
agentService.reportOnline(zone.getName(), Sets.newHashSet(agents));
122+
for (String agent : agents) {
123+
agentService.report(new AgentPath(zone.getName(), agent), AgentStatus.IDLE);
124+
}
120125
}
121126

122127
ZoneEventListener zoneEventWatcher = zoneEventWatchers.computeIfAbsent(zone, ZoneEventListener::new);
@@ -236,14 +241,20 @@ private class ZoneEventListener implements PathChildrenCacheListener {
236241

237242
@Override
238243
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
239-
// TODO: should optimize by event type
240-
Type type = event.getType();
241-
LOGGER.debugMarker("ZoneEventListener", "Receive zookeeper event %s", type);
242-
243-
taskExecutor.execute(() -> {
244-
List<String> agents = zkClient.getChildren(zone.getPath());
245-
agentService.reportOnline(zone.getName(), Sets.newHashSet(agents));
246-
});
244+
final Type eventType = event.getType();
245+
final String path = event.getData().getPath();
246+
final String name = ZKHelper.getNameFromPath(path);
247+
LOGGER.debugMarker("ZoneEventListener", "Receive zookeeper event %s %s", eventType, path);
248+
249+
if (eventType == Type.CHILD_ADDED || eventType == Type.CHILD_UPDATED) {
250+
agentService.report(new AgentPath(zone.getName(), name), AgentStatus.IDLE);
251+
return;
252+
}
253+
254+
if (eventType == Type.CHILD_REMOVED) {
255+
agentService.report(new AgentPath(zone.getName(), name), AgentStatus.OFFLINE);
256+
return;
257+
}
247258
}
248259
}
249260
}

0 commit comments

Comments
 (0)