2626import com .cloud .user .Account ;
2727import com .cloud .user .User ;
2828import com .cloud .utils .DateUtil ;
29+ import com .cloud .utils .Pair ;
2930import com .cloud .utils .StringUtils ;
3031import com .cloud .utils .component .ComponentContext ;
3132import com .cloud .utils .component .ManagerBase ;
3233import com .cloud .utils .concurrency .NamedThreadFactory ;
3334import com .cloud .utils .db .GlobalLock ;
3435import com .cloud .vm .VmDetailConstants ;
3536import com .cloud .vm .dao .UserVmDetailsDao ;
37+ import com .google .gson .Gson ;
38+ import com .google .gson .reflect .TypeToken ;
3639import org .apache .cloudstack .api .ApiCommandResourceType ;
3740import org .apache .cloudstack .api .ApiConstants ;
3841import org .apache .cloudstack .api .command .user .vm .DestroyVMCmd ;
3942import org .apache .cloudstack .api .command .user .vm .StopVMCmd ;
4043import org .apache .cloudstack .framework .config .ConfigKey ;
4144import org .apache .cloudstack .framework .config .Configurable ;
45+ import org .apache .cloudstack .framework .jobs .AsyncJob ;
4246import org .apache .cloudstack .framework .jobs .AsyncJobDispatcher ;
4347import org .apache .cloudstack .framework .jobs .AsyncJobManager ;
4448import org .apache .cloudstack .framework .jobs .impl .AsyncJobVO ;
49+ import org .apache .cloudstack .framework .messagebus .MessageBus ;
50+ import org .apache .cloudstack .framework .messagebus .MessageSubscriber ;
51+ import org .apache .cloudstack .jobs .JobInfo ;
4552import org .apache .cloudstack .managed .context .ManagedContextRunnable ;
4653import org .apache .commons .lang3 .time .DateUtils ;
4754
4855import javax .inject .Inject ;
4956import javax .naming .ConfigurationException ;
57+
5058import java .util .ArrayList ;
5159import java .util .Calendar ;
5260import java .util .Date ;
@@ -70,11 +78,17 @@ public class VMLeaseManagerImpl extends ManagerBase implements VMLeaseManager, C
7078
7179 @ Inject
7280 private AsyncJobManager asyncJobManager ;
81+ @ Inject
82+ private MessageBus messageBus ;
7383
7484 private AsyncJobDispatcher asyncJobDispatcher ;
7585
7686 ScheduledExecutorService vmLeaseExecutor ;
7787 ScheduledExecutorService vmLeaseExpiryEventExecutor ;
88+ Gson gson = ApiGsonHelper .getBuilder ().create ();
89+ VMLeaseManagerSubscriber leaseManagerSubscriber ;
90+
91+ public static final String JOB_INITIATOR = "jobInitiator" ;
7892
7993 @ Override
8094 public String getConfigComponentName () {
@@ -148,6 +162,7 @@ private void scheduleLeaseExecutors() {
148162 vmLeaseExpiryEventExecutor = Executors .newSingleThreadScheduledExecutor (new NamedThreadFactory ("VmLeaseExpiryEventExecutor" ));
149163 vmLeaseExpiryEventExecutor .scheduleAtFixedRate (new VMLeaseExpiryEventSchedulerTask (), 5L , InstanceLeaseExpiryEventSchedulerInterval .value (), TimeUnit .SECONDS );
150164 }
165+ addLeaseExpiryListener ();
151166 }
152167
153168 private void shutDownLeaseExecutors () {
@@ -162,6 +177,7 @@ private void shutDownLeaseExecutors() {
162177 vmLeaseExpiryEventExecutor .shutdown ();
163178 vmLeaseExpiryEventExecutor = null ;
164179 }
180+ removeLeaseExpiryListener ();
165181 }
166182
167183 class VMLeaseSchedulerTask extends ManagedContextRunnable {
@@ -257,7 +273,6 @@ protected void reallyRun() {
257273 if (jobId != null ) {
258274 submittedJobIds .add (jobId );
259275 successfulInstanceIds .add (instanceId );
260- userVmDetailsDao .addDetail (instanceId , VmDetailConstants .INSTANCE_LEASE_EXECUTION , LeaseActionExecution .DONE .name (), false );
261276 } else {
262277 failedToSubmitInstanceIds .add (instanceId );
263278 }
@@ -294,10 +309,10 @@ long executeStopInstanceJob(UserVmJoinVO vm, boolean isForced, long eventId) {
294309 params .put ("ctxAccountId" , String .valueOf (Account .ACCOUNT_ID_SYSTEM ));
295310 params .put (ApiConstants .CTX_START_EVENT_ID , String .valueOf (eventId ));
296311 params .put (ApiConstants .FORCED , String .valueOf (isForced ));
312+ params .put (JOB_INITIATOR , VMLeaseManager .class .getSimpleName ());
297313 final StopVMCmd cmd = new StopVMCmd ();
298314 ComponentContext .inject (cmd );
299- AsyncJobVO job = new AsyncJobVO ("" , User .UID_SYSTEM , vm .getAccountId (), StopVMCmd .class .getName (),
300- ApiGsonHelper .getBuilder ().create ().toJson (params ), vm .getId (),
315+ AsyncJobVO job = new AsyncJobVO ("" , User .UID_SYSTEM , vm .getAccountId (), StopVMCmd .class .getName (), gson .toJson (params ), vm .getId (),
301316 cmd .getApiResourceType () != null ? cmd .getApiResourceType ().toString () : null , null );
302317 job .setDispatcher (asyncJobDispatcher .getName ());
303318 return asyncJobManager .submitAsyncJob (job );
@@ -310,12 +325,11 @@ long executeDestroyInstanceJob(UserVmJoinVO vm, boolean isForced, long eventId)
310325 params .put ("ctxAccountId" , String .valueOf (Account .ACCOUNT_ID_SYSTEM ));
311326 params .put (ApiConstants .CTX_START_EVENT_ID , String .valueOf (eventId ));
312327 params .put (ApiConstants .FORCED , String .valueOf (isForced ));
313-
328+ params . put ( JOB_INITIATOR , VMLeaseManager . class . getSimpleName ());
314329 final DestroyVMCmd cmd = new DestroyVMCmd ();
315330 ComponentContext .inject (cmd );
316331
317- AsyncJobVO job = new AsyncJobVO ("" , User .UID_SYSTEM , vm .getAccountId (), DestroyVMCmd .class .getName (),
318- ApiGsonHelper .getBuilder ().create ().toJson (params ), vm .getId (),
332+ AsyncJobVO job = new AsyncJobVO ("" , User .UID_SYSTEM , vm .getAccountId (), DestroyVMCmd .class .getName (), gson .toJson (params ), vm .getId (),
319333 cmd .getApiResourceType () != null ? cmd .getApiResourceType ().toString () : null , null );
320334 job .setDispatcher (asyncJobDispatcher .getName ());
321335 return asyncJobManager .submitAsyncJob (job );
@@ -335,4 +349,46 @@ public ExpiryAction getLeaseExpiryAction(UserVmJoinVO instance) {
335349 }
336350 return expiryAction ;
337351 }
352+
353+ private void addLeaseExpiryListener () {
354+ logger .debug ("Adding Lease subscriber for async job events" );
355+ if (this .leaseManagerSubscriber == null ) {
356+ this .leaseManagerSubscriber = new VMLeaseManagerSubscriber ();
357+ }
358+ messageBus .subscribe (AsyncJob .Topics .JOB_EVENT_PUBLISH , this .leaseManagerSubscriber );
359+ }
360+
361+ private void removeLeaseExpiryListener () {
362+ logger .debug ("Removing Lease subscriber for async job events" );
363+ messageBus .unsubscribe (AsyncJob .Topics .JOB_EVENT_PUBLISH , this .leaseManagerSubscriber );
364+ this .leaseManagerSubscriber = null ;
365+ }
366+
367+ class VMLeaseManagerSubscriber implements MessageSubscriber {
368+ @ Override
369+ public void onPublishMessage (String senderAddress , String subject , Object args ) {
370+ try {
371+ @ SuppressWarnings ("unchecked" )
372+ Pair <AsyncJob , String > eventInfo = (Pair <AsyncJob , String >) args ;
373+ AsyncJob asyncExpiryJob = eventInfo .first ();
374+ if (!"ApiAsyncJobDispatcher" .equalsIgnoreCase (asyncExpiryJob .getDispatcher ()) || !"complete" .equalsIgnoreCase (eventInfo .second ())) {
375+ return ;
376+ }
377+ String cmd = asyncExpiryJob .getCmd ();
378+ if ((cmd .equalsIgnoreCase (StopVMCmd .class .getName ()) || cmd .equalsIgnoreCase (DestroyVMCmd .class .getName ()))
379+ && asyncExpiryJob .getStatus () == JobInfo .Status .SUCCEEDED && asyncExpiryJob .getInstanceId () != null ) {
380+
381+ Map <String , String > params = gson .fromJson (asyncExpiryJob .getCmdInfo (), new TypeToken <Map <String , String >>() {
382+ }.getType ());
383+
384+ if (VMLeaseManager .class .getSimpleName ().equals (params .get (JOB_INITIATOR ))) {
385+ logger .debug ("Lease expiry job: {} successfully executed for instanceId: {}" , asyncExpiryJob .getId (), asyncExpiryJob .getInstanceId ());
386+ userVmDetailsDao .addDetail (asyncExpiryJob .getInstanceId (), VmDetailConstants .INSTANCE_LEASE_EXECUTION , LeaseActionExecution .DONE .name (), false );
387+ }
388+ }
389+ } catch (final Exception e ) {
390+ logger .error ("Caught exception while executing lease expiry job" , e );
391+ }
392+ }
393+ }
338394}
0 commit comments