Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -247,19 +247,15 @@ public DelayedScaleDown getDelayedScaleDown(Context jobContext) {

@Override
public void clearAll(Context jobContext) {
jdbcStateStore.clearAll(getSerializeKey(jobContext));
var serializedKey = getSerializeKey(jobContext);
jdbcStateStore.clearAll(serializedKey);
}

@Override
public void flush(Context jobContext) throws Exception {
jdbcStateStore.flush(getSerializeKey(jobContext));
}

@Override
public void removeInfoFromCache(KEY jobKey) {
jdbcStateStore.removeInfoFromCache(getSerializeKey(jobKey));
}

private String getSerializeKey(Context jobContext) {
return getSerializeKey(jobContext.getJobKey());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,15 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.CONTROL_LOOP_INTERVAL;
Expand Down Expand Up @@ -74,10 +77,10 @@ public class StandaloneAutoscalerExecutor<KEY, Context extends JobAutoScalerCont
private final Set<KEY> scalingJobKeys;

/**
* Maintain a set of scaling job keys for the last control loop, it should be accessed at {@link
* Maintain a map of scaling job keys for the last control loop, it should be accessed at {@link
* #scheduledExecutorService} thread.
*/
private Set<KEY> lastScalingKeys;
private Map<KEY, Context> lastScaling;

public StandaloneAutoscalerExecutor(
@Nonnull Configuration conf,
Expand Down Expand Up @@ -151,12 +154,12 @@ protected List<CompletableFuture<Void>> scaling() {
throwable);
}
scalingJobKeys.remove(jobKey);
if (!lastScalingKeys.contains(jobKey)) {
if (!lastScaling.containsKey(jobKey)) {
// Current job has been stopped. lastScalingKeys doesn't
// contain jobKey means current job key was scaled in a
// previous control loop, and current job is stopped in
// the latest control loop.
autoScaler.cleanup(jobKey);
autoScaler.cleanup(jobContext);
}
},
scheduledExecutorService));
Expand All @@ -165,18 +168,21 @@ protected List<CompletableFuture<Void>> scaling() {
}

private void cleanupStoppedJob(Collection<Context> jobList) {
var currentScalingKeys =
jobList.stream().map(JobAutoScalerContext::getJobKey).collect(Collectors.toSet());
if (lastScalingKeys != null) {
lastScalingKeys.removeAll(currentScalingKeys);
for (KEY jobKey : lastScalingKeys) {
var jobs =
jobList.stream()
.collect(
Collectors.toMap(
JobAutoScalerContext::getJobKey, Function.identity()));
if (lastScaling != null) {
jobs.keySet().forEach(lastScaling::remove);
for (Map.Entry<KEY, Context> job : lastScaling.entrySet()) {
// Current job may be scaling, and cleanup should happen after scaling.
if (!scalingJobKeys.contains(jobKey)) {
autoScaler.cleanup(jobKey);
if (!scalingJobKeys.contains(job.getKey())) {
autoScaler.cleanup(job.getValue());
}
}
}
lastScalingKeys = currentScalingKeys;
lastScaling = new ConcurrentHashMap<>(jobs);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void scale(JobAutoScalerContext<JobID> context) {
}

@Override
public void cleanup(JobID jobKey) {
public void cleanup(JobAutoScalerContext<JobID> context) {
fail("Should be called.");
}
};
Expand Down Expand Up @@ -126,7 +126,7 @@ public void scale(JobAutoScalerContext<JobID> context) {
}

@Override
public void cleanup(JobID jobID) {
public void cleanup(JobAutoScalerContext<JobID> context) {
fail("Should be called.");
}
})) {
Expand Down Expand Up @@ -164,7 +164,7 @@ public void scale(JobAutoScalerContext<JobID> context)
}

@Override
public void cleanup(JobID jobID) {
public void cleanup(JobAutoScalerContext<JobID> context) {
fail("Should be called.");
}
})) {
Expand Down Expand Up @@ -221,7 +221,7 @@ public void scale(JobAutoScalerContext<JobID> context)
}

@Override
public void cleanup(JobID jobID) {
public void cleanup(JobAutoScalerContext<JobID> context) {
fail("Should be called.");
}
})) {
Expand Down Expand Up @@ -255,7 +255,8 @@ public void scale(JobAutoScalerContext<JobID> context) {
}

@Override
public void cleanup(JobID jobID) {
public void cleanup(JobAutoScalerContext<JobID> context) {
var jobID = context.getJobKey();
cleanupCounter.put(
jobID, cleanupCounter.getOrDefault(jobID, 0) + 1);
}
Expand Down Expand Up @@ -346,7 +347,8 @@ public void scale(JobAutoScalerContext<JobID> context)
}

@Override
public void cleanup(JobID jobID) {
public void cleanup(JobAutoScalerContext<JobID> context) {
var jobID = context.getJobKey();
cleanupCounter.put(
jobID, cleanupCounter.getOrDefault(jobID, 0) + 1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public interface JobAutoScaler<KEY, Context extends JobAutoScalerContext<KEY>> {
/**
* Called when the job is deleted.
*
* @param jobKey Job key.
* @param context Job context.
*/
void cleanup(KEY jobKey);
void cleanup(Context context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,17 @@ public void scale(Context ctx) throws Exception {
}

@Override
public void cleanup(KEY jobKey) {
public void cleanup(Context ctx) {
LOG.info("Cleaning up autoscaling meta data");
metricsCollector.cleanup(jobKey);
lastEvaluatedMetrics.remove(jobKey);
flinkMetrics.remove(jobKey);
stateStore.removeInfoFromCache(jobKey);
metricsCollector.cleanup(ctx.getJobKey());
lastEvaluatedMetrics.remove(ctx.getJobKey());
flinkMetrics.remove(ctx.getJobKey());
try {
stateStore.clearAll(ctx);
stateStore.flush(ctx);
} catch (Exception e) {
LOG.error("Error cleaning up autoscaling meta data for {}", ctx.getJobKey(), e);
}
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ public class NoopJobAutoscaler<KEY, Context extends JobAutoScalerContext<KEY>>
public void scale(Context context) throws Exception {}

@Override
public void cleanup(KEY jobKey) {}
public void cleanup(Context jobKey) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,4 @@ void storeDelayedScaleDown(Context jobContext, DelayedScaleDown delayedScaleDown
* was changed through this interface.
*/
void flush(Context jobContext) throws Exception;

/** Clean up all information related to the current job. */
void removeInfoFromCache(KEY jobKey);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.autoscaler.state;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.autoscaler.DelayedScaleDown;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.ScalingSummary;
Expand All @@ -34,6 +35,7 @@
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;

/**
* State store based on the Java Heap, the state will be discarded after process restarts.
Expand Down Expand Up @@ -177,10 +179,16 @@ public void flush(Context jobContext) {
// The InMemory state store doesn't persist data.
}

@Override
public void removeInfoFromCache(KEY jobKey) {
scalingHistoryStore.remove(jobKey);
collectedMetricsStore.remove(jobKey);
parallelismOverridesStore.remove(jobKey);
@VisibleForTesting
public boolean hasDataFor(Context jobContext) {
var k = jobContext.getJobKey();
return Stream.of(
scalingHistoryStore,
parallelismOverridesStore,
collectedMetricsStore,
tmConfigOverrides,
scalingTrackingStore,
delayedScaleDownStore)
.anyMatch(m -> m.containsKey(k));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.flink.autoscaler.metrics.TestMetrics;
import org.apache.flink.autoscaler.realizer.ScalingRealizer;
import org.apache.flink.autoscaler.realizer.TestingScalingRealizer;
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.autoscaler.topology.VertexInfo;
Expand Down Expand Up @@ -72,7 +71,7 @@ public class JobAutoScalerImplTest {
private JobAutoScalerContext<JobID> context;
private TestingScalingRealizer<JobID, JobAutoScalerContext<JobID>> scalingRealizer;
private TestingEventCollector<JobID, JobAutoScalerContext<JobID>> eventCollector;
private AutoScalerStateStore<JobID, JobAutoScalerContext<JobID>> stateStore;
private InMemoryAutoScalerStateStore<JobID, JobAutoScalerContext<JobID>> stateStore;

@BeforeEach
public void setup() {
Expand Down Expand Up @@ -232,7 +231,12 @@ public void realizeParallelismOverrides(
void testParallelismOverrides() throws Exception {
var autoscaler =
new JobAutoScalerImpl<>(
null, null, null, eventCollector, scalingRealizer, stateStore);
new TestingMetricsCollector<>(new JobTopology()),
null,
null,
eventCollector,
scalingRealizer,
stateStore);

// Initially we should return empty overrides, do not crate any state
assertThat(autoscaler.getParallelismOverrides(context)).isEmpty();
Expand Down Expand Up @@ -282,6 +286,17 @@ void testParallelismOverrides() throws Exception {
context.getConfiguration().setString(AUTOSCALER_ENABLED.key(), "asd");
autoscaler.scale(context);
assertParallelismOverrides(Map.of(v1, "1", v2, "2"));

context.getConfiguration().setString(AUTOSCALER_ENABLED.key(), "true");
autoscaler.scale(context);
assertParallelismOverrides(Map.of(v1, "1", v2, "2"));

// Make sure cleanup removes everything
assertTrue(stateStore.hasDataFor(context));
autoscaler.cleanup(context);
assertFalse(stateStore.hasDataFor(context));
autoscaler.scale(context);
assertParallelismOverrides(null);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,11 +266,6 @@ public void flush(KubernetesJobAutoScalerContext jobContext) {
configMapStore.flush(jobContext);
}

@Override
public void removeInfoFromCache(ResourceID resourceID) {
configMapStore.removeInfoFromCache(resourceID);
}

@SneakyThrows
protected static String serializeScalingHistory(
Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ protected abstract boolean reconcileSpecChange(

@Override
public DeleteControl cleanup(FlinkResourceContext<CR> ctx) {
autoscaler.cleanup(ResourceID.fromResource(ctx.getResource()));
autoscaler.cleanup(ctx.getJobAutoScalerContext());
return cleanupInternal(ctx);
}

Expand Down
Loading