diff --git a/mma-commons/pom.xml b/mma-commons/pom.xml index bd9a8dbb..871541d1 100644 --- a/mma-commons/pom.xml +++ b/mma-commons/pom.xml @@ -34,7 +34,6 @@ limitations under the License. com.aliyun.odps odps-sdk-core - shaded com.aliyun.oss diff --git a/mma-commons/src/main/java/com/aliyun/odps/mma/config/JobConfiguration.java b/mma-commons/src/main/java/com/aliyun/odps/mma/config/JobConfiguration.java index f4f3e782..0108beaf 100644 --- a/mma-commons/src/main/java/com/aliyun/odps/mma/config/JobConfiguration.java +++ b/mma-commons/src/main/java/com/aliyun/odps/mma/config/JobConfiguration.java @@ -96,6 +96,7 @@ public void validate() throws MmaException { // 1. Hive(metadata), Hive(data) -> MC(metadata), MC(data) // 2. MC(metadata), MC(metadata) -> OSS(metadata), OSS(data) // 3. OSS(metadata), OSS(data) -> MC(metadata), MC(data) + // 4. MC(metadata), MC(data) -> MC(metadata), MC(data) validateJobId(); validPartitionFilter(); MetaSourceType metaSourceType = MetaSourceType.valueOf(configuration.get(METADATA_SOURCE_TYPE)); @@ -118,6 +119,11 @@ public void validate() throws MmaException { && dataDestType.equals(DataDestType.MaxCompute)) { validateHiveToMcCredentials(); validMcAuthType(); + } else if (metaSourceType.equals(MetaSourceType.MaxCompute) + && dataSourceType.equals(DataSourceType.MaxCompute) + && metaDestType.equals(MetaDestType.MaxCompute) + && dataDestType.equals(DataDestType.MaxCompute)) { + validateMcToMcCredentials(); } else { throw new IllegalArgumentException("Unsupported source and dest combination."); } @@ -168,6 +174,13 @@ private void validateHiveToMcCredentials() throws MmaException { ConfigurationUtils.validateMcDataDest(this); } + private void validateMcToMcCredentials() throws MmaException { + ConfigurationUtils.validateMcMetaSource(this); + ConfigurationUtils.validateMcDataSource(this); + ConfigurationUtils.validateMcMetaDest(this); + ConfigurationUtils.validateMcDataDest(this); + } + private void validateJobId() throws MmaException { String jobId = get(JobConfiguration.JOB_ID); if (!StringUtils.isBlank(jobId) && !JOB_ID_PATTERN.matcher(jobId).matches()) { diff --git a/mma-server/pom.xml b/mma-server/pom.xml index 642ce8fe..5af6b1a8 100644 --- a/mma-server/pom.xml +++ b/mma-server/pom.xml @@ -35,7 +35,6 @@ limitations under the License. com.aliyun.odps odps-sdk-core - shaded com.aliyun.oss diff --git a/mma-server/src/main/java/com/aliyun/odps/mma/server/MmaServerMain.java b/mma-server/src/main/java/com/aliyun/odps/mma/server/MmaServerMain.java index f84259f9..381c72a5 100644 --- a/mma-server/src/main/java/com/aliyun/odps/mma/server/MmaServerMain.java +++ b/mma-server/src/main/java/com/aliyun/odps/mma/server/MmaServerMain.java @@ -122,7 +122,7 @@ private static void initMmaServerConfigurationSingleton(Path path) Map map = GsonUtils.GSON.fromJson( json, new TypeToken>() {}.getType()); MmaServerConfiguration.setInstance(map); - MetaLoaderConfig.setGlobalMetaLoader(MmaServerConfiguration.getInstance()); + // MetaLoaderConfig.setGlobalMetaLoader(MmaServerConfiguration.getInstance()); } private static void initMmaEventManagerSingleton() { diff --git a/mma-server/src/main/java/com/aliyun/odps/mma/server/action/AbstractAction.java b/mma-server/src/main/java/com/aliyun/odps/mma/server/action/AbstractAction.java index 04f05bb7..e82e881c 100644 --- a/mma-server/src/main/java/com/aliyun/odps/mma/server/action/AbstractAction.java +++ b/mma-server/src/main/java/com/aliyun/odps/mma/server/action/AbstractAction.java @@ -37,6 +37,8 @@ public abstract class AbstractAction implements Action { private ActionProgress progress; private Long startTime; private Long endTime; + // The exception.Message from failed action for users(UI). + private String reason; Map resourceMap; Future future; @@ -135,6 +137,7 @@ public void afterExecution() { LOG.error("Action failed, actionId: {}, stack trace: {}", id, ExceptionUtils.getStackTrace(e)); + this.reason = e.getMessage(); setProgress(ActionProgress.FAILED); } } @@ -190,4 +193,8 @@ public void stop() { this.future.cancel(true); } } + + public String getReason() { + return this.reason; + } } diff --git a/mma-server/src/main/java/com/aliyun/odps/mma/server/action/CopyTaskAction.java b/mma-server/src/main/java/com/aliyun/odps/mma/server/action/CopyTaskAction.java new file mode 100644 index 00000000..9778bc78 --- /dev/null +++ b/mma-server/src/main/java/com/aliyun/odps/mma/server/action/CopyTaskAction.java @@ -0,0 +1,160 @@ +package com.aliyun.odps.mma.server.action; + +import java.util.List; +import java.util.Calendar; +import java.util.stream.Collectors; + +import com.aliyun.odps.Odps; +import com.aliyun.odps.mma.exception.MmaException; +import com.aliyun.odps.mma.meta.MetaSource.ColumnMetaModel; +import com.aliyun.odps.mma.meta.MetaSource.PartitionMetaModel; +import com.aliyun.odps.mma.server.action.executor.ActionExecutorFactory; +import com.aliyun.odps.task.CopyTask; +import com.aliyun.odps.task.copy.LocalDatasource; +import com.aliyun.odps.task.copy.Datasource.Direction; +import com.aliyun.odps.mma.config.JobConfiguration; + +import com.aliyun.odps.mma.server.task.Task; +import com.aliyun.odps.mma.meta.MetaSource.TableMetaModel; +import com.aliyun.odps.mma.server.action.info.CopyTaskActionInfo; +import com.aliyun.odps.task.copy.TunnelDatasource; +import com.aliyun.oss.common.auth.HmacSHA1Signature; +import com.google.gson.JsonObject; +import org.apache.commons.codec.binary.Base64; + +public abstract class CopyTaskAction extends AbstractAction>> { + private static final String TOKEN_TYPE = "1"; + private static final String TOKEN_VERSION = "v1"; + private static final String TOKEN_ALGORITHM_ID = "1"; + + // private static final String ODPS_COPY_STRIP_ENABLED = "odps.copy.strip.enabled"; + // compatible: 是否兼容odps新的数据类型,如,struct/array等 + private static final String ODPS_COPY_COMPATIBLE_ENABLED = "odps.copy.compatible.enabled"; + + private final TableMetaModel source; + private final TableMetaModel dest; + private final ActionExecutionContext context; + + public CopyTaskAction( + String id, + TableMetaModel source, + TableMetaModel dest, + Task task, + ActionExecutionContext actionExecutionContext) { + super(id, task, actionExecutionContext); + actionInfo = new CopyTaskActionInfo(); + this.source = source; + this.dest = dest; + this.context = actionExecutionContext; + } + + @Override + void executeInternal() throws Exception { + future = ActionExecutorFactory.getCopyTaskExecutor().execute( + getSrcOdps(), + createCopyTask(getDestOdps(), Direction.EXPORT), + id, + (CopyTaskActionInfo) actionInfo + ); + } + + public abstract Odps getSrcOdps(); + + public abstract Odps getDestOdps(); + + @Override + void handleResult(List> result) { + ((CopyTaskActionInfo) actionInfo).setResult(result); + } + + @Override + public Object getResult() { + return ((CopyTaskActionInfo) actionInfo).getResult(); + } + + private CopyTask createCopyTask(Odps destOdps, Direction direction) throws MmaException { + String partitions = getPartitions(source); + TunnelDatasource tunnel = new TunnelDatasource(direction, dest.getDatabase(), dest.getTable(), partitions); + String token = createToken(direction, dest.getDatabase(), dest.getTable()); + tunnel.setAccountType("token"); + tunnel.setSignature(token); + tunnel.setOdpsEndPoint(destOdps.getEndpoint()); + + LocalDatasource local = new LocalDatasource(direction, source.getDatabase(), source.getTable(), partitions); + CopyTask task = new CopyTask("copy_task_" + Calendar.getInstance().getTimeInMillis()); + /** + * Append(-a)和Overwrite(-o)的语义很明确 + * 不过tunnel其实是只支持append操作 + * 所以overwrite模式只不过是帮你执行了一下alter table drop partition和add partition的操作。 + */ + task.setMode("Append"); + task.setLocalInfo(local); + task.SetTunnelInfo(tunnel); + + JsonObject params = new JsonObject(); + params.addProperty(ODPS_COPY_COMPATIBLE_ENABLED, "false"); + // params.addProperty(ODPS_COPY_STRIP_ENABLED, "false"); + task.setProperty("settings", params.toString()); + + return task; + } + + private String createToken(Direction direction, String project, String table) throws MmaException { + String grantee = ""; + String accessId = context.getConfig().get(JobConfiguration.DATA_DEST_MC_ACCESS_KEY_ID); + String accessKey = context.getConfig().get(JobConfiguration.DATA_DEST_MC_ACCESS_KEY_SECRET); + + int day = 60 * 60 * 24; + long expires = System.currentTimeMillis() / 1000 + day; + + String policy; + String tableUrl = "projects/" + project + "/" + "tables/" + table; + String projectUrl = "projects/" + project; + String instanceUrl = "projects/" + project + "/instances/*"; + + switch (direction.toString()) { + case "EXPORT": + policy = "{\"Version\":\"1\",\"Statement\":[{\"Effect\":\"Allow\",\"Action\":[\"odps:Describe\",\"odps:Update\",\"odps:Alter\",\"odps:Drop\",\"odps:Create\"],\"Resource\":\"acs:odps:*:" + + tableUrl.toLowerCase() + + "\"},{\"Effect\":\"Allow\",\"Action\":[\"odps:CreateInstance\"],\"Resource\":\"acs:odps:*:" + + projectUrl.toLowerCase() + + "\"},{\"Effect\":\"Allow\",\"Action\":[\"odps:Read\"],\"Resource\":\"acs:odps:*:" + + instanceUrl.toLowerCase() + "\"}]}"; + break; + case "IMPORT": + policy = "{\"Version\":\"1\",\"Statement\":[{\"Effect\":\"Allow\",\"Action\":[\"odps:Describe\",\"odps:Select\",\"odps:Download\"],\"Resource\":\"acs:odps:*:" + + tableUrl.toLowerCase() + "\"}]}"; + break; + default: + throw new MmaException("Unknown Direction mode!"); + } + + String data = TOKEN_ALGORITHM_ID + TOKEN_TYPE + accessId + grantee + expires + policy; + HmacSHA1Signature signer = new HmacSHA1Signature(); + String signature = signer.computeSignature(accessKey, data) + "#" + TOKEN_ALGORITHM_ID + "#" + TOKEN_TYPE + + "#" + accessId + "#" + grantee + "#" + expires + "#" + policy; + String base64Signature = new String(Base64.encodeBase64(signature.getBytes())); + + return TOKEN_VERSION + "." + base64Signature; + } + + private static String getPartitions(TableMetaModel tableMetaModel) { + StringBuilder partitions = new StringBuilder(); + if (!tableMetaModel.getPartitionColumns().isEmpty()) { + List columnMetaModels = tableMetaModel.getPartitionColumns(); + List partitionMetaModels = tableMetaModel.getPartitions(); + if (columnMetaModels.isEmpty() || partitionMetaModels.isEmpty()) { + return partitions.toString(); + } + + List partitionColumns = columnMetaModels.stream().map(ColumnMetaModel::getColumnName).collect(Collectors.toList()); + List partitionValues = partitionMetaModels.get(0).getPartitionValues(); + for (int i = 0; i < partitionColumns.size(); i++) { + partitions.append(partitionColumns.get(i)).append("=").append(partitionValues.get(i)).append(","); + } + return partitions.deleteCharAt(partitions.length() - 1).toString(); + } + return partitions.toString(); + } + +} diff --git a/mma-server/src/main/java/com/aliyun/odps/mma/server/action/McCreateTableAction.java b/mma-server/src/main/java/com/aliyun/odps/mma/server/action/McCreateTableAction.java index 7a30bec5..ef297c3d 100644 --- a/mma-server/src/main/java/com/aliyun/odps/mma/server/action/McCreateTableAction.java +++ b/mma-server/src/main/java/com/aliyun/odps/mma/server/action/McCreateTableAction.java @@ -16,9 +16,12 @@ package com.aliyun.odps.mma.server.action; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; +import com.aliyun.odps.mma.meta.MetaSource; import com.aliyun.odps.mma.server.resource.Resource; import com.aliyun.odps.mma.util.McSqlUtils; import com.aliyun.odps.mma.meta.MetaSource.TableMetaModel; @@ -54,8 +57,20 @@ public boolean hasResults() { @Override public Map getSettings() { - // TODO: - return new HashMap<>(); + /** + * 如果create table的schema里包含TINYINT, SMALLINT, INT, CHAR, VARCHAR等2.0数据类型 + * 而dest project暂未开启2.0模式,便会导致create table失败 + * solution: set odps.sql.type.system.odps2=true + */ + Map hints = new HashMap<>(); + List columnTypes = Arrays.asList("TINYINT", "SMALLINT", "INT", "CHAR", "VARCHAR"); + for (MetaSource.ColumnMetaModel columnMetaModel : tableMetaModel.getColumns()) { + if (columnTypes.contains(columnMetaModel.getType().toUpperCase())) { + hints.put("odps.sql.type.system.odps2", "true"); + break; + } + } + return hints; } @Override diff --git a/mma-server/src/main/java/com/aliyun/odps/mma/server/action/McInsertOverwriteAction.java b/mma-server/src/main/java/com/aliyun/odps/mma/server/action/McInsertOverwriteAction.java new file mode 100644 index 00000000..2524e2d5 --- /dev/null +++ b/mma-server/src/main/java/com/aliyun/odps/mma/server/action/McInsertOverwriteAction.java @@ -0,0 +1,78 @@ +/* + * Copyright 1999-2021 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.aliyun.odps.mma.server.action; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.aliyun.odps.mma.util.McSqlUtils; +import com.aliyun.odps.mma.server.action.info.McSqlActionInfo; +import com.aliyun.odps.mma.meta.MetaSource.TableMetaModel; +import com.aliyun.odps.mma.server.task.Task; + +public class McInsertOverwriteAction extends McSqlAction { + + private TableMetaModel source; + private TableMetaModel dest; + + public McInsertOverwriteAction( + String id, + String mcAccessKeyId, + String mcAccessKeySecret, + String mcProject, + String mcEndpoint, + TableMetaModel source, + TableMetaModel dest, + Task task, + ActionExecutionContext context) { + super(id, mcAccessKeyId, mcAccessKeySecret, mcProject, mcEndpoint, task, context); + this.source = source; + this.dest = dest; + } + + @Override + void handleResult(List> result) { + ((McSqlActionInfo) actionInfo).setResult(result); + } + + @Override + public String getSql() { + return McSqlUtils.getInsertOverwriteTableStatement(source, dest); + } + + @Override + public boolean hasResults() { + return false; + } + + @Override + public Map getSettings() { + // TODO: + return new HashMap<>(); + } + + @Override + public String getName() { + return "Table data transmission"; + } + + @Override + public List> getResult() { + return ((McSqlActionInfo) actionInfo).getResult(); + } +} diff --git a/mma-server/src/main/java/com/aliyun/odps/mma/server/action/McToMcFunctionAction.java b/mma-server/src/main/java/com/aliyun/odps/mma/server/action/McToMcFunctionAction.java new file mode 100644 index 00000000..3928eda7 --- /dev/null +++ b/mma-server/src/main/java/com/aliyun/odps/mma/server/action/McToMcFunctionAction.java @@ -0,0 +1,109 @@ +package com.aliyun.odps.mma.server.action; + +import java.util.ArrayList; +import java.util.List; +import lombok.Getter; +import lombok.Setter; + +import com.aliyun.odps.Odps; +import com.aliyun.odps.Function; +import com.aliyun.odps.mma.config.JobConfiguration; +import com.aliyun.odps.mma.server.OdpsUtils; +import com.aliyun.odps.mma.server.action.info.DefaultActionInfo; +import com.aliyun.odps.mma.server.task.Task; +import com.aliyun.odps.mma.util.GsonUtils; + +public class McToMcFunctionAction extends DefaultAction { + private final Result result = new Result(); + + private final JobConfiguration config; + private final Odps srcOdps; + private final Odps destOdps; + + public McToMcFunctionAction(String id, Task task, ActionExecutionContext context, + JobConfiguration config, Odps srcOdps, Odps destOdps){ + super(id, task, context); + this.config = config; + this.srcOdps = srcOdps; + this.destOdps = destOdps; + } + + @Override + void handleResult(Object result) { + ((DefaultActionInfo) actionInfo).setResult(result.toString()); + } + + @Override + public String getName() { + return "McToMC Function transmission"; + } + + @Override + public Object getResult() { + if (ActionProgress.FAILED.equals(getProgress())) { + if (result.getAll().size() == 1) { + result.setFailed(result.getAll()); + result.setReason(getReason()); + } else { + List var1 = new ArrayList<>(result.getAll()); + var1.removeAll(result.getSuccess()); + result.setFailed(var1); + result.setReason(String.format( + "Maybe the first function has sync failed which in the failed list, detail: %s.", + getReason())); + } + ((DefaultActionInfo) actionInfo).setResult(GsonUtils.GSON.toJson(result, Result.class)); + } + return ((DefaultActionInfo) actionInfo).getResult(); + } + + @Override + public Object call() throws Exception { + List sync = new ArrayList<>(); + if (config.containsKey(JobConfiguration.SOURCE_OBJECT_NAME) + && !config.get(JobConfiguration.SOURCE_OBJECT_NAME).isEmpty()) { + result.getAll().add(config.get(JobConfiguration.SOURCE_OBJECT_NAME)); + Function function = OdpsUtils.getFunction( + srcOdps, + config.get(JobConfiguration.SOURCE_CATALOG_NAME), + config.get(JobConfiguration.SOURCE_OBJECT_NAME)); + sync.add(new McFunctionInfo(function)); + } else { + for (Function function : srcOdps.functions()) { + result.getAll().add(function.getName()); + sync.add(new McFunctionInfo(function)); + } + } + + for (McFunctionInfo mcFunctionInfo : sync) { + OdpsUtils.createFunction(destOdps, destOdps.getDefaultProject(), mcFunctionInfo, true); + result.getSuccess().add(mcFunctionInfo.getFunctionName()); + } + + return GsonUtils.GSON.toJson(result, Result.class); + } + + @Setter + @Getter + private static class Result { + private List all; + private List success; + private List failed; + private String reason; + + Result () { + this.all = new ArrayList<>(); + this.success = new ArrayList<>(); + this.failed = new ArrayList<>(); + this.reason = ""; + } + + @Override + public String toString() { + return "[all]: " + all.toString() + + "\n[success]: " + success.toString() + + "\n[failed]: " + failed.toString() + + "\n[reason]: " + reason; + } + } +} diff --git a/mma-server/src/main/java/com/aliyun/odps/mma/server/action/McToMcResourceAction.java b/mma-server/src/main/java/com/aliyun/odps/mma/server/action/McToMcResourceAction.java new file mode 100644 index 00000000..877bb550 --- /dev/null +++ b/mma-server/src/main/java/com/aliyun/odps/mma/server/action/McToMcResourceAction.java @@ -0,0 +1,222 @@ +package com.aliyun.odps.mma.server.action; + +import java.util.List; +import java.util.ArrayList; + +import lombok.Getter; +import lombok.Setter; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.LogManager; + +import com.aliyun.odps.FileResource; +import com.aliyun.odps.Odps; +import com.aliyun.odps.OdpsException; +import com.aliyun.odps.Resource; +import com.aliyun.odps.TableResource; + +import com.aliyun.odps.mma.config.JobConfiguration; +import com.aliyun.odps.mma.exception.MmaException; +import com.aliyun.odps.mma.server.OdpsUtils; +import com.aliyun.odps.mma.server.action.info.DefaultActionInfo; +import com.aliyun.odps.mma.server.task.Task; +import com.aliyun.odps.mma.util.GsonUtils; +import com.aliyun.odps.utils.StringUtils; + + +public class McToMcResourceAction extends DefaultAction { + private static final Logger LOG = LogManager.getLogger(McToMcResourceAction.class); + + private final JobConfiguration config; + private final Odps srcOdps; + private final Odps destOdps; + private final String destProject; + + private final Result result = new Result(); + + public McToMcResourceAction(String id, Task task, ActionExecutionContext context, + JobConfiguration config, Odps srcOdps, Odps destOdps) { + super(id, task, context); + this.config = config; + this.srcOdps = srcOdps; + this.destOdps = destOdps; + this.destProject = destOdps.getDefaultProject(); + } + + @Override + void handleResult(Object result) { + ((DefaultActionInfo) actionInfo).setResult(result.toString()); + } + + @Override + public String getName() { + return "McToMC Resources transmission"; + } + + @Override + public Object getResult() { + /** + * Success: + * status.1 success.size + exist.size + invalid.size == all.size (normal) + * Failed: + * status.1 success.size + exist.size + invalid.size != all.size (abnormal) + * status.2 exists.size + invalid.size == all.size (abnormal) + * status 2.1 sync.size == 0 + */ + if (ActionProgress.FAILED.equals(getProgress())) { + if (!result.successExistInvalidAll()) { + // For specified resource which has sync failed. + if (result.getAll().size() == 1) { + result.setFailed(result.getAll()); + result.setReason(getReason()); + } else { + List var1 = new ArrayList<>(result.getAll()); + List var2 = new ArrayList<>(result.getSuccess()); + var2.addAll(result.getExist()); + var2.addAll(result.getInvalid()); + var1.removeAll(var2); + result.setFailed(var1); + result.setReason(String.format( + "Maybe the first resource has sync failed which in the failed list, detail: %s.", + getReason())); + } + } + ((DefaultActionInfo) actionInfo).setResult(GsonUtils.GSON.toJson(result, Result.class)); + } + return ((DefaultActionInfo) actionInfo).getResult(); + } + + @Override + public Object call() throws Exception { + List sync = new ArrayList<>(); + if (config.containsKey(JobConfiguration.SOURCE_OBJECT_NAME) + && !config.get(JobConfiguration.SOURCE_OBJECT_NAME).isEmpty()) { + result.getAll().add(config.get(JobConfiguration.SOURCE_OBJECT_NAME)); + Resource specResource = OdpsUtils.getResource( + srcOdps, + config.get(JobConfiguration.SOURCE_CATALOG_NAME), + config.get(JobConfiguration.SOURCE_OBJECT_NAME)); + if (StringUtils.isNullOrEmpty(specResource.getName()) + || specResource.getType().equals(Resource.Type.UNKOWN)) { + result.getInvalid().add(specResource.getName()); + result.setReason("Resource name is EMPTY or type is UNKNOWN."); + LOG.error("Invalid resource name or type {} for task {}.", specResource.getName(), id); + throw new MmaException("ERROR: Resource name is empty."); + } + sync.add(new McResourceInfo(specResource)); + } else { + List resourceNames = new ArrayList<>(); + for (Resource resource : destOdps.resources()) { + resourceNames.add(resource.getName()); + } + + for (Resource resource : srcOdps.resources()) { + result.getAll().add(resource.getName()); + if (StringUtils.isNullOrEmpty(resource.getName()) + || resource.getType().equals(Resource.Type.UNKOWN)) { + result.getInvalid().add(resource.getName()); + continue; + } + + if (resourceNames.contains(resource.getName())) { + Resource tmp = destOdps.resources().get(resource.getName()); + String contentMD5 = ((FileResource) tmp).getContentMd5(); + // TABLE.TYPE doesn't have contentMd5 which will be updated everytime. + if (!resource.getType().equals(Resource.Type.TABLE) + && ((FileResource) resource).getContentMd5().equals(contentMD5)) { + result.getExist().add(resource.getName()); + continue; + } + } + sync.add(new McResourceInfo(resource)); + } + + if (result.existAll()) { + result.setReason("All resources has already exist."); + LOG.info("All resources has already exist {} for task {}", result.getExist(), id); + return GsonUtils.GSON.toJson(result, Result.class); + } + + if (result.getExist().size() != 0) { + LOG.info("These resources has already exist which don't need to be update {} for task {}.", result.getExist(), id); + } + + if (result.invalidAll()) { + result.setReason("All resources are invalid (resource.name == null || resource.type = unknown)."); + LOG.warn("All resources are invalid {} for task {}", result.getInvalid(), id); + return GsonUtils.GSON.toJson(result, Result.class); + } + + if (result.getInvalid().size() != 0) { + LOG.warn("Invalid resources lists {} for task {}.", result.getInvalid(), id); + } + + if (sync.size() == 0) { + result.setReason("All resources has already exist or which are invalid."); + LOG.error("All resources has already exist or which are invalid for task {}.", id); + throw new MmaException("Error: All resources has already exist or which are invalid."); + } + } + + for (McResourceInfo resourceInfo : sync) { + if (Resource.Type.TABLE.equals(resourceInfo.getType())) { + OdpsUtils.addTableResource(destOdps, destProject, (TableResource) resourceInfo.toResource(), true); + } else { + addFileResource(srcOdps, destOdps, resourceInfo); + } + result.getSuccess().add(resourceInfo.getAlias()); + } + + return GsonUtils.GSON.toJson(result, Result.class); + } + + /** + * This method doesn't need isUpdate parameter like addTableResource method, + * and the files [FILE|JAR|PY|ARCHIVE] that are not updated will be processed by the blacklist + */ + private void addFileResource( + Odps srcOdps, + Odps destOdps, + McResourceInfo resourceInfo) throws OdpsException, MmaException { + boolean exists = destOdps.resources().exists(resourceInfo.getAlias()); + if (exists) { + LOG.info("Update resource {} because of exists for task {}.", resourceInfo.getAlias(), id); + destOdps.resources().update((FileResource) resourceInfo.toResource(), srcOdps.resources().getResourceAsStream(resourceInfo.getAlias())); + } else { + LOG.info("Create new resource {} for task {}.", resourceInfo.getAlias(), id); + destOdps.resources().create((FileResource) resourceInfo.toResource(), srcOdps.resources().getResourceAsStream(resourceInfo.getAlias())); + } + } + + @Setter + @Getter + private static class Result { + private List all = new ArrayList<>(); + private List success = new ArrayList<>(); + private List exist = new ArrayList<>(); + private List invalid = new ArrayList<>(); + private List failed = new ArrayList<>(); + private String reason; + + private boolean existAll() { + return exist.size() == all.size(); + } + + private boolean invalidAll() { + return invalid.size() == all.size(); + } + + private boolean successExistInvalidAll() { + return (success.size() + exist.size() + invalid.size()) == all.size(); + } + + @Override + public String toString() { + return "[all]: " + all.toString() + + "\n[success]: " + success.toString() + + "\n[exist]: " + exist.toString() + + "\n[invalid]: " + invalid.toString() + + "\n[failed]: " + failed.toString() + + "\n[reason]: " + reason; + } + } +} diff --git a/mma-server/src/main/java/com/aliyun/odps/mma/server/action/McToMcTableDataTransmissionAction.java b/mma-server/src/main/java/com/aliyun/odps/mma/server/action/McToMcTableDataTransmissionAction.java index 8e01fc1b..d3aa0fd2 100644 --- a/mma-server/src/main/java/com/aliyun/odps/mma/server/action/McToMcTableDataTransmissionAction.java +++ b/mma-server/src/main/java/com/aliyun/odps/mma/server/action/McToMcTableDataTransmissionAction.java @@ -1,78 +1,84 @@ -/* - * Copyright 1999-2021 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package com.aliyun.odps.mma.server.action; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import com.aliyun.odps.mma.util.McSqlUtils; -import com.aliyun.odps.mma.server.action.info.McSqlActionInfo; -import com.aliyun.odps.mma.meta.MetaSource.TableMetaModel; +import com.aliyun.odps.Odps; +import com.aliyun.odps.mma.config.AbstractConfiguration; +import com.aliyun.odps.mma.config.JobConfiguration; +import com.aliyun.odps.mma.server.OdpsUtils; +import com.aliyun.odps.mma.server.config.MmaServerConfiguration; +import com.aliyun.odps.mma.server.resource.Resource; import com.aliyun.odps.mma.server.task.Task; +import com.aliyun.odps.mma.meta.MetaSource.TableMetaModel; -public class McToMcTableDataTransmissionAction extends McSqlAction { +public class McToMcTableDataTransmissionAction extends CopyTaskAction { + private final String srcProject; + private final String srcOdpsAccessId; + private final String srcOdpsAccessKey; + private final String srcOdpsEndpoint; + private final String destProject; + private final String destOdpsAccessId; + private final String destOdpsAccessKey; + private final String destOdpsEndpoint; - private TableMetaModel source; - private TableMetaModel dest; public McToMcTableDataTransmissionAction( - String id, - String mcAccessKeyId, - String mcAccessKeySecret, - String mcProject, - String mcEndpoint, - TableMetaModel source, - TableMetaModel dest, - Task task, - ActionExecutionContext context) { - super(id, mcAccessKeyId, mcAccessKeySecret, mcProject, mcEndpoint, task, context); - this.source = source; - this.dest = dest; - } + String id, + TableMetaModel source, + TableMetaModel dest, + String srcProject, + String srcOdpsAccessId, + String srcOdpsAccessKey, + String srcOdpsEndpoint, + String destProject, + String destOdpsAccessId, + String destOdpsAccessKey, + String destOdpsEndpoint, + Task task, ActionExecutionContext context) { + super(id, source, dest, task, context); + this.srcProject = srcProject; + this.srcOdpsAccessId = srcOdpsAccessId; + this.srcOdpsAccessKey = srcOdpsAccessKey; + this.srcOdpsEndpoint = srcOdpsEndpoint; + this.destProject = destProject; + this.destOdpsAccessId = destOdpsAccessId; + this.destOdpsAccessKey = destOdpsAccessKey; + this.destOdpsEndpoint = destOdpsEndpoint; - @Override - void handleResult(List> result) { - ((McSqlActionInfo) actionInfo).setResult(result); - } + JobConfiguration config = actionExecutionContext.getConfig(); + MmaServerConfiguration mmaServerConfiguration = MmaServerConfiguration.getInstance(); + long numDataWorkerResource = Long.parseLong( + config.getOrDefault( + JobConfiguration.JOB_NUM_DATA_WORKER, + mmaServerConfiguration.getOrDefault( + AbstractConfiguration.JOB_NUM_DATA_WORKER, + AbstractConfiguration.JOB_NUM_DATA_WORKER_DEFAULT_VALUE) + ) + ); - @Override - public String getSql() { - return McSqlUtils.getInsertOverwriteTableStatement(source, dest); - } + resourceMap.put(Resource.DATA_WORKER, numDataWorkerResource); - @Override - public boolean hasResults() { - return false; } @Override - public Map getSettings() { - // TODO: - return new HashMap<>(); + public String getName() { + return "Table data transmission"; } @Override - public String getName() { - return "Table data transmission"; + public Odps getSrcOdps() { + return OdpsUtils.getOdps( + this.srcOdpsAccessId, + this.srcOdpsAccessKey, + this.srcOdpsEndpoint, + this.srcProject); } @Override - public List> getResult() { - return ((McSqlActionInfo) actionInfo).getResult(); + public Odps getDestOdps() { + return OdpsUtils.getOdps( + this.destOdpsAccessId, + this.destOdpsAccessKey, + this.destOdpsEndpoint, + this.destProject); } + } diff --git a/mma-server/src/main/java/com/aliyun/odps/mma/server/action/executor/ActionExecutorFactory.java b/mma-server/src/main/java/com/aliyun/odps/mma/server/action/executor/ActionExecutorFactory.java index fb2a5387..e2100177 100644 --- a/mma-server/src/main/java/com/aliyun/odps/mma/server/action/executor/ActionExecutorFactory.java +++ b/mma-server/src/main/java/com/aliyun/odps/mma/server/action/executor/ActionExecutorFactory.java @@ -21,6 +21,7 @@ public class ActionExecutorFactory { private static HiveSqlExecutor hiveSqlExecutor = null; private static McSqlExecutor mcSqlExecutor = null; private static DefaultExecutor defaultExecutor = null; + private static CopyTaskExecutor copyTaskExecutor = null; public static HiveSqlExecutor getHiveSqlExecutor() { if (hiveSqlExecutor == null) { @@ -46,6 +47,14 @@ public static DefaultExecutor getDefaultExecutor() { return defaultExecutor; } + public static CopyTaskExecutor getCopyTaskExecutor() { + if (copyTaskExecutor == null) { + copyTaskExecutor = new CopyTaskExecutor(); + } + + return copyTaskExecutor; + } + public static void shutdown() { if (hiveSqlExecutor != null) { hiveSqlExecutor.shutdown(); diff --git a/mma-server/src/main/java/com/aliyun/odps/mma/server/action/executor/CopyTaskExecutor.java b/mma-server/src/main/java/com/aliyun/odps/mma/server/action/executor/CopyTaskExecutor.java new file mode 100644 index 00000000..44e037da --- /dev/null +++ b/mma-server/src/main/java/com/aliyun/odps/mma/server/action/executor/CopyTaskExecutor.java @@ -0,0 +1,73 @@ +package com.aliyun.odps.mma.server.action.executor; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.LogManager; + +import com.aliyun.odps.Task; +import com.aliyun.odps.Instance; +import com.aliyun.odps.Odps; +import com.aliyun.odps.OdpsException; +import com.aliyun.odps.mma.server.action.info.CopyTaskActionInfo; + +public class CopyTaskExecutor extends AbstractActionExecutor { + private static final Logger LOG = LogManager.getLogger("ExecutorLogger"); + + private static class CopyTaskCallable implements Callable>> { + private final Odps odps; + private final Task copyTask; + private final String actionId; + private final CopyTaskActionInfo actionInfo; + + CopyTaskCallable( + Odps odps, + Task copyTask, + String actionId, + CopyTaskActionInfo actionInfo) { + this.odps = odps; + this.copyTask = copyTask; + this.actionId = actionId; + this.actionInfo = actionInfo; + + } + + @Override + public List> call() throws Exception { + LOG.info("ActionId: {}, Executing copyTask: {}, properties {}", + this.actionId, + this.copyTask.getName(), + this.copyTask.getProperties()); + Instance instance = this.odps.instances().create(odps.getDefaultProject(), copyTask); + + this.actionInfo.setInstanceId(instance.getId()); + LOG.info("ActionId: {}, InstanceId: {}", this.actionId, instance.getId()); + + try { + this.actionInfo.setLogView(this.odps.logview().generateLogView(instance, 72L)); + LOG.info("ActionId: {}, LogView {}", this.actionId, this.actionInfo.getLogView()); + } catch (OdpsException e) { + LOG.warn("ActionId: {}, failed to generate logview", this.actionId); + } + + instance.waitForSuccess(); + + LOG.info("Action execute result: {}", instance.getTaskResults()); + return Collections.emptyList(); + } + } + + public Future>> execute( + Odps odps, + Task copyTask, + String actionId, + CopyTaskActionInfo actionInfo) { + CopyTaskCallable callable = new CopyTaskCallable(odps, copyTask, actionId, actionInfo); + + return this.executor.submit(callable); + } + +} diff --git a/mma-server/src/main/java/com/aliyun/odps/mma/server/action/info/CopyTaskActionInfo.java b/mma-server/src/main/java/com/aliyun/odps/mma/server/action/info/CopyTaskActionInfo.java new file mode 100644 index 00000000..2bb2cf42 --- /dev/null +++ b/mma-server/src/main/java/com/aliyun/odps/mma/server/action/info/CopyTaskActionInfo.java @@ -0,0 +1,57 @@ +package com.aliyun.odps.mma.server.action.info; + +import java.util.List; + +import com.aliyun.odps.utils.StringUtils; + +public class CopyTaskActionInfo extends AbstractActionInfo { + private String instanceId; + private String logView; + private Float progress; + private List> result; + + public synchronized String getInstanceId() { + return instanceId; + } + + public synchronized String getLogView() { + return logView; + } + + public synchronized Float getProgress() { + return progress; + } + + public synchronized List> getResult() { + return result; + } + + public synchronized void setInstanceId(String instanceId) { + this.instanceId = instanceId; + } + + public synchronized void setLogView(String logView) { + this.logView = logView; + } + + public synchronized void setProgress(Float progress) { + this.progress = progress; + } + + public void setResult(List> result) { + this.result = result; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("["); + sb.append(this.getClass().getSimpleName()); + String instanceId = getInstanceId(); + if (!StringUtils.isNullOrEmpty(instanceId)) { + sb.append(" ").append(instanceId); + } + sb.append("]"); + + return sb.toString(); + } +} diff --git a/mma-server/src/main/java/com/aliyun/odps/mma/server/action/info/DefaultActionInfo.java b/mma-server/src/main/java/com/aliyun/odps/mma/server/action/info/DefaultActionInfo.java index fa1d369c..09b7a322 100644 --- a/mma-server/src/main/java/com/aliyun/odps/mma/server/action/info/DefaultActionInfo.java +++ b/mma-server/src/main/java/com/aliyun/odps/mma/server/action/info/DefaultActionInfo.java @@ -21,12 +21,22 @@ public class DefaultActionInfo extends AbstractActionInfo { + private String result = ""; + private Map kvs = new ConcurrentHashMap(); public String get(String key) { return this.kvs.get(key); } public void put(String key, String value) { this.kvs.put(key, value); } + public synchronized String getResult() { + return result; + } + + public void setResult(String result) { + this.result = result; + } + @Override public String toString() { return null; } } diff --git a/mma-server/src/main/java/com/aliyun/odps/mma/server/job/AbstractSingleTaskJob.java b/mma-server/src/main/java/com/aliyun/odps/mma/server/job/AbstractSingleTaskJob.java index 381a8761..5a17feb7 100644 --- a/mma-server/src/main/java/com/aliyun/odps/mma/server/job/AbstractSingleTaskJob.java +++ b/mma-server/src/main/java/com/aliyun/odps/mma/server/job/AbstractSingleTaskJob.java @@ -16,7 +16,7 @@ /** * @author yida */ -public abstract class AbstractSingleTaskJob extends AbstractJob{ +public abstract class AbstractSingleTaskJob extends AbstractJob { private static final Logger LOG = LogManager.getLogger(AbstractSingleTaskJob.class); private Task task; diff --git a/mma-server/src/main/java/com/aliyun/odps/mma/server/job/JobManager.java b/mma-server/src/main/java/com/aliyun/odps/mma/server/job/JobManager.java index ba70efba..634962f3 100644 --- a/mma-server/src/main/java/com/aliyun/odps/mma/server/job/JobManager.java +++ b/mma-server/src/main/java/com/aliyun/odps/mma/server/job/JobManager.java @@ -16,13 +16,13 @@ package com.aliyun.odps.mma.server.job; -import java.util.ArrayList; import java.util.Arrays; +import java.util.ArrayList; import java.util.Collections; +import java.util.Map; import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -381,7 +381,7 @@ public synchronized void removeJob(String jobId) { } public synchronized void removeSubJob(String parentJobId, String jobId) { - LOG.info("Remove sub job, parent job id: {}, sub job id: {}"); + LOG.info("Remove sub job, parent job id: {}, sub job id: {}", parentJobId, jobId); removeJobInternal(parentJobId, jobId); } @@ -467,13 +467,20 @@ boolean isHiveToMcJob() { && metaDestType.equals(MetaDestType.MaxCompute) && dataDestType.equals(DataDestType.MaxCompute); } + boolean isMcToMcJob() { + return metaSourceType.equals(MetaSourceType.MaxCompute) && dataSourceType.equals(DataSourceType.MaxCompute) + && metaDestType.equals(MetaDestType.MaxCompute) && dataDestType.equals(DataDestType.MaxCompute); + } + void check() { Set mcOSSValidType = new HashSet<>(Arrays.asList(CATALOG, TABLE, FUNCTION, RESOURCE)); Set hiveMcValidType = new HashSet<>(Arrays.asList(CATALOG, TABLE)); + Set mcToMcValidType = new HashSet<>(Arrays.asList(CATALOG, TABLE, FUNCTION, RESOURCE)); boolean isValidMcToOSSJob = isMcToOSSJob() && mcOSSValidType.contains(objectType); boolean isValidOSSToMcJob = isOSSToMcJob() && mcOSSValidType.contains(objectType); boolean isValidHiveToMcJob = isHiveToMcJob() && hiveMcValidType.contains(objectType); - if (!isValidMcToOSSJob && !isValidOSSToMcJob && !isValidHiveToMcJob) { + boolean isValidMcToMcJob = isMcToMcJob() && mcToMcValidType.contains(objectType); + if (!isValidMcToOSSJob && !isValidOSSToMcJob && !isValidHiveToMcJob && !isValidMcToMcJob) { throw new IllegalArgumentException("Unsupported source and dest combination"); } } @@ -536,6 +543,8 @@ private Job getJobInternal(Job parentJob, JobRecord record, boolean refresh) { } else { throw new IllegalArgumentException("Unsupported object type " + jobDescribe.objectType); } + } else if (jobDescribe.isMcToMcJob()) { + job = getMcToMcJob(parentJob, config, record); } else { throw new IllegalArgumentException("Unsupported source and dest combination."); } @@ -544,6 +553,62 @@ private Job getJobInternal(Job parentJob, JobRecord record, boolean refresh) { return job; } + private Job getMcToMcJob( + Job parentJob, + JobConfiguration config, + JobRecord record) { + ObjectType objectType = ObjectType.valueOf(config.get(JobConfiguration.OBJECT_TYPE)); + switch (objectType) { + case CATALOG: { + return getMcToMcCatalogJob(parentJob, record); + } + case TABLE: { + return getMcToMcTableJob(parentJob, record); + } + case PARTITION: { + return getPartitionJob(parentJob, record); + } + case RESOURCE: { + return getMcToMcResourceJob(parentJob, record); + } + case FUNCTION: { + return getMcToMcFunctionJob(parentJob, record); + } + default: + throw new IllegalArgumentException("Unsupported object type " + objectType); + } + } + + private Job getMcToMcCatalogJob( + Job parentJob, + JobRecord record) { + return new McToMcCatalogJob(parentJob, record, this, metaManager, metaSourceFactory); + } + + private Job getMcToMcTableJob( + Job parentJob, + JobRecord record) { + return new McToMcTableJob(parentJob, record, this, metaManager, metaSourceFactory); + } + + private Job getPartitionJob( + Job parentJob, + JobRecord record) { + return new PartitionJob(parentJob, record, this, metaManager, metaSourceFactory); + } + + private Job getMcToMcResourceJob( + Job parentJob, + JobRecord record) { + return new McToMcResourceJob(parentJob, record, this, metaManager, metaSourceFactory); + } + + private Job getMcToMcFunctionJob( + Job parentJob, + JobRecord record) { + return new McToMcFunctionJob(parentJob, record, this, metaManager, metaSourceFactory); + } + public Job getSubJobById(Job parentJob, String subJobId) { LOG.info("Get sub job, parent job id: {}, job id: {}", parentJob.getId(), subJobId); JobRecord subRecord = metaManager.getSubJobById(parentJob.getId(), subJobId); diff --git a/mma-server/src/main/java/com/aliyun/odps/mma/server/job/McToMcCatalogJob.java b/mma-server/src/main/java/com/aliyun/odps/mma/server/job/McToMcCatalogJob.java new file mode 100644 index 00000000..39adb829 --- /dev/null +++ b/mma-server/src/main/java/com/aliyun/odps/mma/server/job/McToMcCatalogJob.java @@ -0,0 +1,78 @@ +package com.aliyun.odps.mma.server.job; + +import java.util.List; +import java.util.LinkedList; +import java.util.stream.Collectors; + +import com.aliyun.odps.mma.server.meta.generated.JobRecord; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import com.aliyun.odps.mma.config.JobConfiguration; +import com.aliyun.odps.mma.config.ObjectType; +import com.aliyun.odps.mma.job.JobStatus; +import com.aliyun.odps.mma.meta.MetaSourceFactory; +import com.aliyun.odps.mma.server.meta.MetaManager; +import com.aliyun.odps.mma.server.task.Task; + +public class McToMcCatalogJob extends CatalogJob { + private static final Logger LOG = LogManager.getLogger(McToMcCatalogJob.class); + + public McToMcCatalogJob(Job parentJob, + JobRecord record, + JobManager jobManager, + MetaManager metaManager, + MetaSourceFactory metaSourceFactory) { + super(parentJob, record, jobManager, metaManager, metaSourceFactory); + } + + @Override + public synchronized List getExecutableTasks() { + LOG.info("Create the Mc2Mc catalogJob, job id: {}", record.getJobId()); + List subJobs = getSubJobs(); + List ret = new LinkedList<>(); + + // Step.1: sync table jobs. + getJobsByObjectType(ret, subJobs, ObjectType.TABLE); + if (!ret.isEmpty()) { + LOG.info("The first step is to sync the table jobs: {}", ret); + return ret; + } + if (!subJobs.stream() + .filter(subJob -> ObjectType.TABLE.name().equals(subJob.getJobConfiguration().get(JobConfiguration.OBJECT_TYPE))) + .allMatch(subJob -> JobStatus.SUCCEEDED.equals(subJob.getStatus()))) { + LOG.info("The subjobs of ObjectType.TABLE from McToMcCatalogJob aren't finished yet."); + return ret; + } + + // Step.2: sync resource jobs. + getJobsByObjectType(ret, subJobs, ObjectType.RESOURCE); + if (!ret.isEmpty()) { + LOG.info("The second step is to sync the resource jobs: {}", ret); + return ret; + } + + if (!subJobs.stream() + .filter(subJob -> ObjectType.RESOURCE.name().equals(subJob.getJobConfiguration().get(JobConfiguration.OBJECT_TYPE))) + .allMatch(subJob -> JobStatus.SUCCEEDED.equals(subJob.getStatus()))) { + LOG.info("The subjobs of ObjectType.RESOURCE from McToMcCatalogJob aren't finished yet."); + return ret; + } + + // Step.3: sync function jobs. + getJobsByObjectType(ret, subJobs, ObjectType.FUNCTION); + LOG.info("The third step is to sync the functions jobs: {}", ret); + return ret; + } + + + void getJobsByObjectType(List ret, List subJobs, ObjectType objectType) { + List unSucceededJob = subJobs.stream() + .filter(subJob -> objectType.name().equals(subJob.getJobConfiguration().get(JobConfiguration.OBJECT_TYPE)) + && !JobStatus.SUCCEEDED.equals(subJob.getStatus())) + .collect(Collectors.toList()); + for (Job job : unSucceededJob) { + ret.addAll(job.getExecutableTasks()); + } + } +} diff --git a/mma-server/src/main/java/com/aliyun/odps/mma/server/job/McToMcFunctionJob.java b/mma-server/src/main/java/com/aliyun/odps/mma/server/job/McToMcFunctionJob.java new file mode 100644 index 00000000..d6e800d0 --- /dev/null +++ b/mma-server/src/main/java/com/aliyun/odps/mma/server/job/McToMcFunctionJob.java @@ -0,0 +1,74 @@ +package com.aliyun.odps.mma.server.job; + +import com.aliyun.odps.mma.job.JobStatus; +import com.aliyun.odps.mma.meta.MetaSourceFactory; +import com.aliyun.odps.mma.server.meta.MetaManager; +import com.aliyun.odps.mma.server.meta.generated.JobRecord; +import com.aliyun.odps.mma.server.task.McToMcFunctionTask; +import com.aliyun.odps.mma.server.task.Task; +import com.aliyun.odps.mma.server.task.TaskProgress; +import com.google.common.collect.Lists; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class McToMcFunctionJob extends AbstractSingleTaskJob { + private static final Logger LOG = LogManager.getLogger(McToMcFunctionJob.class); + + private Task task; + + public McToMcFunctionJob(Job parentJob, + JobRecord record, + JobManager jobManager, + MetaManager metaManager, + MetaSourceFactory metaSourceFactory) { + super(parentJob, record, jobManager, metaManager, metaSourceFactory); + } + + @Override + Task generateTask() { + String taskIdPrefix = generateTaskIdPrefix(); + this.task = new McToMcFunctionTask( + taskIdPrefix + ".functionTransmission", + getRootJobId(), + config, + this + ); + return task; + } + + @Override + public synchronized void setStatus(Task task) { + if (this.task != task) { + LOG.info("Outdated task(McToMcFunction) found, job id: {}, task id:{}", + record.getJobId(), + task.getId()); + return; + } + + if (this.isTerminated()) { + LOG.info("Job(McToMcFunction) has terminated, id: {}, status: {}, task id: {}, task status: {}", + record.getJobId(), + getStatus(), + task.getId(), + task.getProgress()); + } + + TaskProgress taskStatus = task.getProgress(); + + switch (taskStatus) { + case SUCCEEDED: + setStatusInternal(JobStatus.SUCCEEDED); + setInfo(Lists.newArrayList(task.getDag().iterator()).get(0).getResult().toString()); + break; + case FAILED: + setStatusInternal(JobStatus.FAILED); + fail(Lists.newArrayList(task.getDag().iterator()).get(0).getResult().toString()); + break; + case RUNNING: + setStatusInternal(JobStatus.RUNNING); + break; + default: + break; + } + } +} diff --git a/mma-server/src/main/java/com/aliyun/odps/mma/server/job/McToMcResourceJob.java b/mma-server/src/main/java/com/aliyun/odps/mma/server/job/McToMcResourceJob.java new file mode 100644 index 00000000..6bcea950 --- /dev/null +++ b/mma-server/src/main/java/com/aliyun/odps/mma/server/job/McToMcResourceJob.java @@ -0,0 +1,72 @@ +package com.aliyun.odps.mma.server.job; + +import com.aliyun.odps.mma.server.meta.generated.JobRecord; +import com.google.common.collect.Lists; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.LogManager; + +import com.aliyun.odps.mma.job.JobStatus; +import com.aliyun.odps.mma.server.task.Task; +import com.aliyun.odps.mma.meta.MetaSourceFactory; +import com.aliyun.odps.mma.server.meta.MetaManager; +import com.aliyun.odps.mma.server.task.McToMcResourceTask; +import com.aliyun.odps.mma.server.task.TaskProgress; + +public class McToMcResourceJob extends AbstractSingleTaskJob { + private static final Logger LOG = LogManager.getLogger(McToMcResourceJob.class); + private Task task; + + public McToMcResourceJob(Job parentJob, + JobRecord record, + JobManager jobManager, + MetaManager metaManager, + MetaSourceFactory metaSourceFactory) { + super(parentJob, record, jobManager, metaManager, metaSourceFactory); + } + + @Override + Task generateTask() { + String taskIdPrefix = generateTaskIdPrefix(); + this.task = new McToMcResourceTask( + taskIdPrefix + ".resourceTransmission", + getRootJobId(), + config, + this + ); + return task; + } + + @Override + public synchronized void setStatus(Task task) { + if (this.task != task) { + LOG.info("Outdated task(McToMcResource) found, job id: {}, task id:{}", record.getJobId(), task.getId()); + return; + } + + if (this.isTerminated()) { + LOG.info("Job(McToMcResource) has terminated, id: {}, status: {}, task id: {}, task status: {}", + record.getJobId(), + getStatus(), + task.getId(), + task.getProgress()); + } + + TaskProgress taskStatus = task.getProgress(); + + switch (taskStatus) { + case SUCCEEDED: + setStatusInternal(JobStatus.SUCCEEDED); + setInfo(Lists.newArrayList(task.getDag().iterator()).get(0).getResult().toString()); + break; + case FAILED: + setStatusInternal(JobStatus.FAILED); + fail(Lists.newArrayList(task.getDag().iterator()).get(0).getResult().toString()); + break; + case RUNNING: + setStatusInternal(JobStatus.RUNNING); + break; + default: + break; + } + } +} diff --git a/mma-server/src/main/java/com/aliyun/odps/mma/server/job/McToMcTableJob.java b/mma-server/src/main/java/com/aliyun/odps/mma/server/job/McToMcTableJob.java new file mode 100644 index 00000000..7094fb13 --- /dev/null +++ b/mma-server/src/main/java/com/aliyun/odps/mma/server/job/McToMcTableJob.java @@ -0,0 +1,276 @@ +package com.aliyun.odps.mma.server.job; + +import java.util.List; +import java.util.LinkedList; +import java.util.Collections; +import java.util.stream.Collectors; + +import com.aliyun.odps.mma.server.meta.generated.JobRecord; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.LogManager; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.jgrapht.graph.DefaultEdge; +import org.jgrapht.graph.DirectedAcyclicGraph; + +import com.aliyun.odps.mma.job.JobStatus; +import com.aliyun.odps.mma.config.DataSourceType; +import com.aliyun.odps.mma.config.JobConfiguration; +import com.aliyun.odps.mma.config.ConfigurationUtils; +import com.aliyun.odps.mma.meta.MetaSource; +import com.aliyun.odps.mma.meta.MetaSource.TableMetaModel; +import com.aliyun.odps.mma.meta.MetaSource.PartitionMetaModel; +import com.aliyun.odps.mma.meta.MetaSourceFactory; +import com.aliyun.odps.mma.server.meta.MetaManager; +import com.aliyun.odps.mma.server.task.Task; +import com.aliyun.odps.mma.server.task.TaskProgress; +import com.aliyun.odps.mma.server.task.McToMcTableSetUpTask; +import com.aliyun.odps.mma.server.task.McToMcTableDataTransmissionTask; +import com.aliyun.odps.mma.meta.transform.SchemaTransformerFactory; +import com.aliyun.odps.mma.meta.transform.SchemaTransformer.SchemaTransformResult; + +public class McToMcTableJob extends AbstractTableJob { + private static final Logger LOG = LogManager.getLogger(McToOssTableJob.class); + + McToMcTableJob( + Job parentJob, + JobRecord record, + JobManager jobManager, + MetaManager metaManager, + MetaSourceFactory metaSourceFactory) { + super(parentJob, record, jobManager, metaManager, metaSourceFactory); + } + + @Override + DirectedAcyclicGraph generateDag() throws Exception { + LOG.info("Generate the Mc2Mc tableJob DAG, job id: {}", record.getJobId()); + + try { + MetaSource metaSource = metaSourceFactory.getMetaSource(config); + + TableMetaModel sourceMcTableMetaModel = metaSource.getTableMeta( + config.get(JobConfiguration.SOURCE_CATALOG_NAME), + config.get(JobConfiguration.SOURCE_OBJECT_NAME)); + + SchemaTransformResult schemaTransformResult = SchemaTransformerFactory + .get(DataSourceType.MaxCompute) + .transform(sourceMcTableMetaModel, config); + TableMetaModel sinkMcTableMetaModel = schemaTransformResult.getTableMetaModel(); + + List pendingSubJobs = null; + if (!sourceMcTableMetaModel.getPartitionColumns().isEmpty()) { + pendingSubJobs = jobManager.listSubJobsByStatus(this, JobStatus.PENDING); + } + + DirectedAcyclicGraph dag = new DirectedAcyclicGraph<>(DefaultEdge.class); + Task setUpTask = getSetUpTask( + metaSource, + sourceMcTableMetaModel, + sinkMcTableMetaModel, + pendingSubJobs); + List dataTransmissionTasks = getDataTransmissionTasks( + metaSource, + sourceMcTableMetaModel, + sinkMcTableMetaModel, + pendingSubJobs); + + dag.addVertex(setUpTask); + dataTransmissionTasks.forEach(dag::addVertex); + dataTransmissionTasks.forEach(t -> dag.addEdge(setUpTask, t)); + return dag; + } catch (Exception e) { + String stackTrace = ExceptionUtils.getFullStackTrace(e); + fail(stackTrace); + throw e; + } + + } + + private Task getSetUpTask(MetaSource metaSource, + TableMetaModel sourceMcTableMetaModel, + TableMetaModel destMcTableMetaModel, + List pendingSubJobs) throws Exception { + List groupDests = null; + if (!sourceMcTableMetaModel.getPartitionColumns().isEmpty()) { + groupDests = getStaticTablePartitionGroups( + metaSource, + sourceMcTableMetaModel, + destMcTableMetaModel, + pendingSubJobs) + .stream() + .map(TablePartitionGroup::getDest) + .collect(Collectors.toList()); + } + String taskIdPrefix = generateTaskIdPrefix(); + return new McToMcTableSetUpTask( + taskIdPrefix + ".SetUp", + getRootJobId(), + config, + destMcTableMetaModel, + groupDests, + this); + } + + private List getDataTransmissionTasks(MetaSource metaSource, + TableMetaModel sourceMcTableMetaModel, + TableMetaModel destMcTableMetaModel, + List pendingSubJobs) throws Exception { + List ret = new LinkedList<>(); + boolean isPartitioned = !sourceMcTableMetaModel.getPartitionColumns().isEmpty(); + String taskIdPrefix = generateTaskIdPrefix(); + String rootJobId = getRootJobId(); + + if (isPartitioned) { + List groups = getTablePartitionGroups( + metaSource, + sourceMcTableMetaModel, + destMcTableMetaModel, + pendingSubJobs); + + for (int i = 0; i < groups.size(); i++) { + String taskId = taskIdPrefix + ".DataTransmission" + ".part." + i; + Task task = new McToMcTableDataTransmissionTask( + taskId, + rootJobId, + config, + groups.get(i).getSource(), + groups.get(i).getDest(), + this, + groups.get(i).getJobs()); + LOG.info( + "McToMcTableJob data transmission tasks generated, id: {}, rootJobId: {}, jobs: {}", + taskId, + rootJobId, + groups.get(i).getJobs().stream().map(Job::getId).collect(Collectors.toList())); + ret.add(task); + } + } else { + Task task = new McToMcTableDataTransmissionTask( + taskIdPrefix + ".DataTransmission", + rootJobId, + config, + sourceMcTableMetaModel, + destMcTableMetaModel, + this, + Collections.emptyList()); + ret.add(task); + } + + return ret; + } + + List getTablePartitionGroups( + MetaSource metaSource, + TableMetaModel source, + TableMetaModel dest, + List pendingSubJobs) throws Exception { + + if (source.getPartitionColumns().isEmpty() || pendingSubJobs.isEmpty()) { + // Not a partitioned table or the number of partitions to transfer is zero + LOG.info( + "McToMcTableJob create partition group, database: {}, table: {}, is partitioned: {}, num partitions: {}", + source.getDatabase(), + source.getTable(), + !source.getPartitionColumns().isEmpty(), + pendingSubJobs.size()); + return Collections.singletonList( + new TablePartitionGroup(source, dest, Collections.singletonList(this))); + } + + return generateAdaptiveTablePartitionGroups(metaSource, source, dest, pendingSubJobs); + + } + + private List generateAdaptiveTablePartitionGroups( + MetaSource metaSource, + TableMetaModel source, + TableMetaModel dest, + List pendingSubJobs) throws Exception { + + LOG.info("McToMcTableJob catalog: {}, table: {}, enter getAdaptiveTablePartitionGroups", + source.getDatabase(), source.getTable()); + + if (source.getPartitionColumns().isEmpty()) { + LOG.info("McToMcTableJob database: {}, table: {}, non-partitioned table is not supported", + source.getDatabase(), source.getTable()); + return null; + } + + List ret = new LinkedList<>(); + for (Job job : pendingSubJobs) { + List partitionVals = ConfigurationUtils.getPartitionValuesFromPartitionIdentifier( + job.getJobConfiguration().get(JobConfiguration.SOURCE_OBJECT_NAME)); + // TODO: The source table metadata already contains the partition metadata + PartitionMetaModel partitionMetaModel = metaSource.getPartitionMeta( + source.getDatabase(), source.getTable(), partitionVals); + // Make sure that the size of each partition is valid. + if (partitionMetaModel.getSize() == null) { + LOG.info( + "McToMcTableJob database: {}, table: {}, partition: {}, size is not valid", + source.getDatabase(), + source.getTable(), + partitionMetaModel.getPartitionValues()); + // Tips: different from others. + continue; + } + TableMetaModel.TableMetaModelBuilder sourceBuilder = new TableMetaModel.TableMetaModelBuilder(source); + TableMetaModel.TableMetaModelBuilder destBuilder = new TableMetaModel.TableMetaModelBuilder(dest); + LOG.info("McToMcTableJob Database: {}, table: {}, partition: {}", + source.getDatabase(), + source.getTable(), + partitionMetaModel.getPartitionValues()); + sourceBuilder.partitions(Collections.singletonList(partitionMetaModel)); + destBuilder.partitions(Collections.singletonList(partitionMetaModel)); + ret.add( + new TablePartitionGroup( + sourceBuilder.build(), + destBuilder.build(), + Collections.singletonList(job))); + } + return ret; + } + + @Override + public synchronized void setStatus(Task task) { + + if (JobStatus.SUCCEEDED.equals(getStatus()) + || JobStatus.FAILED.equals(getStatus()) + || JobStatus.CANCELED.equals(getStatus())) { + LOG.info("Job has terminated, id: {}, status: {}, task id: {}, task status: {}", + record.getJobId(), + getStatus(), + task.getId(), + task.getProgress()); + } + + TaskProgress taskStatus = task.getProgress(); + + switch (taskStatus) { + case SUCCEEDED: + if (task instanceof McToMcTableDataTransmissionTask) { + handleDataTransmissionTask((McToMcTableDataTransmissionTask) task); + } + if (dag.vertexSet() + .stream() + .filter(t -> t instanceof McToMcTableDataTransmissionTask) + .allMatch(t -> TaskProgress.SUCCEEDED.equals(t.getProgress()))) { + setStatusInternal(JobStatus.SUCCEEDED); + } + break; + case FAILED: + if (task instanceof McToMcTableDataTransmissionTask) { + handleDataTransmissionTask((McToMcTableDataTransmissionTask) task); + } else { + String reason = String.format("%s failed, id: %s", task.getClass(), task.getId()); + fail(reason); + } + break; + case RUNNING: + LOG.info("Job running, id: {}", record.getJobId()); + setStatusInternal(JobStatus.RUNNING); + break; + default: + } + + } + +} diff --git a/mma-server/src/main/java/com/aliyun/odps/mma/server/task/McToMcFunctionTask.java b/mma-server/src/main/java/com/aliyun/odps/mma/server/task/McToMcFunctionTask.java new file mode 100644 index 00000000..0e73d7e5 --- /dev/null +++ b/mma-server/src/main/java/com/aliyun/odps/mma/server/task/McToMcFunctionTask.java @@ -0,0 +1,49 @@ +package com.aliyun.odps.mma.server.task; + +import com.aliyun.odps.Odps; +import com.aliyun.odps.mma.config.JobConfiguration; +import com.aliyun.odps.mma.server.OdpsUtils; +import com.aliyun.odps.mma.server.action.ActionExecutionContext; +import com.aliyun.odps.mma.server.action.McToMcFunctionAction; +import com.aliyun.odps.mma.server.job.Job; + +public class McToMcFunctionTask extends DagTask { + private final Job job; + + public McToMcFunctionTask(String id, String rootJobId, JobConfiguration config, Job job) { + super(id, rootJobId, config); + this.job = job; + init(); + } + + private void init() { + ActionExecutionContext context = new ActionExecutionContext(config); + Odps srcOdps = OdpsUtils.getOdps( + config.get(JobConfiguration.DATA_SOURCE_MC_ACCESS_KEY_ID), + config.get(JobConfiguration.DATA_SOURCE_MC_ACCESS_KEY_SECRET), + config.get(JobConfiguration.DATA_SOURCE_MC_ENDPOINT), + config.get(JobConfiguration.SOURCE_CATALOG_NAME) + ); + Odps destOdps = OdpsUtils.getOdps( + config.get(JobConfiguration.DATA_DEST_MC_ACCESS_KEY_ID), + config.get(JobConfiguration.DATA_DEST_MC_ACCESS_KEY_SECRET), + config.get(JobConfiguration.DATA_DEST_MC_ENDPOINT), + config.get(JobConfiguration.DEST_CATALOG_NAME) + ); + + McToMcFunctionAction action = new McToMcFunctionAction( + id, + this, + context, + config, + srcOdps, + destOdps + ); + dag.addVertex(action); + } + + @Override + void updateMetadata() { + job.setStatus(this); + } +} diff --git a/mma-server/src/main/java/com/aliyun/odps/mma/server/task/McToMcResourceTask.java b/mma-server/src/main/java/com/aliyun/odps/mma/server/task/McToMcResourceTask.java new file mode 100644 index 00000000..f0e9d9ea --- /dev/null +++ b/mma-server/src/main/java/com/aliyun/odps/mma/server/task/McToMcResourceTask.java @@ -0,0 +1,50 @@ +package com.aliyun.odps.mma.server.task; + +import com.aliyun.odps.Odps; +import com.aliyun.odps.mma.config.JobConfiguration; +import com.aliyun.odps.mma.server.OdpsUtils; +import com.aliyun.odps.mma.server.action.ActionExecutionContext; +import com.aliyun.odps.mma.server.action.McToMcResourceAction; +import com.aliyun.odps.mma.server.job.Job; + +public class McToMcResourceTask extends DagTask { + private final Job job; + + public McToMcResourceTask(String id, String rootJobId, JobConfiguration config, Job job) { + super(id, rootJobId, config); + this.job = job; + init(); + } + + private void init() { + ActionExecutionContext context = new ActionExecutionContext(config); + Odps srcOdps = OdpsUtils.getOdps( + config.get(JobConfiguration.DATA_SOURCE_MC_ACCESS_KEY_ID), + config.get(JobConfiguration.DATA_SOURCE_MC_ACCESS_KEY_SECRET), + config.get(JobConfiguration.DATA_SOURCE_MC_ENDPOINT), + config.get(JobConfiguration.SOURCE_CATALOG_NAME) + ); + Odps destOdps = OdpsUtils.getOdps( + config.get(JobConfiguration.DATA_DEST_MC_ACCESS_KEY_ID), + config.get(JobConfiguration.DATA_DEST_MC_ACCESS_KEY_SECRET), + config.get(JobConfiguration.DATA_DEST_MC_ENDPOINT), + config.get(JobConfiguration.DEST_CATALOG_NAME) + ); + + McToMcResourceAction action = new McToMcResourceAction( + id, + this, + context, + config, + srcOdps, + destOdps + ); + dag.addVertex(action); + } + + @Override + void updateMetadata() { + job.setStatus(this); + } + +} diff --git a/mma-server/src/main/java/com/aliyun/odps/mma/server/task/McToMcTableDataTransmissionTask.java b/mma-server/src/main/java/com/aliyun/odps/mma/server/task/McToMcTableDataTransmissionTask.java new file mode 100644 index 00000000..5cac0c3c --- /dev/null +++ b/mma-server/src/main/java/com/aliyun/odps/mma/server/task/McToMcTableDataTransmissionTask.java @@ -0,0 +1,52 @@ +package com.aliyun.odps.mma.server.task; + +import java.util.List; + +import com.aliyun.odps.mma.server.job.Job; +import com.aliyun.odps.mma.config.JobConfiguration; +import com.aliyun.odps.mma.meta.MetaSource.TableMetaModel; +import com.aliyun.odps.mma.server.action.ActionExecutionContext; +import com.aliyun.odps.mma.server.action.McToMcTableDataTransmissionAction; + +public class McToMcTableDataTransmissionTask extends TableDataTransmissionTask { + + public McToMcTableDataTransmissionTask( + String id, + String rootJobId, + JobConfiguration config, + TableMetaModel source, + TableMetaModel dest, + Job job, + List subJobs) { + super(id, rootJobId, config, source, dest, job, subJobs); + init(); + } + + private void init() { + ActionExecutionContext context = new ActionExecutionContext(config); + + McToMcTableDataTransmissionAction mcToMcTableDataTransmissionAction = + new McToMcTableDataTransmissionAction( + id + ".DataTransmission", + source, + dest, + config.get(JobConfiguration.SOURCE_CATALOG_NAME), + config.get(JobConfiguration.DATA_SOURCE_MC_ACCESS_KEY_ID), + config.get(JobConfiguration.DATA_SOURCE_MC_ACCESS_KEY_SECRET), + config.get(JobConfiguration.DATA_SOURCE_MC_ENDPOINT), + config.get(JobConfiguration.DEST_CATALOG_NAME), + config.get(JobConfiguration.DATA_DEST_MC_ACCESS_KEY_ID), + config.get(JobConfiguration.DATA_DEST_MC_ACCESS_KEY_SECRET), + config.get(JobConfiguration.DATA_DEST_MC_ENDPOINT), + this, + context + ); + + dag.addVertex(mcToMcTableDataTransmissionAction); + } + + @Override + void updateMetadata() { + job.setStatus(this); + } +} diff --git a/mma-server/src/main/java/com/aliyun/odps/mma/server/task/McToMcTableSetUpTask.java b/mma-server/src/main/java/com/aliyun/odps/mma/server/task/McToMcTableSetUpTask.java new file mode 100644 index 00000000..23f74989 --- /dev/null +++ b/mma-server/src/main/java/com/aliyun/odps/mma/server/task/McToMcTableSetUpTask.java @@ -0,0 +1,92 @@ +package com.aliyun.odps.mma.server.task; + +import java.util.List; + +import com.aliyun.odps.mma.server.job.Job; +import com.aliyun.odps.mma.meta.MetaSource; +import com.aliyun.odps.mma.config.JobConfiguration; +import com.aliyun.odps.mma.meta.MetaSource.TableMetaModel; +import com.aliyun.odps.mma.server.action.ActionExecutionContext; +import com.aliyun.odps.mma.server.action.McAddPartitionsAction; +import com.aliyun.odps.mma.server.action.McCreateTableAction; +import com.aliyun.odps.mma.server.action.McDropPartitionAction; +import com.aliyun.odps.mma.server.action.McDropTableAction; + +public class McToMcTableSetUpTask extends DagTask { + private final TableMetaModel tableMetaModel; + private final List partitionGroups; + private final Job job; + + public McToMcTableSetUpTask( + String id, + String rootJobId, + JobConfiguration config, + TableMetaModel tableMetaModel, + List partitionGroups, + Job job) { + super(id, rootJobId, config); + this.job = job; + this.tableMetaModel = tableMetaModel; + this.partitionGroups = partitionGroups; + init(); + } + + private void init() { + ActionExecutionContext context = new ActionExecutionContext(config); + + McCreateTableAction mcCreateTableAction = new McCreateTableAction( + this.getId() + ".CreateTable", + config.get(JobConfiguration.DATA_DEST_MC_ACCESS_KEY_ID), + config.get(JobConfiguration.DATA_DEST_MC_ACCESS_KEY_SECRET), + config.get(JobConfiguration.DEST_CATALOG_NAME), + config.get(JobConfiguration.DATA_DEST_MC_ENDPOINT), + tableMetaModel, + this, + context); + dag.addVertex(mcCreateTableAction); + + if (tableMetaModel.getPartitionColumns().isEmpty()) { + McDropTableAction mcDropTableAction = new McDropTableAction( + this.getId() + ".DropTable", + config.get(JobConfiguration.DATA_DEST_MC_ACCESS_KEY_ID), + config.get(JobConfiguration.DATA_DEST_MC_ACCESS_KEY_SECRET), + config.get(JobConfiguration.DEST_CATALOG_NAME), + config.get(JobConfiguration.DATA_DEST_MC_ENDPOINT), + tableMetaModel, + this, + context); + dag.addVertex(mcDropTableAction); + dag.addEdge(mcDropTableAction, mcCreateTableAction); + } else { + int idx = 0; + for (TableMetaModel managedPartitionGroup : partitionGroups) { + McDropPartitionAction mcDropPartitionAction = new McDropPartitionAction( + this.getId() + ".DropPartitions.part." + idx, + config.get(JobConfiguration.DATA_DEST_MC_ACCESS_KEY_ID), + config.get(JobConfiguration.DATA_DEST_MC_ACCESS_KEY_SECRET), + config.get(JobConfiguration.DEST_CATALOG_NAME), + config.get(JobConfiguration.DATA_DEST_MC_ENDPOINT), + managedPartitionGroup, + this, + context); + dag.addVertex(mcDropPartitionAction); + dag.addEdge(mcCreateTableAction, mcDropPartitionAction); + McAddPartitionsAction mcAddPartitionsAction = new McAddPartitionsAction( + this.getId() + ".AddPartitions.part." + idx, + config.get(JobConfiguration.DATA_DEST_MC_ACCESS_KEY_ID), + config.get(JobConfiguration.DATA_DEST_MC_ACCESS_KEY_SECRET), + config.get(JobConfiguration.DEST_CATALOG_NAME), + config.get(JobConfiguration.DATA_DEST_MC_ENDPOINT), + managedPartitionGroup, + this, + context); + dag.addVertex(mcAddPartitionsAction); + dag.addEdge(mcDropPartitionAction, mcAddPartitionsAction); + idx += 1; + } + } + } + + @Override + void updateMetadata() { job.setStatus(this); } +} diff --git a/mma-server/src/main/java/com/aliyun/odps/mma/server/task/McToOssTableDataTransmissionTask.java b/mma-server/src/main/java/com/aliyun/odps/mma/server/task/McToOssTableDataTransmissionTask.java index f2904997..829d2df5 100644 --- a/mma-server/src/main/java/com/aliyun/odps/mma/server/task/McToOssTableDataTransmissionTask.java +++ b/mma-server/src/main/java/com/aliyun/odps/mma/server/task/McToOssTableDataTransmissionTask.java @@ -20,9 +20,9 @@ import com.aliyun.odps.mma.config.JobConfiguration; import com.aliyun.odps.mma.server.action.ActionExecutionContext; -import com.aliyun.odps.mma.server.action.McToMcTableDataTransmissionAction; import com.aliyun.odps.mma.server.action.McVerificationAction; import com.aliyun.odps.mma.server.action.VerificationAction; +import com.aliyun.odps.mma.server.action.McInsertOverwriteAction; import com.aliyun.odps.mma.server.job.Job; import com.aliyun.odps.mma.meta.MetaSource.TableMetaModel; @@ -45,7 +45,7 @@ private void init() { String executionProject = config.getOrDefault( JobConfiguration.JOB_EXECUTION_MC_PROJECT, config.get(JobConfiguration.SOURCE_CATALOG_NAME)); - McToMcTableDataTransmissionAction action = new McToMcTableDataTransmissionAction( + McInsertOverwriteAction action = new McInsertOverwriteAction( id + ".DataTransmission", config.get(JobConfiguration.DATA_SOURCE_MC_ACCESS_KEY_ID), config.get(JobConfiguration.DATA_SOURCE_MC_ACCESS_KEY_SECRET), diff --git a/mma-server/src/main/java/com/aliyun/odps/mma/server/task/OssToMcTableDataTransmissionTask.java b/mma-server/src/main/java/com/aliyun/odps/mma/server/task/OssToMcTableDataTransmissionTask.java index 8c8f33fd..6cf302a5 100644 --- a/mma-server/src/main/java/com/aliyun/odps/mma/server/task/OssToMcTableDataTransmissionTask.java +++ b/mma-server/src/main/java/com/aliyun/odps/mma/server/task/OssToMcTableDataTransmissionTask.java @@ -20,9 +20,9 @@ import com.aliyun.odps.mma.config.JobConfiguration; import com.aliyun.odps.mma.server.action.ActionExecutionContext; -import com.aliyun.odps.mma.server.action.McToMcTableDataTransmissionAction; import com.aliyun.odps.mma.server.action.McVerificationAction; import com.aliyun.odps.mma.server.action.VerificationAction; +import com.aliyun.odps.mma.server.action.McInsertOverwriteAction; import com.aliyun.odps.mma.server.job.Job; import com.aliyun.odps.mma.meta.MetaSource.TableMetaModel; @@ -45,7 +45,7 @@ private void init() { String executionProject = config.getOrDefault( JobConfiguration.JOB_EXECUTION_MC_PROJECT, config.get(JobConfiguration.DEST_CATALOG_NAME)); - McToMcTableDataTransmissionAction action = new McToMcTableDataTransmissionAction( + McInsertOverwriteAction action = new McInsertOverwriteAction( id + ".DataTransmission", config.get(JobConfiguration.DATA_DEST_MC_ACCESS_KEY_ID), config.get(JobConfiguration.DATA_DEST_MC_ACCESS_KEY_SECRET), diff --git a/mma-server/src/main/java/com/aliyun/odps/mma/server/ui/utils/UiUtils.java b/mma-server/src/main/java/com/aliyun/odps/mma/server/ui/utils/UiUtils.java index deceadd8..244705bd 100644 --- a/mma-server/src/main/java/com/aliyun/odps/mma/server/ui/utils/UiUtils.java +++ b/mma-server/src/main/java/com/aliyun/odps/mma/server/ui/utils/UiUtils.java @@ -59,6 +59,7 @@ import com.aliyun.odps.mma.config.ObjectType; import com.aliyun.odps.mma.server.action.Action; import com.aliyun.odps.mma.server.action.info.AbstractActionInfo; +import com.aliyun.odps.mma.server.action.info.CopyTaskActionInfo; import com.aliyun.odps.mma.server.action.info.HiveSqlActionInfo; import com.aliyun.odps.mma.server.action.info.McSqlActionInfo; import com.aliyun.odps.mma.server.action.info.VerificationActionInfo; @@ -202,6 +203,27 @@ public static DomContent actionInfoTable(Action action) { return ul( listEntries.toArray(new DomContent[0]) ); + } else if (action instanceof CopyTaskActionInfo) { + CopyTaskActionInfo mcSqlActionInfo = (CopyTaskActionInfo) actionInfo; + List listEntries = new LinkedList<>(); + listEntries.add( + actionInfoEntry("MC instance ID", mcSqlActionInfo.getInstanceId()) + ); + if (mcSqlActionInfo.getLogView() != null) { + listEntries.add( + actionInfoEntry("MC instance tracking URL", + a(mcSqlActionInfo.getLogView()).withStyle("word-break: break-all;")) + ); + } else { + listEntries.add( + actionInfoEntry("MC instance tracking URL", "N/A") + ); + } + // TODO: progress + + return ul( + listEntries.toArray(new DomContent[0]) + ); } else if (actionInfo instanceof VerificationActionInfo) { VerificationActionInfo verificationActionInfo = (VerificationActionInfo) actionInfo; @@ -691,9 +713,10 @@ public static String getSource(Job job) { JobConfiguration config = job.getJobConfiguration(); ObjectType objectType = ObjectType.valueOf(config.get(JobConfiguration.OBJECT_TYPE)); switch (objectType) { - case CATALOG: { + case CATALOG: + case RESOURCE: + case FUNCTION: return config.get(JobConfiguration.SOURCE_CATALOG_NAME); - } case PARTITION: { String catalogName = StringUtils.defaultString( config.get(JobConfiguration.SOURCE_CATALOG_NAME), @@ -725,9 +748,10 @@ public static String getDestination(Job job) { JobConfiguration config = job.getJobConfiguration(); ObjectType objectType = ObjectType.valueOf(config.get(JobConfiguration.OBJECT_TYPE)); switch (objectType) { - case CATALOG: { + case CATALOG: + case RESOURCE: + case FUNCTION: return config.get(JobConfiguration.DEST_CATALOG_NAME); - } case PARTITION: { String catalogName = StringUtils.defaultString( config.get(JobConfiguration.DEST_CATALOG_NAME), @@ -741,7 +765,8 @@ public static String getDestination(Job job) { "%s.%s partition %s", catalogName, tableName, - GsonUtils.GSON.toJson(partitionValues)); } + GsonUtils.GSON.toJson(partitionValues)); + } default: { String catalogName = StringUtils.defaultString( config.get(JobConfiguration.DEST_CATALOG_NAME), diff --git a/pom.xml b/pom.xml index b90c4180..b790b4e3 100644 --- a/pom.xml +++ b/pom.xml @@ -25,7 +25,7 @@ limitations under the License. 0.1.1 - 0.36.4-public + 0.39.3 3.8.0 1.1.0 2.6.0 @@ -114,7 +114,6 @@ limitations under the License. com.aliyun.odps odps-sdk-core ${odps.sdk.version} - shaded com.google.protobuf