Skip to content

Commit 7dbeebf

Browse files
authored
[GOBBLIN-2185] Recommend GoT Dynamic Auto-Scaling using heuristics based on WorkUnitsSizeSummary (#4087)
* Implement GoT Dynamic auto-scaling PoC of `WorkUnitsSizeSummary`-driven linear heuristic * Do not generate `@Setter`s for `@Data` POJOs, for which deserialization support prevents having `final` members * Align choice of directory between `FsScalingDirectivesRecipient` and `FsScalingDirectivesSource`, and ensure various handles get closed
1 parent faecb35 commit 7dbeebf

File tree

43 files changed

+1085
-112
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1085
-112
lines changed

gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,8 @@ public class ConfigurationKeys {
208208
public static final String DEFAULT_FORK_OPERATOR_CLASS = "org.apache.gobblin.fork.IdentityForkOperator";
209209
public static final String JOB_COMMIT_POLICY_KEY = "job.commit.policy";
210210
public static final String DEFAULT_JOB_COMMIT_POLICY = "full";
211+
public static final String JOB_TARGET_COMPLETION_DURATION_IN_MINUTES_KEY = "job.duration.target.completion.in.minutes";
212+
public static final long DEFAULT_JOB_TARGET_COMPLETION_DURATION_IN_MINUTES = 360;
211213

212214
public static final String PARTIAL_FAIL_TASK_FAILS_JOB_COMMIT = "job.commit.partial.fail.task.fails.job.commit";
213215
// If true, commit of different datasets will be performed in parallel

gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1060,7 +1060,8 @@ private void cleanLeftoverStagingData(WorkUnitStream workUnits, JobState jobStat
10601060

10611061
try {
10621062
if (!canCleanStagingData(jobState)) {
1063-
LOG.error("Job " + jobState.getJobName() + " has unfinished commit sequences. Will not clean up staging data.");
1063+
// TODO: decide whether should be `.warn`, stay as `.info`, or change back to `.error`
1064+
LOG.info("Job " + jobState.getJobName() + " has unfinished commit sequences. Will not clean up staging data.");
10641065
return;
10651066
}
10661067
} catch (IOException e) {

gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717

1818
package org.apache.gobblin.runtime;
1919

20+
import lombok.AccessLevel;
2021
import lombok.Data;
2122
import lombok.NoArgsConstructor;
2223
import lombok.NonNull;
2324
import lombok.RequiredArgsConstructor;
25+
import lombok.Setter;
2426

2527
import org.apache.gobblin.metrics.DatasetMetric;
2628

@@ -30,6 +32,7 @@
3032
* that can be reported as a single event in the commit phase.
3133
*/
3234
@Data
35+
@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization
3336
@RequiredArgsConstructor
3437
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
3538
public class DatasetTaskSummary {

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,4 +69,7 @@ public interface GobblinTemporalConfigurationKeys {
6969
* Prefix for Gobblin-on-Temporal Dynamic Scaling
7070
*/
7171
String DYNAMIC_SCALING_PREFIX = PREFIX + "dynamic.scaling.";
72+
73+
String DYNAMIC_SCALING_POLLING_INTERVAL_SECS = DYNAMIC_SCALING_PREFIX + "polling.interval.seconds";
74+
int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
7275
}

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalClusterManager.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ public class GobblinTemporalClusterManager implements ApplicationLauncher, Stand
9292
@Getter
9393
protected final FileSystem fs;
9494

95+
@Getter
9596
protected final String applicationId;
9697

9798
@Getter
@@ -285,8 +286,11 @@ public Collection<StandardMetrics> getStandardMetricsCollection() {
285286
* comment lifted from {@link org.apache.gobblin.cluster.GobblinClusterManager}
286287
* TODO for now the cluster id is hardcoded to 1 both here and in the {@link GobblinTaskRunner}. In the future, the
287288
* cluster id should be created by the {@link GobblinTemporalClusterManager} and passed to each {@link GobblinTaskRunner}
289+
*
290+
* NOTE: renamed from `getApplicationId` to avoid shadowing the `@Getter`-generated instance method of that name
291+
* TODO: unravel what to make of the comment above. as it is, `GobblinTemporalApplicationMaster#main` is what runs, NOT `GobblinTemporalClusterManager#main`
288292
*/
289-
private static String getApplicationId() {
293+
private static String getApplicationIdStatic() {
290294
return "1";
291295
}
292296

@@ -332,7 +336,7 @@ public static void main(String[] args) throws Exception {
332336
}
333337

334338
try (GobblinTemporalClusterManager GobblinTemporalClusterManager = new GobblinTemporalClusterManager(
335-
cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME), getApplicationId(),
339+
cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME), getApplicationIdStatic(),
336340
config, Optional.<Path>absent())) {
337341
GobblinTemporalClusterManager.start();
338342
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.gobblin.temporal.ddm.activity;
19+
20+
import java.util.List;
21+
import java.util.Properties;
22+
23+
import io.temporal.activity.ActivityInterface;
24+
import io.temporal.activity.ActivityMethod;
25+
26+
import org.apache.gobblin.source.workunit.WorkUnit;
27+
import org.apache.gobblin.temporal.ddm.work.TimeBudget;
28+
import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
29+
import org.apache.gobblin.temporal.dynamic.ScalingDirective;
30+
31+
32+
33+
/**
34+
* Activity to suggest the Dynamic Scaling warranted to complete processing of some amount of {@link org.apache.gobblin.source.workunit.WorkUnit}s
35+
* within {@link TimeBudget}, through a combination of Workforce auto-scaling and Worker right-sizing.
36+
*
37+
* As with all {@link ActivityInterface}s, this is stateless, so the {@link ScalingDirective}(s) returned "stand alone", presuming nothing of current
38+
* {@link org.apache.gobblin.temporal.dynamic.WorkforceStaffing}. It thus falls to the caller to coordinate whether to apply the directive(s) as-is,
39+
* or first to adjust in light of scaling levels already in the current {@link org.apache.gobblin.temporal.dynamic.WorkforcePlan}.
40+
*/
41+
@ActivityInterface
42+
public interface RecommendScalingForWorkUnits {
43+
44+
/**
45+
* Recommend the {@link ScalingDirective}s to process the {@link WorkUnit}s of {@link WorkUnitsSizeSummary} within {@link TimeBudget}.
46+
*
47+
* @param remainingWork may characterize a newly-generated batch of `WorkUnit`s for which no processing has yet begun - or be the sub-portion
48+
* of an in-progress job that still awaits processing
49+
* @param sourceClass contextualizes the `WorkUnitsSizeSummary` and should name a {@link org.apache.gobblin.source.Source}
50+
* @param timeBudget the remaining target duration for processing the summarized `WorkUnit`s
51+
* @param jobProps all job props, to either guide the recommendation or better contextualize the nature of the `remainingWork`
52+
* @return the {@link ScalingDirective}s to process the summarized {@link WorkUnit}s within {@link TimeBudget}
53+
*/
54+
@ActivityMethod
55+
List<ScalingDirective> recommendScaling(WorkUnitsSizeSummary remainingWork, String sourceClass, TimeBudget timeBudget, Properties jobProps);
56+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.gobblin.temporal.ddm.activity.impl;
19+
20+
import java.util.Arrays;
21+
import java.util.List;
22+
import java.util.Optional;
23+
import java.util.Properties;
24+
25+
import lombok.extern.slf4j.Slf4j;
26+
27+
import org.apache.gobblin.runtime.JobState;
28+
import org.apache.gobblin.temporal.ddm.activity.RecommendScalingForWorkUnits;
29+
import org.apache.gobblin.temporal.ddm.work.TimeBudget;
30+
import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
31+
import org.apache.gobblin.temporal.dynamic.ProfileDerivation;
32+
import org.apache.gobblin.temporal.dynamic.ProfileOverlay;
33+
import org.apache.gobblin.temporal.dynamic.ScalingDirective;
34+
import org.apache.gobblin.temporal.dynamic.WorkforceProfiles;
35+
36+
37+
/**
38+
* Skeletal impl handling all foundational concerns, but leaving it to a concrete impl to actually choose the auto-scaling
39+
* {@link ScalingDirective#getSetPoint()} for the exactly one {@link ScalingDirective} being recommended.
40+
*/
41+
@Slf4j
42+
public abstract class AbstractRecommendScalingForWorkUnitsImpl implements RecommendScalingForWorkUnits {
43+
44+
// TODO: decide whether this name ought to be configurable - or instead a predictable name that callers may expect (and possibly adjust)
45+
public static final String DEFAULT_PROFILE_DERIVATION_NAME = "workUnitsProc";
46+
47+
@Override
48+
public List<ScalingDirective> recommendScaling(WorkUnitsSizeSummary remainingWork, String sourceClass, TimeBudget timeBudget, Properties jobProps) {
49+
// NOTE: no attempt to determine the current scaling - per `RecommendScalingForWorkUnits` javadoc, the `ScalingDirective`(s) returned must "stand alone",
50+
// presuming nothing of the current `WorkforcePlan`'s `WorkforceStaffing`
51+
JobState jobState = new JobState(jobProps);
52+
ScalingDirective procWUsWorkerScaling = new ScalingDirective(
53+
calcProfileDerivationName(jobState),
54+
calcDerivationSetPoint(remainingWork, sourceClass, timeBudget, jobState),
55+
System.currentTimeMillis(),
56+
Optional.of(calcProfileDerivation(calcBasisProfileName(jobState), remainingWork, sourceClass, jobState))
57+
);
58+
log.info("Recommended re-scaling to process work units: {}", procWUsWorkerScaling);
59+
return Arrays.asList(procWUsWorkerScaling);
60+
}
61+
62+
protected abstract int calcDerivationSetPoint(WorkUnitsSizeSummary remainingWork, String sourceClass, TimeBudget timeBudget, JobState jobState);
63+
64+
protected ProfileDerivation calcProfileDerivation(String basisProfileName, WorkUnitsSizeSummary remainingWork, String sourceClass, JobState jobState) {
65+
// TODO: implement right-sizing!!! (for now just return unchanged)
66+
return new ProfileDerivation(basisProfileName, ProfileOverlay.unchanged());
67+
}
68+
69+
protected String calcProfileDerivationName(JobState jobState) {
70+
// TODO: if we ever return > 1 directive, append a monotonically increasing number to avoid collisions
71+
return DEFAULT_PROFILE_DERIVATION_NAME;
72+
}
73+
74+
protected String calcBasisProfileName(JobState jobState) {
75+
return WorkforceProfiles.BASELINE_NAME; // always build upon baseline
76+
}
77+
}

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,20 @@
2828
import java.util.concurrent.Callable;
2929
import java.util.concurrent.ExecutionException;
3030
import java.util.stream.Collectors;
31+
import javax.annotation.Nullable;
3132

3233
import org.apache.hadoop.fs.FileSystem;
3334
import org.apache.hadoop.fs.Path;
3435

36+
import lombok.extern.slf4j.Slf4j;
37+
3538
import com.google.api.client.util.Lists;
3639
import com.google.common.base.Function;
3740
import com.google.common.base.Strings;
3841
import com.google.common.collect.ImmutableList;
3942
import com.google.common.collect.Iterables;
4043

4144
import io.temporal.failure.ApplicationFailure;
42-
import javax.annotation.Nullable;
43-
import lombok.extern.slf4j.Slf4j;
4445

4546
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
4647
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
@@ -83,8 +84,7 @@ public CommitStats commit(WUProcessingSpec workSpec) {
8384
int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
8485
Optional<String> optJobName = Optional.empty();
8586
AutomaticTroubleshooter troubleshooter = null;
86-
try {
87-
FileSystem fs = Help.loadFileSystem(workSpec);
87+
try (FileSystem fs = Help.loadFileSystem(workSpec)) {
8888
JobState jobState = Help.loadJobState(workSpec, fs);
8989
optJobName = Optional.ofNullable(jobState.getJobName());
9090
SharedResourcesBroker<GobblinScopeTypes> instanceBroker = JobStateUtils.getSharedResourcesBroker(jobState);

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.apache.hadoop.fs.FileSystem;
2929
import org.apache.hadoop.fs.Path;
3030

31-
import io.temporal.failure.ApplicationFailure;
3231
import lombok.Data;
3332
import lombok.extern.slf4j.Slf4j;
3433

@@ -37,6 +36,7 @@
3736
import com.google.common.base.Preconditions;
3837
import com.google.common.io.Closer;
3938
import com.tdunning.math.stats.TDigest;
39+
import io.temporal.failure.ApplicationFailure;
4040

4141
import org.apache.gobblin.configuration.ConfigurationKeys;
4242
import org.apache.gobblin.configuration.State;
@@ -118,7 +118,7 @@ public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, EventSubmi
118118
troubleshooter.start();
119119
try (Closer closer = Closer.create()) {
120120
// before embarking on (potentially expensive) WU creation, first pre-check that the FS is available
121-
FileSystem fs = JobStateUtils.openFileSystem(jobState);
121+
FileSystem fs = closer.register(JobStateUtils.openFileSystem(jobState));
122122
fs.mkdirs(workDirRoot);
123123

124124
Set<String> pathsToCleanUp = new HashSet<>();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.gobblin.temporal.ddm.activity.impl;
19+
20+
import lombok.extern.slf4j.Slf4j;
21+
22+
import org.apache.gobblin.runtime.JobState;
23+
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
24+
import org.apache.gobblin.temporal.ddm.work.TimeBudget;
25+
import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
26+
import org.apache.gobblin.temporal.ddm.worker.WorkFulfillmentWorker;
27+
28+
29+
/**
30+
* Simple config-driven linear recommendation for how many containers to use to complete the "remaining work" within a given {@link TimeBudget}, per:
31+
*
32+
* a. from {@link WorkUnitsSizeSummary}, find how many (remaining) "top-level" {@link org.apache.gobblin.source.workunit.MultiWorkUnit}s of some mean size
33+
* b. from the configured {@link #AMORTIZED_NUM_BYTES_PER_MINUTE}, find the expected "processing rate" in bytes / minute
34+
* 1. estimate the time required for processing a mean-sized `MultiWorkUnit` (MWU)
35+
* c. from {@link JobState}, find per-container `MultiWorkUnit` parallelism capacity (aka. "worker-slots") to base the recommendation upon
36+
* 2. calculate the per-container throughput of MWUs per minute
37+
* 3. estimate the total per-container-minutes required to process all MWUs
38+
* d. from the {@link TimeBudget}, find the target number of minutes in which to complete processing of all MWUs
39+
* 4. recommend the number of containers so all MWU processing should finish within the target number of minutes
40+
*/
41+
@Slf4j
42+
public class RecommendScalingForWorkUnitsLinearHeuristicImpl extends AbstractRecommendScalingForWorkUnitsImpl {
43+
44+
public static final String AMORTIZED_NUM_BYTES_PER_MINUTE = GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "heuristic.params.numBytesPerMinute";
45+
public static final long DEFAULT_AMORTIZED_NUM_BYTES_PER_MINUTE = 80 * 1000L * 1000L * 60L; // 80MB/sec
46+
47+
@Override
48+
protected int calcDerivationSetPoint(WorkUnitsSizeSummary remainingWork, String sourceClass, TimeBudget jobTimeBudget, JobState jobState) {
49+
// for simplicity, for now, consider only top-level work units (aka. `MultiWorkUnit`s - MWUs)
50+
long numMWUs = remainingWork.getTopLevelWorkUnitsCount();
51+
double meanBytesPerMWU = remainingWork.getTopLevelWorkUnitsMeanSize();
52+
int numSimultaneousMWUsPerContainer = calcPerContainerWUCapacity(jobState); // (a worker-thread is a slot for top-level (MWUs) - not constituent sub-WUs)
53+
long bytesPerMinuteProcRate = calcAmortizedBytesPerMinute(jobState);
54+
log.info("Calculating auto-scaling (for {} remaining work units within {}) using: bytesPerMinuteProcRate = {}; meanBytesPerMWU = {}",
55+
numMWUs, jobTimeBudget, bytesPerMinuteProcRate, meanBytesPerMWU);
56+
57+
// calc how many container*minutes to process all MWUs, based on mean MWU size
58+
double minutesProcTimeForMeanMWU = meanBytesPerMWU / bytesPerMinuteProcRate;
59+
double meanMWUsThroughputPerContainerMinute = numSimultaneousMWUsPerContainer / minutesProcTimeForMeanMWU;
60+
double estContainerMinutesForAllMWUs = numMWUs / meanMWUsThroughputPerContainerMinute;
61+
62+
long targetNumMinutesForAllMWUs = jobTimeBudget.getMaxTargetDurationMinutes();
63+
// TODO: take into account `jobTimeBudget.getPermittedOverageMinutes()` - e.g. to decide whether to use `Math.ceil` vs. `Math.floor`
64+
65+
// TODO: decide how to account for container startup; working est. for GoT-on-YARN ~ 3 mins (req to alloc ~ 30s; alloc to workers ready ~ 2.5m)
66+
// e.g. can we amortize away / ignore when `targetNumMinutesForAllMWUs >> workerRequestToReadyNumMinutes`?
67+
// TODO take into account that MWUs are quantized into discrete chunks; this est. uses avg and presumes to divide partial MWUs amongst workers
68+
// can we we mostly ignore if we keep MWU "chunk size" "small-ish", like maybe even just `duration(max(MWU)) <= targetNumMinutesForAllMWUs/2)`?
69+
70+
int recommendedNumContainers = (int) Math.floor(estContainerMinutesForAllMWUs / targetNumMinutesForAllMWUs);
71+
log.info("Recommended auto-scaling: {} containers, given: minutesToProc(mean(MWUs)) = {}; throughput = {} (MWUs / container*minute); "
72+
+ "est. container*minutes to complete ALL ({}) MWUs = {}",
73+
recommendedNumContainers, minutesProcTimeForMeanMWU, meanMWUsThroughputPerContainerMinute, numMWUs, estContainerMinutesForAllMWUs);
74+
return recommendedNumContainers;
75+
}
76+
77+
protected int calcPerContainerWUCapacity(JobState jobState) {
78+
int numWorkersPerContainer = jobState.getPropAsInt(GobblinTemporalConfigurationKeys.TEMPORAL_NUM_WORKERS_PER_CONTAINER,
79+
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_NUM_WORKERS_PER_CONTAINERS);
80+
int numThreadsPerWorker = WorkFulfillmentWorker.MAX_EXECUTION_CONCURRENCY; // TODO: get from config, once that's implemented
81+
return numWorkersPerContainer * numThreadsPerWorker;
82+
}
83+
84+
protected long calcAmortizedBytesPerMinute(JobState jobState) {
85+
return jobState.getPropAsLong(AMORTIZED_NUM_BYTES_PER_MINUTE, DEFAULT_AMORTIZED_NUM_BYTES_PER_MINUTE);
86+
}
87+
}

0 commit comments

Comments
 (0)