Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public abstract class AbstractConfiguration implements Map<String, String>, Conf
public static final String JOB_MAX_ATTEMPT_TIMES = "mma.job.max.attempt.times";
public static final String JOB_MAX_ATTEMPT_TIMES_DEFAULT_VALUE = "3";
public static final String JOB_EXECUTION_MC_PROJECT = "mma.job.execution.mc.project";
public static final String JOB_EXECUTION_MC_SETTINGS = "mma.job.execution.mc.settings";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

和hive的对齐比较好:mma.data.source.hive.settings.transmission

public static final String JOB_NUM_DATA_WORKER = "mma.job.num.data.worker";
public static final String JOB_NUM_DATA_WORKER_DEFAULT_VALUE = "5";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;

import com.aliyun.odps.Odps;
Expand All @@ -43,12 +46,25 @@

public class ConfigurationUtils {

public static Map<String, String> getSQLSettings(String config) throws MmaException {
Map<String, String> sqlSettings = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

用一个immutable map更好些

if (!StringUtils.isBlank(config)) {
for (String s : config.split(";")) {
String[] kv = s.split("=");
if (kv.length != 2) {
throw new MmaException("Unsupported SQL setting format: " + s);
}
sqlSettings.put(kv[0].trim(), kv[1].trim());
}
}
return sqlSettings;
}

public static class PartitionComparator implements Comparator<List<String>> {

List<PartitionOrderType> partitionOrders;

public PartitionComparator(List<PartitionOrderType> partitionOrders){
public PartitionComparator(List<PartitionOrderType> partitionOrders) {
this.partitionOrders = partitionOrders;
}

Expand Down Expand Up @@ -96,7 +112,8 @@ public static void validateMcMetaSource(AbstractConfiguration config) throws Mma
getCannotBeNullOrEmptyErrorMessage(AbstractConfiguration.METADATA_SOURCE_MC_ACCESS_KEY_ID));
String accessKeySecret = Validate.notBlank(
config.get(AbstractConfiguration.METADATA_SOURCE_MC_ACCESS_KEY_SECRET),
getCannotBeNullOrEmptyErrorMessage(AbstractConfiguration.METADATA_SOURCE_MC_ACCESS_KEY_SECRET));
getCannotBeNullOrEmptyErrorMessage(
AbstractConfiguration.METADATA_SOURCE_MC_ACCESS_KEY_SECRET));
validateMcCredentials(endpoint, accessKeyId, accessKeySecret);
}

Expand Down Expand Up @@ -155,7 +172,8 @@ public static void validateMcMetaDest(AbstractConfiguration config) throws MmaEx
getCannotBeNullOrEmptyErrorMessage(AbstractConfiguration.METADATA_DEST_MC_ACCESS_KEY_ID));
String accessKeySecret = Validate.notBlank(
config.get(AbstractConfiguration.METADATA_DEST_MC_ACCESS_KEY_SECRET),
getCannotBeNullOrEmptyErrorMessage(AbstractConfiguration.METADATA_DEST_MC_ACCESS_KEY_SECRET));
getCannotBeNullOrEmptyErrorMessage(
AbstractConfiguration.METADATA_DEST_MC_ACCESS_KEY_SECRET));
validateMcCredentials(endpoint, accessKeyId, accessKeySecret);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/*
* Copyright 1999-2021 Alibaba Group Holding Ltd.
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -16,10 +16,14 @@

package com.aliyun.odps.mma.server.action;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.aliyun.odps.mma.config.AbstractConfiguration;
import com.aliyun.odps.mma.config.ConfigurationUtils;
import com.aliyun.odps.mma.config.JobConfiguration;
import com.aliyun.odps.mma.exception.MmaException;
import com.aliyun.odps.mma.meta.MetaSource;
import com.aliyun.odps.mma.util.McSqlUtils;
import com.aliyun.odps.mma.server.action.info.McSqlActionInfo;
import com.aliyun.odps.mma.meta.MetaSource.TableMetaModel;
Expand All @@ -29,6 +33,7 @@ public class McToMcTableDataTransmissionAction extends McSqlAction {

private TableMetaModel source;
private TableMetaModel dest;
private Map<String, String> settings;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

放在Task层面解析好一些,后续如果mc -> oss或oss -> mc需要加上数据验证的步骤,这个settings是需要复用的


public McToMcTableDataTransmissionAction(
String id,
Expand All @@ -39,10 +44,14 @@ public McToMcTableDataTransmissionAction(
TableMetaModel source,
TableMetaModel dest,
Task task,
ActionExecutionContext context) {
ActionExecutionContext context) throws MmaException {
super(id, mcAccessKeyId, mcAccessKeySecret, mcProject, mcEndpoint, task, context);
this.source = source;
this.dest = dest;

JobConfiguration config = context.getConfig();
settings = ConfigurationUtils.getSQLSettings(
config.get(AbstractConfiguration.JOB_EXECUTION_MC_SETTINGS));
}

@Override
Expand All @@ -62,8 +71,7 @@ public boolean hasResults() {

@Override
public Map<String, String> getSettings() {
// TODO:
return new HashMap<>();
return settings;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.commons.lang3.StringUtils;

import com.aliyun.odps.mma.config.AbstractConfiguration;
import com.aliyun.odps.mma.config.ConfigurationUtils;
import com.aliyun.odps.mma.config.JobConfiguration;
import com.aliyun.odps.mma.exception.MmaException;
import com.aliyun.odps.mma.server.action.ActionExecutionContext;
Expand Down Expand Up @@ -53,17 +54,8 @@ private void init() throws MmaException {
config.get(JobConfiguration.DEST_CATALOG_NAME));
ActionExecutionContext context = new ActionExecutionContext(config);

Map<String, String> userHiveSettingsMap = new HashMap<>();
String userHiveSettings = config.get(AbstractConfiguration.DATA_SOURCE_HIVE_RUNTIME_CONFIG);
if (!StringUtils.isBlank(userHiveSettings)) {
for (String s : userHiveSettings.split(";")) {
String[] kv = s.split("=");
if (kv.length != 2) {
throw new MmaException("Unsupported Hive setting format: " + s);
}
userHiveSettingsMap.put(kv[0].trim(), kv[1].trim());
}
}
Map<String, String> userHiveSettingsMap = ConfigurationUtils.getSQLSettings(
config.get(AbstractConfiguration.DATA_SOURCE_HIVE_RUNTIME_CONFIG));

HiveToMcTableDataTransmissionAction dataTransmissionAction =
new HiveToMcTableDataTransmissionAction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.List;

import com.aliyun.odps.mma.config.JobConfiguration;
import com.aliyun.odps.mma.exception.MmaException;
import com.aliyun.odps.mma.server.action.ActionExecutionContext;
import com.aliyun.odps.mma.server.action.McToMcTableDataTransmissionAction;
import com.aliyun.odps.mma.server.job.Job;
Expand All @@ -33,12 +34,12 @@ public McToOssTableDataTransmissionTask(
TableMetaModel source,
TableMetaModel dest,
Job job,
List<Job> subJobs) {
List<Job> subJobs) throws MmaException {
super(id, rootJobId, config, source, dest, job, subJobs);
init();
}

private void init() {
private void init() throws MmaException {
ActionExecutionContext context = new ActionExecutionContext(config);
String executionProject = config.getOrDefault(
JobConfiguration.JOB_EXECUTION_MC_PROJECT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.List;

import com.aliyun.odps.mma.config.JobConfiguration;
import com.aliyun.odps.mma.exception.MmaException;
import com.aliyun.odps.mma.server.action.ActionExecutionContext;
import com.aliyun.odps.mma.server.action.McToMcTableDataTransmissionAction;
import com.aliyun.odps.mma.server.job.Job;
Expand All @@ -33,12 +34,12 @@ public OssToMcTableDataTransmissionTask(
TableMetaModel ossTableMetaModel,
TableMetaModel mcTableMetaModel,
Job job,
List<Job> subJobs) {
List<Job> subJobs) throws MmaException {
super(id, rootJobId, config, ossTableMetaModel, mcTableMetaModel, job, subJobs);
init();
}

private void init() {
private void init() throws MmaException {
ActionExecutionContext context = new ActionExecutionContext(config);
String executionProject = config.getOrDefault(
JobConfiguration.JOB_EXECUTION_MC_PROJECT,
Expand Down