Skip to content

Commit 5e4cf40

Browse files
author
yang.guo
authored
Merge pull request #175 from FlowCI/feature/171
Feature/171
2 parents 308037a + 7be5f0c commit 5e4cf40

File tree

11 files changed

+105
-19
lines changed

11 files changed

+105
-19
lines changed

platform-api/src/main/java/com/flow/platform/api/consumer/CmdCallbackQueueConsumer.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818

1919
import com.flow.platform.api.domain.CmdCallbackQueueItem;
2020
import com.flow.platform.api.service.job.JobService;
21+
import com.flow.platform.core.exception.FlowException;
2122
import com.flow.platform.core.exception.NotFoundException;
2223
import com.flow.platform.core.queue.PlatformQueue;
2324
import com.flow.platform.core.queue.PriorityMessage;
2425
import com.flow.platform.core.queue.QueueListener;
26+
import com.flow.platform.core.util.ThreadUtil;
2527
import com.flow.platform.util.Logger;
2628
import javax.annotation.PostConstruct;
2729
import org.springframework.beans.factory.annotation.Autowired;
@@ -35,6 +37,9 @@ public class CmdCallbackQueueConsumer implements QueueListener<PriorityMessage>
3537

3638
private final static Logger LOGGER = new Logger(CmdCallbackQueueConsumer.class);
3739

40+
// requeue 1 s
41+
private final static int REQUEUE_DELAY_TIME = 1000;
42+
3843
@Autowired
3944
private PlatformQueue<PriorityMessage> cmdCallbackQueue;
4045

@@ -58,20 +63,34 @@ public void onQueueItem(PriorityMessage message) {
5863
jobService.callback(item);
5964
} catch (NotFoundException notFoundException) {
6065

66+
// detect retry times is reach the limit or not
67+
detectRetryTimes(item);
68+
6169
// re-enqueue cmd callback if job not found since transaction problem
62-
reEnqueueJobCallback(item, 1000);
70+
reEnqueueJobCallback(item, REQUEUE_DELAY_TIME, message.getMessageProperties().getPriority());
6371

6472
} catch (Throwable throwable) {
6573
LOGGER.traceMarker("onQueueItem", String.format("exception - %s", throwable));
6674
}
6775
}
6876

69-
private void reEnqueueJobCallback(CmdCallbackQueueItem item, long wait) {
70-
try {
71-
Thread.sleep(wait);
72-
} catch (Throwable ignore) {
77+
private void detectRetryTimes(CmdCallbackQueueItem item) {
78+
if (item.getRetryTimes() <= 0) {
79+
throw new FlowException("retry times has reach the limit");
7380
}
81+
}
82+
83+
private void reEnqueueJobCallback(CmdCallbackQueueItem item, long wait, int priority) {
84+
85+
// sleep seconds
86+
ThreadUtil.sleep(wait);
87+
88+
// set retry times
89+
item.setRetryTimes(item.getRetryTimes() - 1);
90+
91+
//priority inc 1
92+
priority = priority + 1;
7493

75-
jobService.enterQueue(item);
94+
jobService.enterQueue(item, priority);
7695
}
7796
}

platform-api/src/main/java/com/flow/platform/api/controller/CmdWebhookController.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public void execute(@RequestBody Cmd cmd, @RequestParam String identifier) {
6464
cmd.getStatus(),
6565
cmd.getId(),
6666
cmd.getCmdResult());
67-
jobService.enterQueue(new CmdCallbackQueueItem(jobId, cmd));
67+
jobService.enterQueue(new CmdCallbackQueueItem(jobId, cmd), 1);
6868
} catch (NumberFormatException warn) {
6969
LOGGER.warn("Invalid job id format");
7070
}

platform-api/src/main/java/com/flow/platform/api/dao/job/NodeResultDaoImpl.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import javax.persistence.criteria.Predicate;
2929
import javax.persistence.criteria.Root;
3030
import org.hibernate.Session;
31-
import org.hibernate.query.Query;
3231
import org.springframework.stereotype.Repository;
3332

3433
/**
@@ -100,11 +99,8 @@ public int update(BigInteger jobId, NodeStatus target) {
10099

101100
@Override
102101
public void delete(List<BigInteger> jobIds) {
103-
execute((Session session) -> {
104-
String delete = String.format("delete from NodeResult where job_id in (:list)");
105-
Query query = session.createQuery(delete);
106-
query.setParameterList("list", jobIds);
107-
return true;
108-
});
102+
execute((Session session) -> session.createQuery("delete from NodeResult where key.jobId in ( :jobIds )")
103+
.setParameterList("jobIds", jobIds)
104+
.executeUpdate());
109105
}
110106
}

platform-api/src/main/java/com/flow/platform/api/domain/CmdCallbackQueueItem.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,19 @@ public class CmdCallbackQueueItem extends Jsonable {
3131

3232
private final Cmd cmd;
3333

34-
private Integer retryTimes = 0;
34+
// default retry times 5
35+
private Integer retryTimes = 5;
3536

3637
public CmdCallbackQueueItem(BigInteger jobId, Cmd cmd) {
3738
this.jobId = jobId;
3839
this.cmd = cmd;
3940
this.path = cmd.getExtra();
4041
}
4142

43+
public void setRetryTimes(Integer retryTimes) {
44+
this.retryTimes = retryTimes;
45+
}
46+
4247
public BigInteger getJobId() {
4348
return jobId;
4449
}

platform-api/src/main/java/com/flow/platform/api/service/job/JobService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ void createWithYmlLoad(String path,
110110
/**
111111
* Enqueue cmd callback item
112112
*/
113-
void enterQueue(CmdCallbackQueueItem cmdQueueItem);
113+
void enterQueue(CmdCallbackQueueItem cmdQueueItem, int priority);
114114

115115
/**
116116
* stop job

platform-api/src/main/java/com/flow/platform/api/service/job/JobServiceImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -462,8 +462,8 @@ private void onRunShellCallback(String path, Cmd cmd, Job job) {
462462
}
463463

464464
@Override
465-
public void enterQueue(CmdCallbackQueueItem cmdQueueItem) {
466-
cmdCallbackQueue.enqueue(PriorityMessage.create(cmdQueueItem.toBytes(), 1));
465+
public void enterQueue(CmdCallbackQueueItem cmdQueueItem, int priority) {
466+
cmdCallbackQueue.enqueue(PriorityMessage.create(cmdQueueItem.toBytes(), priority));
467467
}
468468

469469
@Override

platform-api/src/main/java/com/flow/platform/api/service/node/NodeCrontabService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,9 @@ public interface NodeCrontabService extends ContextEvent {
4444
* List current triggers
4545
*/
4646
List<Trigger> triggers();
47+
48+
/**
49+
* Clean triggers
50+
*/
51+
void cleanTriggers();
4752
}

platform-api/src/main/java/com/flow/platform/api/service/node/NodeCrontabServiceImpl.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,4 +165,13 @@ public List<Trigger> triggers() {
165165
throw new IllegalStatusException(e.getMessage());
166166
}
167167
}
168+
169+
@Override
170+
public void cleanTriggers() {
171+
try {
172+
quartzScheduler.getTriggersOfJob(nodeCrontabDetail.getKey()).clear();
173+
} catch (SchedulerException e) {
174+
throw new IllegalStatusException(e.getMessage());
175+
}
176+
}
168177
}

platform-api/src/test/java/com/flow/platform/api/test/service/JobServiceTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import com.flow.platform.api.test.TestBase;
3333
import com.flow.platform.api.util.CommonUtil;
3434
import com.flow.platform.core.exception.IllegalStatusException;
35+
import com.flow.platform.core.queue.PlatformQueue;
36+
import com.flow.platform.core.queue.PriorityMessage;
3537
import com.flow.platform.core.util.ThreadUtil;
3638
import com.flow.platform.domain.Cmd;
3739
import com.flow.platform.domain.CmdResult;
@@ -42,10 +44,14 @@
4244
import java.io.IOException;
4345
import java.time.ZonedDateTime;
4446
import java.util.List;
47+
import java.util.concurrent.CountDownLatch;
48+
import java.util.concurrent.TimeUnit;
49+
import java.util.concurrent.atomic.AtomicInteger;
4550
import org.junit.Assert;
4651
import org.junit.Before;
4752
import org.junit.Test;
4853
import org.springframework.beans.factory.annotation.Autowired;
54+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
4955

5056
/**
5157
* @author yh@firim
@@ -55,6 +61,9 @@ public class JobServiceTest extends TestBase {
5561
@Autowired
5662
private JobNodeService jobNodeService;
5763

64+
@Autowired
65+
private PlatformQueue<PriorityMessage> cmdCallbackQueue;
66+
5867
@Before
5968
public void init() {
6069
stubDemo();
@@ -301,4 +310,28 @@ public void should_get_latest_job_by_node_path() throws IOException {
301310
Assert.assertEquals(1, jobs.size());
302311
Assert.assertEquals("2", jobs.get(0).getNumber().toString());
303312
}
313+
314+
@Test
315+
public void should_cmd_enqueue_limit_times_success() throws InterruptedException {
316+
Cmd cmd = new Cmd("default", "test", CmdType.RUN_SHELL, "echo 1");
317+
CountDownLatch countDownLatch = new CountDownLatch(5);
318+
AtomicInteger atomicInteger = new AtomicInteger(0);
319+
320+
// register new queue to get item info
321+
cmdCallbackQueue.register(message -> {
322+
CmdCallbackQueueItem item = CmdCallbackQueueItem.parse(message.getBody(), CmdCallbackQueueItem.class);
323+
atomicInteger.set(item.getRetryTimes());
324+
countDownLatch.countDown();
325+
});
326+
327+
// when: enter queue one not found job id
328+
jobService.enterQueue(new CmdCallbackQueueItem(CommonUtil.randomId(), cmd), 1);
329+
countDownLatch.await(6, TimeUnit.SECONDS);
330+
331+
// then: should try 5 times
332+
Assert.assertEquals(1, atomicInteger.get());
333+
334+
// then: cmdCallbackQueue size should 0
335+
Assert.assertEquals(0, cmdCallbackQueue.size());
336+
}
304337
}

platform-api/src/test/java/com/flow/platform/api/test/service/NodeCrontabServiceTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public class NodeCrontabServiceTest extends TestBase {
4242
@Before
4343
public void before() {
4444
stubDemo();
45+
flowCrontabService.cleanTriggers();
4546
}
4647

4748
@Test

0 commit comments

Comments
 (0)