From 25c7e0415a13362f6252d1a0a49fe1551b6fcf79 Mon Sep 17 00:00:00 2001 From: DifferentSC Date: Mon, 21 May 2018 22:35:09 +0900 Subject: [PATCH 1/4] [MIST-1131] Implement recovery-based scale out --- .../src/main/avro/master_to_task_msg.avpr | 12 ++ .../src/main/avro/task_to_master_msg.avpr | 4 + .../snu/mist/core/master/TaskStatsMap.java | 1 + .../PeriodicDynamicScalingManager.java | 19 ++- .../scaling/RecoveryBasedScaleOutManager.java | 140 ++++++++++++++++++ .../master/lb/scaling/ScaleOutManager.java | 31 ++++ .../rpc/DefaultMasterToTaskMessageImpl.java | 20 ++- .../groupaware/DefaultTaskStatsUpdater.java | 1 + 8 files changed, 224 insertions(+), 4 deletions(-) create mode 100644 mist-core/src/main/java/edu/snu/mist/core/master/lb/scaling/RecoveryBasedScaleOutManager.java create mode 100644 mist-core/src/main/java/edu/snu/mist/core/master/lb/scaling/ScaleOutManager.java diff --git a/mist-common/src/main/avro/master_to_task_msg.avpr b/mist-common/src/main/avro/master_to_task_msg.avpr index cd4c7633..6949b23a 100644 --- a/mist-common/src/main/avro/master_to_task_msg.avpr +++ b/mist-common/src/main/avro/master_to_task_msg.avpr @@ -28,6 +28,18 @@ { "request": [], "response": "null" + }, + "removeGroup": { + "request": [ + { + "name": "removedGroupList", + "type": { + "type": "array", + "items": "string" + } + } + ], + "response": "null" } } } \ No newline at end of file diff --git a/mist-common/src/main/avro/task_to_master_msg.avpr b/mist-common/src/main/avro/task_to_master_msg.avpr index 2a0b2439..4e8b9609 100644 --- a/mist-common/src/main/avro/task_to_master_msg.avpr +++ b/mist-common/src/main/avro/task_to_master_msg.avpr @@ -52,6 +52,10 @@ "name": "TaskLoad", "type": "double" }, + { + "name": "NumEventProcessors", + "type": "int" + }, { "name": "GroupStatsMap", "type": { diff --git a/mist-core/src/main/java/edu/snu/mist/core/master/TaskStatsMap.java b/mist-core/src/main/java/edu/snu/mist/core/master/TaskStatsMap.java index bd0092ff..daa5d16f 100644 --- a/mist-core/src/main/java/edu/snu/mist/core/master/TaskStatsMap.java +++ b/mist-core/src/main/java/edu/snu/mist/core/master/TaskStatsMap.java @@ -59,6 +59,7 @@ public TaskStats addTask(final String taskHostname) { innerList.add(taskHostname); return innerMap.putIfAbsent(taskHostname, TaskStats.newBuilder() .setTaskLoad(0.0) + .setNumEventProcessors(1) .setGroupStatsMap(new HashMap<>()) .build()); } diff --git a/mist-core/src/main/java/edu/snu/mist/core/master/lb/scaling/PeriodicDynamicScalingManager.java b/mist-core/src/main/java/edu/snu/mist/core/master/lb/scaling/PeriodicDynamicScalingManager.java index c3c1665c..6c5d9cfc 100644 --- a/mist-core/src/main/java/edu/snu/mist/core/master/lb/scaling/PeriodicDynamicScalingManager.java +++ b/mist-core/src/main/java/edu/snu/mist/core/master/lb/scaling/PeriodicDynamicScalingManager.java @@ -118,6 +118,11 @@ public final class PeriodicDynamicScalingManager implements DynamicScalingManage */ private final ScaleInManager scaleInManager; + /** + * The scale-out manager. + */ + private final ScaleOutManager scaleOutManager; + @Inject private PeriodicDynamicScalingManager( final TaskStatsMap taskStatsMap, @@ -130,7 +135,8 @@ private PeriodicDynamicScalingManager( @Parameter(ScaleOutGracePeriod.class) final long scaleOutGracePeriod, @Parameter(ScaleInIdleTaskRatio.class) final double scaleInIdleTaskRatio, @Parameter(ScaleOutOverloadedTaskRatio.class) final double scaleOutOverloadedTaskRatio, - final ScaleInManager scaleInManager) { + final ScaleInManager scaleInManager, + final ScaleOutManager scaleOutManager) { this.taskStatsMap = taskStatsMap; this.dynamicScalingPeriod = dynamicScalingPeriod; this.maxTaskNum = maxTaskNum; @@ -146,6 +152,7 @@ private PeriodicDynamicScalingManager( this.lastMeasuredTimestamp = System.currentTimeMillis(); this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); this.scaleInManager = scaleInManager; + this.scaleOutManager = scaleOutManager; } private boolean isClusterOverloaded() { @@ -188,8 +195,13 @@ public void run() { if (clusterOverloaded) { overloadedTimeElapsed += lastMeasuredTimestamp - oldTimeStamp; if (overloadedTimeElapsed > scaleOutGracePeriod && taskStatsMap.getTaskList().size() < maxTaskNum) { - // TODO: [MIST-1130] Perform automatic scale-out. - overloadedTimeElapsed = 0; + try { + scaleOutManager.scaleOut(); + overloadedTimeElapsed = 0; + } catch (final Exception e) { + LOG.log(Level.SEVERE, "An error occured while scaling-out!"); + e.printStackTrace(); + } return; } } else { @@ -224,6 +236,7 @@ public void startAutoScaling() { @Override public void close() throws Exception { scaleInManager.close(); + scaleOutManager.close(); scheduledExecutorService.shutdown(); scheduledExecutorService.awaitTermination(6000, TimeUnit.MILLISECONDS); } diff --git a/mist-core/src/main/java/edu/snu/mist/core/master/lb/scaling/RecoveryBasedScaleOutManager.java b/mist-core/src/main/java/edu/snu/mist/core/master/lb/scaling/RecoveryBasedScaleOutManager.java new file mode 100644 index 00000000..3c661b4f --- /dev/null +++ b/mist-core/src/main/java/edu/snu/mist/core/master/lb/scaling/RecoveryBasedScaleOutManager.java @@ -0,0 +1,140 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.mist.core.master.lb.scaling; + +import edu.snu.mist.core.master.ProxyToTaskMap; +import edu.snu.mist.core.master.TaskRequestor; +import edu.snu.mist.core.master.TaskStatsMap; +import edu.snu.mist.core.master.lb.parameters.OverloadedTaskLoadThreshold; +import edu.snu.mist.core.master.lb.parameters.UnderloadedTaskLoadThreshold; +import edu.snu.mist.core.master.recovery.RecoveryScheduler; +import edu.snu.mist.core.master.recovery.RecoveryStarter; +import edu.snu.mist.core.master.recovery.SingleNodeRecoveryScheduler; +import edu.snu.mist.formats.avro.AllocatedTask; +import edu.snu.mist.formats.avro.GroupStats; +import edu.snu.mist.formats.avro.MasterToTaskMessage; +import edu.snu.mist.formats.avro.TaskStats; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.annotations.Parameter; + +import javax.inject.Inject; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * The scale out manager which is implemented based on recovery process. + */ +public final class RecoveryBasedScaleOutManager implements ScaleOutManager { + + /** + * The shared task stats map. + */ + private final TaskStatsMap taskStatsMap; + + /** + * The shared task to proxy map. + */ + private final ProxyToTaskMap proxyToTaskMap; + + /** + * The shared task requestor to driver. + */ + private final TaskRequestor taskRequestor; + + /** + * The overloaded task load threshold. + */ + private final double overloadedTaskLoadThreshold; + + /** + * The underloaded task load threshold. + */ + private final double underloadedTaskLoadThreshold; + + /** + * The single executor thread for running recovery. + */ + private final ExecutorService singleThreadedExecutorService; + + @Inject + private RecoveryBasedScaleOutManager( + final TaskStatsMap taskStatsMap, + final ProxyToTaskMap proxyToTaskMap, + final TaskRequestor taskRequestor, + @Parameter(UnderloadedTaskLoadThreshold.class) final double underloadedTaskLoadThreshold, + @Parameter(OverloadedTaskLoadThreshold.class) final double overloadedTaskLoadThreshold) { + this.taskStatsMap = taskStatsMap; + this.proxyToTaskMap = proxyToTaskMap; + this.taskRequestor = taskRequestor; + this.underloadedTaskLoadThreshold = underloadedTaskLoadThreshold; + this.overloadedTaskLoadThreshold = overloadedTaskLoadThreshold; + this.singleThreadedExecutorService = Executors.newSingleThreadExecutor(); + } + + @Override + public void scaleOut() throws Exception { + // Step 1: Request a new task and setup the connection. + final AllocatedTask allocatedTask = taskRequestor.setupTaskAndConn(1).iterator().next(); + // Step 2: Get the groups which will be moved from overloaded tasks to the newly allocated groups. + // Our goal is to move the queries from the overloaded tasks to the newly allocated task as much as we can. + final List overloadedTaskList = new ArrayList<>(); + for (final Map.Entry entry: taskStatsMap.entrySet()) { + if (entry.getValue().getTaskLoad() > overloadedTaskLoadThreshold) { + overloadedTaskList.add(entry.getKey()); + } + } + // Calculate the maximum load of the new task. + final double maximumNewTaskLoad = (underloadedTaskLoadThreshold + overloadedTaskLoadThreshold) / 2.; + final double movableGroupLoadPerTask = maximumNewTaskLoad / overloadedTaskList.size(); + final Map movedGroupStatsMap = new HashMap<>(); + // Choose the moved groups from the overloaded tasks. + for (final String overloadedTaskHostname : overloadedTaskList) { + final TaskStats taskStats = taskStatsMap.get(overloadedTaskHostname); + final List movedGroupList = new ArrayList<>(); + final int numEp = taskStats.getNumEventProcessors(); + double movedGroupLoad = 0.; + for (final Map.Entry entry : taskStats.getGroupStatsMap().entrySet()) { + final double effectiveGroupLoad = entry.getValue().getGroupLoad() / numEp; + if (movedGroupLoad + effectiveGroupLoad < movableGroupLoadPerTask) { + movedGroupStatsMap.put(entry.getKey(), entry.getValue()); + movedGroupList.add(entry.getKey()); + movedGroupLoad += effectiveGroupLoad; + } + } + // Remove the stopped groups in the overloaded tasks. + final MasterToTaskMessage proxyToTask = proxyToTaskMap.get(overloadedTaskHostname); + proxyToTask.removeGroup(movedGroupList); + } + // Recover the moved groups into the new task using a single node recovery scheduler. + final Injector injector = Tang.Factory.getTang().newInjector(); + injector.bindVolatileInstance(TaskStatsMap.class, taskStatsMap); + injector.bindVolatileInstance(ProxyToTaskMap.class, proxyToTaskMap); + final RecoveryScheduler recoveryScheduler = injector.getInstance(SingleNodeRecoveryScheduler.class); + singleThreadedExecutorService.submit(new RecoveryStarter(movedGroupStatsMap, recoveryScheduler)); + } + + @Override + public void close() throws Exception { + singleThreadedExecutorService.shutdown(); + singleThreadedExecutorService.awaitTermination(60000, TimeUnit.MILLISECONDS); + } +} diff --git a/mist-core/src/main/java/edu/snu/mist/core/master/lb/scaling/ScaleOutManager.java b/mist-core/src/main/java/edu/snu/mist/core/master/lb/scaling/ScaleOutManager.java new file mode 100644 index 00000000..c8d4ef0d --- /dev/null +++ b/mist-core/src/main/java/edu/snu/mist/core/master/lb/scaling/ScaleOutManager.java @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.mist.core.master.lb.scaling; + +import org.apache.reef.tang.annotations.DefaultImplementation; + +/** + * The interface for scaling out. + */ +@DefaultImplementation(RecoveryBasedScaleOutManager.class) +public interface ScaleOutManager extends AutoCloseable { + + /** + * Perform scale-out according to the given implementation. + * @throws Exception + */ + void scaleOut() throws Exception; +} diff --git a/mist-core/src/main/java/edu/snu/mist/core/rpc/DefaultMasterToTaskMessageImpl.java b/mist-core/src/main/java/edu/snu/mist/core/rpc/DefaultMasterToTaskMessageImpl.java index 63b01cf8..e9d775a7 100644 --- a/mist-core/src/main/java/edu/snu/mist/core/rpc/DefaultMasterToTaskMessageImpl.java +++ b/mist-core/src/main/java/edu/snu/mist/core/rpc/DefaultMasterToTaskMessageImpl.java @@ -15,11 +15,13 @@ */ package edu.snu.mist.core.rpc; +import edu.snu.mist.core.task.checkpointing.CheckpointManager; import edu.snu.mist.core.task.recovery.RecoveryManager; import edu.snu.mist.formats.avro.MasterToTaskMessage; import org.apache.avro.AvroRemoteException; import javax.inject.Inject; +import java.util.List; /** * The default master-to-task message implementation. @@ -31,9 +33,17 @@ public final class DefaultMasterToTaskMessageImpl implements MasterToTaskMessage */ private final RecoveryManager recoveryManager; + /** + * The checkpoint manager. + */ + private final CheckpointManager checkpointManager; + @Inject - private DefaultMasterToTaskMessageImpl(final RecoveryManager recoveryManager) { + private DefaultMasterToTaskMessageImpl( + final RecoveryManager recoveryManager, + final CheckpointManager checkpointManager) { this.recoveryManager = recoveryManager; + this.checkpointManager = checkpointManager; } @Override @@ -41,4 +51,12 @@ public Void startTaskSideRecovery() throws AvroRemoteException { recoveryManager.startRecovery(); return null; } + + @Override + public Void removeGroup(final List removedGroupList) throws AvroRemoteException { + for (final String removedGroupId : removedGroupList) { + checkpointManager.deleteGroup(removedGroupId); + } + return null; + } } \ No newline at end of file diff --git a/mist-core/src/main/java/edu/snu/mist/core/task/groupaware/DefaultTaskStatsUpdater.java b/mist-core/src/main/java/edu/snu/mist/core/task/groupaware/DefaultTaskStatsUpdater.java index 1bf6796f..d6592d22 100644 --- a/mist-core/src/main/java/edu/snu/mist/core/task/groupaware/DefaultTaskStatsUpdater.java +++ b/mist-core/src/main/java/edu/snu/mist/core/task/groupaware/DefaultTaskStatsUpdater.java @@ -86,6 +86,7 @@ public void updateTaskStatsToMaster(final GroupAllocationTable groupAllocationTa proxyToMaster.updateTaskStats(taskHostname, TaskStats.newBuilder() .setTaskLoad(taskCpuLoad) + .setNumEventProcessors(numEventProcessors) .setGroupStatsMap(groupStatsMap) .build()); } From 4b64010aa0b8c6c29c2a6abe6fe70c21e2be5c26 Mon Sep 17 00:00:00 2001 From: DifferentSC Date: Mon, 2 Jul 2018 18:30:31 +0900 Subject: [PATCH 2/4] Fix compilation error --- .../scaling/RecoveryBasedScaleOutManager.java | 3 +-- .../core/master/recovery/RecoveryStarter.java | 25 +++++++++++-------- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/mist-core/src/main/java/edu/snu/mist/core/master/lb/scaling/RecoveryBasedScaleOutManager.java b/mist-core/src/main/java/edu/snu/mist/core/master/lb/scaling/RecoveryBasedScaleOutManager.java index 3c661b4f..54a0b1d3 100644 --- a/mist-core/src/main/java/edu/snu/mist/core/master/lb/scaling/RecoveryBasedScaleOutManager.java +++ b/mist-core/src/main/java/edu/snu/mist/core/master/lb/scaling/RecoveryBasedScaleOutManager.java @@ -23,7 +23,6 @@ import edu.snu.mist.core.master.recovery.RecoveryScheduler; import edu.snu.mist.core.master.recovery.RecoveryStarter; import edu.snu.mist.core.master.recovery.SingleNodeRecoveryScheduler; -import edu.snu.mist.formats.avro.AllocatedTask; import edu.snu.mist.formats.avro.GroupStats; import edu.snu.mist.formats.avro.MasterToTaskMessage; import edu.snu.mist.formats.avro.TaskStats; @@ -93,7 +92,7 @@ private RecoveryBasedScaleOutManager( @Override public void scaleOut() throws Exception { // Step 1: Request a new task and setup the connection. - final AllocatedTask allocatedTask = taskRequestor.setupTaskAndConn(1).iterator().next(); + taskRequestor.setupTaskAndConn(1); // Step 2: Get the groups which will be moved from overloaded tasks to the newly allocated groups. // Our goal is to move the queries from the overloaded tasks to the newly allocated task as much as we can. final List overloadedTaskList = new ArrayList<>(); diff --git a/mist-core/src/main/java/edu/snu/mist/core/master/recovery/RecoveryStarter.java b/mist-core/src/main/java/edu/snu/mist/core/master/recovery/RecoveryStarter.java index 07e7edb1..a4971c3d 100644 --- a/mist-core/src/main/java/edu/snu/mist/core/master/recovery/RecoveryStarter.java +++ b/mist-core/src/main/java/edu/snu/mist/core/master/recovery/RecoveryStarter.java @@ -17,6 +17,7 @@ import edu.snu.mist.core.master.TaskRequestor; import edu.snu.mist.formats.avro.GroupStats; +import org.apache.avro.AvroRemoteException; import java.util.Map; import java.util.logging.Level; @@ -60,16 +61,20 @@ public RecoveryStarter(final Map failedGroupMap, @Override public void run() { - if (taskRequestor != null) { - // Request an evaluator for recovery task if recoveryScheduler is not null. - // If recovery scheduler is null, it does not request a new task. - taskRequestor.setupTaskAndConn(1); + try { + if (taskRequestor != null) { + // Request an evaluator for recovery task if recoveryScheduler is not null. + // If recovery scheduler is null, it does not request a new task. + taskRequestor.setupTaskAndConn(1); + } + // Start recovering of the queries... + recoveryScheduler.startRecovery(failedGroupMap); + final long startTime = System.currentTimeMillis(); + // Blocks the thread until the recovery has been finished... + recoveryScheduler.awaitUntilRecoveryFinish(); + LOG.log(Level.INFO, "Recovery is finished in {0} ms...", System.currentTimeMillis() - startTime); + } catch (final AvroRemoteException | InterruptedException e) { + e.printStackTrace(); } - // Start recovering of the queries... - recoveryScheduler.startRecovery(failedGroupMap); - final long startTime = System.currentTimeMillis(); - // Blocks the thread until the recovery has been finished... - recoveryScheduler.awaitUntilRecoveryFinish(); - LOG.log(Level.INFO, "Recovery is finished in {0} ms...", System.currentTimeMillis() - startTime); } } \ No newline at end of file From 36edb6dccfcee3d672b2ee3647fc8085f43a7023 Mon Sep 17 00:00:00 2001 From: DifferentSC Date: Thu, 12 Jul 2018 12:33:10 +0900 Subject: [PATCH 3/4] Remove RecoveryStarter --- .../scaling/RecoveryBasedScaleOutManager.java | 4 +- .../core/master/recovery/RecoveryStarter.java | 80 ------------------- 2 files changed, 2 insertions(+), 82 deletions(-) delete mode 100644 mist-core/src/main/java/edu/snu/mist/core/master/recovery/RecoveryStarter.java diff --git a/mist-core/src/main/java/edu/snu/mist/core/master/lb/scaling/RecoveryBasedScaleOutManager.java b/mist-core/src/main/java/edu/snu/mist/core/master/lb/scaling/RecoveryBasedScaleOutManager.java index 54a0b1d3..ecfe0765 100644 --- a/mist-core/src/main/java/edu/snu/mist/core/master/lb/scaling/RecoveryBasedScaleOutManager.java +++ b/mist-core/src/main/java/edu/snu/mist/core/master/lb/scaling/RecoveryBasedScaleOutManager.java @@ -21,7 +21,6 @@ import edu.snu.mist.core.master.lb.parameters.OverloadedTaskLoadThreshold; import edu.snu.mist.core.master.lb.parameters.UnderloadedTaskLoadThreshold; import edu.snu.mist.core.master.recovery.RecoveryScheduler; -import edu.snu.mist.core.master.recovery.RecoveryStarter; import edu.snu.mist.core.master.recovery.SingleNodeRecoveryScheduler; import edu.snu.mist.formats.avro.GroupStats; import edu.snu.mist.formats.avro.MasterToTaskMessage; @@ -128,7 +127,8 @@ public void scaleOut() throws Exception { injector.bindVolatileInstance(TaskStatsMap.class, taskStatsMap); injector.bindVolatileInstance(ProxyToTaskMap.class, proxyToTaskMap); final RecoveryScheduler recoveryScheduler = injector.getInstance(SingleNodeRecoveryScheduler.class); - singleThreadedExecutorService.submit(new RecoveryStarter(movedGroupStatsMap, recoveryScheduler)); + // Start the recovery process. + recoveryScheduler.recover(movedGroupStatsMap); } @Override diff --git a/mist-core/src/main/java/edu/snu/mist/core/master/recovery/RecoveryStarter.java b/mist-core/src/main/java/edu/snu/mist/core/master/recovery/RecoveryStarter.java deleted file mode 100644 index a4971c3d..00000000 --- a/mist-core/src/main/java/edu/snu/mist/core/master/recovery/RecoveryStarter.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright (C) 2018 Seoul National University - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package edu.snu.mist.core.master.recovery; - -import edu.snu.mist.core.master.TaskRequestor; -import edu.snu.mist.formats.avro.GroupStats; -import org.apache.avro.AvroRemoteException; - -import java.util.Map; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * The runnable class for asynchronous task reallocation & query recovery. - */ -public final class RecoveryStarter implements Runnable { - - private static final Logger LOG = Logger.getLogger(RecoveryStarter.class.getName()); - - /** - * The failed group map. - */ - private final Map failedGroupMap; - - /** - * The task requestor. - */ - private final TaskRequestor taskRequestor; - - /** - * The recovery scheduler. - */ - private final RecoveryScheduler recoveryScheduler; - - public RecoveryStarter(final Map failedGroupMap, - final RecoveryScheduler recoveryScheduler, - final TaskRequestor taskRequestor) { - this.failedGroupMap = failedGroupMap; - this.taskRequestor = taskRequestor; - this.recoveryScheduler = recoveryScheduler; - } - - // This constructor is used when the recovery without task requesting is necessary. - public RecoveryStarter(final Map failedGroupMap, - final RecoveryScheduler recoveryScheduler) { - this(failedGroupMap, recoveryScheduler, null); - } - - @Override - public void run() { - try { - if (taskRequestor != null) { - // Request an evaluator for recovery task if recoveryScheduler is not null. - // If recovery scheduler is null, it does not request a new task. - taskRequestor.setupTaskAndConn(1); - } - // Start recovering of the queries... - recoveryScheduler.startRecovery(failedGroupMap); - final long startTime = System.currentTimeMillis(); - // Blocks the thread until the recovery has been finished... - recoveryScheduler.awaitUntilRecoveryFinish(); - LOG.log(Level.INFO, "Recovery is finished in {0} ms...", System.currentTimeMillis() - startTime); - } catch (final AvroRemoteException | InterruptedException e) { - e.printStackTrace(); - } - } -} \ No newline at end of file From c3a011b6b485aa75cf7b36d197f6cfb144444e3a Mon Sep 17 00:00:00 2001 From: DifferentSC Date: Thu, 12 Jul 2018 15:47:27 +0900 Subject: [PATCH 4/4] Update app task list map when scaling-out --- .../mist/core/master/lb/AppTaskListMap.java | 7 +++ .../scaling/RecoveryBasedScaleOutManager.java | 56 +++++++++++++++---- 2 files changed, 53 insertions(+), 10 deletions(-) diff --git a/mist-core/src/main/java/edu/snu/mist/core/master/lb/AppTaskListMap.java b/mist-core/src/main/java/edu/snu/mist/core/master/lb/AppTaskListMap.java index dfc2784c..e3b48378 100644 --- a/mist-core/src/main/java/edu/snu/mist/core/master/lb/AppTaskListMap.java +++ b/mist-core/src/main/java/edu/snu/mist/core/master/lb/AppTaskListMap.java @@ -44,6 +44,13 @@ public synchronized void removeTask(final String removedTaskId) { } } + public synchronized void removeAppFromTask(final String removedTaskId, final String appId) { + final List taskIdList = innerMap.get(appId); + if (taskIdList != null) { + taskIdList.removeIf(taskId -> taskId.equals(removedTaskId)); + } + } + public synchronized void addTaskToApp(final String appId, final String taskId) { if (!innerMap.containsKey(appId)) { innerMap.putIfAbsent(appId, new ArrayList<>()); diff --git a/mist-core/src/main/java/edu/snu/mist/core/master/lb/scaling/RecoveryBasedScaleOutManager.java b/mist-core/src/main/java/edu/snu/mist/core/master/lb/scaling/RecoveryBasedScaleOutManager.java index ecfe0765..e5eb1136 100644 --- a/mist-core/src/main/java/edu/snu/mist/core/master/lb/scaling/RecoveryBasedScaleOutManager.java +++ b/mist-core/src/main/java/edu/snu/mist/core/master/lb/scaling/RecoveryBasedScaleOutManager.java @@ -18,6 +18,7 @@ import edu.snu.mist.core.master.ProxyToTaskMap; import edu.snu.mist.core.master.TaskRequestor; import edu.snu.mist.core.master.TaskStatsMap; +import edu.snu.mist.core.master.lb.AppTaskListMap; import edu.snu.mist.core.master.lb.parameters.OverloadedTaskLoadThreshold; import edu.snu.mist.core.master.lb.parameters.UnderloadedTaskLoadThreshold; import edu.snu.mist.core.master.recovery.RecoveryScheduler; @@ -53,6 +54,11 @@ public final class RecoveryBasedScaleOutManager implements ScaleOutManager { */ private final ProxyToTaskMap proxyToTaskMap; + /** + * The app task list map. + */ + private final AppTaskListMap appTaskListMap; + /** * The shared task requestor to driver. */ @@ -77,11 +83,13 @@ public final class RecoveryBasedScaleOutManager implements ScaleOutManager { private RecoveryBasedScaleOutManager( final TaskStatsMap taskStatsMap, final ProxyToTaskMap proxyToTaskMap, + final AppTaskListMap appTaskListMap, final TaskRequestor taskRequestor, @Parameter(UnderloadedTaskLoadThreshold.class) final double underloadedTaskLoadThreshold, @Parameter(OverloadedTaskLoadThreshold.class) final double overloadedTaskLoadThreshold) { this.taskStatsMap = taskStatsMap; this.proxyToTaskMap = proxyToTaskMap; + this.appTaskListMap = appTaskListMap; this.taskRequestor = taskRequestor; this.underloadedTaskLoadThreshold = underloadedTaskLoadThreshold; this.overloadedTaskLoadThreshold = overloadedTaskLoadThreshold; @@ -93,7 +101,8 @@ public void scaleOut() throws Exception { // Step 1: Request a new task and setup the connection. taskRequestor.setupTaskAndConn(1); // Step 2: Get the groups which will be moved from overloaded tasks to the newly allocated groups. - // Our goal is to move the queries from the overloaded tasks to the newly allocated task as much as we can. + // Our goal is to move the queries from the overloaded tasks to the newly allocated task as much as we can + // without making the new task overloaded. final List overloadedTaskList = new ArrayList<>(); for (final Map.Entry entry: taskStatsMap.entrySet()) { if (entry.getValue().getTaskLoad() > overloadedTaskLoadThreshold) { @@ -105,21 +114,48 @@ public void scaleOut() throws Exception { final double movableGroupLoadPerTask = maximumNewTaskLoad / overloadedTaskList.size(); final Map movedGroupStatsMap = new HashMap<>(); // Choose the moved groups from the overloaded tasks. - for (final String overloadedTaskHostname : overloadedTaskList) { - final TaskStats taskStats = taskStatsMap.get(overloadedTaskHostname); + for (final String overloadedTaskId : overloadedTaskList) { + final TaskStats taskStats = taskStatsMap.get(overloadedTaskId); + final Map groupStatsMap = taskStats.getGroupStatsMap(); + + // Organize the group stats according to the applications. + final Map> appGroupStatsMap = new HashMap<>(); + for (final Map.Entry entry : groupStatsMap.entrySet()) { + final String appId = entry.getValue().getAppId(); + if (!appGroupStatsMap.containsKey(appId)) { + appGroupStatsMap.put(appId, new ArrayList<>()); + } + appGroupStatsMap.get(appId).add(entry.getValue()); + } + + // Select the moved group. final List movedGroupList = new ArrayList<>(); final int numEp = taskStats.getNumEventProcessors(); double movedGroupLoad = 0.; - for (final Map.Entry entry : taskStats.getGroupStatsMap().entrySet()) { - final double effectiveGroupLoad = entry.getValue().getGroupLoad() / numEp; - if (movedGroupLoad + effectiveGroupLoad < movableGroupLoadPerTask) { - movedGroupStatsMap.put(entry.getKey(), entry.getValue()); - movedGroupList.add(entry.getKey()); - movedGroupLoad += effectiveGroupLoad; + for (final Map.Entry> entry : appGroupStatsMap.entrySet()) { + final String appId = entry.getKey(); + final List groupStatsList = entry.getValue(); + boolean isAppCompletelyMoved = true; + for (final GroupStats groupStats : groupStatsList) { + final double effectiveGroupLoad = groupStats.getGroupLoad() / numEp; + if (movedGroupLoad + effectiveGroupLoad < movableGroupLoadPerTask) { + movedGroupStatsMap.put(groupStats.getGroupId(), groupStats); + movedGroupList.add(groupStats.getGroupId()); + movedGroupLoad += effectiveGroupLoad; + } else { + isAppCompletelyMoved = false; + } + } + if (isAppCompletelyMoved) { + // Update the AppTaskListMap if all the groups in the app are removed. + appTaskListMap.removeAppFromTask(overloadedTaskId, appId); + } else { + // No more groups to move. + break; } } // Remove the stopped groups in the overloaded tasks. - final MasterToTaskMessage proxyToTask = proxyToTaskMap.get(overloadedTaskHostname); + final MasterToTaskMessage proxyToTask = proxyToTaskMap.get(overloadedTaskId); proxyToTask.removeGroup(movedGroupList); } // Recover the moved groups into the new task using a single node recovery scheduler.