Skip to content

Commit 9898be6

Browse files
sureshanapartidhslove
authored andcommitted
Remove unfinished usage job entries of the host (apache#10848)
1 parent 4dee6b0 commit 9898be6

File tree

3 files changed

+47
-15
lines changed

3 files changed

+47
-15
lines changed

engine/schema/src/main/java/com/cloud/usage/dao/UsageJobDao.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,6 @@ public interface UsageJobDao extends GenericDao<UsageJobVO, Long> {
3737
UsageJobVO isOwner(String hostname, int pid);
3838

3939
void updateJobSuccess(Long jobId, long startMillis, long endMillis, long execTime, boolean success);
40+
41+
void removeLastOpenJobsOwned(String hostname, int pid);
4042
}

engine/schema/src/main/java/com/cloud/usage/dao/UsageJobDaoImpl.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.List;
2323

2424

25+
import org.apache.commons.collections.CollectionUtils;
2526
import org.springframework.stereotype.Component;
2627

2728
import com.cloud.usage.UsageJobVO;
@@ -114,7 +115,7 @@ public Long checkHeartbeat(String hostname, int pid, int aggregationDuration) {
114115
public UsageJobVO isOwner(String hostname, int pid) {
115116
TransactionLegacy txn = TransactionLegacy.open(TransactionLegacy.USAGE_DB);
116117
try {
117-
if ((hostname == null) || (pid <= 0)) {
118+
if (hostname == null || pid <= 0) {
118119
return null;
119120
}
120121

@@ -174,7 +175,7 @@ public UsageJobVO getNextImmediateJob() {
174175
SearchCriteria<UsageJobVO> sc = createSearchCriteria();
175176
sc.addAnd("endMillis", SearchCriteria.Op.EQ, Long.valueOf(0));
176177
sc.addAnd("jobType", SearchCriteria.Op.EQ, Integer.valueOf(UsageJobVO.JOB_TYPE_SINGLE));
177-
sc.addAnd("scheduled", SearchCriteria.Op.EQ, Integer.valueOf(0));
178+
sc.addAnd("scheduled", SearchCriteria.Op.EQ, Integer.valueOf(UsageJobVO.JOB_NOT_SCHEDULED));
178179
List<UsageJobVO> jobs = search(sc, filter);
179180

180181
if ((jobs == null) || jobs.isEmpty()) {
@@ -194,4 +195,36 @@ public Date getLastHeartbeat() {
194195
}
195196
return jobs.get(0).getHeartbeat();
196197
}
198+
199+
private List<UsageJobVO> getLastOpenJobsOwned(String hostname, int pid) {
200+
SearchCriteria<UsageJobVO> sc = createSearchCriteria();
201+
sc.addAnd("endMillis", SearchCriteria.Op.EQ, Long.valueOf(0));
202+
sc.addAnd("host", SearchCriteria.Op.EQ, hostname);
203+
if (pid > 0) {
204+
sc.addAnd("pid", SearchCriteria.Op.EQ, Integer.valueOf(pid));
205+
}
206+
return listBy(sc);
207+
}
208+
209+
@Override
210+
public void removeLastOpenJobsOwned(String hostname, int pid) {
211+
if (hostname == null) {
212+
return;
213+
}
214+
215+
TransactionLegacy txn = TransactionLegacy.open(TransactionLegacy.USAGE_DB);
216+
try {
217+
List<UsageJobVO> jobs = getLastOpenJobsOwned(hostname, pid);
218+
if (CollectionUtils.isNotEmpty(jobs)) {
219+
s_logger.info(String.format("Found %s opens job, to remove", jobs.size()));
220+
for (UsageJobVO job : jobs) {
221+
s_logger.debug(String.format("Removing job - id: %d, pid: %d, job type: %d, scheduled: %d, heartbeat: %s",
222+
job.getId(), job.getPid(), job.getJobType(), job.getScheduled(), job.getHeartbeat()));
223+
remove(job.getId());
224+
}
225+
}
226+
} finally {
227+
txn.close();
228+
}
229+
}
197230
}

usage/src/main/java/com/cloud/usage/UsageManagerImpl.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,9 @@ public boolean start() {
324324
logger.info("Starting Usage Manager");
325325
}
326326

327+
_usageJobDao.removeLastOpenJobsOwned(_hostname, 0);
328+
Runtime.getRuntime().addShutdownHook(new AbandonJob());
329+
327330
// use the configured exec time and aggregation duration for scheduling the job
328331
_scheduledFuture =
329332
_executor.scheduleAtFixedRate(this, _jobExecTime.getTimeInMillis() - System.currentTimeMillis(), _aggregationDuration * 60 * 1000, TimeUnit.MILLISECONDS);
@@ -336,7 +339,6 @@ public boolean start() {
336339
_sanity = _sanityExecutor.scheduleAtFixedRate(new SanityCheck(), 1, _sanityCheckInterval, TimeUnit.DAYS);
337340
}
338341

339-
Runtime.getRuntime().addShutdownHook(new AbandonJob());
340342
TransactionLegacy usageTxn = TransactionLegacy.open(TransactionLegacy.USAGE_DB);
341343
try {
342344
if (_heartbeatLock.lock(3)) { // 3 second timeout
@@ -2263,17 +2265,17 @@ protected void runInContext() {
22632265
// the aggregation range away from executing the next job
22642266
long now = System.currentTimeMillis();
22652267
long timeToJob = _jobExecTime.getTimeInMillis() - now;
2266-
long timeSinceJob = 0;
2268+
long timeSinceLastSuccessJob = 0;
22672269
long aggregationDurationMillis = _aggregationDuration * 60L * 1000L;
22682270
long lastSuccess = _usageJobDao.getLastJobSuccessDateMillis();
22692271
if (lastSuccess > 0) {
2270-
timeSinceJob = now - lastSuccess;
2272+
timeSinceLastSuccessJob = now - lastSuccess;
22712273
}
22722274

2273-
if ((timeSinceJob > 0) && (timeSinceJob > (aggregationDurationMillis - 100))) {
2275+
if ((timeSinceLastSuccessJob > 0) && (timeSinceLastSuccessJob > (aggregationDurationMillis - 100))) {
22742276
if (timeToJob > (aggregationDurationMillis / 2)) {
22752277
if (logger.isDebugEnabled()) {
2276-
logger.debug("it's been " + timeSinceJob + " ms since last usage job and " + timeToJob +
2278+
logger.debug("it's been " + timeSinceLastSuccessJob + " ms since last usage job and " + timeToJob +
22772279
" ms until next job, scheduling an immediate job to catch up (aggregation duration is " + _aggregationDuration + " minutes)");
22782280
}
22792281
scheduleParse();
@@ -2360,17 +2362,12 @@ protected void runInContext() {
23602362
}
23612363
}
23622364
}
2365+
23632366
private class AbandonJob extends Thread {
23642367
@Override
23652368
public void run() {
2366-
logger.info("exitting Usage Manager");
2367-
deleteOpenjob();
2368-
}
2369-
private void deleteOpenjob() {
2370-
UsageJobVO job = _usageJobDao.isOwner(_hostname, _pid);
2371-
if (job != null) {
2372-
_usageJobDao.remove(job.getId());
2373-
}
2369+
logger.info("exiting Usage Manager");
2370+
_usageJobDao.removeLastOpenJobsOwned(_hostname, _pid);
23742371
}
23752372
}
23762373
}

0 commit comments

Comments
 (0)