Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion mma-commons/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ limitations under the License.
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-core</artifactId>
<classifier>shaded</classifier>
</dependency>
<dependency>
<groupId>com.aliyun.oss</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public void validate() throws MmaException {
// 1. Hive(metadata), Hive(data) -> MC(metadata), MC(data)
// 2. MC(metadata), MC(metadata) -> OSS(metadata), OSS(data)
// 3. OSS(metadata), OSS(data) -> MC(metadata), MC(data)
// 4. MC(metadata), MC(data) -> MC(metadata), MC(data)
validateJobId();
validPartitionFilter();
MetaSourceType metaSourceType = MetaSourceType.valueOf(configuration.get(METADATA_SOURCE_TYPE));
Expand All @@ -118,6 +119,11 @@ public void validate() throws MmaException {
&& dataDestType.equals(DataDestType.MaxCompute)) {
validateHiveToMcCredentials();
validMcAuthType();
} else if (metaSourceType.equals(MetaSourceType.MaxCompute)
&& dataSourceType.equals(DataSourceType.MaxCompute)
&& metaDestType.equals(MetaDestType.MaxCompute)
&& dataDestType.equals(DataDestType.MaxCompute)) {
validateMcToMcCredentials();
} else {
throw new IllegalArgumentException("Unsupported source and dest combination.");
}
Expand Down Expand Up @@ -168,6 +174,13 @@ private void validateHiveToMcCredentials() throws MmaException {
ConfigurationUtils.validateMcDataDest(this);
}

private void validateMcToMcCredentials() throws MmaException {
ConfigurationUtils.validateMcMetaSource(this);
ConfigurationUtils.validateMcDataSource(this);
ConfigurationUtils.validateMcMetaDest(this);
ConfigurationUtils.validateMcDataDest(this);
}

private void validateJobId() throws MmaException {
String jobId = get(JobConfiguration.JOB_ID);
if (!StringUtils.isBlank(jobId) && !JOB_ID_PATTERN.matcher(jobId).matches()) {
Expand Down
1 change: 0 additions & 1 deletion mma-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ limitations under the License.
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-core</artifactId>
<classifier>shaded</classifier>
</dependency>
<dependency>
<groupId>com.aliyun.oss</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ private static void initMmaServerConfigurationSingleton(Path path)
Map<String, String> map = GsonUtils.GSON.fromJson(
json, new TypeToken<Map<String, String>>() {}.getType());
MmaServerConfiguration.setInstance(map);
MetaLoaderConfig.setGlobalMetaLoader(MmaServerConfiguration.getInstance());
// MetaLoaderConfig.setGlobalMetaLoader(MmaServerConfiguration.getInstance());
}

private static void initMmaEventManagerSingleton() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public abstract class AbstractAction<T> implements Action {
private ActionProgress progress;
private Long startTime;
private Long endTime;
// The exception.Message from failed action for users(UI).
private String reason;

Map<Resource, Long> resourceMap;
Future<T> future;
Expand Down Expand Up @@ -135,6 +137,7 @@ public void afterExecution() {
LOG.error("Action failed, actionId: {}, stack trace: {}",
id,
ExceptionUtils.getStackTrace(e));
this.reason = e.getMessage();
setProgress(ActionProgress.FAILED);
}
}
Expand Down Expand Up @@ -190,4 +193,8 @@ public void stop() {
this.future.cancel(true);
}
}

public String getReason() {
return this.reason;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package com.aliyun.odps.mma.server.action;

import java.util.List;
import java.util.Calendar;
import java.util.stream.Collectors;

import com.aliyun.odps.Odps;
import com.aliyun.odps.mma.exception.MmaException;
import com.aliyun.odps.mma.meta.MetaSource.ColumnMetaModel;
import com.aliyun.odps.mma.meta.MetaSource.PartitionMetaModel;
import com.aliyun.odps.mma.server.action.executor.ActionExecutorFactory;
import com.aliyun.odps.task.CopyTask;
import com.aliyun.odps.task.copy.LocalDatasource;
import com.aliyun.odps.task.copy.Datasource.Direction;
import com.aliyun.odps.mma.config.JobConfiguration;

import com.aliyun.odps.mma.server.task.Task;
import com.aliyun.odps.mma.meta.MetaSource.TableMetaModel;
import com.aliyun.odps.mma.server.action.info.CopyTaskActionInfo;
import com.aliyun.odps.task.copy.TunnelDatasource;
import com.aliyun.oss.common.auth.HmacSHA1Signature;
import com.google.gson.JsonObject;
import org.apache.commons.codec.binary.Base64;

public abstract class CopyTaskAction extends AbstractAction<List<List<Object>>> {
private static final String TOKEN_TYPE = "1";
private static final String TOKEN_VERSION = "v1";
private static final String TOKEN_ALGORITHM_ID = "1";

// private static final String ODPS_COPY_STRIP_ENABLED = "odps.copy.strip.enabled";
// compatible: 是否兼容odps新的数据类型,如,struct/array等
private static final String ODPS_COPY_COMPATIBLE_ENABLED = "odps.copy.compatible.enabled";

private final TableMetaModel source;
private final TableMetaModel dest;
private final ActionExecutionContext context;

public CopyTaskAction(
String id,
TableMetaModel source,
TableMetaModel dest,
Task task,
ActionExecutionContext actionExecutionContext) {
super(id, task, actionExecutionContext);
actionInfo = new CopyTaskActionInfo();
this.source = source;
this.dest = dest;
this.context = actionExecutionContext;
}

@Override
void executeInternal() throws Exception {
future = ActionExecutorFactory.getCopyTaskExecutor().execute(
getSrcOdps(),
createCopyTask(getDestOdps(), Direction.EXPORT),
id,
(CopyTaskActionInfo) actionInfo
);
}

public abstract Odps getSrcOdps();

public abstract Odps getDestOdps();

@Override
void handleResult(List<List<Object>> result) {
((CopyTaskActionInfo) actionInfo).setResult(result);
}

@Override
public Object getResult() {
return ((CopyTaskActionInfo) actionInfo).getResult();
}

private CopyTask createCopyTask(Odps destOdps, Direction direction) throws MmaException {
String partitions = getPartitions(source);
TunnelDatasource tunnel = new TunnelDatasource(direction, dest.getDatabase(), dest.getTable(), partitions);
String token = createToken(direction, dest.getDatabase(), dest.getTable());
tunnel.setAccountType("token");
tunnel.setSignature(token);
tunnel.setOdpsEndPoint(destOdps.getEndpoint());

LocalDatasource local = new LocalDatasource(direction, source.getDatabase(), source.getTable(), partitions);
CopyTask task = new CopyTask("copy_task_" + Calendar.getInstance().getTimeInMillis());
/**
* Append(-a)和Overwrite(-o)的语义很明确
* 不过tunnel其实是只支持append操作
* 所以overwrite模式只不过是帮你执行了一下alter table drop partition和add partition的操作。
*/
task.setMode("Append");
task.setLocalInfo(local);
task.SetTunnelInfo(tunnel);

JsonObject params = new JsonObject();
params.addProperty(ODPS_COPY_COMPATIBLE_ENABLED, "false");
// params.addProperty(ODPS_COPY_STRIP_ENABLED, "false");
task.setProperty("settings", params.toString());

return task;
}

private String createToken(Direction direction, String project, String table) throws MmaException {
String grantee = "";
String accessId = context.getConfig().get(JobConfiguration.DATA_DEST_MC_ACCESS_KEY_ID);
String accessKey = context.getConfig().get(JobConfiguration.DATA_DEST_MC_ACCESS_KEY_SECRET);

int day = 60 * 60 * 24;
long expires = System.currentTimeMillis() / 1000 + day;

String policy;
String tableUrl = "projects/" + project + "/" + "tables/" + table;
String projectUrl = "projects/" + project;
String instanceUrl = "projects/" + project + "/instances/*";

switch (direction.toString()) {
case "EXPORT":
policy = "{\"Version\":\"1\",\"Statement\":[{\"Effect\":\"Allow\",\"Action\":[\"odps:Describe\",\"odps:Update\",\"odps:Alter\",\"odps:Drop\",\"odps:Create\"],\"Resource\":\"acs:odps:*:"
+ tableUrl.toLowerCase()
+ "\"},{\"Effect\":\"Allow\",\"Action\":[\"odps:CreateInstance\"],\"Resource\":\"acs:odps:*:"
+ projectUrl.toLowerCase()
+ "\"},{\"Effect\":\"Allow\",\"Action\":[\"odps:Read\"],\"Resource\":\"acs:odps:*:"
+ instanceUrl.toLowerCase() + "\"}]}";
break;
case "IMPORT":
policy = "{\"Version\":\"1\",\"Statement\":[{\"Effect\":\"Allow\",\"Action\":[\"odps:Describe\",\"odps:Select\",\"odps:Download\"],\"Resource\":\"acs:odps:*:"
+ tableUrl.toLowerCase() + "\"}]}";
break;
default:
throw new MmaException("Unknown Direction mode!");
}

String data = TOKEN_ALGORITHM_ID + TOKEN_TYPE + accessId + grantee + expires + policy;
HmacSHA1Signature signer = new HmacSHA1Signature();
String signature = signer.computeSignature(accessKey, data) + "#" + TOKEN_ALGORITHM_ID + "#" + TOKEN_TYPE
+ "#" + accessId + "#" + grantee + "#" + expires + "#" + policy;
String base64Signature = new String(Base64.encodeBase64(signature.getBytes()));

return TOKEN_VERSION + "." + base64Signature;
}

private static String getPartitions(TableMetaModel tableMetaModel) {
StringBuilder partitions = new StringBuilder();
if (!tableMetaModel.getPartitionColumns().isEmpty()) {
List<ColumnMetaModel> columnMetaModels = tableMetaModel.getPartitionColumns();
List<PartitionMetaModel> partitionMetaModels = tableMetaModel.getPartitions();
if (columnMetaModels.isEmpty() || partitionMetaModels.isEmpty()) {
return partitions.toString();
}

List<String> partitionColumns = columnMetaModels.stream().map(ColumnMetaModel::getColumnName).collect(Collectors.toList());
List<String> partitionValues = partitionMetaModels.get(0).getPartitionValues();
for (int i = 0; i < partitionColumns.size(); i++) {
partitions.append(partitionColumns.get(i)).append("=").append(partitionValues.get(i)).append(",");
}
return partitions.deleteCharAt(partitions.length() - 1).toString();
}
return partitions.toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@

package com.aliyun.odps.mma.server.action;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.aliyun.odps.mma.meta.MetaSource;
import com.aliyun.odps.mma.server.resource.Resource;
import com.aliyun.odps.mma.util.McSqlUtils;
import com.aliyun.odps.mma.meta.MetaSource.TableMetaModel;
Expand Down Expand Up @@ -54,8 +57,20 @@ public boolean hasResults() {

@Override
public Map<String, String> getSettings() {
// TODO:
return new HashMap<>();
/**
* 如果create table的schema里包含TINYINT, SMALLINT, INT, CHAR, VARCHAR等2.0数据类型
* 而dest project暂未开启2.0模式,便会导致create table失败
* solution: set odps.sql.type.system.odps2=true
*/
Map<String, String> hints = new HashMap<>();
List<String> columnTypes = Arrays.asList("TINYINT", "SMALLINT", "INT", "CHAR", "VARCHAR");
for (MetaSource.ColumnMetaModel columnMetaModel : tableMetaModel.getColumns()) {
if (columnTypes.contains(columnMetaModel.getType().toUpperCase())) {
hints.put("odps.sql.type.system.odps2", "true");
break;
}
}
return hints;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 1999-2021 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.aliyun.odps.mma.server.action;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.aliyun.odps.mma.util.McSqlUtils;
import com.aliyun.odps.mma.server.action.info.McSqlActionInfo;
import com.aliyun.odps.mma.meta.MetaSource.TableMetaModel;
import com.aliyun.odps.mma.server.task.Task;

public class McInsertOverwriteAction extends McSqlAction {

private TableMetaModel source;
private TableMetaModel dest;

public McInsertOverwriteAction(
String id,
String mcAccessKeyId,
String mcAccessKeySecret,
String mcProject,
String mcEndpoint,
TableMetaModel source,
TableMetaModel dest,
Task task,
ActionExecutionContext context) {
super(id, mcAccessKeyId, mcAccessKeySecret, mcProject, mcEndpoint, task, context);
this.source = source;
this.dest = dest;
}

@Override
void handleResult(List<List<Object>> result) {
((McSqlActionInfo) actionInfo).setResult(result);
}

@Override
public String getSql() {
return McSqlUtils.getInsertOverwriteTableStatement(source, dest);
}

@Override
public boolean hasResults() {
return false;
}

@Override
public Map<String, String> getSettings() {
// TODO:
return new HashMap<>();
}

@Override
public String getName() {
return "Table data transmission";
}

@Override
public List<List<Object>> getResult() {
return ((McSqlActionInfo) actionInfo).getResult();
}
}
Loading