2727import com .flow .platform .core .queue .PlatformQueue ;
2828import com .flow .platform .core .queue .PriorityMessage ;
2929import com .flow .platform .core .queue .QueueListener ;
30+ import com .flow .platform .core .util .ThreadUtil ;
3031import com .flow .platform .domain .Cmd ;
3132import com .flow .platform .domain .CmdStatus ;
3233import com .flow .platform .util .Logger ;
@@ -48,6 +49,8 @@ public class CmdQueueConsumer implements QueueListener<PriorityMessage> {
4849
4950 private final static Logger LOGGER = new Logger (CmdQueueConsumer .class );
5051
52+ private final static long RETRY_WAIT_TIME = 1000 ; // in millis
53+
5154 @ Value ("${queue.cmd.idle_agent.period}" )
5255 private Integer idleAgentPeriod ; // period for check idle agent in seconds
5356
@@ -88,11 +91,6 @@ public void onQueueItem(PriorityMessage message) {
8891 return ;
8992 }
9093
91- boolean isTimeout = waitForIdleAgent (cmdId , idleAgentPeriod , idleAgentTimeout );
92- if (isTimeout ) {
93- LOGGER .trace ("wait for idle agent time out %s seconds for cmd %s" , idleAgentTimeout , cmdId );
94- }
95-
9694 Cmd cmd = cmdService .find (cmdId );
9795
9896 // do not re-enqueue if cmd been stopped or killed
@@ -114,36 +112,6 @@ public void onQueueItem(PriorityMessage message) {
114112 }
115113 }
116114
117- /**
118- * Block current thread and check idle agent
119- *
120- * @param cmdId cmd id
121- * @param period check idle agent period in seconds
122- * @param timeout timeout in seconds
123- * @return is time out exit or not
124- */
125- private boolean waitForIdleAgent (String cmdId , int period , int timeout ) {
126- Instant now = Instant .now ();
127-
128- while (true ) {
129- if (Duration .between (now , Instant .now ()).toMillis () >= timeout * 1000 ) {
130- return true ;
131- }
132-
133- try {
134- Thread .sleep (period * 1000 );
135- } catch (InterruptedException ignore ) {
136- }
137-
138- String zone = cmdService .find (cmdId ).getZoneName ();
139- int numOfIdle = agentService .findAvailable (zone ).size ();
140- if (numOfIdle > 0 ) {
141- LOGGER .trace ("has %s idle agent" , numOfIdle );
142- return false ;
143- }
144- }
145- }
146-
147115 /**
148116 * Re-enqueue cmd and return num of retry
149117 */
@@ -153,12 +121,7 @@ private void resend(final CmdQueueItem item, final int retry) {
153121 }
154122
155123 item .setRetry (retry );
156-
157- try {
158- Thread .sleep (1000 ); // wait 1 seconds and enqueue again with priority
159- } catch (InterruptedException ignore ) {
160- // do nothing
161- }
124+ ThreadUtil .sleep (RETRY_WAIT_TIME );
162125
163126 // reset cmd status
164127 PriorityMessage message = PriorityMessage .create (item .toBytes (), QueueConfig .MAX_PRIORITY );
0 commit comments