diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java index bbd728a04f7..ab2ca37c89f 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java @@ -25,7 +25,6 @@ import org.apache.commons.lang3.StringUtils; -import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Strings; import com.typesafe.config.Config; @@ -46,7 +45,6 @@ import org.apache.gobblin.runtime.api.SpecExecutor; import org.apache.gobblin.service.ExecutionStatus; import org.apache.gobblin.service.modules.flowgraph.DagNodeId; -import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys; import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys; import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine; import org.apache.gobblin.service.modules.orchestration.DagUtils; @@ -111,8 +109,6 @@ private static JobSpec buildJobSpec(FlowSpec flowSpec, Config jobConfig, Long fl String flowName = ConfigUtils.getString(flowConfig, ConfigurationKeys.FLOW_NAME_KEY, ""); String flowGroup = ConfigUtils.getString(flowConfig, ConfigurationKeys.FLOW_GROUP_KEY, ""); String flowFailureOption = ConfigUtils.getString(flowConfig, ConfigurationKeys.FLOW_FAILURE_OPTION, DagProcessingEngine.DEFAULT_FLOW_FAILURE_OPTION); - String flowInputPath = ConfigUtils.getString(flowConfig, DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX - + "." + DatasetDescriptorConfigKeys.PATH_KEY, ""); Long flowModTime = ConfigUtils.getLong(flowConfig, FlowSpec.MODIFICATION_TIME_KEY, 0L); String jobName = ConfigUtils.getString(jobConfig, ConfigurationKeys.JOB_NAME_KEY, ""); @@ -121,15 +117,9 @@ private static JobSpec buildJobSpec(FlowSpec flowSpec, Config jobConfig, Long fl final int gaasJobExecutionIdHash = gaasJobExecutionId.hashCode(); // Passing the hashCode of the uniqueIdentifier to be used as jobExecutionId for backward compatibility if (!ConfigUtils.getBoolean(jobConfig, JOB_MAINTAIN_JOBNAME, false) || jobName.isEmpty()) { - // Modify the job name to include the flow group, flow name, edge id, and a random string to avoid collisions since - // job names are assumed to be unique within a dag. - int hash = flowInputPath.hashCode(); - jobName = Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup, flowName, jobName, edgeId, hash); - // jobNames are commonly used as a directory name, which is limited to 255 characters - if (jobName.length() >= MAX_JOB_NAME_LENGTH) { - // shorten job length to be 128 characters (flowGroup) + (hashed) flowName, hashCode length - jobName = Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup, flowName.hashCode(), hash); - } + jobName = gaasJobExecutionId; // Assigning jobName with the value of GaaSJobExecutionId + // which is a UUID and unique to avoid collisions + } JobSpec.Builder jobSpecBuilder = JobSpec.builder(jobSpecURIGenerator(flowGroup, jobName, flowSpec)).withConfig(jobConfig) .withDescription(flowSpec.getDescription()).withVersion(flowSpec.getVersion()); diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java index ee025224146..61b70356aa6 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java @@ -237,8 +237,7 @@ public void testCompileFlow() throws URISyntaxException, IOException { Config jobConfig = jobSpec.getConfig(); String flowGroup = "testFlowGroup"; String flowName = "testFlowName"; - String expectedJobName1 = Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR). - join(flowGroup, flowName, "Distcp", "LocalFS-1", "HDFS-1", "localToHdfs"); + String expectedJobName1 = jobSpec.getConfig().getString(ConfigurationKeys.GAAS_JOB_EXEC_ID); String jobName1 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY); Assert.assertTrue(jobName1.startsWith(expectedJobName1)); String from = jobConfig.getString("from"); @@ -267,8 +266,7 @@ public void testCompileFlow() throws URISyntaxException, IOException { DagNode secondHopNode = jobDag.getChildren(startNode).get(0); jobSpecWithExecutor = secondHopNode.getValue(); jobConfig = jobSpecWithExecutor.getJobSpec().getConfig(); - String expectedJobName2 = Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR). - join(flowGroup, flowName, "ConvertToJsonAndEncrypt", "HDFS-1", "HDFS-1", "hdfsConvertToJsonAndEncrypt"); + String expectedJobName2 = jobConfig.getString(ConfigurationKeys.GAAS_JOB_EXEC_ID); String jobName2 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY); Assert.assertTrue(jobName2.startsWith(expectedJobName2)); Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), jobName1); @@ -287,8 +285,7 @@ public void testCompileFlow() throws URISyntaxException, IOException { DagNode thirdHopNode = jobDag.getChildren(secondHopNode).get(0); jobSpecWithExecutor = thirdHopNode.getValue(); jobConfig = jobSpecWithExecutor.getJobSpec().getConfig(); - String expectedJobName3 = Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR). - join(flowGroup, flowName, "Distcp", "HDFS-1", "HDFS-3", "hdfsToHdfs"); + String expectedJobName3 = jobConfig.getString(ConfigurationKeys.GAAS_JOB_EXEC_ID); String jobName3 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY); Assert.assertTrue(jobName3.startsWith(expectedJobName3)); Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), jobName2); @@ -311,8 +308,7 @@ public void testCompileFlow() throws URISyntaxException, IOException { DagNode fourthHopNode = jobDag.getChildren(thirdHopNode).get(0); jobSpecWithExecutor = fourthHopNode.getValue(); jobConfig = jobSpecWithExecutor.getJobSpec().getConfig(); - String expectedJobName4 = Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR). - join(flowGroup, flowName, "DistcpToADL", "HDFS-3", "ADLS-1", "hdfsToAdl"); + String expectedJobName4 = jobConfig.getString(ConfigurationKeys.GAAS_JOB_EXEC_ID); String jobName4 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY); Assert.assertTrue(jobName4.startsWith(expectedJobName4)); Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), jobName3); diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java index ad11d139962..b17c137b665 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java @@ -216,7 +216,7 @@ public void testCreateDagAdhoc() throws Exception { } @Test - public void testCreateDagLongName() throws Exception { + public void testCreateDagLongNameShouldHaveFixedUUIDLength() throws Exception { // flowName and flowGroup are both 128 characters long, the maximum for flowName and flowGroup Config flowConfig = ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "uwXJwZPAPygvmSAfhtrzXL7ovIEKOBZdulBiNIGzaT7vILrK9QB5EDJj0fc4pkgNHuIKZ3d18TZzyH6a9HpaZACwpWpIpf8SYcSfKtXeoF8IJY064BqEUXR32k3ox31G") .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "4mdfSGSv6GoFW7ICWubN2ORK4s5PMTQ60yIWkcbJOVneTSPn12cXT5ueEgij907tjzLlbcjdVjWFITFf9Y5sB9i0EvKGmTbUF98hJGoQlAhmottaipDEFTdbyzt5Loxg") @@ -232,7 +232,7 @@ public void testCreateDagLongName() throws Exception { Dag dag1 = new JobExecutionPlanDagFactory().createDag(Arrays.asList(jobExecutionPlan)); - Assert.assertEquals(dag1.getStartNodes().get(0).getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY).length(), 142); + Assert.assertEquals(dag1.getStartNodes().get(0).getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY).length(), 32); } @Test