Skip to content

Commit ee7c453

Browse files
author
Yang Guo
committed
deal with diff zk event for agent stauts
1 parent 36c1a03 commit ee7c453

File tree

7 files changed

+59
-22
lines changed

7 files changed

+59
-22
lines changed

platform-control-center/src/main/java/com/flow/platform/cc/service/AgentService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ public interface AgentService extends WebhookService {
3535
int IDLE_AGENT_TASK_HEARTBEAT = 30 * 1000; // millisecond
3636

3737
/**
38-
* Async update agent offline and online list by zone, will send to agent report queue
38+
* Async update agent offline or online status
3939
*/
40-
void reportOnline(String zone, Set<String> agents);
40+
void report(AgentPath path, AgentStatus status);
4141

4242
/**
4343
* List agent by zone name

platform-control-center/src/main/java/com/flow/platform/cc/service/AgentServiceImpl.java

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.springframework.beans.factory.annotation.Autowired;
5050
import org.springframework.beans.factory.annotation.Value;
5151
import org.springframework.dao.DataAccessException;
52+
import org.springframework.dao.DataIntegrityViolationException;
5253
import org.springframework.scheduling.annotation.Scheduled;
5354
import org.springframework.stereotype.Service;
5455
import org.springframework.transaction.annotation.Isolation;
@@ -89,14 +90,34 @@ public class AgentServiceImpl extends WebhookServiceImplBase implements AgentSer
8990
private String secretKey;
9091

9192
@Override
92-
public void reportOnline(String zone, Set<String> agents) {
93-
// set offline agents
94-
agentDao.batchUpdateStatus(zone, AgentStatus.OFFLINE, agents, true);
95-
96-
// send to report queue
97-
for (String agent : agents) {
98-
AgentPath key = new AgentPath(zone, agent);
99-
agentReportQueue.enqueue(key);
93+
public void report(AgentPath path, AgentStatus status) {
94+
Agent exist = find(path);
95+
96+
// For agent offline status
97+
if (status == AgentStatus.OFFLINE) {
98+
saveWithStatus(exist, AgentStatus.OFFLINE);
99+
return;
100+
}
101+
102+
// create new agent with idle status
103+
if (exist == null) {
104+
try {
105+
exist = create(path, null);
106+
LOGGER.trace("Create agent %s from report", path);
107+
} catch (DataIntegrityViolationException ignore) {
108+
// agent been created at some other threads
109+
return;
110+
}
111+
}
112+
113+
// update exist offline agent to idle status
114+
if (exist.getStatus() == AgentStatus.OFFLINE) {
115+
saveWithStatus(exist, AgentStatus.IDLE);
116+
}
117+
118+
// do not update agent status when its busy
119+
if (exist.getStatus() == AgentStatus.BUSY) {
120+
// do nothing
100121
}
101122
}
102123

platform-control-center/src/main/java/com/flow/platform/cc/service/ZoneServiceImpl.java

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
import com.flow.platform.core.context.ContextEvent;
2323
import com.flow.platform.core.context.SpringContext;
2424
import com.flow.platform.domain.Agent;
25+
import com.flow.platform.domain.AgentPath;
2526
import com.flow.platform.domain.AgentSettings;
27+
import com.flow.platform.domain.AgentStatus;
2628
import com.flow.platform.domain.Cmd;
2729
import com.flow.platform.domain.CmdInfo;
2830
import com.flow.platform.domain.CmdType;
@@ -41,6 +43,7 @@
4143
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
4244
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type;
4345
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
46+
import org.apache.zookeeper.ZKUtil;
4447
import org.springframework.beans.factory.annotation.Autowired;
4548
import org.springframework.scheduling.annotation.Scheduled;
4649
import org.springframework.stereotype.Service;
@@ -116,7 +119,9 @@ public String createZone(Zone zone) {
116119
List<String> agents = zkClient.getChildren(zonePath);
117120

118121
if (!agents.isEmpty()) {
119-
agentService.reportOnline(zone.getName(), Sets.newHashSet(agents));
122+
for (String agent : agents) {
123+
agentService.report(new AgentPath(zone.getName(), agent), AgentStatus.IDLE);
124+
}
120125
}
121126

122127
ZoneEventListener zoneEventWatcher = zoneEventWatchers.computeIfAbsent(zone, ZoneEventListener::new);
@@ -236,14 +241,20 @@ private class ZoneEventListener implements PathChildrenCacheListener {
236241

237242
@Override
238243
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
239-
// TODO: should optimize by event type
240-
Type type = event.getType();
241-
LOGGER.debugMarker("ZoneEventListener", "Receive zookeeper event %s", type);
242-
243-
taskExecutor.execute(() -> {
244-
List<String> agents = zkClient.getChildren(zone.getPath());
245-
agentService.reportOnline(zone.getName(), Sets.newHashSet(agents));
246-
});
244+
final Type eventType = event.getType();
245+
final String path = event.getData().getPath();
246+
final String name = ZKHelper.getNameFromPath(path);
247+
LOGGER.debugMarker("ZoneEventListener", "Receive zookeeper event %s %s", eventType, path);
248+
249+
if (eventType == Type.CHILD_ADDED || eventType == Type.CHILD_UPDATED) {
250+
agentService.report(new AgentPath(zone.getName(), name), AgentStatus.IDLE);
251+
return;
252+
}
253+
254+
if (eventType == Type.CHILD_REMOVED) {
255+
agentService.report(new AgentPath(zone.getName(), name), AgentStatus.OFFLINE);
256+
return;
257+
}
247258
}
248259
}
249260
}

platform-control-center/src/main/java/com/flow/platform/cc/util/ZKHelper.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,8 @@ public static String buildPath(String zone, String name) {
5252
public static String buildPath(AgentPath agentPath) {
5353
return ZKPaths.makePath(ROOT_NODE, agentPath.getZone(), agentPath.getName());
5454
}
55+
56+
public static String getNameFromPath(String path) {
57+
return ZKPaths.getNodeFromPath(path);
58+
}
5559
}

platform-control-center/src/test/java/com/flow/platform/cc/test/controller/CmdControllerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.flow.platform.cc.test.TestBase;
3131
import com.flow.platform.cc.util.ZKHelper;
3232
import com.flow.platform.domain.AgentPath;
33+
import com.flow.platform.domain.AgentStatus;
3334
import com.flow.platform.domain.Cmd;
3435
import com.flow.platform.domain.CmdInfo;
3536
import com.flow.platform.domain.CmdReport;
@@ -94,7 +95,7 @@ public void should_update_cmd_status() throws Throwable {
9495
String agent = "test-001";
9596

9697
AgentPath path = new AgentPath(zone, agent);
97-
agentService.reportOnline(zone, Sets.newHashSet(agent));
98+
agentService.report(path, AgentStatus.IDLE);
9899
Thread.sleep(1000);
99100

100101
CmdInfo base = new CmdInfo(zone, agent, CmdType.STOP, null);

platform-control-center/src/test/java/com/flow/platform/cc/test/service/AgentServiceTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,10 +252,10 @@ public void should_delete_agent() throws Exception{
252252
// when: report status
253253
AgentPath pathObj = new AgentPath(zoneName, agentName);
254254
Agent created = agentService.find(pathObj);
255+
Assert.assertNotNull(created);
255256
agentService.saveWithStatus(created, AgentStatus.BUSY);
256257

257258
agentService.delete(created);
258259
Assert.assertNull(agentService.find(new AgentPath(zoneName, agentName)));
259-
260260
}
261261
}

platform-control-center/src/test/java/com/flow/platform/cc/test/service/CmdServiceTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ public void should_report_cmd_status() throws Throwable {
180180
// given:
181181
String zoneName = defaultZones.get(0).getName();
182182
AgentPath agentPath = new AgentPath(zoneName, "test-agent-for-report-cmd");
183-
agentService.reportOnline(zoneName, Sets.newHashSet("test-agent-for-report-cmd"));
183+
agentService.report(agentPath, AgentStatus.IDLE);
184184
Thread.sleep(5000);
185185

186186
Agent agent = agentService.find(agentPath);

0 commit comments

Comments
 (0)