Skip to content

Commit aadfd46

Browse files
committed
change retryTimes dec and set priority param
1 parent 0a01de0 commit aadfd46

File tree

9 files changed

+35
-30
lines changed

9 files changed

+35
-30
lines changed

platform-api/src/main/java/com/flow/platform/api/consumer/CmdCallbackQueueConsumer.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,6 @@ public class CmdCallbackQueueConsumer implements QueueListener<PriorityMessage>
3636

3737
private final static Logger LOGGER = new Logger(CmdCallbackQueueConsumer.class);
3838

39-
// retry time set 5 times
40-
private final static int RETRY_TIMES = 5;
41-
4239
// requeue 1 s
4340
private final static int REQUEUE_DELAY_TIME = 1000;
4441

@@ -69,31 +66,31 @@ public void onQueueItem(PriorityMessage message) {
6966
detectRetryTimes(item);
7067

7168
// re-enqueue cmd callback if job not found since transaction problem
72-
reEnqueueJobCallback(item, REQUEUE_DELAY_TIME);
69+
reEnqueueJobCallback(item, REQUEUE_DELAY_TIME, message.getMessageProperties().getPriority());
7370

7471
} catch (Throwable throwable) {
7572
LOGGER.traceMarker("onQueueItem", String.format("exception - %s", throwable));
7673
}
7774
}
7875

7976
private void detectRetryTimes(CmdCallbackQueueItem item) {
80-
if (item.getRetryTimes() > RETRY_TIMES) {
81-
throw new FlowException(String.format("retry times has reach the limit"));
77+
if (item.getRetryTimes() <= 0) {
78+
throw new FlowException("retry times has reach the limit");
8279
}
8380
}
8481

85-
private void reEnqueueJobCallback(CmdCallbackQueueItem item, long wait) {
82+
private void reEnqueueJobCallback(CmdCallbackQueueItem item, long wait, int priority) {
8683
try {
8784
Thread.sleep(wait);
8885
} catch (Throwable ignore) {
8986
}
9087

91-
// set item priority priority inc
92-
item.setPriority(item.getPriority() + 1);
93-
9488
// set retry times
95-
item.setRetryTimes(item.getRetryTimes() + 1);
89+
item.setRetryTimes(item.getRetryTimes() - 1);
90+
91+
//priority inc 1
92+
priority = priority + 1;
9693

97-
jobService.enterQueue(item);
94+
jobService.enterQueue(item, priority);
9895
}
9996
}

platform-api/src/main/java/com/flow/platform/api/controller/CmdWebhookController.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public void execute(@RequestBody Cmd cmd, @RequestParam String identifier) {
6464
cmd.getStatus(),
6565
cmd.getId(),
6666
cmd.getCmdResult());
67-
jobService.enterQueue(new CmdCallbackQueueItem(jobId, cmd));
67+
jobService.enterQueue(new CmdCallbackQueueItem(jobId, cmd), 1);
6868
} catch (NumberFormatException warn) {
6969
LOGGER.warn("Invalid job id format");
7070
}

platform-api/src/main/java/com/flow/platform/api/domain/CmdCallbackQueueItem.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,8 @@ public class CmdCallbackQueueItem extends Jsonable {
3131

3232
private final Cmd cmd;
3333

34-
private Integer retryTimes = 1;
35-
36-
// priority default 1
37-
private Integer priority = 1;
34+
// default retry times 5
35+
private Integer retryTimes = 5;
3836

3937
public CmdCallbackQueueItem(BigInteger jobId, Cmd cmd) {
4038
this.jobId = jobId;
@@ -46,14 +44,6 @@ public void setRetryTimes(Integer retryTimes) {
4644
this.retryTimes = retryTimes;
4745
}
4846

49-
public Integer getPriority() {
50-
return priority;
51-
}
52-
53-
public void setPriority(Integer priority) {
54-
this.priority = priority;
55-
}
56-
5747
public BigInteger getJobId() {
5848
return jobId;
5949
}

platform-api/src/main/java/com/flow/platform/api/service/job/JobService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ void createWithYmlLoad(String path,
110110
/**
111111
* Enqueue cmd callback item
112112
*/
113-
void enterQueue(CmdCallbackQueueItem cmdQueueItem);
113+
void enterQueue(CmdCallbackQueueItem cmdQueueItem, int priority);
114114

115115
/**
116116
* stop job

platform-api/src/main/java/com/flow/platform/api/service/job/JobServiceImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -462,8 +462,8 @@ private void onRunShellCallback(String path, Cmd cmd, Job job) {
462462
}
463463

464464
@Override
465-
public void enterQueue(CmdCallbackQueueItem cmdQueueItem) {
466-
cmdCallbackQueue.enqueue(PriorityMessage.create(cmdQueueItem.toBytes(), cmdQueueItem.getPriority()));
465+
public void enterQueue(CmdCallbackQueueItem cmdQueueItem, int priority) {
466+
cmdCallbackQueue.enqueue(PriorityMessage.create(cmdQueueItem.toBytes(), priority));
467467
}
468468

469469
@Override

platform-api/src/main/java/com/flow/platform/api/service/node/NodeCrontabService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,9 @@ public interface NodeCrontabService extends ContextEvent {
4444
* List current triggers
4545
*/
4646
List<Trigger> triggers();
47+
48+
/**
49+
* Clean triggers
50+
*/
51+
void cleanTriggers();
4752
}

platform-api/src/main/java/com/flow/platform/api/service/node/NodeCrontabServiceImpl.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,4 +165,13 @@ public List<Trigger> triggers() {
165165
throw new IllegalStatusException(e.getMessage());
166166
}
167167
}
168+
169+
@Override
170+
public void cleanTriggers() {
171+
try {
172+
quartzScheduler.getTriggersOfJob(nodeCrontabDetail.getKey()).clear();
173+
} catch (SchedulerException e) {
174+
throw new IllegalStatusException(e.getMessage());
175+
}
176+
}
168177
}

platform-api/src/test/java/com/flow/platform/api/test/service/JobServiceTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -325,10 +325,13 @@ public void should_cmd_enqueue_limit_times_success() throws InterruptedException
325325
});
326326

327327
// when: enter queue one not found job id
328-
jobService.enterQueue(new CmdCallbackQueueItem(CommonUtil.randomId(), cmd));
328+
jobService.enterQueue(new CmdCallbackQueueItem(CommonUtil.randomId(), cmd), 1);
329329
countDownLatch.await(6, TimeUnit.SECONDS);
330330

331331
// then: should try 5 times
332-
Assert.assertEquals(5, atomicInteger.get());
332+
Assert.assertEquals(1, atomicInteger.get());
333+
334+
// then: cmdCallbackQueue size should 0
335+
Assert.assertEquals(0, cmdCallbackQueue.size());
333336
}
334337
}

platform-api/src/test/java/com/flow/platform/api/test/service/NodeCrontabServiceTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public class NodeCrontabServiceTest extends TestBase {
4242
@Before
4343
public void before() {
4444
stubDemo();
45+
flowCrontabService.cleanTriggers();
4546
}
4647

4748
@Test

0 commit comments

Comments
 (0)