Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,10 @@ public Iterator<IcebergDataset> getDatasetsIterator() throws IOException {
*/
protected IcebergDataset createIcebergDataset(IcebergCatalog sourceIcebergCatalog, String srcDbName, String srcTableName, IcebergCatalog destinationIcebergCatalog, String destDbName, String destTableName, Properties properties, FileSystem fs) throws IOException {
IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(srcDbName, srcTableName);
Preconditions.checkArgument(sourceIcebergCatalog.tableAlreadyExists(srcIcebergTable), String.format("Missing Source Iceberg Table: {%s}.{%s}", srcDbName, srcTableName));
Preconditions.checkArgument(sourceIcebergCatalog.tableAlreadyExists(srcIcebergTable), String.format("Source Iceberg Table not found: {%s}.{%s}", srcDbName, srcTableName));
IcebergTable destIcebergTable = destinationIcebergCatalog.openTable(destDbName, destTableName);
// TODO: Rethink strategy to enforce dest iceberg table
Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable), String.format("Missing Destination Iceberg Table: {%s}.{%s}", destDbName, destTableName));
Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable), String.format("Destination Iceberg Table not found: {%s}.{%s}", destDbName, destTableName));
return createSpecificDataset(srcIcebergTable, destIcebergTable, properties, fs, getConfigShouldCopyMetadataPath(properties));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public static class FlowEventConstants {
public static final String JOB_SKIPPED_TIME = "jobSkippedTime";
public static final String WORKUNIT_PLAN_START_TIME = "workunitPlanStartTime";
public static final String WORKUNIT_PLAN_END_TIME = "workunitPlanEndTime";
public static final String WORKUNITS_GENERATED_SUMMARY = "workUnitsGeneratedSummary";
public static final String JOB_END_TIME = "jobEndTime";
public static final String JOB_LAST_PROGRESS_EVENT_TIME = "jobLastProgressEventTime";
public static final String JOB_COMPLETION_PERCENTAGE = "jobCompletionPercentage";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public Future<? extends List<Pair<SpecExecutor.Verb, Spec>>> changedSpecs() {
fileStatuses = this.fs.listStatus(this.specDirPath,
new AndPathFilter(new HiddenFilter(), new AvroUtils.AvroPathFilter()));
} catch (IOException e) {
log.error("Error when listing files at path: {}", this.specDirPath.toString(), e);
log.error("Error when listing files at path: " + this.specDirPath.toString(), e);
return null;
}
log.info("Found {} files at path {}", fileStatuses.length, this.specDirPath.toString());
Expand All @@ -102,42 +102,53 @@ public Future<? extends List<Pair<SpecExecutor.Verb, Spec>>> changedSpecs() {
try {
dataFileReader = new DataFileReader<>(new FsInput(fileStatus.getPath(), this.fs.getConf()), new SpecificDatumReader<>());
} catch (IOException e) {
log.error("Error creating DataFileReader for: {}", fileStatus.getPath().toString(), e);
log.error("Error creating DataFileReader for: " + fileStatus.getPath().toString(), e);
continue;
}

AvroJobSpec avroJobSpec = null;
while (dataFileReader.hasNext()) {
avroJobSpec = dataFileReader.next();
break;
}
try { // ensure `dataFileReader` is always closed!
AvroJobSpec avroJobSpec = null;
while (dataFileReader.hasNext()) {
avroJobSpec = dataFileReader.next();
break;
}

if (avroJobSpec != null) {
JobSpec.Builder jobSpecBuilder = new JobSpec.Builder(avroJobSpec.getUri());
Properties props = new Properties();
props.putAll(avroJobSpec.getProperties());
jobSpecBuilder.withJobCatalogURI(avroJobSpec.getUri())
.withVersion(avroJobSpec.getVersion())
.withDescription(avroJobSpec.getDescription())
.withConfigAsProperties(props)
.withConfig(ConfigUtils.propertiesToConfig(props));

try {
if (!avroJobSpec.getTemplateUri().isEmpty()) {
jobSpecBuilder.withTemplate(new URI(avroJobSpec.getTemplateUri()));
}
} catch (URISyntaxException u) {
log.error("Error building a job spec: ", u);
continue;
}

if (avroJobSpec != null) {
JobSpec.Builder jobSpecBuilder = new JobSpec.Builder(avroJobSpec.getUri());
Properties props = new Properties();
props.putAll(avroJobSpec.getProperties());
jobSpecBuilder.withJobCatalogURI(avroJobSpec.getUri())
.withVersion(avroJobSpec.getVersion())
.withDescription(avroJobSpec.getDescription())
.withConfigAsProperties(props)
.withConfig(ConfigUtils.propertiesToConfig(props));
String verbName = avroJobSpec.getMetadata().get(SpecExecutor.VERB_KEY);
SpecExecutor.Verb verb = SpecExecutor.Verb.valueOf(verbName);

JobSpec jobSpec = jobSpecBuilder.build();
log.debug("Successfully built jobspec: {}", jobSpec.getUri().toString());
specList.add(new ImmutablePair<SpecExecutor.Verb, Spec>(verb, jobSpec));
this.specToPathMap.put(jobSpec.getUri(), fileStatus.getPath());
}
} finally {
try {
if (!avroJobSpec.getTemplateUri().isEmpty()) {
jobSpecBuilder.withTemplate(new URI(avroJobSpec.getTemplateUri()));
if (dataFileReader != null) {
dataFileReader.close();
dataFileReader = null;
}
} catch (URISyntaxException u) {
log.error("Error building a job spec: ", u);
continue;
} catch (IOException e) {
log.warn("Unable to close DataFileReader for: {} - {}", fileStatus.getPath().toString(), e.getMessage());
}

String verbName = avroJobSpec.getMetadata().get(SpecExecutor.VERB_KEY);
SpecExecutor.Verb verb = SpecExecutor.Verb.valueOf(verbName);

JobSpec jobSpec = jobSpecBuilder.build();
log.debug("Successfully built jobspec: {}", jobSpec.getUri().toString());
specList.add(new ImmutablePair<SpecExecutor.Verb, Spec>(verb, jobSpec));
this.specToPathMap.put(jobSpec.getUri(), fileStatus.getPath());
}
}
return new CompletedFuture<>(specList, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {

// Since {@link SpecCompiler} is an {@link SpecCatalogListener}, it is expected that any Spec change should be reflected
// to these data structures.
@Getter
protected final Map<URI, TopologySpec> topologySpecMap;

protected final Config config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ public class StaticFlowTemplate implements FlowTemplate {
private String description;
@Getter
private transient FlowCatalogWithTemplates catalog;
@Getter
private List<JobTemplate> jobTemplates;

private transient Config rawConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
import org.apache.gobblin.destination.DestinationDatasetHandlerService;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
Expand All @@ -60,6 +61,8 @@
import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
import org.apache.gobblin.temporal.workflows.metrics.EventTimer;
import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
import org.apache.gobblin.writer.initializer.WriterInitializerFactory;


Expand Down Expand Up @@ -127,6 +130,9 @@ public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, EventSubmi
int numSizeSummaryQuantiles = getConfiguredNumSizeSummaryQuantiles(jobState);
WorkUnitsSizeSummary wuSizeSummary = digestWorkUnitsSize(workUnits).asSizeSummary(numSizeSummaryQuantiles);
log.info("Discovered WorkUnits: {}", wuSizeSummary);
// IMPORTANT: send prior to `writeWorkUnits`, so the volume of work discovered (and bin packed) gets durably measured. even if serialization were to
// exceed available memory and this activity execution were to fail, a subsequent re-attempt would know the amount of work, to guide re-config/attempt
createWorkPreparedSizeDistillationTimer(wuSizeSummary, eventSubmitterContext).stop();
Comment on lines +133 to +135
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, if serialization takes time and if it accounts for more OOM issue then should we consider these both in separate activity and launched through one parent workflow as IIUC activity retry will do everything from beginning and discovered workunits will be generated again which can also lead to duplication of GTE, whats your opinion on this ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

having WU planning separate from WU serialization is worth considering. that would allow for a re-attempt of only serialization w/o needing to rerun the planning. there's no concern in sending the GTE again on the re-attempt - that's not the motivation. instead the benefit would be to expedite the re-attempt by subsequently skipping a repeat of successful WU planning.

to separate into two activities we'd need to succeed in persisting some intermediate form of WU planning, so the WU planning activity could "pass input" to the WU serialization activity, as the two won't execute together - or possibly even on the same host. that intermediate form clearly ought to be more likely to succeed than serializing all the WUs themselves - the failure we're trying to address.

this major design choice awaits if we decide to pursue such larger rework.


JobStateUtils.writeWorkUnits(workUnits, workDirRoot, jobState, fs);
JobStateUtils.writeJobState(jobState, workDirRoot, fs); // ATTENTION: the writing of `JobState` after all WUs signifies WU gen+serialization now complete
Expand All @@ -150,26 +156,28 @@ public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, EventSubmi
protected List<WorkUnit> generateWorkUnitsForJobStateAndCollectCleanupPaths(JobState jobState, EventSubmitterContext eventSubmitterContext, Closer closer,
Set<String> pathsToCleanUp)
throws ReflectiveOperationException {
// report (timer) metrics for "Work Discovery", *planning only* - NOT including WU prep, like serialization, `DestinationDatasetHandlerService`ing, etc.
// IMPORTANT: for accurate timing, SEPARATELY emit `.createWorkPreparationTimer`, to record time prior to measuring the WU size required for that one
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean here one GTE event for serialization step or GTE event for everything discovery serialization and all ?
and also would that GTE help us in anyway if we had , wdyt ?

Copy link
Contributor Author

@phet phet Jan 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

originally, in AbstractJobLauncher the "WU creation timer" measured only the planning -

workUnitsCreationTimer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext,

that is what's included in the GaaSJobObservabilityEvent.

the timer for WU prep happens a bit later -

so in this comment:

"Work Discovery", planning only - NOT including WU prep, like serialization, ...

I just meant that we're timing only planning/creation, not the preparation such as serialization.

as for WU serialization, there is no existing, historical event strictly for that. typically that only takes a long time when memory-constrained and GC-bound. although we could consider adding a new event to time that, for purposes of right-sizing, GC stats are more interesting than the duration it happens to take. if anything, the former is what I'd prioritize.

TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinActivityFactory(eventSubmitterContext);
EventTimer workDiscoveryTimer = timerFactory.createWorkDiscoveryTimer();
Source<?, ?> source = JobStateUtils.createSource(jobState);
WorkUnitStream workUnitStream = source instanceof WorkUnitStreamSource
? ((WorkUnitStreamSource) source).getWorkunitStream(jobState)
: new BasicWorkUnitStream.Builder(source.getWorkunits(jobState)).build();

// TODO: report (timer) metrics for workunits creation
if (workUnitStream == null || workUnitStream.getWorkUnits() == null) { // indicates a problem getting the WUs
String errMsg = "Failure in getting work units for job " + jobState.getJobId();
log.error(errMsg);
// TODO: decide whether a non-retryable failure is too severe... (in most circumstances, it's likely what we want)
// TODO: decide whether a non-retryable failure is too severe... (some sources may merit retry)
throw ApplicationFailure.newNonRetryableFailure(errMsg, "Failure: Source.getWorkUnits()");
}
workDiscoveryTimer.stop();

if (!workUnitStream.getWorkUnits().hasNext()) { // no work unit to run: entirely normal result (not a failure)
log.warn("No work units created for job " + jobState.getJobId());
return Lists.newArrayList();
}

// TODO: count total bytes for progress tracking!

boolean canCleanUpTempDirs = false; // unlike `AbstractJobLauncher` running the job end-to-end, this is Work Discovery only, so WAY TOO SOON for cleanup
DestinationDatasetHandlerService datasetHandlerService = closer.register(
new DestinationDatasetHandlerService(jobState, canCleanUpTempDirs, eventSubmitterContext.create()));
Expand Down Expand Up @@ -264,6 +272,19 @@ protected static WorkUnitsSizeDigest digestWorkUnitsSize(List<WorkUnit> workUnit
return new WorkUnitsSizeDigest(totalSize.get(), topLevelWorkUnitsDigest, constituentWorkUnitsDigest);
}

protected static EventTimer createWorkPreparedSizeDistillationTimer(
WorkUnitsSizeSummary wuSizeSummary, EventSubmitterContext eventSubmitterContext) {
// Inspired by a pair of log messages produced within `CopySource::getWorkUnits`:
// 1. Statistics for ConcurrentBoundedPriorityIterable: {ResourcePool: {softBound: [ ... ], hardBound: [ ...]},totalResourcesUsed: [ ... ], \
// maxRequirementPerDimension: [entities: 231943.0, bytesCopied: 1.22419622769628E14], ... }
// 2. org.apache.gobblin.data.management.copy.CopySource - Bin packed work units. Initial work units: 27252, packed work units: 13175, \
// max weight per bin: 500000000, max work units per bin: 100.
// rather than merely logging, durably emit this info, to inform re-config for any potential re-attempt (should WU serialization OOM)
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinActivityFactory(eventSubmitterContext);
return timerFactory.createWorkPreparationTimer()
.withMetadataAsJson(TimingEvent.WORKUNITS_GENERATED_SUMMARY, wuSizeSummary.distill());
}

public static int getConfiguredNumSizeSummaryQuantiles(State state) {
return state.getPropAsInt(GenerateWorkUnits.NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES, GenerateWorkUnits.DEFAULT_NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,29 @@ public class WorkUnitsSizeSummary {
@NonNull private List<Double> topLevelQuantilesMinSizes;
@NonNull private List<Double> constituentQuantilesMinSizes;

/** Total size, counts, means, and medians: the most telling measurements packaged for ready consumption / observability */
@Data
@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
@RequiredArgsConstructor
public static class Distillation {
@NonNull private long totalSize;
@NonNull private long topLevelWorkUnitsCount;
@NonNull private long constituentWorkUnitsCount;
@NonNull private double topLevelWorkUnitsMeanSize;
@NonNull private double constituentWorkUnitsMeanSize;
@NonNull private double topLevelWorkUnitsMedianSize;
@NonNull private double constituentWorkUnitsMedianSize;
}

@JsonIgnore // (because no-arg method resembles 'java bean property')
public Distillation distill() {
return new Distillation(this.totalSize, this.topLevelWorkUnitsCount, this.constituentWorkUnitsCount,
this.getTopLevelWorkUnitsMeanSize(), this.getConstituentWorkUnitsMeanSize(),
this.getTopLevelWorkUnitsMedianSize(), this.getConstituentWorkUnitsMedianSize()
);
}

@JsonIgnore // (because no-arg method resembles 'java bean property')
public double getTopLevelWorkUnitsMeanSize() {
return this.totalSize * 1.0 / this.topLevelWorkUnitsCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,20 @@

package org.apache.gobblin.temporal.ddm.workflow.impl;

import io.temporal.activity.ActivityOptions;
import io.temporal.common.RetryOptions;
import io.temporal.failure.ApplicationFailure;
import io.temporal.workflow.Workflow;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import io.temporal.activity.ActivityOptions;
import io.temporal.common.RetryOptions;
import io.temporal.failure.ApplicationFailure;
import io.temporal.workflow.Workflow;

import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.DatasetTaskSummary;
import org.apache.gobblin.runtime.util.GsonUtils;
import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
import org.apache.gobblin.temporal.ddm.work.CommitStats;
import org.apache.gobblin.temporal.ddm.work.DatasetStats;
Expand Down Expand Up @@ -60,12 +59,10 @@ public class CommitStepWorkflowImpl implements CommitStepWorkflow {
@Override
public CommitStats commit(WUProcessingSpec workSpec) {
CommitStats commitGobblinStats = activityStub.commit(workSpec);

if (!commitGobblinStats.getOptFailure().isPresent() || commitGobblinStats.getNumCommittedWorkUnits() > 0) {
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(workSpec.getEventSubmitterContext());
timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
.withMetadata(TimingEvent.DATASET_TASK_SUMMARIES, GsonUtils.GSON_WITH_DATE_HANDLING.toJson(
convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats())))
.withMetadataAsJson(TimingEvent.DATASET_TASK_SUMMARIES, convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats()))
.submit();// emit job summary info on both full and partial commit (ultimately for `GaaSJobObservabilityEvent.datasetsMetrics`)
}
if (commitGobblinStats.getOptFailure().isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,14 @@ public class ExecuteGobblinWorkflowImpl implements ExecuteGobblinWorkflow {
.build();

private static final ActivityOptions DELETE_WORK_DIRS_ACTIVITY_OPTS = ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofHours(1))
.setStartToCloseTimeout(Duration.ofMinutes(10))
.setRetryOptions(DELETE_WORK_DIRS_RETRY_OPTS)
.build();
private final DeleteWorkDirsActivity deleteWorkDirsActivityStub = Workflow.newActivityStub(DeleteWorkDirsActivity.class, DELETE_WORK_DIRS_ACTIVITY_OPTS);

@Override
public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext eventSubmitterContext) {
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(eventSubmitterContext);
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext);
timerFactory.create(TimingEvent.LauncherTimings.JOB_PREPARE).submit(); // update GaaS: `TimingEvent.JOB_START_TIME`
EventTimer jobSuccessTimer = timerFactory.createJobTimer();
Optional<GenerateWorkUnitsResult> optGenerateWorkUnitResult = Optional.empty();
Expand Down Expand Up @@ -207,7 +207,7 @@ protected TimeBudget calcWUProcTimeBudget(Instant jobStartTime, WorkUnitsSizeSum
ConfigurationKeys.JOB_TARGET_COMPLETION_DURATION_IN_MINUTES_KEY,
ConfigurationKeys.DEFAULT_JOB_TARGET_COMPLETION_DURATION_IN_MINUTES));
double permittedOveragePercentage = .2;
Duration genWUsDuration = Duration.between(jobStartTime, TemporalEventTimer.getCurrentTime());
Duration genWUsDuration = Duration.between(jobStartTime, TemporalEventTimer.WithinWorkflowFactory.getCurrentInstant());
long remainingMins = totalTargetTimeMins - Math.min(genWUsDuration.toMinutes(), maxGenWUsMins) - commitStepMins;
return TimeBudget.withOveragePercentage(remainingMins, permittedOveragePercentage);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ private CommitStats performCommitIfAnyWorkUnitsProcessed(WUProcessingSpec workSp
private Optional<EventTimer> createOptJobEventTimer(WUProcessingSpec workSpec) {
if (workSpec.isToDoJobLevelTiming()) {
EventSubmitterContext eventSubmitterContext = workSpec.getEventSubmitterContext();
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(eventSubmitterContext);
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext);
return Optional.of(timerFactory.createJobTimer());
} else {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public String getGreeting(String name, EventSubmitterContext eventSubmitterConte
/**
* Example of the {@link TemporalEventTimer.Factory} invoking child activity for instrumentation.
*/
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(eventSubmitterContext);
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext);
try (TemporalEventTimer timer = timerFactory.create("getGreetingTime")) {
LOG.info("Executing getGreeting");
timer.withMetadata("name", name);
Expand Down
Loading
Loading