Skip to content

Commit dd40502

Browse files
committed
add enqueue retry times and add item priority
1 parent cf43bad commit dd40502

File tree

3 files changed

+38
-2
lines changed

3 files changed

+38
-2
lines changed

platform-api/src/main/java/com/flow/platform/api/consumer/CmdCallbackQueueConsumer.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@ public class CmdCallbackQueueConsumer implements QueueListener<PriorityMessage>
3535

3636
private final static Logger LOGGER = new Logger(CmdCallbackQueueConsumer.class);
3737

38+
// retry time set 5 times
39+
private final static int RETRY_TIMES = 5;
40+
41+
// requeue 1 s
42+
private final static int REQUEUE_DELAY_TIME = 1000;
43+
3844
@Autowired
3945
private PlatformQueue<PriorityMessage> cmdCallbackQueue;
4046

@@ -58,20 +64,35 @@ public void onQueueItem(PriorityMessage message) {
5864
jobService.callback(item);
5965
} catch (NotFoundException notFoundException) {
6066

67+
// detect retry times is reach the limit or not
68+
detectRetryTimes(item);
69+
6170
// re-enqueue cmd callback if job not found since transaction problem
62-
reEnqueueJobCallback(item, 1000);
71+
reEnqueueJobCallback(item, REQUEUE_DELAY_TIME);
6372

6473
} catch (Throwable throwable) {
6574
LOGGER.traceMarker("onQueueItem", String.format("exception - %s", throwable));
6675
}
6776
}
6877

78+
private void detectRetryTimes(CmdCallbackQueueItem item) {
79+
if (item.getRetryTimes() > RETRY_TIMES) {
80+
throw new NotFoundException(String.format("retry times has reach the limit"));
81+
}
82+
}
83+
6984
private void reEnqueueJobCallback(CmdCallbackQueueItem item, long wait) {
7085
try {
7186
Thread.sleep(wait);
7287
} catch (Throwable ignore) {
7388
}
7489

90+
// set item priority priority inc
91+
item.setPriority(item.getPriority() + 1);
92+
93+
// set retry times
94+
item.setRetryTimes(item.getRetryTimes() + 1);
95+
7596
jobService.enterQueue(item);
7697
}
7798
}

platform-api/src/main/java/com/flow/platform/api/domain/CmdCallbackQueueItem.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,27 @@ public class CmdCallbackQueueItem extends Jsonable {
3333

3434
private Integer retryTimes = 0;
3535

36+
// priority default 1
37+
private Integer priority = 1;
38+
3639
public CmdCallbackQueueItem(BigInteger jobId, Cmd cmd) {
3740
this.jobId = jobId;
3841
this.cmd = cmd;
3942
this.path = cmd.getExtra();
4043
}
4144

45+
public void setRetryTimes(Integer retryTimes) {
46+
this.retryTimes = retryTimes;
47+
}
48+
49+
public Integer getPriority() {
50+
return priority;
51+
}
52+
53+
public void setPriority(Integer priority) {
54+
this.priority = priority;
55+
}
56+
4257
public BigInteger getJobId() {
4358
return jobId;
4459
}

platform-api/src/main/java/com/flow/platform/api/service/job/JobServiceImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,7 @@ private void onRunShellCallback(String path, Cmd cmd, Job job) {
463463

464464
@Override
465465
public void enterQueue(CmdCallbackQueueItem cmdQueueItem) {
466-
cmdCallbackQueue.enqueue(PriorityMessage.create(cmdQueueItem.toBytes(), 1));
466+
cmdCallbackQueue.enqueue(PriorityMessage.create(cmdQueueItem.toBytes(), cmdQueueItem.getPriority()));
467467
}
468468

469469
@Override

0 commit comments

Comments
 (0)