Skip to content

Commit 984e9dc

Browse files
author
Yang Guo
committed
get rid of cmd queue item instead of cmd id as queue item
1 parent 3a4c960 commit 984e9dc

File tree

10 files changed

+34
-128
lines changed

10 files changed

+34
-128
lines changed

platform-control-center/src/main/java/com/flow/platform/cc/consumer/CmdQueueConsumer.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package com.flow.platform.cc.consumer;
1818

1919
import com.flow.platform.cc.config.QueueConfig;
20-
import com.flow.platform.cc.domain.CmdQueueItem;
2120
import com.flow.platform.cc.exception.AgentErr;
2221
import com.flow.platform.cc.service.AgentService;
2322
import com.flow.platform.cc.service.CmdDispatchService;
@@ -76,36 +75,37 @@ public void init() {
7675

7776
@Override
7877
public void onQueueItem(PriorityMessage message) {
79-
CmdQueueItem item = CmdQueueItem.parse(message.getBody(), CmdQueueItem.class);
80-
String cmdId = item.getCmdId();
81-
LOGGER.trace("Receive a cmd queue item: %s", item);
78+
String cmdId = new String(message.getBody());
79+
LOGGER.trace("Receive a cmd queue item: %s", cmdId);
80+
81+
Cmd cmd = cmdService.find(cmdId);
8282

8383
try {
84-
cmdDispatchService.dispatch(cmdId, false);
84+
cmdDispatchService.dispatch(cmd);
8585
} catch (IllegalParameterException e) {
8686
LOGGER.warn("Illegal cmd id: %s", e.getMessage());
8787
} catch (IllegalStatusException e) {
8888
LOGGER.warn("Illegal cmd status: %s", e.getMessage());
8989
} catch (AgentErr.NotAvailableException | AgentErr.NotFoundException | ZkException.NotExitException e) {
90-
if (item.getRetry() <= 0) {
90+
if (cmd.getRetry() <= 0) {
9191
return;
9292
}
9393

94-
Cmd cmd = cmdService.find(cmdId);
94+
cmd = cmdService.find(cmdId);
9595

9696
// do not re-enqueue if cmd been stopped or killed
9797
if (cmd.getStatus() == CmdStatus.STOPPED || cmd.getStatus() == CmdStatus.KILLED) {
9898
return;
9999
}
100100

101101
// reset cmd status to pending, record num of retry
102-
int retry = item.getRetry() - 1;
102+
int retry = cmd.getRetry() - 1;
103103
cmd.setStatus(CmdStatus.PENDING);
104104
cmd.setRetry(retry);
105105
cmdService.save(cmd);
106106

107107
// re-enqueue
108-
resend(item, retry);
108+
resend(cmd.getId(), retry);
109109

110110
} catch (Throwable e) {
111111
LOGGER.error("Unexpected exception", e);
@@ -115,17 +115,16 @@ public void onQueueItem(PriorityMessage message) {
115115
/**
116116
* Re-enqueue cmd and return num of retry
117117
*/
118-
private void resend(final CmdQueueItem item, final int retry) {
118+
private void resend(final String cmdId, final int retry) {
119119
if (retry <= 0) {
120120
return;
121121
}
122122

123-
item.setRetry(retry);
124123
ThreadUtil.sleep(RETRY_WAIT_TIME);
125124

126125
// reset cmd status
127-
PriorityMessage message = PriorityMessage.create(item.toBytes(), QueueConfig.MAX_PRIORITY);
126+
PriorityMessage message = PriorityMessage.create(cmdId.getBytes(), QueueConfig.MAX_PRIORITY);
128127
cmdQueue.enqueue(message);
129-
LOGGER.trace("Re-enqueue item %s", item);
128+
LOGGER.trace("Re-enqueue item %s", cmdId);
130129
}
131130
}

platform-control-center/src/main/java/com/flow/platform/cc/controller/CmdController.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public CmdType[] getCmdTypes() {
6969
@PostMapping(path = "/send")
7070
public Cmd sendCommand(@RequestBody CmdInfo cmd) {
7171
Cmd cmdToExec = cmdService.create(cmd);
72-
return cmdDispatchService.dispatch(cmdToExec.getId(), false);
72+
return cmdDispatchService.dispatch(cmdToExec);
7373
}
7474

7575
@PostMapping(path = "/queue/send")

platform-control-center/src/main/java/com/flow/platform/cc/domain/CmdQueueItem.java

Lines changed: 0 additions & 82 deletions
This file was deleted.

platform-control-center/src/main/java/com/flow/platform/cc/service/AgentServiceImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ public void sessionTimeoutTask() {
256256
for (Agent agent : agents) {
257257
if (agent.getSessionId() != null && isSessionTimeout(agent, now, zone.getAgentSessionTimeout())) {
258258
Cmd delSessionCmd = cmdService.create(new CmdInfo(agent.getPath(), CmdType.DELETE_SESSION, null));
259-
cmdDispatchService.dispatch(delSessionCmd.getId(), false);
259+
cmdDispatchService.dispatch(delSessionCmd);
260260
LOGGER.traceMarker("sessionTimeoutTask", "Send DELETE_SESSION to agent %s", agent);
261261
}
262262
}

platform-control-center/src/main/java/com/flow/platform/cc/service/CmdDispatchService.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,9 @@ public interface CmdDispatchService {
2626
/**
2727
* Dispatch cmd to agent
2828
*
29-
* @param cmdId cmd id
30-
* @param reset is reset cmd status to pending
3129
* @return cmd
3230
*/
33-
Cmd dispatch(String cmdId, boolean reset);
31+
Cmd dispatch(Cmd cmd);
3432

3533
/**
3634
* Task to check cmd is executing timeout

platform-control-center/src/main/java/com/flow/platform/cc/service/CmdDispatchServiceImpl.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -100,15 +100,9 @@ public void init() {
100100

101101
@Override
102102
@Transactional(noRollbackFor = {Throwable.class})
103-
public Cmd dispatch(String cmdId, boolean reset) {
104-
Cmd cmd = cmdService.find(cmdId);
103+
public Cmd dispatch(Cmd cmd) {
105104
if (cmd == null) {
106-
throw new IllegalParameterException(String.format("Cmd '%s' does not exist", cmdId));
107-
}
108-
109-
// reset cmd status to pending
110-
if (reset) {
111-
cmd.setStatus(CmdStatus.PENDING);
105+
throw new IllegalParameterException("Cmd is null while dispatching");
112106
}
113107

114108
// do not run cmd if not in working status
@@ -165,7 +159,7 @@ public void checkTimeoutTask() {
165159
for (Cmd cmd : workingCmdList) {
166160
if (cmd.isCmdTimeout()) {
167161
Cmd killCmd = cmdService.create(new CmdInfo(cmd.getAgentPath(), CmdType.KILL, null));
168-
dispatch(killCmd.getId(), false);
162+
dispatch(killCmd);
169163
LOGGER.traceMarker("checkTimeoutTask", "Send KILL for timeout cmd %s", cmd);
170164

171165
// update cmd status via queue
@@ -201,11 +195,11 @@ private Cmd createDeleteSessionCmd(Agent target) {
201195
private void cleanCurrentCmd(Cmd current) {
202196
if (Strings.isNullOrEmpty(current.getSessionId())) {
203197
Cmd cmdToKill = cmdService.create(new CmdInfo(current.getAgentPath(), CmdType.KILL, null));
204-
dispatch(cmdToKill.getId(), false);
198+
dispatch(cmdToKill);
205199
} else {
206200
Agent agent = agentService.find(current.getAgentPath());
207201
Cmd cmdToDelSession = createDeleteSessionCmd(agent);
208-
dispatch(cmdToDelSession.getId(), false);
202+
dispatch(cmdToDelSession);
209203
}
210204
}
211205

platform-control-center/src/main/java/com/flow/platform/cc/service/CmdServiceImpl.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import com.flow.platform.cc.dao.CmdDao;
3030
import com.flow.platform.cc.dao.CmdLogDao;
3131
import com.flow.platform.cc.dao.CmdResultDao;
32-
import com.flow.platform.cc.domain.CmdQueueItem;
3332
import com.flow.platform.cc.domain.CmdStatusItem;
3433
import com.flow.platform.cc.exception.AgentErr;
3534
import com.flow.platform.core.exception.IllegalParameterException;
@@ -99,6 +98,9 @@ public class CmdServiceImpl extends WebhookServiceImplBase implements CmdService
9998
@Autowired
10099
private AgentDao agentDao;
101100

101+
/**
102+
* The queue item is cmd id as string
103+
*/
102104
@Autowired
103105
private PlatformQueue<PriorityMessage> cmdQueue;
104106

@@ -205,9 +207,7 @@ public List<Cmd> listWorkingCmd(AgentPath agentPath) {
205207
@Transactional(propagation = Propagation.NEVER)
206208
public Cmd enqueue(CmdInfo cmdInfo, int priority, int retry) {
207209
Cmd cmd = create(cmdInfo, retry);
208-
209-
CmdQueueItem item = new CmdQueueItem(cmd.getId(), retry);
210-
PriorityMessage message = PriorityMessage.create(item.toBytes(), priority);
210+
PriorityMessage message = PriorityMessage.create(cmd.getId().getBytes(), priority);
211211
cmdQueue.enqueue(message);
212212

213213
return cmd;

platform-control-center/src/main/java/com/flow/platform/cc/service/ZoneServiceImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ public boolean keepIdleAgentMaxSize(final Zone zone, final InstanceManager insta
189189

190190
// send shutdown cmd
191191
Cmd shutdown = cmdService.create(new CmdInfo(idleAgent.getPath(), CmdType.SHUTDOWN, "flow.ci"));
192-
cmdDispatchService.dispatch(shutdown.getId(), false);
192+
cmdDispatchService.dispatch(shutdown);
193193
LOGGER.traceMarker("keepIdleAgentMaxSize", "Send SHUTDOWN to idle agent: %s", idleAgent);
194194

195195
// add instance to cleanup list

platform-control-center/src/test/java/com/flow/platform/cc/test/service/CmdDispatchServiceTest.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,12 @@
3939
import com.flow.platform.domain.CmdType;
4040
import com.flow.platform.domain.Zone;
4141
import java.util.List;
42-
import java.util.concurrent.CountDownLatch;
43-
import java.util.concurrent.TimeUnit;
4442
import org.junit.After;
4543
import org.junit.Assert;
4644
import org.junit.Before;
4745
import org.junit.FixMethodOrder;
4846
import org.junit.Test;
4947
import org.junit.runners.MethodSorters;
50-
import org.springframework.amqp.core.Message;
5148
import org.springframework.beans.factory.annotation.Autowired;
5249

5350
/**
@@ -91,7 +88,7 @@ public void toCreateSession() throws Throwable {
9188
// when: create cmd and dispatch to agent
9289
Cmd cmd = cmdService.create(new CmdInfo(zoneName, agentName, CmdType.CREATE_SESSION, null));
9390
Assert.assertNotNull(cmd.getSessionId());
94-
cmd = cmdDispatchService.dispatch(cmd.getId(), false);
91+
cmd = cmdDispatchService.dispatch(cmd);
9592

9693
// then: check agent is locked by session
9794
target = agentService.find(cmd.getAgentPath());
@@ -109,7 +106,7 @@ public void should_broadcast_agent_resource_event_if_no_available_agent() throws
109106
Cmd cmdToCreateSession = cmdService.create(new CmdInfo(agentPath, CmdType.CREATE_SESSION, null));
110107

111108
try {
112-
cmdDispatchService.dispatch(cmdToCreateSession.getId(), false);
109+
cmdDispatchService.dispatch(cmdToCreateSession);
113110
} catch (FlowException ignore) {
114111

115112
}
@@ -121,7 +118,7 @@ public void should_broadcast_agent_resource_event_if_no_available_agent() throws
121118
CmdInfo cmd = new CmdInfo(agentPath, CmdType.DELETE_SESSION, null);
122119
cmd.setSessionId(target.getSessionId());
123120
Cmd cmdToDeleteSession = cmdService.create(cmd);
124-
cmdDispatchService.dispatch(cmdToDeleteSession.getId(), false);
121+
cmdDispatchService.dispatch(cmdToDeleteSession);
125122

126123
// then: queue should be resumed since agent resource released
127124
Assert.assertEquals(true, cmdQueue.isRunning());
@@ -133,7 +130,7 @@ public void should_raise_exception_if_create_session_again() throws Throwable {
133130

134131
// should throw agent not available exception
135132
try {
136-
cmdDispatchService.dispatch(cmdToFail.getId(), false);
133+
cmdDispatchService.dispatch(cmdToFail);
137134
fail();
138135
} catch (Throwable e) {
139136
Assert.assertEquals(AgentErr.NotAvailableException.class, e.getClass());
@@ -147,7 +144,7 @@ public void should_send_cmd_with_session() throws Throwable {
147144
cmdWithSession.setSessionId(target.getSessionId());
148145

149146
Cmd cmd = cmdService.create(cmdWithSession);
150-
cmd = cmdDispatchService.dispatch(cmd.getId(), false);
147+
cmd = cmdDispatchService.dispatch(cmd);
151148

152149
// then: check target is found correctly by session id
153150
Agent sessionAgent = agentService.find(cmd.getAgentPath());
@@ -169,7 +166,7 @@ public void should_running_cmd_been_killed_when_delete_session() throws Throwabl
169166
// when: delete session
170167
CmdInfo cmdToDelSession = new CmdInfo(agentPath.getZone(), null, CmdType.DELETE_SESSION, null);
171168
cmdToDelSession.setSessionId(target.getSessionId());
172-
cmdDispatchService.dispatch(cmdService.create(cmdToDelSession).getId(), false);
169+
cmdDispatchService.dispatch(cmdService.create(cmdToDelSession));
173170

174171
// then: new kill cmd should been sent to agent
175172
Cmd killCmd = Cmd.parse(zkClient.getData(ZKHelper.buildPath(agentPath)), Cmd.class);
@@ -199,7 +196,7 @@ public void should_cmd_status_not_changed_for_finished_when_delete_session() thr
199196
// when: delete session
200197
CmdInfo cmdToDelSession = new CmdInfo(agentPath.getZone(), null, CmdType.DELETE_SESSION, null);
201198
cmdToDelSession.setSessionId(cmd.getSessionId());
202-
cmdDispatchService.dispatch(cmdService.create(cmdToDelSession).getId(), false);
199+
cmdDispatchService.dispatch(cmdService.create(cmdToDelSession));
203200

204201
// then: cmd in agent not changed
205202
Cmd notChangeCmd = Cmd.parse(zkClient.getData(ZKHelper.buildPath(agentPath)), Cmd.class);
@@ -223,6 +220,6 @@ private Cmd startRunShell(String zone, String sessionId) {
223220
CmdInfo cmdWithSession = new CmdInfo(zone, null, CmdType.RUN_SHELL, "echo hello");
224221
cmdWithSession.setSessionId(sessionId);
225222
Cmd cmd = cmdService.create(cmdWithSession);
226-
return cmdDispatchService.dispatch(cmd.getId(), false);
223+
return cmdDispatchService.dispatch(cmd);
227224
}
228225
}

platform-control-center/src/test/java/com/flow/platform/cc/test/service/CmdServiceTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,7 @@ public void should_write_cmd_log() throws Throwable {
478478

479479
private Cmd send(CmdInfo info) {
480480
Cmd cmd = cmdService.create(info);
481-
cmdDispatchService.dispatch(cmd.getId(), false);
481+
cmdDispatchService.dispatch(cmd);
482482
return cmdService.find(cmd.getId());
483483
}
484484
}

0 commit comments

Comments
 (0)