Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.aliyun.odps.mma.exception.MmaException;
import com.aliyun.odps.mma.server.action.Action;
import com.aliyun.odps.mma.server.action.executor.ActionExecutorFactory;
import com.aliyun.odps.mma.server.job.AbstractJob;
import com.aliyun.odps.mma.server.job.Job;
import com.aliyun.odps.mma.job.JobStatus;
import com.aliyun.odps.mma.server.task.Task;
Expand Down Expand Up @@ -164,9 +165,7 @@ void handleTerminatedJobs() {
for (Job job : runningJobs) {
// TODO: remove later
LOG.info("Job id: {}, status: {}", job.getId(), job.getStatus());
if (JobStatus.SUCCEEDED.equals(job.getStatus())
|| JobStatus.FAILED.equals(job.getStatus())
|| JobStatus.CANCELED.equals(job.getStatus())) {
if (job.isTerminated()) {
LOG.info("Job terminated, id: {}, status: {}", job.getId(), job.getStatus());

// TODO: a better way to print to the dw console
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/*
* Copyright 1999-2021 Alibaba Group Holding Ltd.
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down Expand Up @@ -34,6 +34,7 @@
import com.aliyun.odps.mma.server.task.Task;

public abstract class AbstractJob implements Job {

private static final Logger LOG = LogManager.getLogger(AbstractJob.class);

Job parentJob;
Expand Down Expand Up @@ -349,11 +350,14 @@ void update(JobBuilder jobBuilder) {
reload();
}

boolean isTerminated() {
@Override
public boolean isTerminated() {
JobStatus status = getStatus();
return JobStatus.SUCCEEDED.equals(status)
|| JobStatus.FAILED.equals(status)
|| JobStatus.CANCELED.equals(status);
boolean statusTerminated = JobStatus.SUCCEEDED.equals(status)
|| JobStatus.FAILED.equals(status)
|| JobStatus.CANCELED.equals(status);
boolean noMoreTask = getExecutableTasks().isEmpty();
return statusTerminated && noMoreTask;
}

String generateTaskIdPrefix() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public synchronized void setStatus(Task task) {
getStatus(),
task.getId(),
task.getProgress());
return;
}

TaskProgress taskStatus = task.getProgress();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,13 @@ public synchronized void setStatus(Task task) {
return;
}

if (JobStatus.SUCCEEDED.equals(getStatus())
|| JobStatus.FAILED.equals(getStatus())
|| JobStatus.CANCELED.equals(getStatus())) {
if (isTerminated()) {
LOG.info("Job has terminated, id: {}, status: {}, task id: {}, task status: {}",
record.getJobId(),
getStatus(),
task.getId(),
task.getProgress());
return;
}

TaskProgress taskStatus = task.getProgress();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,20 @@
*/
public interface Job {

/**
* return executable tasks for scheduler
* don't return null
* - exception => set fail status => return empty list
* @return
*/
List<Task> getExecutableTasks();

List<Job> getSubJobs();

JobStatus getStatus();

boolean isTerminated();

List<Task> getTasks();

Job getParentJob();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.aliyun.odps.mma.config.DataSourceType;
import com.aliyun.odps.mma.config.JobConfiguration;
import com.aliyun.odps.mma.config.MmaConfig.OssConfig;
import com.aliyun.odps.mma.exception.MmaException;
import com.aliyun.odps.mma.job.JobStatus;
import com.aliyun.odps.mma.meta.transform.SchemaTransformer.SchemaTransformResult;
import com.aliyun.odps.mma.meta.transform.SchemaTransformerFactory;
Expand All @@ -51,6 +52,7 @@
public class McToOssTableJob extends AbstractTableJob {

private static final Logger LOG = LogManager.getLogger(McToOssTableJob.class);
private Task finalCleanUpTask;

public McToOssTableJob(
Job parentJob,
Expand Down Expand Up @@ -128,6 +130,7 @@ DirectedAcyclicGraph<Task, DefaultEdge> generateDag() throws Exception {
mcExternalTableMetaModel,
pendingSubJobs);
Task cleanUpTask = getCleanUpTask(mcExternalTableMetaModel);
finalCleanUpTask = getCleanUpTask(mcExternalTableMetaModel);

dag.addVertex(metadataTransmissionTask);
dag.addVertex(setUpTask);
Expand Down Expand Up @@ -183,6 +186,19 @@ private Task getSetUpTask(
this);
}


@Override
synchronized void fail(String reason) {
super.fail(reason);
dag.addVertex(finalCleanUpTask);
}

@Override
public synchronized void stop() throws MmaException {
super.stop();
dag.addVertex(finalCleanUpTask);
}

private List<Task> getDataTransmissionTasks(
MetaSource metaSource,
TableMetaModel mcTableMetaModel,
Expand Down Expand Up @@ -240,14 +256,13 @@ private Task getCleanUpTask(TableMetaModel tableMetaModel) {

@Override
public synchronized void setStatus(Task task) {
if (JobStatus.SUCCEEDED.equals(getStatus())
|| JobStatus.FAILED.equals(getStatus())
|| JobStatus.CANCELED.equals(getStatus())) {
if (isTerminated()) {
LOG.info("Job has terminated, id: {}, status: {}, task id: {}, task status: {}",
record.getJobId(),
getStatus(),
task.getId(),
task.getProgress());
return;
}

TaskProgress taskStatus = task.getProgress();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@
import com.aliyun.odps.mma.config.DataSourceType;
import com.aliyun.odps.mma.config.JobConfiguration;
import com.aliyun.odps.mma.config.MmaConfig.OssConfig;
import com.aliyun.odps.mma.exception.MmaException;
import com.aliyun.odps.mma.job.JobStatus;
import com.aliyun.odps.mma.meta.transform.SchemaTransformer.SchemaTransformResult;
import com.aliyun.odps.mma.meta.transform.SchemaTransformerFactory;
import com.aliyun.odps.mma.server.OssUtils;
import com.aliyun.odps.mma.server.action.ActionUtils;
import com.aliyun.odps.mma.server.meta.MetaManager;
import com.aliyun.odps.mma.meta.MetaSource;
import com.aliyun.odps.mma.meta.MetaSource.TableMetaModel;
Expand All @@ -51,6 +53,7 @@
public class OssToMcTableJob extends AbstractTableJob {

private static final Logger LOG = LogManager.getLogger(OssToMcTableJob.class);
private Task finalCleanUpTask;

OssToMcTableJob(
Job parentJob,
Expand Down Expand Up @@ -128,6 +131,8 @@ DirectedAcyclicGraph<Task, DefaultEdge> generateDag() throws Exception {
externalTableMetaModel,
pendingSubJobs);
Task cleanUpTask = getCleanUpTask(externalTableMetaModel);
finalCleanUpTask = getCleanUpTask(externalTableMetaModel);

dag.addVertex(setUpTask);
dataTransmissionTasks.forEach(dag::addVertex);
dag.addVertex(cleanUpTask);
Expand All @@ -142,6 +147,18 @@ DirectedAcyclicGraph<Task, DefaultEdge> generateDag() throws Exception {
}
}

@Override
synchronized void fail(String reason) {
super.fail(reason);
dag.addVertex(finalCleanUpTask);
}

@Override
public synchronized void stop() throws MmaException {
super.stop();
dag.addVertex(finalCleanUpTask);
}

private Task getSetUpTask(
MetaSource metaSource,
TableMetaModel ossTableMetaModel,
Expand Down Expand Up @@ -256,14 +273,13 @@ private List<TablePartitionGroup> getTablePartitionGroups(

@Override
public synchronized void setStatus(Task task) {
if (JobStatus.SUCCEEDED.equals(getStatus())
|| JobStatus.FAILED.equals(getStatus())
|| JobStatus.CANCELED.equals(getStatus())) {
if (isTerminated()) {
LOG.info("Job has terminated, id: {}, status: {}, task id: {}, task status: {}",
record.getJobId(),
getStatus(),
task.getId(),
task.getProgress());
return;
}

TaskProgress taskStatus = task.getProgress();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.aliyun.odps.mma.server.job;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -47,7 +48,7 @@ public synchronized void setStatus(Task task) {

@Override
public List<Task> getExecutableTasks() {
throw new UnsupportedOperationException();
return new LinkedList<>();
}

@Override
Expand Down