Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

import org.apache.commons.lang3.StringUtils;

import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Strings;
import com.typesafe.config.Config;
Expand All @@ -46,7 +45,6 @@
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
import org.apache.gobblin.service.modules.orchestration.DagUtils;
Expand Down Expand Up @@ -111,8 +109,6 @@ private static JobSpec buildJobSpec(FlowSpec flowSpec, Config jobConfig, Long fl
String flowName = ConfigUtils.getString(flowConfig, ConfigurationKeys.FLOW_NAME_KEY, "");
String flowGroup = ConfigUtils.getString(flowConfig, ConfigurationKeys.FLOW_GROUP_KEY, "");
String flowFailureOption = ConfigUtils.getString(flowConfig, ConfigurationKeys.FLOW_FAILURE_OPTION, DagProcessingEngine.DEFAULT_FLOW_FAILURE_OPTION);
String flowInputPath = ConfigUtils.getString(flowConfig, DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX
+ "." + DatasetDescriptorConfigKeys.PATH_KEY, "");
Long flowModTime = ConfigUtils.getLong(flowConfig, FlowSpec.MODIFICATION_TIME_KEY, 0L);

String jobName = ConfigUtils.getString(jobConfig, ConfigurationKeys.JOB_NAME_KEY, "");
Expand All @@ -121,15 +117,9 @@ private static JobSpec buildJobSpec(FlowSpec flowSpec, Config jobConfig, Long fl
final int gaasJobExecutionIdHash = gaasJobExecutionId.hashCode(); // Passing the hashCode of the uniqueIdentifier to be used as jobExecutionId for backward compatibility

if (!ConfigUtils.getBoolean(jobConfig, JOB_MAINTAIN_JOBNAME, false) || jobName.isEmpty()) {
// Modify the job name to include the flow group, flow name, edge id, and a random string to avoid collisions since
// job names are assumed to be unique within a dag.
int hash = flowInputPath.hashCode();
jobName = Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup, flowName, jobName, edgeId, hash);
// jobNames are commonly used as a directory name, which is limited to 255 characters
if (jobName.length() >= MAX_JOB_NAME_LENGTH) {
// shorten job length to be 128 characters (flowGroup) + (hashed) flowName, hashCode length
jobName = Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup, flowName.hashCode(), hash);
}
jobName = gaasJobExecutionId; // Assigning jobName with the value of GaaSJobExecutionId
// which is a UUID and unique to avoid collisions

}
JobSpec.Builder jobSpecBuilder = JobSpec.builder(jobSpecURIGenerator(flowGroup, jobName, flowSpec)).withConfig(jobConfig)
.withDescription(flowSpec.getDescription()).withVersion(flowSpec.getVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,7 @@ public void testCompileFlow() throws URISyntaxException, IOException {
Config jobConfig = jobSpec.getConfig();
String flowGroup = "testFlowGroup";
String flowName = "testFlowName";
String expectedJobName1 = Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
join(flowGroup, flowName, "Distcp", "LocalFS-1", "HDFS-1", "localToHdfs");
String expectedJobName1 = jobSpec.getConfig().getString(ConfigurationKeys.GAAS_JOB_EXEC_ID);
String jobName1 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
Assert.assertTrue(jobName1.startsWith(expectedJobName1));
String from = jobConfig.getString("from");
Expand Down Expand Up @@ -267,8 +266,7 @@ public void testCompileFlow() throws URISyntaxException, IOException {
DagNode<JobExecutionPlan> secondHopNode = jobDag.getChildren(startNode).get(0);
jobSpecWithExecutor = secondHopNode.getValue();
jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
String expectedJobName2 = Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
join(flowGroup, flowName, "ConvertToJsonAndEncrypt", "HDFS-1", "HDFS-1", "hdfsConvertToJsonAndEncrypt");
String expectedJobName2 = jobConfig.getString(ConfigurationKeys.GAAS_JOB_EXEC_ID);
String jobName2 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
Assert.assertTrue(jobName2.startsWith(expectedJobName2));
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), jobName1);
Expand All @@ -287,8 +285,7 @@ public void testCompileFlow() throws URISyntaxException, IOException {
DagNode<JobExecutionPlan> thirdHopNode = jobDag.getChildren(secondHopNode).get(0);
jobSpecWithExecutor = thirdHopNode.getValue();
jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
String expectedJobName3 = Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
join(flowGroup, flowName, "Distcp", "HDFS-1", "HDFS-3", "hdfsToHdfs");
String expectedJobName3 = jobConfig.getString(ConfigurationKeys.GAAS_JOB_EXEC_ID);
String jobName3 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
Assert.assertTrue(jobName3.startsWith(expectedJobName3));
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), jobName2);
Expand All @@ -311,8 +308,7 @@ public void testCompileFlow() throws URISyntaxException, IOException {
DagNode<JobExecutionPlan> fourthHopNode = jobDag.getChildren(thirdHopNode).get(0);
jobSpecWithExecutor = fourthHopNode.getValue();
jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
String expectedJobName4 = Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
join(flowGroup, flowName, "DistcpToADL", "HDFS-3", "ADLS-1", "hdfsToAdl");
String expectedJobName4 = jobConfig.getString(ConfigurationKeys.GAAS_JOB_EXEC_ID);
String jobName4 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
Assert.assertTrue(jobName4.startsWith(expectedJobName4));
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), jobName3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public void testCreateDagAdhoc() throws Exception {
}

@Test
public void testCreateDagLongName() throws Exception {
public void testCreateDagLongNameShouldHaveFixedUUIDLength() throws Exception {
// flowName and flowGroup are both 128 characters long, the maximum for flowName and flowGroup
Config flowConfig = ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "uwXJwZPAPygvmSAfhtrzXL7ovIEKOBZdulBiNIGzaT7vILrK9QB5EDJj0fc4pkgNHuIKZ3d18TZzyH6a9HpaZACwpWpIpf8SYcSfKtXeoF8IJY064BqEUXR32k3ox31G")
.addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "4mdfSGSv6GoFW7ICWubN2ORK4s5PMTQ60yIWkcbJOVneTSPn12cXT5ueEgij907tjzLlbcjdVjWFITFf9Y5sB9i0EvKGmTbUF98hJGoQlAhmottaipDEFTdbyzt5Loxg")
Expand All @@ -232,7 +232,7 @@ public void testCreateDagLongName() throws Exception {

Dag<JobExecutionPlan> dag1 = new JobExecutionPlanDagFactory().createDag(Arrays.asList(jobExecutionPlan));

Assert.assertEquals(dag1.getStartNodes().get(0).getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY).length(), 142);
Assert.assertEquals(dag1.getStartNodes().get(0).getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY).length(), 32);
}

@Test
Expand Down
Loading