diff --git a/mist-common/src/main/avro/master_to_task_msg.avpr b/mist-common/src/main/avro/master_to_task_msg.avpr index cd4c7633..6949b23a 100644 --- a/mist-common/src/main/avro/master_to_task_msg.avpr +++ b/mist-common/src/main/avro/master_to_task_msg.avpr @@ -28,6 +28,18 @@ { "request": [], "response": "null" + }, + "removeGroup": { + "request": [ + { + "name": "removedGroupList", + "type": { + "type": "array", + "items": "string" + } + } + ], + "response": "null" } } } \ No newline at end of file diff --git a/mist-common/src/main/avro/task_to_master_msg.avpr b/mist-common/src/main/avro/task_to_master_msg.avpr index 315bdcc1..608fe477 100644 --- a/mist-common/src/main/avro/task_to_master_msg.avpr +++ b/mist-common/src/main/avro/task_to_master_msg.avpr @@ -74,6 +74,10 @@ "name": "TaskLoad", "type": "double" }, + { + "name": "NumEventProcessors", + "type": "int" + }, { "name": "GroupStatsMap", "type": { diff --git a/mist-core/src/main/java/edu/snu/mist/core/master/TaskStatsMap.java b/mist-core/src/main/java/edu/snu/mist/core/master/TaskStatsMap.java index 00b6d25e..ee32b22c 100644 --- a/mist-core/src/main/java/edu/snu/mist/core/master/TaskStatsMap.java +++ b/mist-core/src/main/java/edu/snu/mist/core/master/TaskStatsMap.java @@ -68,6 +68,7 @@ public TaskStats addTask(final String taskId) { innerList.add(taskId); return innerMap.putIfAbsent(taskId, TaskStats.newBuilder() .setTaskLoad(0.0) + .setNumEventProcessors(1) .setGroupStatsMap(new HashMap<>()) .build()); } diff --git a/mist-core/src/main/java/edu/snu/mist/core/master/lb/AppTaskListMap.java b/mist-core/src/main/java/edu/snu/mist/core/master/lb/AppTaskListMap.java index dfc2784c..e3b48378 100644 --- a/mist-core/src/main/java/edu/snu/mist/core/master/lb/AppTaskListMap.java +++ b/mist-core/src/main/java/edu/snu/mist/core/master/lb/AppTaskListMap.java @@ -44,6 +44,13 @@ public synchronized void removeTask(final String removedTaskId) { } } + public synchronized void removeAppFromTask(final String removedTaskId, final String appId) { + final List taskIdList = innerMap.get(appId); + if (taskIdList != null) { + taskIdList.removeIf(taskId -> taskId.equals(removedTaskId)); + } + } + public synchronized void addTaskToApp(final String appId, final String taskId) { if (!innerMap.containsKey(appId)) { innerMap.putIfAbsent(appId, new ArrayList<>()); diff --git a/mist-core/src/main/java/edu/snu/mist/core/master/lb/scaling/PeriodicDynamicScalingManager.java b/mist-core/src/main/java/edu/snu/mist/core/master/lb/scaling/PeriodicDynamicScalingManager.java index 8588af2c..006bb9f2 100644 --- a/mist-core/src/main/java/edu/snu/mist/core/master/lb/scaling/PeriodicDynamicScalingManager.java +++ b/mist-core/src/main/java/edu/snu/mist/core/master/lb/scaling/PeriodicDynamicScalingManager.java @@ -120,6 +120,11 @@ public final class PeriodicDynamicScalingManager implements DynamicScalingManage */ private final ScaleInManager scaleInManager; + /** + * The scale-out manager. + */ + private final ScaleOutManager scaleOutManager; + /** * The shared lock for synchronizing recovery process. */ @@ -143,6 +148,7 @@ private PeriodicDynamicScalingManager( @Parameter(ScaleInIdleTaskRatio.class) final double scaleInIdleTaskRatio, @Parameter(ScaleOutOverloadedTaskRatio.class) final double scaleOutOverloadedTaskRatio, final ScaleInManager scaleInManager, + final ScaleOutManager scaleOutManager, final RecoveryLock recoveryLock, final TaskInfoRWLock taskInfoRWLock) { this.taskStatsMap = taskStatsMap; @@ -160,6 +166,7 @@ private PeriodicDynamicScalingManager( this.lastMeasuredTimestamp = System.currentTimeMillis(); this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); this.scaleInManager = scaleInManager; + this.scaleOutManager = scaleOutManager; this.recoveryLock = recoveryLock; this.taskInfoRWLock = taskInfoRWLock; } @@ -211,7 +218,7 @@ public void run() { try { // Release the lock. taskInfoRWLock.readLock().unlock(); - // TODO: [MIST-1130] Perform automatic scale-out. + scaleOutManager.scaleOut(); overloadedTimeElapsed = 0; } catch (final Exception e) { e.printStackTrace(); @@ -259,6 +266,7 @@ public void startAutoScaling() { @Override public void close() throws Exception { scaleInManager.close(); + scaleOutManager.close(); scheduledExecutorService.shutdown(); scheduledExecutorService.awaitTermination(6000, TimeUnit.MILLISECONDS); } diff --git a/mist-core/src/main/java/edu/snu/mist/core/master/lb/scaling/RecoveryBasedScaleOutManager.java b/mist-core/src/main/java/edu/snu/mist/core/master/lb/scaling/RecoveryBasedScaleOutManager.java new file mode 100644 index 00000000..e5eb1136 --- /dev/null +++ b/mist-core/src/main/java/edu/snu/mist/core/master/lb/scaling/RecoveryBasedScaleOutManager.java @@ -0,0 +1,175 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.mist.core.master.lb.scaling; + +import edu.snu.mist.core.master.ProxyToTaskMap; +import edu.snu.mist.core.master.TaskRequestor; +import edu.snu.mist.core.master.TaskStatsMap; +import edu.snu.mist.core.master.lb.AppTaskListMap; +import edu.snu.mist.core.master.lb.parameters.OverloadedTaskLoadThreshold; +import edu.snu.mist.core.master.lb.parameters.UnderloadedTaskLoadThreshold; +import edu.snu.mist.core.master.recovery.RecoveryScheduler; +import edu.snu.mist.core.master.recovery.SingleNodeRecoveryScheduler; +import edu.snu.mist.formats.avro.GroupStats; +import edu.snu.mist.formats.avro.MasterToTaskMessage; +import edu.snu.mist.formats.avro.TaskStats; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.annotations.Parameter; + +import javax.inject.Inject; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * The scale out manager which is implemented based on recovery process. + */ +public final class RecoveryBasedScaleOutManager implements ScaleOutManager { + + /** + * The shared task stats map. + */ + private final TaskStatsMap taskStatsMap; + + /** + * The shared task to proxy map. + */ + private final ProxyToTaskMap proxyToTaskMap; + + /** + * The app task list map. + */ + private final AppTaskListMap appTaskListMap; + + /** + * The shared task requestor to driver. + */ + private final TaskRequestor taskRequestor; + + /** + * The overloaded task load threshold. + */ + private final double overloadedTaskLoadThreshold; + + /** + * The underloaded task load threshold. + */ + private final double underloadedTaskLoadThreshold; + + /** + * The single executor thread for running recovery. + */ + private final ExecutorService singleThreadedExecutorService; + + @Inject + private RecoveryBasedScaleOutManager( + final TaskStatsMap taskStatsMap, + final ProxyToTaskMap proxyToTaskMap, + final AppTaskListMap appTaskListMap, + final TaskRequestor taskRequestor, + @Parameter(UnderloadedTaskLoadThreshold.class) final double underloadedTaskLoadThreshold, + @Parameter(OverloadedTaskLoadThreshold.class) final double overloadedTaskLoadThreshold) { + this.taskStatsMap = taskStatsMap; + this.proxyToTaskMap = proxyToTaskMap; + this.appTaskListMap = appTaskListMap; + this.taskRequestor = taskRequestor; + this.underloadedTaskLoadThreshold = underloadedTaskLoadThreshold; + this.overloadedTaskLoadThreshold = overloadedTaskLoadThreshold; + this.singleThreadedExecutorService = Executors.newSingleThreadExecutor(); + } + + @Override + public void scaleOut() throws Exception { + // Step 1: Request a new task and setup the connection. + taskRequestor.setupTaskAndConn(1); + // Step 2: Get the groups which will be moved from overloaded tasks to the newly allocated groups. + // Our goal is to move the queries from the overloaded tasks to the newly allocated task as much as we can + // without making the new task overloaded. + final List overloadedTaskList = new ArrayList<>(); + for (final Map.Entry entry: taskStatsMap.entrySet()) { + if (entry.getValue().getTaskLoad() > overloadedTaskLoadThreshold) { + overloadedTaskList.add(entry.getKey()); + } + } + // Calculate the maximum load of the new task. + final double maximumNewTaskLoad = (underloadedTaskLoadThreshold + overloadedTaskLoadThreshold) / 2.; + final double movableGroupLoadPerTask = maximumNewTaskLoad / overloadedTaskList.size(); + final Map movedGroupStatsMap = new HashMap<>(); + // Choose the moved groups from the overloaded tasks. + for (final String overloadedTaskId : overloadedTaskList) { + final TaskStats taskStats = taskStatsMap.get(overloadedTaskId); + final Map groupStatsMap = taskStats.getGroupStatsMap(); + + // Organize the group stats according to the applications. + final Map> appGroupStatsMap = new HashMap<>(); + for (final Map.Entry entry : groupStatsMap.entrySet()) { + final String appId = entry.getValue().getAppId(); + if (!appGroupStatsMap.containsKey(appId)) { + appGroupStatsMap.put(appId, new ArrayList<>()); + } + appGroupStatsMap.get(appId).add(entry.getValue()); + } + + // Select the moved group. + final List movedGroupList = new ArrayList<>(); + final int numEp = taskStats.getNumEventProcessors(); + double movedGroupLoad = 0.; + for (final Map.Entry> entry : appGroupStatsMap.entrySet()) { + final String appId = entry.getKey(); + final List groupStatsList = entry.getValue(); + boolean isAppCompletelyMoved = true; + for (final GroupStats groupStats : groupStatsList) { + final double effectiveGroupLoad = groupStats.getGroupLoad() / numEp; + if (movedGroupLoad + effectiveGroupLoad < movableGroupLoadPerTask) { + movedGroupStatsMap.put(groupStats.getGroupId(), groupStats); + movedGroupList.add(groupStats.getGroupId()); + movedGroupLoad += effectiveGroupLoad; + } else { + isAppCompletelyMoved = false; + } + } + if (isAppCompletelyMoved) { + // Update the AppTaskListMap if all the groups in the app are removed. + appTaskListMap.removeAppFromTask(overloadedTaskId, appId); + } else { + // No more groups to move. + break; + } + } + // Remove the stopped groups in the overloaded tasks. + final MasterToTaskMessage proxyToTask = proxyToTaskMap.get(overloadedTaskId); + proxyToTask.removeGroup(movedGroupList); + } + // Recover the moved groups into the new task using a single node recovery scheduler. + final Injector injector = Tang.Factory.getTang().newInjector(); + injector.bindVolatileInstance(TaskStatsMap.class, taskStatsMap); + injector.bindVolatileInstance(ProxyToTaskMap.class, proxyToTaskMap); + final RecoveryScheduler recoveryScheduler = injector.getInstance(SingleNodeRecoveryScheduler.class); + // Start the recovery process. + recoveryScheduler.recover(movedGroupStatsMap); + } + + @Override + public void close() throws Exception { + singleThreadedExecutorService.shutdown(); + singleThreadedExecutorService.awaitTermination(60000, TimeUnit.MILLISECONDS); + } +} diff --git a/mist-core/src/main/java/edu/snu/mist/core/master/lb/scaling/ScaleOutManager.java b/mist-core/src/main/java/edu/snu/mist/core/master/lb/scaling/ScaleOutManager.java new file mode 100644 index 00000000..c8d4ef0d --- /dev/null +++ b/mist-core/src/main/java/edu/snu/mist/core/master/lb/scaling/ScaleOutManager.java @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.mist.core.master.lb.scaling; + +import org.apache.reef.tang.annotations.DefaultImplementation; + +/** + * The interface for scaling out. + */ +@DefaultImplementation(RecoveryBasedScaleOutManager.class) +public interface ScaleOutManager extends AutoCloseable { + + /** + * Perform scale-out according to the given implementation. + * @throws Exception + */ + void scaleOut() throws Exception; +} diff --git a/mist-core/src/main/java/edu/snu/mist/core/rpc/DefaultMasterToTaskMessageImpl.java b/mist-core/src/main/java/edu/snu/mist/core/rpc/DefaultMasterToTaskMessageImpl.java index 63b01cf8..e9d775a7 100644 --- a/mist-core/src/main/java/edu/snu/mist/core/rpc/DefaultMasterToTaskMessageImpl.java +++ b/mist-core/src/main/java/edu/snu/mist/core/rpc/DefaultMasterToTaskMessageImpl.java @@ -15,11 +15,13 @@ */ package edu.snu.mist.core.rpc; +import edu.snu.mist.core.task.checkpointing.CheckpointManager; import edu.snu.mist.core.task.recovery.RecoveryManager; import edu.snu.mist.formats.avro.MasterToTaskMessage; import org.apache.avro.AvroRemoteException; import javax.inject.Inject; +import java.util.List; /** * The default master-to-task message implementation. @@ -31,9 +33,17 @@ public final class DefaultMasterToTaskMessageImpl implements MasterToTaskMessage */ private final RecoveryManager recoveryManager; + /** + * The checkpoint manager. + */ + private final CheckpointManager checkpointManager; + @Inject - private DefaultMasterToTaskMessageImpl(final RecoveryManager recoveryManager) { + private DefaultMasterToTaskMessageImpl( + final RecoveryManager recoveryManager, + final CheckpointManager checkpointManager) { this.recoveryManager = recoveryManager; + this.checkpointManager = checkpointManager; } @Override @@ -41,4 +51,12 @@ public Void startTaskSideRecovery() throws AvroRemoteException { recoveryManager.startRecovery(); return null; } + + @Override + public Void removeGroup(final List removedGroupList) throws AvroRemoteException { + for (final String removedGroupId : removedGroupList) { + checkpointManager.deleteGroup(removedGroupId); + } + return null; + } } \ No newline at end of file diff --git a/mist-core/src/main/java/edu/snu/mist/core/task/groupaware/DefaultTaskStatsUpdater.java b/mist-core/src/main/java/edu/snu/mist/core/task/groupaware/DefaultTaskStatsUpdater.java index 36d8bf95..e29daa03 100644 --- a/mist-core/src/main/java/edu/snu/mist/core/task/groupaware/DefaultTaskStatsUpdater.java +++ b/mist-core/src/main/java/edu/snu/mist/core/task/groupaware/DefaultTaskStatsUpdater.java @@ -86,6 +86,7 @@ public void updateTaskStatsToMaster(final GroupAllocationTable groupAllocationTa proxyToMaster.updateTaskStats(taskId, TaskStats.newBuilder() .setTaskLoad(taskCpuLoad) + .setNumEventProcessors(numEventProcessors) .setGroupStatsMap(groupStatsMap) .build()); }