Skip to content

Commit 7a377af

Browse files
thisisArjitWill-Lo
authored andcommitted
Emit job metrics at RM level (#4143)
1 parent 794124d commit 7a377af

File tree

4 files changed

+46
-0
lines changed

4 files changed

+46
-0
lines changed

gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ public class ConfigurationKeys {
184184
public static final String FLOW_UNSCHEDULE_KEY = "flow.unschedule";
185185
public static final String FLOW_OWNING_GROUP_KEY = "flow.owningGroup";
186186
public static final String FLOW_SPEC_EXECUTOR = "flow.edge.specExecutors";
187+
public static final String RM_HOST_KEY = "hadoop.resource.manager.rpc";
187188

188189
/**
189190
* Common topology configuration properties.

gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ public static class FlowEventConstants {
9292
public static final String JOB_TAG_FIELD = "jobTag";
9393
public static final String JOB_EXECUTION_ID_FIELD = "jobExecutionId";
9494
public static final String SPEC_EXECUTOR_FIELD = "specExecutor";
95+
public static final String RM_HOST_FIELD = "rmHost";
9596
public static final String LOW_WATERMARK_FIELD = "lowWatermark";
9697
public static final String HIGH_WATERMARK_FIELD = "highWatermark";
9798
public static final String PROCESSED_COUNT_FIELD = "processedCount";

gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ public class ServiceMetricNames {
3030
public static final String DATA_QUALITY_NON_EVALUATED_FILE_COUNT = "dataQualityNonEvaluatedFileCount";
3131
public static final String DATA_QUALITY_BYTES_READ = "dataQualityBytesRead";
3232
public static final String DATA_QUALITY_BYTES_WRITTEN = "dataQualityBytesWritten";
33+
// RM metric names
34+
public static final String RM_JOB_OBSERVED_COUNT = "rmJobObservedCount";
3335

3436
// Flow Compilation Meters and Timer
3537
public static final String FLOW_COMPILATION_SUCCESSFUL_METER = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowCompilation.successful";

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import org.apache.hadoop.fs.FileSystem;
3333
import org.apache.hadoop.fs.Path;
3434

35+
import io.opentelemetry.api.common.Attributes;
36+
import io.opentelemetry.api.metrics.Meter;
3537
import lombok.Data;
3638
import lombok.extern.slf4j.Slf4j;
3739

@@ -53,6 +55,9 @@
5355
import org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
5456
import org.apache.gobblin.initializer.Initializer;
5557
import org.apache.gobblin.destination.DestinationDatasetHandlerService;
58+
import org.apache.gobblin.metrics.OpenTelemetryMetrics;
59+
import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
60+
import org.apache.gobblin.metrics.ServiceMetricNames;
5661
import org.apache.gobblin.metrics.event.EventSubmitter;
5762
import org.apache.gobblin.metrics.event.TimingEvent;
5863
import org.apache.gobblin.runtime.AbstractJobLauncher;
@@ -80,6 +85,8 @@
8085
import org.apache.gobblin.writer.initializer.WriterInitializer;
8186
import org.apache.gobblin.writer.initializer.WriterInitializerFactory;
8287

88+
import static org.apache.gobblin.runtime.JobState.GAAS_OBSERVABILITY_METRICS_GROUPNAME;
89+
8390

8491

8592
@Slf4j
@@ -142,6 +149,8 @@ public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, EventSubmi
142149
// TODO: provide for job cancellation (unless handling at the temporal-level of parent workflows)!
143150
JobState jobState = new JobState(jobProps);
144151
log.info("Created jobState: {}", jobState.toJsonString(true));
152+
// emit jobs observed at RM level
153+
emitMetrics(jobState);
145154

146155
int heartBeatInterval = JobStateUtils.getHeartBeatInterval(jobState);
147156
heartBeatExecutor.scheduleAtFixedRate(() -> activityExecutionContext.heartbeat("Running GenerateWorkUnits"),
@@ -354,4 +363,37 @@ protected static EventTimer createWorkPreparedSizeDistillationTimer(
354363
public static int getConfiguredNumSizeSummaryQuantiles(State state) {
355364
return state.getPropAsInt(GenerateWorkUnits.NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES, GenerateWorkUnits.DEFAULT_NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES);
356365
}
366+
367+
/**
368+
* Emit metrics to indicate jobs observed at RM level
369+
* @param jobState job state
370+
*/
371+
private void emitMetrics(JobState jobState) {
372+
try {
373+
OpenTelemetryMetricsBase otelMetrics = OpenTelemetryMetrics.getInstance(jobState);
374+
if (otelMetrics == null) {
375+
log.warn("OpenTelemetry metrics instance is null, skipping metrics emission");
376+
return;
377+
}
378+
379+
Meter meter = otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME);
380+
Attributes tags = getEventAttributes(jobState);
381+
log.info("Emitting metrics for job: {}", jobState.getJobName());
382+
String jobMetricDescription = "Number of Jobs observed on RM";
383+
String jobMetricName = ServiceMetricNames.RM_JOB_OBSERVED_COUNT;
384+
meter.counterBuilder(jobMetricName).setDescription(jobMetricDescription).build().add(1, tags);
385+
} catch (Exception e) {
386+
log.error("Error in emitMetrics for job: {}", jobState.getJobName(), e);
387+
}
388+
}
389+
390+
private Attributes getEventAttributes(JobState jobState) {
391+
return Attributes.builder()
392+
.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, jobState.getProp(ConfigurationKeys.FLOW_NAME_KEY, "NA"))
393+
.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, jobState.getProp(ConfigurationKeys.FLOW_GROUP_KEY, "NA"))
394+
.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, jobState.getProp(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "NA"))
395+
.put(TimingEvent.FlowEventConstants.FLOW_FABRIC, jobState.getProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_FABRIC, "NA"))
396+
.put(TimingEvent.FlowEventConstants.RM_HOST_FIELD, jobState.getProp(ConfigurationKeys.RM_HOST_KEY, "NA"))
397+
.build();
398+
}
357399
}

0 commit comments

Comments
 (0)