33import java .io .File ;
44import java .io .FileInputStream ;
55import java .io .IOException ;
6+ import java .sql .SQLException ;
67import java .util .ArrayList ;
78import java .util .Calendar ;
89import java .util .Collection ;
4748import org .fogbowcloud .manager .core .plugins .ComputePlugin ;
4849import org .fogbowcloud .manager .core .plugins .FederationMemberAuthorizationPlugin ;
4950import org .fogbowcloud .manager .core .plugins .FederationMemberPickerPlugin ;
50- import org .fogbowcloud .manager .core .plugins .LocalCredentialsPlugin ;
5151import org .fogbowcloud .manager .core .plugins .IdentityPlugin ;
5252import org .fogbowcloud .manager .core .plugins .ImageStoragePlugin ;
53+ import org .fogbowcloud .manager .core .plugins .LocalCredentialsPlugin ;
5354import org .fogbowcloud .manager .core .plugins .PrioritizationPlugin ;
5455import org .fogbowcloud .manager .core .plugins .accounting .ResourceUsage ;
5556import org .fogbowcloud .manager .core .plugins .util .SshClientPool ;
6263import org .fogbowcloud .manager .occi .model .ResourceRepository ;
6364import org .fogbowcloud .manager .occi .model .ResponseConstants ;
6465import org .fogbowcloud .manager .occi .model .Token ;
66+ import org .fogbowcloud .manager .occi .request .OrderDataStore ;
6567import org .fogbowcloud .manager .occi .request .Request ;
6668import org .fogbowcloud .manager .occi .request .RequestAttribute ;
6769import org .fogbowcloud .manager .occi .request .RequestConstants ;
7072import org .fogbowcloud .manager .occi .request .RequestType ;
7173import org .fogbowcloud .manager .xmpp .AsyncPacketSender ;
7274import org .fogbowcloud .manager .xmpp .ManagerPacketHelper ;
75+ import org .json .JSONException ;
7376import org .json .JSONObject ;
7477import org .restlet .Response ;
7578
@@ -84,6 +87,7 @@ public class ManagerController {
8487 private static final Logger LOGGER = Logger .getLogger (ManagerController .class );
8588
8689 private static final int DEFAULT_MAX_WHOISALIVE_MANAGER_COUNT = 100 ;
90+ private static final long DEFAULT_BD_UPDATE_PERIOD = 300000 ; // 5 minute
8791 private static final long DEFAULT_SCHEDULER_PERIOD = 30000 ; // 30 seconds
8892 protected static final int DEFAULT_ASYNC_REQUEST_WAITING_INTERVAL = 300000 ; // 5 minutes
8993 private static final long DEFAULT_INSTANCE_MONITORING_PERIOD = 120000 ; // 2 minutes
@@ -99,6 +103,7 @@ public class ManagerController {
99103 private final ManagerTimer servedRequestMonitoringTimer ;
100104 private final ManagerTimer garbageCollectorTimer ;
101105 private final ManagerTimer accountingUpdaterTimer ;
106+ private final ManagerTimer requestDBUpdaterTimer ;
102107
103108 private Map <String , Token > instanceIdToToken = new HashMap <String , Token >();
104109 private final List <FederationMember > members = Collections .synchronizedList (new LinkedList <FederationMember >());
@@ -126,6 +131,8 @@ public class ManagerController {
126131 private DateUtils dateUtils = new DateUtils ();
127132
128133 private PoolingHttpClientConnectionManager cm ;
134+
135+ private OrderDataStore requestDB ;
129136
130137 public ManagerController (Properties properties ) {
131138 this (properties , null );
@@ -143,15 +150,24 @@ public ManagerController(Properties properties, ScheduledExecutorService executo
143150 this .servedRequestMonitoringTimer = new ManagerTimer (Executors .newScheduledThreadPool (1 ));
144151 this .garbageCollectorTimer = new ManagerTimer (Executors .newScheduledThreadPool (1 ));
145152 this .accountingUpdaterTimer = new ManagerTimer (Executors .newScheduledThreadPool (1 ));
153+ this .requestDBUpdaterTimer = new ManagerTimer (Executors .newScheduledThreadPool (1 ));
146154 } else {
147155 this .requestSchedulerTimer = new ManagerTimer (executor );
148156 this .instanceMonitoringTimer = new ManagerTimer (executor );
149157 this .servedRequestMonitoringTimer = new ManagerTimer (executor );
150158 this .garbageCollectorTimer = new ManagerTimer (executor );
151159 this .accountingUpdaterTimer = new ManagerTimer (executor );
160+ this .requestDBUpdaterTimer = new ManagerTimer (executor );
152161 }
162+ requestDB = new OrderDataStore (properties );
163+ recoverPreviousRequests ();
164+ triggerRequestDBUpdater ();
153165 }
154166
167+ public void setDatabase (OrderDataStore database ) {
168+ this .requestDB = database ;
169+ }
170+
155171 public void setBenchmarkExecutor (ExecutorService benchmarkExecutor ) {
156172 this .benchmarkExecutor = benchmarkExecutor ;
157173 }
@@ -180,6 +196,45 @@ public void setAccountingPlugin(AccountingPlugin accountingPlugin) {
180196 }
181197 }
182198
199+ private void recoverPreviousRequests () {
200+ new Thread (new Runnable () {
201+ @ Override
202+ public void run () {
203+ try {
204+ initializeManager ();
205+ } catch (Exception e ) {
206+ LOGGER .error ("Could not recover requests." , e );
207+ }
208+ }
209+ }).start ();
210+ }
211+
212+ protected void initializeManager () throws SQLException , JSONException {
213+ LOGGER .debug ("Recovering previous requests." );
214+ for (Request request : this .requestDB .getOrders ()) {
215+ Instance instance = null ;
216+ try {
217+ if (request .getState ().equals (RequestState .FULFILLED ) ||
218+ request .getState ().equals (RequestState .DELETED )) {
219+ instance = getInstance (request );
220+ LOGGER .debug (instance .getId () + " was recovered to request " + request .getId ());
221+ }
222+ } catch (Exception e ) {
223+ LOGGER .debug (request .getGlobalInstanceId () + " does not exist anymore." );
224+ if (request .getState ().equals (RequestState .DELETED )) {
225+ continue ;
226+ }
227+ instanceRemoved (request );
228+ }
229+ requests .addRequest (request .getFederationToken ().getUser (), request );
230+ }
231+ if (!requestSchedulerTimer .isScheduled () &&
232+ requests .getRequestsIn (RequestState .OPEN ).size () > 0 ) {
233+ triggerRequestScheduler ();
234+ }
235+ LOGGER .debug ("Previous requests recovered." );
236+ }
237+
183238 private String getSSHCommonUser () {
184239 String sshCommonUser = properties .getProperty (ConfigurationConstants .SSH_COMMON_USER );
185240 return sshCommonUser == null ? DEFAULT_COMMON_SSH_USER : sshCommonUser ;
@@ -935,6 +990,45 @@ public List<Request> createRequests(String federationAccessTokenStr, List<Catego
935990
936991 return currentRequests ;
937992 }
993+
994+ private void triggerRequestDBUpdater () {
995+ String bdUpdaterPeriodStr = properties
996+ .getProperty (ConfigurationConstants .ORDER_BD_UPDATER_PERIOD_KEY );
997+ long schedulerPeriod = bdUpdaterPeriodStr == null ? DEFAULT_BD_UPDATE_PERIOD : Long
998+ .valueOf (bdUpdaterPeriodStr );
999+ requestDBUpdaterTimer .scheduleAtFixedRate (new TimerTask () {
1000+ @ Override
1001+ public void run () {
1002+ try {
1003+ updateRequestDB ();
1004+ } catch (Exception e ) {
1005+ LOGGER .error ("Could not update the database." , e );
1006+ }
1007+ }
1008+ }, 60000 , schedulerPeriod );
1009+ }
1010+
1011+ protected void updateRequestDB () throws SQLException , JSONException {
1012+ LOGGER .debug ("Database update start." );
1013+ List <Request > orders = this .requestDB .getOrders ();
1014+ Map <String , Request > ordersDB = new HashMap <String , Request >();
1015+ for (Request request : orders ) {
1016+ ordersDB .put (request .getId (), request );
1017+ }
1018+ List <Request > allRequests = new ArrayList <Request >(requests .getAllRequests ());
1019+ for (Request request : allRequests ) {
1020+ if (ordersDB .get (request .getId ()) == null ) {
1021+ requestDB .addOrder (request );
1022+ } else {
1023+ requestDB .updateOrder (request );
1024+ ordersDB .remove (request .getId ());
1025+ }
1026+ }
1027+ for (String key : ordersDB .keySet ()) {
1028+ requestDB .removeOrder (ordersDB .get (key ));
1029+ }
1030+ LOGGER .debug ("Database update finish." );
1031+ }
9381032
9391033 private Token getTokenFromLocalIdP (String localAccessTokenStr ) {
9401034 return localIdentityPlugin .getToken (localAccessTokenStr );
0 commit comments