Skip to content

Commit 06c80cd

Browse files
Remove unfinished usage job entries of the host (apache#10848)
1 parent 7715b3d commit 06c80cd

File tree

3 files changed

+47
-15
lines changed

3 files changed

+47
-15
lines changed

engine/schema/src/main/java/com/cloud/usage/dao/UsageJobDao.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,6 @@ public interface UsageJobDao extends GenericDao<UsageJobVO, Long> {
3737
UsageJobVO isOwner(String hostname, int pid);
3838

3939
void updateJobSuccess(Long jobId, long startMillis, long endMillis, long execTime, boolean success);
40+
41+
void removeLastOpenJobsOwned(String hostname, int pid);
4042
}

engine/schema/src/main/java/com/cloud/usage/dao/UsageJobDaoImpl.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.List;
2323

2424

25+
import org.apache.commons.collections.CollectionUtils;
2526
import org.apache.log4j.Logger;
2627
import org.springframework.stereotype.Component;
2728

@@ -116,7 +117,7 @@ public Long checkHeartbeat(String hostname, int pid, int aggregationDuration) {
116117
public UsageJobVO isOwner(String hostname, int pid) {
117118
TransactionLegacy txn = TransactionLegacy.open(TransactionLegacy.USAGE_DB);
118119
try {
119-
if ((hostname == null) || (pid <= 0)) {
120+
if (hostname == null || pid <= 0) {
120121
return null;
121122
}
122123

@@ -176,7 +177,7 @@ public UsageJobVO getNextImmediateJob() {
176177
SearchCriteria<UsageJobVO> sc = createSearchCriteria();
177178
sc.addAnd("endMillis", SearchCriteria.Op.EQ, Long.valueOf(0));
178179
sc.addAnd("jobType", SearchCriteria.Op.EQ, Integer.valueOf(UsageJobVO.JOB_TYPE_SINGLE));
179-
sc.addAnd("scheduled", SearchCriteria.Op.EQ, Integer.valueOf(0));
180+
sc.addAnd("scheduled", SearchCriteria.Op.EQ, Integer.valueOf(UsageJobVO.JOB_NOT_SCHEDULED));
180181
List<UsageJobVO> jobs = search(sc, filter);
181182

182183
if ((jobs == null) || jobs.isEmpty()) {
@@ -196,4 +197,36 @@ public Date getLastHeartbeat() {
196197
}
197198
return jobs.get(0).getHeartbeat();
198199
}
200+
201+
private List<UsageJobVO> getLastOpenJobsOwned(String hostname, int pid) {
202+
SearchCriteria<UsageJobVO> sc = createSearchCriteria();
203+
sc.addAnd("endMillis", SearchCriteria.Op.EQ, Long.valueOf(0));
204+
sc.addAnd("host", SearchCriteria.Op.EQ, hostname);
205+
if (pid > 0) {
206+
sc.addAnd("pid", SearchCriteria.Op.EQ, Integer.valueOf(pid));
207+
}
208+
return listBy(sc);
209+
}
210+
211+
@Override
212+
public void removeLastOpenJobsOwned(String hostname, int pid) {
213+
if (hostname == null) {
214+
return;
215+
}
216+
217+
TransactionLegacy txn = TransactionLegacy.open(TransactionLegacy.USAGE_DB);
218+
try {
219+
List<UsageJobVO> jobs = getLastOpenJobsOwned(hostname, pid);
220+
if (CollectionUtils.isNotEmpty(jobs)) {
221+
s_logger.info(String.format("Found %s opens job, to remove", jobs.size()));
222+
for (UsageJobVO job : jobs) {
223+
s_logger.debug(String.format("Removing job - id: %d, pid: %d, job type: %d, scheduled: %d, heartbeat: %s",
224+
job.getId(), job.getPid(), job.getJobType(), job.getScheduled(), job.getHeartbeat()));
225+
remove(job.getId());
226+
}
227+
}
228+
} finally {
229+
txn.close();
230+
}
231+
}
199232
}

usage/src/main/java/com/cloud/usage/UsageManagerImpl.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,9 @@ public boolean start() {
319319
s_logger.info("Starting Usage Manager");
320320
}
321321

322+
_usageJobDao.removeLastOpenJobsOwned(_hostname, 0);
323+
Runtime.getRuntime().addShutdownHook(new AbandonJob());
324+
322325
// use the configured exec time and aggregation duration for scheduling the job
323326
_scheduledFuture =
324327
_executor.scheduleAtFixedRate(this, _jobExecTime.getTimeInMillis() - System.currentTimeMillis(), _aggregationDuration * 60 * 1000, TimeUnit.MILLISECONDS);
@@ -331,7 +334,6 @@ public boolean start() {
331334
_sanity = _sanityExecutor.scheduleAtFixedRate(new SanityCheck(), 1, _sanityCheckInterval, TimeUnit.DAYS);
332335
}
333336

334-
Runtime.getRuntime().addShutdownHook(new AbandonJob());
335337
TransactionLegacy usageTxn = TransactionLegacy.open(TransactionLegacy.USAGE_DB);
336338
try {
337339
if (_heartbeatLock.lock(3)) { // 3 second timeout
@@ -2255,17 +2257,17 @@ protected void runInContext() {
22552257
// the aggregation range away from executing the next job
22562258
long now = System.currentTimeMillis();
22572259
long timeToJob = _jobExecTime.getTimeInMillis() - now;
2258-
long timeSinceJob = 0;
2260+
long timeSinceLastSuccessJob = 0;
22592261
long aggregationDurationMillis = _aggregationDuration * 60L * 1000L;
22602262
long lastSuccess = _usageJobDao.getLastJobSuccessDateMillis();
22612263
if (lastSuccess > 0) {
2262-
timeSinceJob = now - lastSuccess;
2264+
timeSinceLastSuccessJob = now - lastSuccess;
22632265
}
22642266

2265-
if ((timeSinceJob > 0) && (timeSinceJob > (aggregationDurationMillis - 100))) {
2267+
if ((timeSinceLastSuccessJob > 0) && (timeSinceLastSuccessJob > (aggregationDurationMillis - 100))) {
22662268
if (timeToJob > (aggregationDurationMillis / 2)) {
22672269
if (s_logger.isDebugEnabled()) {
2268-
s_logger.debug("it's been " + timeSinceJob + " ms since last usage job and " + timeToJob +
2270+
s_logger.debug("it's been " + timeSinceLastSuccessJob + " ms since last usage job and " + timeToJob +
22692271
" ms until next job, scheduling an immediate job to catch up (aggregation duration is " + _aggregationDuration + " minutes)");
22702272
}
22712273
scheduleParse();
@@ -2352,17 +2354,12 @@ protected void runInContext() {
23522354
}
23532355
}
23542356
}
2357+
23552358
private class AbandonJob extends Thread {
23562359
@Override
23572360
public void run() {
2358-
s_logger.info("exitting Usage Manager");
2359-
deleteOpenjob();
2360-
}
2361-
private void deleteOpenjob() {
2362-
UsageJobVO job = _usageJobDao.isOwner(_hostname, _pid);
2363-
if (job != null) {
2364-
_usageJobDao.remove(job.getId());
2365-
}
2361+
s_logger.info("exiting Usage Manager");
2362+
_usageJobDao.removeLastOpenJobsOwned(_hostname, _pid);
23662363
}
23672364
}
23682365
}

0 commit comments

Comments
 (0)