3232import com .flow .platform .api .test .TestBase ;
3333import com .flow .platform .api .util .CommonUtil ;
3434import com .flow .platform .core .exception .IllegalStatusException ;
35+ import com .flow .platform .core .queue .PlatformQueue ;
36+ import com .flow .platform .core .queue .PriorityMessage ;
3537import com .flow .platform .core .util .ThreadUtil ;
3638import com .flow .platform .domain .Cmd ;
3739import com .flow .platform .domain .CmdResult ;
4244import java .io .IOException ;
4345import java .time .ZonedDateTime ;
4446import java .util .List ;
47+ import java .util .concurrent .CountDownLatch ;
48+ import java .util .concurrent .TimeUnit ;
49+ import java .util .concurrent .atomic .AtomicInteger ;
4550import org .junit .Assert ;
4651import org .junit .Before ;
4752import org .junit .Test ;
4853import org .springframework .beans .factory .annotation .Autowired ;
54+ import org .springframework .scheduling .concurrent .ThreadPoolTaskExecutor ;
4955
5056/**
5157 * @author yh@firim
@@ -55,6 +61,9 @@ public class JobServiceTest extends TestBase {
5561 @ Autowired
5662 private JobNodeService jobNodeService ;
5763
64+ @ Autowired
65+ private PlatformQueue <PriorityMessage > cmdCallbackQueue ;
66+
5867 @ Before
5968 public void init () {
6069 stubDemo ();
@@ -301,4 +310,25 @@ public void should_get_latest_job_by_node_path() throws IOException {
301310 Assert .assertEquals (1 , jobs .size ());
302311 Assert .assertEquals ("2" , jobs .get (0 ).getNumber ().toString ());
303312 }
313+
314+ @ Test
315+ public void should_cmd_enqueue_limit_times_success () throws InterruptedException {
316+ Cmd cmd = new Cmd ("default" , "test" , CmdType .RUN_SHELL , "echo 1" );
317+ CountDownLatch countDownLatch = new CountDownLatch (5 );
318+ AtomicInteger atomicInteger = new AtomicInteger (0 );
319+
320+ // register new queue to get item info
321+ cmdCallbackQueue .register (message -> {
322+ CmdCallbackQueueItem item = CmdCallbackQueueItem .parse (message .getBody (), CmdCallbackQueueItem .class );
323+ atomicInteger .set (item .getRetryTimes ());
324+ countDownLatch .countDown ();
325+ });
326+
327+ // when: enter queue one not found job id
328+ jobService .enterQueue (new CmdCallbackQueueItem (CommonUtil .randomId (), cmd ));
329+ countDownLatch .await (6 , TimeUnit .SECONDS );
330+
331+ // then: should try 5 times
332+ Assert .assertEquals (5 , atomicInteger .get ());
333+ }
304334}
0 commit comments