Skip to content

Commit 308037a

Browse files
author
yang.guo
authored
Merge pull request #174 from FlowCI/hotfix/173
Hotfix/173
2 parents 66d2989 + 3666604 commit 308037a

File tree

15 files changed

+59
-184
lines changed

15 files changed

+59
-184
lines changed

config/app-api.properties

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ system.email = [email protected]
2929
system.username = admin
3030
system.password = 123456
3131

32-
task.job.toggle.execution_timeout = false
33-
## 6s expire job
34-
task.job.toggle.execution_create_session_duration = 600
35-
## 1h expire job
32+
task.job.toggle.execution_timeout = true
33+
## expired in 1800 seconds for create session
34+
task.job.toggle.execution_create_session_duration = 1800
35+
## expired in 3600 seconds for job running
3636
task.job.toggle.execution_running_duration = 3600

platform-api/src/main/java/com/flow/platform/api/service/job/JobServiceImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -662,14 +662,14 @@ public void checkTimeoutTask() {
662662

663663
LOGGER.trace("job timeout task start");
664664

665-
// create session job timeout 6s time out
665+
// job timeout on create session
666666
ZonedDateTime finishZoneDateTime = ZonedDateTime.now().minusSeconds(jobExecuteTimeoutCreateSessionDuration);
667667
List<Job> jobs = jobDao.listForExpired(finishZoneDateTime, JobStatus.SESSION_CREATING);
668668
for (Job job : jobs) {
669669
updateJobAndNodeResultTimeout(job);
670670
}
671671

672-
// running job timeout 1h time out
672+
// job timeout on running
673673
ZonedDateTime finishRunningZoneDateTime = ZonedDateTime.now().minusSeconds(jobExecuteTimeoutRunningDuration);
674674
List<Job> runningJobs = jobDao.listForExpired(finishRunningZoneDateTime, JobStatus.RUNNING);
675675
for (Job job : runningJobs) {

platform-api/src/main/resources/app-default.properties

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ system.email = [email protected]
2929
system.username = admin
3030
system.password = 123456
3131

32-
task.job.toggle.execution_timeout = false
33-
## 6s expire job
34-
task.job.toggle.execution_create_session_duration = 600
35-
## 1h expire job
32+
task.job.toggle.execution_timeout = true
33+
## expired in 1800 seconds for job create session
34+
task.job.toggle.execution_create_session_duration = 1800
35+
## expired in 3600 seconds for running job
3636
task.job.toggle.execution_running_duration = 3600
3737

platform-api/src/test/java/com/flow/platform/api/test/service/JobServiceTest.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import com.flow.platform.api.test.TestBase;
3333
import com.flow.platform.api.util.CommonUtil;
3434
import com.flow.platform.core.exception.IllegalStatusException;
35+
import com.flow.platform.core.util.ThreadUtil;
3536
import com.flow.platform.domain.Cmd;
3637
import com.flow.platform.domain.CmdResult;
3738
import com.flow.platform.domain.CmdStatus;
@@ -260,19 +261,16 @@ public void should_stop_running_job_success() throws IOException {
260261
Assert.assertEquals(NodeStatus.STOPPED, stoppedJob.getRootResult().getStatus());
261262
}
262263

263-
264264
@Test
265265
public void should_job_time_out_and_reject_callback() throws IOException, InterruptedException {
266+
// given: job and mock updated time as expired
266267
Node rootForFlow = createRootFlow("flow1", "demo_flow2.yaml");
267-
268268
Job job = jobService.createFromFlowYml(rootForFlow.getPath(), JobCategory.TAG, null, mockUser);
269-
270269
Assert.assertNotNull(job.getEnv("FLOW_WORKSPACE"));
271270
Assert.assertNotNull(job.getEnv("FLOW_VERSION"));
272271

273-
Thread.sleep(7000);
274-
275272
// when: check job timeout
273+
ThreadUtil.sleep(20000);
276274
jobService.checkTimeoutTask();
277275

278276
// then: job status should be timeout

platform-control-center/src/main/java/com/flow/platform/cc/consumer/CmdQueueConsumer.java

Lines changed: 16 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package com.flow.platform.cc.consumer;
1818

1919
import com.flow.platform.cc.config.QueueConfig;
20-
import com.flow.platform.cc.domain.CmdQueueItem;
2120
import com.flow.platform.cc.exception.AgentErr;
2221
import com.flow.platform.cc.service.AgentService;
2322
import com.flow.platform.cc.service.CmdDispatchService;
@@ -27,6 +26,7 @@
2726
import com.flow.platform.core.queue.PlatformQueue;
2827
import com.flow.platform.core.queue.PriorityMessage;
2928
import com.flow.platform.core.queue.QueueListener;
29+
import com.flow.platform.core.util.ThreadUtil;
3030
import com.flow.platform.domain.Cmd;
3131
import com.flow.platform.domain.CmdStatus;
3232
import com.flow.platform.util.Logger;
@@ -48,6 +48,8 @@ public class CmdQueueConsumer implements QueueListener<PriorityMessage> {
4848

4949
private final static Logger LOGGER = new Logger(CmdQueueConsumer.class);
5050

51+
private final static long RETRY_WAIT_TIME = 1000; // in millis
52+
5153
@Value("${queue.cmd.idle_agent.period}")
5254
private Integer idleAgentPeriod; // period for check idle agent in seconds
5355

@@ -73,96 +75,56 @@ public void init() {
7375

7476
@Override
7577
public void onQueueItem(PriorityMessage message) {
76-
CmdQueueItem item = CmdQueueItem.parse(message.getBody(), CmdQueueItem.class);
77-
String cmdId = item.getCmdId();
78-
LOGGER.trace("Receive a cmd queue item: %s", item);
78+
String cmdId = new String(message.getBody());
79+
LOGGER.trace("Receive a cmd queue item: %s", cmdId);
80+
81+
Cmd cmd = cmdService.find(cmdId);
7982

8083
try {
81-
cmdDispatchService.dispatch(cmdId, false);
84+
cmdDispatchService.dispatch(cmd);
8285
} catch (IllegalParameterException e) {
8386
LOGGER.warn("Illegal cmd id: %s", e.getMessage());
8487
} catch (IllegalStatusException e) {
8588
LOGGER.warn("Illegal cmd status: %s", e.getMessage());
8689
} catch (AgentErr.NotAvailableException | AgentErr.NotFoundException | ZkException.NotExitException e) {
87-
if (item.getRetry() <= 0) {
90+
if (cmd.getRetry() <= 0) {
8891
return;
8992
}
9093

91-
boolean isTimeout = waitForIdleAgent(cmdId, idleAgentPeriod, idleAgentTimeout);
92-
if (isTimeout) {
93-
LOGGER.trace("wait for idle agent time out %s seconds for cmd %s", idleAgentTimeout, cmdId);
94-
}
95-
96-
Cmd cmd = cmdService.find(cmdId);
94+
cmd = cmdService.find(cmdId);
9795

9896
// do not re-enqueue if cmd been stopped or killed
9997
if (cmd.getStatus() == CmdStatus.STOPPED || cmd.getStatus() == CmdStatus.KILLED) {
10098
return;
10199
}
102100

103101
// reset cmd status to pending, record num of retry
104-
int retry = item.getRetry() - 1;
102+
int retry = cmd.getRetry() - 1;
105103
cmd.setStatus(CmdStatus.PENDING);
106104
cmd.setRetry(retry);
107105
cmdService.save(cmd);
108106

109107
// re-enqueue
110-
resend(item, retry);
108+
resend(cmd.getId(), retry);
111109

112110
} catch (Throwable e) {
113111
LOGGER.error("Unexpected exception", e);
114112
}
115113
}
116114

117-
/**
118-
* Block current thread and check idle agent
119-
*
120-
* @param cmdId cmd id
121-
* @param period check idle agent period in seconds
122-
* @param timeout timeout in seconds
123-
* @return is time out exit or not
124-
*/
125-
private boolean waitForIdleAgent(String cmdId, int period, int timeout) {
126-
Instant now = Instant.now();
127-
128-
while (true) {
129-
if (Duration.between(now, Instant.now()).toMillis() >= timeout * 1000) {
130-
return true;
131-
}
132-
133-
try {
134-
Thread.sleep(period * 1000);
135-
} catch (InterruptedException ignore) {
136-
}
137-
138-
String zone = cmdService.find(cmdId).getZoneName();
139-
int numOfIdle = agentService.findAvailable(zone).size();
140-
if (numOfIdle > 0) {
141-
LOGGER.trace("has %s idle agent", numOfIdle);
142-
return false;
143-
}
144-
}
145-
}
146-
147115
/**
148116
* Re-enqueue cmd and return num of retry
149117
*/
150-
private void resend(final CmdQueueItem item, final int retry) {
118+
private void resend(final String cmdId, final int retry) {
151119
if (retry <= 0) {
152120
return;
153121
}
154122

155-
item.setRetry(retry);
156-
157-
try {
158-
Thread.sleep(1000); // wait 1 seconds and enqueue again with priority
159-
} catch (InterruptedException ignore) {
160-
// do nothing
161-
}
123+
ThreadUtil.sleep(RETRY_WAIT_TIME);
162124

163125
// reset cmd status
164-
PriorityMessage message = PriorityMessage.create(item.toBytes(), QueueConfig.MAX_PRIORITY);
126+
PriorityMessage message = PriorityMessage.create(cmdId.getBytes(), QueueConfig.MAX_PRIORITY);
165127
cmdQueue.enqueue(message);
166-
LOGGER.trace("Re-enqueue item %s", item);
128+
LOGGER.trace("Re-enqueue item %s", cmdId);
167129
}
168130
}

platform-control-center/src/main/java/com/flow/platform/cc/controller/CmdController.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public CmdType[] getCmdTypes() {
6969
@PostMapping(path = "/send")
7070
public Cmd sendCommand(@RequestBody CmdInfo cmd) {
7171
Cmd cmdToExec = cmdService.create(cmd);
72-
return cmdDispatchService.dispatch(cmdToExec.getId(), false);
72+
return cmdDispatchService.dispatch(cmdToExec);
7373
}
7474

7575
@PostMapping(path = "/queue/send")

platform-control-center/src/main/java/com/flow/platform/cc/domain/CmdQueueItem.java

Lines changed: 0 additions & 82 deletions
This file was deleted.

platform-control-center/src/main/java/com/flow/platform/cc/service/AgentServiceImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ public void sessionTimeoutTask() {
256256
for (Agent agent : agents) {
257257
if (agent.getSessionId() != null && isSessionTimeout(agent, now, zone.getAgentSessionTimeout())) {
258258
Cmd delSessionCmd = cmdService.create(new CmdInfo(agent.getPath(), CmdType.DELETE_SESSION, null));
259-
cmdDispatchService.dispatch(delSessionCmd.getId(), false);
259+
cmdDispatchService.dispatch(delSessionCmd);
260260
LOGGER.traceMarker("sessionTimeoutTask", "Send DELETE_SESSION to agent %s", agent);
261261
}
262262
}

platform-control-center/src/main/java/com/flow/platform/cc/service/CmdDispatchService.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,9 @@ public interface CmdDispatchService {
2626
/**
2727
* Dispatch cmd to agent
2828
*
29-
* @param cmdId cmd id
30-
* @param reset is reset cmd status to pending
3129
* @return cmd
3230
*/
33-
Cmd dispatch(String cmdId, boolean reset);
31+
Cmd dispatch(Cmd cmd);
3432

3533
/**
3634
* Task to check cmd is executing timeout

platform-control-center/src/main/java/com/flow/platform/cc/service/CmdDispatchServiceImpl.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -100,15 +100,9 @@ public void init() {
100100

101101
@Override
102102
@Transactional(noRollbackFor = {Throwable.class})
103-
public Cmd dispatch(String cmdId, boolean reset) {
104-
Cmd cmd = cmdService.find(cmdId);
103+
public Cmd dispatch(Cmd cmd) {
105104
if (cmd == null) {
106-
throw new IllegalParameterException(String.format("Cmd '%s' does not exist", cmdId));
107-
}
108-
109-
// reset cmd status to pending
110-
if (reset) {
111-
cmd.setStatus(CmdStatus.PENDING);
105+
throw new IllegalParameterException("Cmd is null while dispatching");
112106
}
113107

114108
// do not run cmd if not in working status
@@ -165,7 +159,7 @@ public void checkTimeoutTask() {
165159
for (Cmd cmd : workingCmdList) {
166160
if (cmd.isCmdTimeout()) {
167161
Cmd killCmd = cmdService.create(new CmdInfo(cmd.getAgentPath(), CmdType.KILL, null));
168-
dispatch(killCmd.getId(), false);
162+
dispatch(killCmd);
169163
LOGGER.traceMarker("checkTimeoutTask", "Send KILL for timeout cmd %s", cmd);
170164

171165
// update cmd status via queue
@@ -201,11 +195,11 @@ private Cmd createDeleteSessionCmd(Agent target) {
201195
private void cleanCurrentCmd(Cmd current) {
202196
if (Strings.isNullOrEmpty(current.getSessionId())) {
203197
Cmd cmdToKill = cmdService.create(new CmdInfo(current.getAgentPath(), CmdType.KILL, null));
204-
dispatch(cmdToKill.getId(), false);
198+
dispatch(cmdToKill);
205199
} else {
206200
Agent agent = agentService.find(current.getAgentPath());
207201
Cmd cmdToDelSession = createDeleteSessionCmd(agent);
208-
dispatch(cmdToDelSession.getId(), false);
202+
dispatch(cmdToDelSession);
209203
}
210204
}
211205

0 commit comments

Comments
 (0)