Skip to content

Commit 8d5589d

Browse files
committed
add a new status SKIPPED for skipped jobs and flows
fix merge conflicts add tests
1 parent f40bb44 commit 8d5589d

File tree

20 files changed

+338
-35
lines changed

20 files changed

+338
-35
lines changed

gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,7 @@ private void runJobExecutionLauncher() throws JobException {
279279

280280
try {
281281
if (planningJobIdFromStore.isPresent() && !canRun(planningJobIdFromStore.get(), planningJobHelixManager)) {
282+
// todo it should emit SKIPPED_JOB event that sets the job status SKIPPED rather than CANCELLED
282283
TimingEvent timer = new TimingEvent(eventSubmitter, TimingEvent.JOB_SKIPPED_TIME);
283284
HashMap<String, String> metadata = new HashMap<>(Tag.toMap(Tag.tagValuesToString(
284285
HelixUtils.initBaseEventTags(jobProps, Lists.newArrayList()))));

gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public static class LauncherTimings {
4343
public static final String JOB_PENDING_RESUME = "JobPendingResume";
4444
public static final String JOB_ORCHESTRATED = "JobOrchestrated";
4545
public static final String JOB_PREPARE = "JobPrepareTimer";
46+
public static final String JOB_SKIPPED = "JobSkipped";
4647
public static final String JOB_START = "JobStartTimer";
4748
public static final String JOB_RUN = "JobRunTimer";
4849
public static final String JOB_COMMIT = "JobCommitTimer";

gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ public void testProcessMessageForSkippedFlow() throws IOException, ReflectiveOpe
310310
ImmutableList.of(
311311
createFlowCompiledEvent(),
312312
createJobOrchestratedEvent(1, 2),
313-
createJobSkippedEvent()
313+
createJobSkippedTimeEvent()
314314
).forEach(event -> {
315315
context.submitEvent(event);
316316
kafkaReporter.report();
@@ -838,6 +838,40 @@ public void testObservabilityEventFlowFailed() throws IOException, ReflectiveOpe
838838
jobStatusMonitor.shutDown();
839839
}
840840

841+
@Test
842+
public void testProcessMessageForSkippedEvent() throws IOException, ReflectiveOperationException {
843+
DagManagementStateStore dagManagementStateStore = mock(DagManagementStateStore.class);
844+
KafkaEventReporter kafkaReporter = builder.build("localhost:0000", "topic8");
845+
846+
//Submit GobblinTrackingEvents to Kafka
847+
ImmutableList.of(
848+
createJobSkippedEvent()
849+
).forEach(event -> {
850+
context.submitEvent(event);
851+
kafkaReporter.report();
852+
});
853+
854+
try {
855+
Thread.sleep(1000);
856+
} catch(InterruptedException ex) {
857+
Thread.currentThread().interrupt();
858+
}
859+
860+
MockKafkaAvroJobStatusMonitor jobStatusMonitor = createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
861+
ConfigFactory.empty(), new NoopGaaSJobObservabilityEventProducer(), dagManagementStateStore);
862+
jobStatusMonitor.buildMetricsContextAndMetrics();
863+
Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator = Iterators.transform(
864+
this.kafkaTestHelper.getIteratorForTopic(TOPIC),
865+
this::convertMessageAndMetadataToDecodableKafkaRecord);
866+
867+
State state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
868+
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.SKIPPED.name());
869+
Mockito.verify(dagManagementStateStore, Mockito.times(1)).addJobDagAction(
870+
any(), any(), anyLong(), any(), eq(DagActionStore.DagActionType.REEVALUATE));
871+
872+
jobStatusMonitor.shutDown();
873+
}
874+
841875
private State getNextJobStatusState(MockKafkaAvroJobStatusMonitor jobStatusMonitor, Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator,
842876
String jobGroup, String jobName) throws IOException {
843877
jobStatusMonitor.processMessage(recordIterator.next());
@@ -873,11 +907,15 @@ private GobblinTrackingEvent createJobOrchestratedEvent(int currentAttempt, int
873907
return createGTE(TimingEvent.LauncherTimings.JOB_ORCHESTRATED, metadata);
874908
}
875909

910+
private GobblinTrackingEvent createJobSkippedEvent() {
911+
return createGTE(TimingEvent.LauncherTimings.JOB_SKIPPED, Maps.newHashMap());
912+
}
913+
876914
private GobblinTrackingEvent createJobStartEvent() {
877915
return createGTE(TimingEvent.LauncherTimings.JOB_START, Maps.newHashMap());
878916
}
879917

880-
private GobblinTrackingEvent createJobSkippedEvent() {
918+
private GobblinTrackingEvent createJobSkippedTimeEvent() {
881919
return createGTE(TimingEvent.JOB_SKIPPED_TIME, Maps.newHashMap());
882920
}
883921

gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdl

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,9 @@ enum ExecutionStatus {
4949
* Flow cancelled.
5050
*/
5151
CANCELLED
52+
53+
/**
54+
* Flow or job is skipped
55+
*/
56+
SKIPPED
5257
}

gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
"name" : "ExecutionStatus",
1414
"namespace" : "org.apache.gobblin.service",
1515
"doc" : "Execution status for a flow or job",
16-
"symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "PENDING_RESUME", "ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED" ],
16+
"symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "PENDING_RESUME", "ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED", "SKIPPED" ],
1717
"symbolDocs" : {
1818
"CANCELLED" : "Flow cancelled.",
1919
"COMPILED" : "Flow compiled to jobs.",
@@ -23,7 +23,8 @@
2323
"PENDING" : "Flow or job is in pending state.",
2424
"PENDING_RESUME" : "Flow or job is currently resuming.",
2525
"PENDING_RETRY" : "Flow or job is pending retry.",
26-
"RUNNING" : "Flow or job is currently executing"
26+
"RUNNING" : "Flow or job is currently executing.",
27+
"SKIPPED" : "Flow or job is skipped."
2728
}
2829
}, {
2930
"type" : "record",

gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
"name" : "ExecutionStatus",
1414
"namespace" : "org.apache.gobblin.service",
1515
"doc" : "Execution status for a flow or job",
16-
"symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "PENDING_RESUME", "ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED" ],
16+
"symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "PENDING_RESUME", "ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED", "SKIPPED" ],
1717
"symbolDocs" : {
1818
"CANCELLED" : "Flow cancelled.",
1919
"COMPILED" : "Flow compiled to jobs.",
@@ -23,7 +23,8 @@
2323
"PENDING" : "Flow or job is in pending state.",
2424
"PENDING_RESUME" : "Flow or job is currently resuming.",
2525
"PENDING_RETRY" : "Flow or job is pending retry.",
26-
"RUNNING" : "Flow or job is currently executing"
26+
"RUNNING" : "Flow or job is currently executing.",
27+
"SKIPPED" : "Flow or job is skipped."
2728
}
2829
}, {
2930
"type" : "record",

gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
@Slf4j
4242
public class FlowStatusGenerator {
4343
public static final List<String> FINISHED_STATUSES = Lists.newArrayList(ExecutionStatus.FAILED.name(),
44-
ExecutionStatus.COMPLETE.name(), ExecutionStatus.CANCELLED.name());
44+
ExecutionStatus.COMPLETE.name(), ExecutionStatus.CANCELLED.name(), ExecutionStatus.SKIPPED.name());
4545
public static final int MAX_LOOKBACK = 100;
4646

4747
private final JobStatusRetriever jobStatusRetriever;

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@ public interface DagManagementStateStore {
9999
* {@link DagManagementStateStore#addDag}. This call is just an additional identifier which may be used
100100
* for DagNode level operations. In the future, it may be merged with checkpointDag.
101101
* @param dagNode dag node to be added
102-
* @param dagId dag id of the dag this dag node belongs to
103102
*/
104103
void updateDagNode(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException;
105104

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,8 @@ public static Set<DagNode<JobExecutionPlan>> getNext(Dag<JobExecutionPlan> dag)
200200
DagNode<JobExecutionPlan> node = nodesToExpand.poll();
201201
ExecutionStatus executionStatus = getExecutionStatus(node);
202202
boolean addFlag = true;
203-
if (executionStatus == PENDING || executionStatus == PENDING_RETRY || executionStatus == PENDING_RESUME) {
203+
if (executionStatus == PENDING || executionStatus == PENDING_RETRY || executionStatus == PENDING_RESUME ||
204+
executionStatus == SKIPPED) {
204205
//Add a node to be executed next, only if all of its parent nodes are COMPLETE.
205206
List<DagNode<JobExecutionPlan>> parentNodes = dag.getParents(node);
206207
for (DagNode<JobExecutionPlan> parentNode : parentNodes) {

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737

3838
import org.apache.gobblin.configuration.ConfigurationKeys;
3939
import org.apache.gobblin.metrics.GobblinTrackingEvent;
40-
import org.apache.gobblin.metrics.event.EventSubmitter;
4140
import org.apache.gobblin.metrics.event.TimingEvent;
4241
import org.apache.gobblin.runtime.api.JobSpec;
4342
import org.apache.gobblin.runtime.api.Spec;
@@ -157,7 +156,7 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat
157156
log.info("Submitted job {} for dagId {}", DagManagerUtils.getJobName(dagNode), dagId);
158157
}
159158

160-
public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel, DagManagementStateStore dagManagementStateStore) throws IOException {
159+
public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel) throws IOException {
161160
Properties cancelJobArgs = new Properties();
162161
String serializedFuture = null;
163162

@@ -174,6 +173,7 @@ public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel,
174173
} else {
175174
log.warn("No Job future when canceling DAG node - {}", dagNodeToCancel.getValue().getId());
176175
}
176+
177177
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(), cancelJobArgs).get();
178178
sendJobCancellationEvent(dagNodeToCancel);
179179
log.info("Cancelled dag node {}, spec_producer_future {}", dagNodeToCancel.getValue().getId(), serializedFuture);
@@ -182,12 +182,34 @@ public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel,
182182
}
183183
}
184184

185-
public static void cancelDag(Dag<JobExecutionPlan> dag, DagManagementStateStore dagManagementStateStore) throws IOException {
185+
/**
186+
* Emits JOB_SKIPPED GTE for each of the dependent job.
187+
*/
188+
public static void sendSkippedEventForDependentJobs(Dag<JobExecutionPlan> dag, Dag.DagNode<JobExecutionPlan> node) {
189+
Set<Dag.DagNode<JobExecutionPlan>> dependentJobs = new HashSet<>();
190+
findDependentJobs(dag, node, dependentJobs);
191+
for (Dag.DagNode<JobExecutionPlan> dependentJob : dependentJobs) {
192+
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), dependentJob.getValue());
193+
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_SKIPPED).stop(jobMetadata);
194+
}
195+
}
196+
197+
private static void findDependentJobs(Dag<JobExecutionPlan> dag,
198+
Dag.DagNode<JobExecutionPlan> node, Set<Dag.DagNode<JobExecutionPlan>> dependentJobs) {
199+
for (Dag.DagNode<JobExecutionPlan> child : dag.getChildren(node)) {
200+
if (!dependentJobs.contains(child)) {
201+
dependentJobs.add(child);
202+
findDependentJobs(dag, child, dependentJobs);
203+
}
204+
}
205+
}
206+
207+
public static void cancelDag(Dag<JobExecutionPlan> dag) throws IOException {
186208
List<Dag.DagNode<JobExecutionPlan>> dagNodesToCancel = dag.getNodes();
187209
log.info("Found {} DagNodes to cancel (DagId {}).", dagNodesToCancel.size(), DagManagerUtils.generateDagId(dag));
188210

189211
for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
190-
DagProcUtils.cancelDagNode(dagNodeToCancel, dagManagementStateStore);
212+
DagProcUtils.cancelDagNode(dagNodeToCancel);
191213
}
192214
}
193215

@@ -201,7 +223,7 @@ private static void sendJobCancellationEvent(Dag.DagNode<JobExecutionPlan> dagNo
201223
* Sets {@link Dag#flowEvent} and emits a {@link GobblinTrackingEvent} of the provided
202224
* flow event type.
203225
*/
204-
public static void setAndEmitFlowEvent(EventSubmitter eventSubmitter, Dag<JobExecutionPlan> dag, String flowEvent) {
226+
public static void setAndEmitFlowEvent(Dag<JobExecutionPlan> dag, String flowEvent) {
205227
if (!dag.isEmpty()) {
206228
// Every dag node will contain the same flow metadata
207229
Config config = DagManagerUtils.getDagJobConfig(dag);
@@ -212,7 +234,7 @@ public static void setAndEmitFlowEvent(EventSubmitter eventSubmitter, Dag<JobExe
212234
flowMetadata.put(TimingEvent.METADATA_MESSAGE, dag.getMessage());
213235
}
214236

215-
eventSubmitter.getTimingEvent(flowEvent).stop(flowMetadata);
237+
DagProc.eventSubmitter.getTimingEvent(flowEvent).stop(flowMetadata);
216238
}
217239
}
218240

0 commit comments

Comments
 (0)