diff --git a/mma-server/src/main/java/com/aliyun/odps/mma/server/JobScheduler.java b/mma-server/src/main/java/com/aliyun/odps/mma/server/JobScheduler.java index 764b7b77..0eaac259 100644 --- a/mma-server/src/main/java/com/aliyun/odps/mma/server/JobScheduler.java +++ b/mma-server/src/main/java/com/aliyun/odps/mma/server/JobScheduler.java @@ -31,6 +31,7 @@ import com.aliyun.odps.mma.exception.MmaException; import com.aliyun.odps.mma.server.action.Action; import com.aliyun.odps.mma.server.action.executor.ActionExecutorFactory; +import com.aliyun.odps.mma.server.job.AbstractJob; import com.aliyun.odps.mma.server.job.Job; import com.aliyun.odps.mma.job.JobStatus; import com.aliyun.odps.mma.server.task.Task; @@ -164,9 +165,7 @@ void handleTerminatedJobs() { for (Job job : runningJobs) { // TODO: remove later LOG.info("Job id: {}, status: {}", job.getId(), job.getStatus()); - if (JobStatus.SUCCEEDED.equals(job.getStatus()) - || JobStatus.FAILED.equals(job.getStatus()) - || JobStatus.CANCELED.equals(job.getStatus())) { + if (job.isTerminated()) { LOG.info("Job terminated, id: {}, status: {}", job.getId(), job.getStatus()); // TODO: a better way to print to the dw console diff --git a/mma-server/src/main/java/com/aliyun/odps/mma/server/job/AbstractJob.java b/mma-server/src/main/java/com/aliyun/odps/mma/server/job/AbstractJob.java index b64ce1f8..e9661d06 100644 --- a/mma-server/src/main/java/com/aliyun/odps/mma/server/job/AbstractJob.java +++ b/mma-server/src/main/java/com/aliyun/odps/mma/server/job/AbstractJob.java @@ -1,12 +1,12 @@ /* * Copyright 1999-2021 Alibaba Group Holding Ltd. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -34,6 +34,7 @@ import com.aliyun.odps.mma.server.task.Task; public abstract class AbstractJob implements Job { + private static final Logger LOG = LogManager.getLogger(AbstractJob.class); Job parentJob; @@ -349,11 +350,14 @@ void update(JobBuilder jobBuilder) { reload(); } - boolean isTerminated() { + @Override + public boolean isTerminated() { JobStatus status = getStatus(); - return JobStatus.SUCCEEDED.equals(status) - || JobStatus.FAILED.equals(status) - || JobStatus.CANCELED.equals(status); + boolean statusTerminated = JobStatus.SUCCEEDED.equals(status) + || JobStatus.FAILED.equals(status) + || JobStatus.CANCELED.equals(status); + boolean noMoreTask = getExecutableTasks().isEmpty(); + return statusTerminated && noMoreTask; } String generateTaskIdPrefix() { diff --git a/mma-server/src/main/java/com/aliyun/odps/mma/server/job/AbstractSingleTaskJob.java b/mma-server/src/main/java/com/aliyun/odps/mma/server/job/AbstractSingleTaskJob.java index 8e9f84d5..082f1e1c 100644 --- a/mma-server/src/main/java/com/aliyun/odps/mma/server/job/AbstractSingleTaskJob.java +++ b/mma-server/src/main/java/com/aliyun/odps/mma/server/job/AbstractSingleTaskJob.java @@ -86,6 +86,7 @@ public synchronized void setStatus(Task task) { getStatus(), task.getId(), task.getProgress()); + return; } TaskProgress taskStatus = task.getProgress(); diff --git a/mma-server/src/main/java/com/aliyun/odps/mma/server/job/HiveToMcTableJob.java b/mma-server/src/main/java/com/aliyun/odps/mma/server/job/HiveToMcTableJob.java index b40145f3..919bb2ba 100644 --- a/mma-server/src/main/java/com/aliyun/odps/mma/server/job/HiveToMcTableJob.java +++ b/mma-server/src/main/java/com/aliyun/odps/mma/server/job/HiveToMcTableJob.java @@ -182,14 +182,13 @@ public synchronized void setStatus(Task task) { return; } - if (JobStatus.SUCCEEDED.equals(getStatus()) - || JobStatus.FAILED.equals(getStatus()) - || JobStatus.CANCELED.equals(getStatus())) { + if (isTerminated()) { LOG.info("Job has terminated, id: {}, status: {}, task id: {}, task status: {}", record.getJobId(), getStatus(), task.getId(), task.getProgress()); + return; } TaskProgress taskStatus = task.getProgress(); diff --git a/mma-server/src/main/java/com/aliyun/odps/mma/server/job/Job.java b/mma-server/src/main/java/com/aliyun/odps/mma/server/job/Job.java index 441370d9..c2ffd84c 100644 --- a/mma-server/src/main/java/com/aliyun/odps/mma/server/job/Job.java +++ b/mma-server/src/main/java/com/aliyun/odps/mma/server/job/Job.java @@ -29,12 +29,20 @@ */ public interface Job { + /** + * return executable tasks for scheduler + * don't return null + * - exception => set fail status => return empty list + * @return + */ List getExecutableTasks(); List getSubJobs(); JobStatus getStatus(); + boolean isTerminated(); + List getTasks(); Job getParentJob(); diff --git a/mma-server/src/main/java/com/aliyun/odps/mma/server/job/McToOssTableJob.java b/mma-server/src/main/java/com/aliyun/odps/mma/server/job/McToOssTableJob.java index d6ddd0c9..3b56d4fb 100644 --- a/mma-server/src/main/java/com/aliyun/odps/mma/server/job/McToOssTableJob.java +++ b/mma-server/src/main/java/com/aliyun/odps/mma/server/job/McToOssTableJob.java @@ -31,6 +31,7 @@ import com.aliyun.odps.mma.config.DataSourceType; import com.aliyun.odps.mma.config.JobConfiguration; import com.aliyun.odps.mma.config.MmaConfig.OssConfig; +import com.aliyun.odps.mma.exception.MmaException; import com.aliyun.odps.mma.job.JobStatus; import com.aliyun.odps.mma.meta.transform.SchemaTransformer.SchemaTransformResult; import com.aliyun.odps.mma.meta.transform.SchemaTransformerFactory; @@ -51,6 +52,7 @@ public class McToOssTableJob extends AbstractTableJob { private static final Logger LOG = LogManager.getLogger(McToOssTableJob.class); + private Task finalCleanUpTask; public McToOssTableJob( Job parentJob, @@ -128,6 +130,7 @@ DirectedAcyclicGraph generateDag() throws Exception { mcExternalTableMetaModel, pendingSubJobs); Task cleanUpTask = getCleanUpTask(mcExternalTableMetaModel); + finalCleanUpTask = getCleanUpTask(mcExternalTableMetaModel); dag.addVertex(metadataTransmissionTask); dag.addVertex(setUpTask); @@ -183,6 +186,19 @@ private Task getSetUpTask( this); } + + @Override + synchronized void fail(String reason) { + super.fail(reason); + dag.addVertex(finalCleanUpTask); + } + + @Override + public synchronized void stop() throws MmaException { + super.stop(); + dag.addVertex(finalCleanUpTask); + } + private List getDataTransmissionTasks( MetaSource metaSource, TableMetaModel mcTableMetaModel, @@ -240,14 +256,13 @@ private Task getCleanUpTask(TableMetaModel tableMetaModel) { @Override public synchronized void setStatus(Task task) { - if (JobStatus.SUCCEEDED.equals(getStatus()) - || JobStatus.FAILED.equals(getStatus()) - || JobStatus.CANCELED.equals(getStatus())) { + if (isTerminated()) { LOG.info("Job has terminated, id: {}, status: {}, task id: {}, task status: {}", record.getJobId(), getStatus(), task.getId(), task.getProgress()); + return; } TaskProgress taskStatus = task.getProgress(); diff --git a/mma-server/src/main/java/com/aliyun/odps/mma/server/job/OssToMcTableJob.java b/mma-server/src/main/java/com/aliyun/odps/mma/server/job/OssToMcTableJob.java index 8ca8a3ee..fe4c317c 100644 --- a/mma-server/src/main/java/com/aliyun/odps/mma/server/job/OssToMcTableJob.java +++ b/mma-server/src/main/java/com/aliyun/odps/mma/server/job/OssToMcTableJob.java @@ -32,10 +32,12 @@ import com.aliyun.odps.mma.config.DataSourceType; import com.aliyun.odps.mma.config.JobConfiguration; import com.aliyun.odps.mma.config.MmaConfig.OssConfig; +import com.aliyun.odps.mma.exception.MmaException; import com.aliyun.odps.mma.job.JobStatus; import com.aliyun.odps.mma.meta.transform.SchemaTransformer.SchemaTransformResult; import com.aliyun.odps.mma.meta.transform.SchemaTransformerFactory; import com.aliyun.odps.mma.server.OssUtils; +import com.aliyun.odps.mma.server.action.ActionUtils; import com.aliyun.odps.mma.server.meta.MetaManager; import com.aliyun.odps.mma.meta.MetaSource; import com.aliyun.odps.mma.meta.MetaSource.TableMetaModel; @@ -51,6 +53,7 @@ public class OssToMcTableJob extends AbstractTableJob { private static final Logger LOG = LogManager.getLogger(OssToMcTableJob.class); + private Task finalCleanUpTask; OssToMcTableJob( Job parentJob, @@ -128,6 +131,8 @@ DirectedAcyclicGraph generateDag() throws Exception { externalTableMetaModel, pendingSubJobs); Task cleanUpTask = getCleanUpTask(externalTableMetaModel); + finalCleanUpTask = getCleanUpTask(externalTableMetaModel); + dag.addVertex(setUpTask); dataTransmissionTasks.forEach(dag::addVertex); dag.addVertex(cleanUpTask); @@ -142,6 +147,18 @@ DirectedAcyclicGraph generateDag() throws Exception { } } + @Override + synchronized void fail(String reason) { + super.fail(reason); + dag.addVertex(finalCleanUpTask); + } + + @Override + public synchronized void stop() throws MmaException { + super.stop(); + dag.addVertex(finalCleanUpTask); + } + private Task getSetUpTask( MetaSource metaSource, TableMetaModel ossTableMetaModel, @@ -256,14 +273,13 @@ private List getTablePartitionGroups( @Override public synchronized void setStatus(Task task) { - if (JobStatus.SUCCEEDED.equals(getStatus()) - || JobStatus.FAILED.equals(getStatus()) - || JobStatus.CANCELED.equals(getStatus())) { + if (isTerminated()) { LOG.info("Job has terminated, id: {}, status: {}, task id: {}, task status: {}", record.getJobId(), getStatus(), task.getId(), task.getProgress()); + return; } TaskProgress taskStatus = task.getProgress(); diff --git a/mma-server/src/main/java/com/aliyun/odps/mma/server/job/PartitionJob.java b/mma-server/src/main/java/com/aliyun/odps/mma/server/job/PartitionJob.java index f2528695..688f458c 100644 --- a/mma-server/src/main/java/com/aliyun/odps/mma/server/job/PartitionJob.java +++ b/mma-server/src/main/java/com/aliyun/odps/mma/server/job/PartitionJob.java @@ -17,6 +17,7 @@ package com.aliyun.odps.mma.server.job; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -47,7 +48,7 @@ public synchronized void setStatus(Task task) { @Override public List getExecutableTasks() { - throw new UnsupportedOperationException(); + return new LinkedList<>(); } @Override