Skip to content

Commit be7c1a7

Browse files
[GOBBLIN-2200] Onboard a new job executionId to deprecate Azkaban executionId (#4108)
* created a new unique jobExecutionIdentifier to deprecate Azkaban executionId
1 parent b535daf commit be7c1a7

File tree

6 files changed

+48
-7
lines changed

6 files changed

+48
-7
lines changed

gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1047,6 +1047,12 @@ public class ConfigurationKeys {
10471047
public static final String AZKABAN_FLOW_ID = "azkaban.flow.flowid";
10481048
public static final String AZKABAN_JOB_ID = "azkaban.job.id";
10491049
public static final String AZKABAN_EXEC_ID = "azkaban.flow.execid";
1050+
// Configuration Key for setting a unique job execution identifier in GaaS, the value is a UUID
1051+
public static final String GAAS_JOB_EXEC_ID = "gaas.job.execid";
1052+
1053+
// Configuration Key for storing hash of gaas.job.execid, to be used as jobExecutionId(integer) for backwards compatibility
1054+
public static final String GAAS_JOB_EXEC_ID_HASH = "gaas.job.executionid.hash";
1055+
10501056
public static final String AZKABAN_URL = "azkaban.link.execution.url";
10511057
public static final String AZKABAN_FLOW_URL = "azkaban.link.workflow.url";
10521058
public static final String AZKABAN_JOB_URL = "azkaban.link.job.url";

gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.concurrent.TimeUnit;
3232
import java.util.concurrent.TimeoutException;
3333

34+
import org.apache.commons.lang3.StringUtils;
3435
import org.apache.hadoop.conf.Configuration;
3536
import org.apache.hadoop.security.Credentials;
3637
import org.apache.log4j.Level;
@@ -385,7 +386,11 @@ private boolean isCurrentTimeInRange() {
385386
*/
386387
private static List<? extends Tag<?>> addAdditionalMetadataTags(Properties jobProps) {
387388
List<Tag<?>> metadataTags = Lists.newArrayList();
388-
String jobExecutionId = jobProps.getProperty(AZKABAN_FLOW_EXEC_ID, "");
389+
String jobExecutionId = jobProps.getProperty(ConfigurationKeys.GAAS_JOB_EXEC_ID_HASH, StringUtils.EMPTY);
390+
//As a fallback setting the value of jobExecutionId to Azkaban Flow exec Id if GAAS_JOB_EXEC_ID is not set
391+
if (Strings.isNullOrEmpty(jobExecutionId)) {
392+
jobExecutionId = jobProps.getProperty(AZKABAN_FLOW_EXEC_ID, StringUtils.EMPTY);
393+
}
389394
// Display the proxy URL in the metadata tag if it exists
390395
String jobExecutionUrl = jobProps.getProperty(AZKABAN_LINK_JOBEXEC_PROXY_URL, jobProps.getProperty(AZKABAN_LINK_JOBEXEC_URL, ""));
391396

@@ -407,7 +412,7 @@ private static List<? extends Tag<?>> addAdditionalMetadataTags(Properties jobPr
407412
metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
408413
jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, jobExecutionId)));
409414

410-
//Use azkaban.flow.execid as the jobExecutionId
415+
//Use gaas.job.execid.hash as the jobExecutionId
411416
metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, jobExecutionId));
412417

413418
metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,

gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.net.URI;
2121
import java.net.URISyntaxException;
22+
import java.util.UUID;
2223
import java.util.concurrent.ExecutionException;
2324
import java.util.concurrent.Future;
2425

@@ -116,6 +117,9 @@ private static JobSpec buildJobSpec(FlowSpec flowSpec, Config jobConfig, Long fl
116117

117118
String jobName = ConfigUtils.getString(jobConfig, ConfigurationKeys.JOB_NAME_KEY, "");
118119
String edgeId = ConfigUtils.getString(jobConfig, FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "");
120+
final String gaasJobExecutionId = UUID.randomUUID().toString(); // Creating a unique Identifier for JobExecution
121+
final int gaasJobExecutionIdHash = gaasJobExecutionId.hashCode(); // Passing the hashCode of the uniqueIdentifier to be used as jobExecutionId for backward compatibility
122+
119123
if (!ConfigUtils.getBoolean(jobConfig, JOB_MAINTAIN_JOBNAME, false) || jobName.isEmpty()) {
120124
// Modify the job name to include the flow group, flow name, edge id, and a random string to avoid collisions since
121125
// job names are assumed to be unique within a dag.
@@ -152,6 +156,8 @@ private static JobSpec buildJobSpec(FlowSpec flowSpec, Config jobConfig, Long fl
152156
.withValue(ConfigurationKeys.FLOW_FAILURE_OPTION, ConfigValueFactory.fromAnyRef(flowFailureOption))
153157
.withValue(ConfigurationKeys.FLOW_EDGE_ID_KEY, ConfigValueFactory.fromAnyRef(edgeId))
154158
.withValue(FlowSpec.MODIFICATION_TIME_KEY, ConfigValueFactory.fromAnyRef(flowModTime))
159+
.withValue(ConfigurationKeys.GAAS_JOB_EXEC_ID, ConfigValueFactory.fromAnyRef(gaasJobExecutionId)) // Setting a unique Identifier for jobExecution
160+
.withValue(ConfigurationKeys.GAAS_JOB_EXEC_ID_HASH, ConfigValueFactory.fromAnyRef(gaasJobExecutionIdHash))
155161
);
156162

157163
//Add tracking config to JobSpec.

gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,10 @@
4444
import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
4545
import org.apache.gobblin.service.modules.orchestration.DagManagerMetrics;
4646
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
47+
import org.apache.gobblin.util.ConfigUtils;
48+
import org.apache.commons.lang.StringUtils;
4749
import org.mockito.Mockito;
50+
import org.testng.Assert;
4851
import org.testng.annotations.BeforeMethod;
4952
import org.testng.annotations.Test;
5053

@@ -78,6 +81,27 @@ public void testSubmitNextNodesSuccess() throws URISyntaxException, IOException
7881
Mockito.verifyNoMoreInteractions(dagManagementStateStore);
7982
}
8083

84+
@Test
85+
public void testGaaSJobExecutionIdInjection() throws URISyntaxException, IOException {
86+
Dag.DagId dagId = new Dag.DagId("testFlowGroup", "testFlowName", 2345678);
87+
List<JobExecutionPlan> jobExecutionPlans = getJobExecutionPlans();
88+
List<Dag.DagNode<JobExecutionPlan>> dagNodeList = jobExecutionPlans.stream()
89+
.map(Dag.DagNode<JobExecutionPlan>::new)
90+
.collect(Collectors.toList());
91+
Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
92+
Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any());
93+
DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
94+
// Assertion to test that GaaS job execution Id has been successfully injected
95+
for(JobExecutionPlan jobExecutionPlan : jobExecutionPlans) {
96+
final String gaasJobExecutionId = ConfigUtils.getString(jobExecutionPlan.getJobSpec().getConfig(), ConfigurationKeys.GAAS_JOB_EXEC_ID, StringUtils.EMPTY);
97+
final Long gaasJobExecutionIdHash = Long.parseLong(ConfigUtils.getString(jobExecutionPlan.getJobSpec().getConfig(), ConfigurationKeys.GAAS_JOB_EXEC_ID_HASH, StringUtils.EMPTY));
98+
99+
Assert.assertNotNull(gaasJobExecutionId);
100+
Assert.assertEquals(gaasJobExecutionId.length(), 36);
101+
Assert.assertNotNull(gaasJobExecutionIdHash);
102+
}
103+
}
104+
81105
@Test
82106
public void testWhenSubmitToExecutorSuccess() throws URISyntaxException, IOException {
83107
Dag.DagId dagId = new Dag.DagId("flowGroup1", "flowName1", 2345680);

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@
5959
public class Help {
6060
public static final int MAX_DESERIALIZATION_FS_LOAD_ATTEMPTS = 5;
6161
public static final int LOG_CACHE_STATS_EVERY_N_ACCESSES = 1000;
62-
public static final String AZKABAN_FLOW_EXEC_ID_KEY = "azkaban.flow.execid";
6362
public static final String USER_TO_PROXY_KEY = "user.to.proxy";
6463
public static final String USER_TO_PROXY_SEARCH_KEY = "userToProxy";
6564
public static final String GAAS_FLOW_ID_SEARCH_KEY = "gaasFlowIdSearchKey";
@@ -104,9 +103,9 @@ public static String calcPerExecQualifierWithOptFlowExecId(FileSystemJobStateful
104103
public static String calcPerExecQualifier(Config workerConfig) {
105104
String userToProxy = workerConfig.hasPath(USER_TO_PROXY_KEY)
106105
? workerConfig.getString(USER_TO_PROXY_KEY) : "";
107-
String azFlowExecId = workerConfig.hasPath(AZKABAN_FLOW_EXEC_ID_KEY)
108-
? workerConfig.getString(AZKABAN_FLOW_EXEC_ID_KEY) : UUID.randomUUID().toString();
109-
return userToProxy + "_" + azFlowExecId;
106+
String gaasFlowExecId = workerConfig.hasPath(ConfigurationKeys.GAAS_JOB_EXEC_ID)
107+
? workerConfig.getString(ConfigurationKeys.GAAS_JOB_EXEC_ID) : UUID.randomUUID().toString();
108+
return userToProxy + "_" + gaasFlowExecId;
110109
}
111110

112111
public static FileSystem loadFileSystem(FileSystemApt a) throws IOException {
@@ -279,4 +278,4 @@ public static void reportTroubleshooterIssues(AutomaticTroubleshooter troublesho
279278
troubleshooter.logIssueSummary();
280279
troubleshooter.reportJobIssuesAsEvents(eventSubmitter);
281280
}
282-
}
281+
}

gobblin-utility/src/main/java/org/apache/gobblin/util/AzkabanTags.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public class AzkabanTags {
3838
.put(ConfigurationKeys.AZKABAN_FLOW_ID, "azkabanFlowId")
3939
.put(ConfigurationKeys.AZKABAN_JOB_ID, "azkabanJobId")
4040
.put(ConfigurationKeys.AZKABAN_EXEC_ID, "azkabanExecId")
41+
.put(ConfigurationKeys.GAAS_JOB_EXEC_ID, "gaasJobExecId")
4142
.put(ConfigurationKeys.AZKABAN_URL, "azkabanURL")
4243
.put(ConfigurationKeys.AZKABAN_FLOW_URL, "azkabanFlowURL")
4344
.put(ConfigurationKeys.AZKABAN_JOB_URL, "azkabanJobURL")

0 commit comments

Comments
 (0)