Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions mist-common/src/main/avro/master_to_task_msg.avpr
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@
{
"request": [],
"response": "null"
},
"removeGroup": {
"request": [
{
"name": "removedGroupList",
"type": {
"type": "array",
"items": "string"
}
}
],
"response": "null"
}
}
}
4 changes: 4 additions & 0 deletions mist-common/src/main/avro/task_to_master_msg.avpr
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@
"name": "TaskLoad",
"type": "double"
},
{
"name": "NumEventProcessors",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this information?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we send the group load info from Task by dividing the load by the number of event processors?

"type": "int"
},
{
"name": "GroupStatsMap",
"type": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public TaskStats addTask(final String taskId) {
innerList.add(taskId);
return innerMap.putIfAbsent(taskId, TaskStats.newBuilder()
.setTaskLoad(0.0)
.setNumEventProcessors(1)
.setGroupStatsMap(new HashMap<>())
.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ public synchronized void removeTask(final String removedTaskId) {
}
}

public synchronized void removeAppFromTask(final String removedTaskId, final String appId) {
final List<String> taskIdList = innerMap.get(appId);
if (taskIdList != null) {
taskIdList.removeIf(taskId -> taskId.equals(removedTaskId));
}
}

public synchronized void addTaskToApp(final String appId, final String taskId) {
if (!innerMap.containsKey(appId)) {
innerMap.putIfAbsent(appId, new ArrayList<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ public final class PeriodicDynamicScalingManager implements DynamicScalingManage
*/
private final ScaleInManager scaleInManager;

/**
* The scale-out manager.
*/
private final ScaleOutManager scaleOutManager;

/**
* The shared lock for synchronizing recovery process.
*/
Expand All @@ -143,6 +148,7 @@ private PeriodicDynamicScalingManager(
@Parameter(ScaleInIdleTaskRatio.class) final double scaleInIdleTaskRatio,
@Parameter(ScaleOutOverloadedTaskRatio.class) final double scaleOutOverloadedTaskRatio,
final ScaleInManager scaleInManager,
final ScaleOutManager scaleOutManager,
final RecoveryLock recoveryLock,
final TaskInfoRWLock taskInfoRWLock) {
this.taskStatsMap = taskStatsMap;
Expand All @@ -160,6 +166,7 @@ private PeriodicDynamicScalingManager(
this.lastMeasuredTimestamp = System.currentTimeMillis();
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
this.scaleInManager = scaleInManager;
this.scaleOutManager = scaleOutManager;
this.recoveryLock = recoveryLock;
this.taskInfoRWLock = taskInfoRWLock;
}
Expand Down Expand Up @@ -211,7 +218,7 @@ public void run() {
try {
// Release the lock.
taskInfoRWLock.readLock().unlock();
// TODO: [MIST-1130] Perform automatic scale-out.
scaleOutManager.scaleOut();
overloadedTimeElapsed = 0;
} catch (final Exception e) {
e.printStackTrace();
Expand Down Expand Up @@ -259,6 +266,7 @@ public void startAutoScaling() {
@Override
public void close() throws Exception {
scaleInManager.close();
scaleOutManager.close();
scheduledExecutorService.shutdown();
scheduledExecutorService.awaitTermination(6000, TimeUnit.MILLISECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Copyright (C) 2018 Seoul National University
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.snu.mist.core.master.lb.scaling;

import edu.snu.mist.core.master.ProxyToTaskMap;
import edu.snu.mist.core.master.TaskRequestor;
import edu.snu.mist.core.master.TaskStatsMap;
import edu.snu.mist.core.master.lb.AppTaskListMap;
import edu.snu.mist.core.master.lb.parameters.OverloadedTaskLoadThreshold;
import edu.snu.mist.core.master.lb.parameters.UnderloadedTaskLoadThreshold;
import edu.snu.mist.core.master.recovery.RecoveryScheduler;
import edu.snu.mist.core.master.recovery.SingleNodeRecoveryScheduler;
import edu.snu.mist.formats.avro.GroupStats;
import edu.snu.mist.formats.avro.MasterToTaskMessage;
import edu.snu.mist.formats.avro.TaskStats;
import org.apache.reef.tang.Injector;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.annotations.Parameter;

import javax.inject.Inject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* The scale out manager which is implemented based on recovery process.
*/
public final class RecoveryBasedScaleOutManager implements ScaleOutManager {

/**
* The shared task stats map.
*/
private final TaskStatsMap taskStatsMap;

/**
* The shared task to proxy map.
*/
private final ProxyToTaskMap proxyToTaskMap;

/**
* The app task list map.
*/
private final AppTaskListMap appTaskListMap;

/**
* The shared task requestor to driver.
*/
private final TaskRequestor taskRequestor;

/**
* The overloaded task load threshold.
*/
private final double overloadedTaskLoadThreshold;

/**
* The underloaded task load threshold.
*/
private final double underloadedTaskLoadThreshold;

/**
* The single executor thread for running recovery.
*/
private final ExecutorService singleThreadedExecutorService;

@Inject
private RecoveryBasedScaleOutManager(
final TaskStatsMap taskStatsMap,
final ProxyToTaskMap proxyToTaskMap,
final AppTaskListMap appTaskListMap,
final TaskRequestor taskRequestor,
@Parameter(UnderloadedTaskLoadThreshold.class) final double underloadedTaskLoadThreshold,
@Parameter(OverloadedTaskLoadThreshold.class) final double overloadedTaskLoadThreshold) {
this.taskStatsMap = taskStatsMap;
this.proxyToTaskMap = proxyToTaskMap;
this.appTaskListMap = appTaskListMap;
this.taskRequestor = taskRequestor;
this.underloadedTaskLoadThreshold = underloadedTaskLoadThreshold;
this.overloadedTaskLoadThreshold = overloadedTaskLoadThreshold;
this.singleThreadedExecutorService = Executors.newSingleThreadExecutor();
}

@Override
public void scaleOut() throws Exception {
// Step 1: Request a new task and setup the connection.
taskRequestor.setupTaskAndConn(1);
// Step 2: Get the groups which will be moved from overloaded tasks to the newly allocated groups.
// Our goal is to move the queries from the overloaded tasks to the newly allocated task as much as we can
// without making the new task overloaded.
final List<String> overloadedTaskList = new ArrayList<>();
for (final Map.Entry<String, TaskStats> entry: taskStatsMap.entrySet()) {
if (entry.getValue().getTaskLoad() > overloadedTaskLoadThreshold) {
overloadedTaskList.add(entry.getKey());
}
}
// Calculate the maximum load of the new task.
final double maximumNewTaskLoad = (underloadedTaskLoadThreshold + overloadedTaskLoadThreshold) / 2.;
final double movableGroupLoadPerTask = maximumNewTaskLoad / overloadedTaskList.size();
final Map<String, GroupStats> movedGroupStatsMap = new HashMap<>();
// Choose the moved groups from the overloaded tasks.
for (final String overloadedTaskId : overloadedTaskList) {
final TaskStats taskStats = taskStatsMap.get(overloadedTaskId);
final Map<String, GroupStats> groupStatsMap = taskStats.getGroupStatsMap();

// Organize the group stats according to the applications.
final Map<String, List<GroupStats>> appGroupStatsMap = new HashMap<>();
for (final Map.Entry<String, GroupStats> entry : groupStatsMap.entrySet()) {
final String appId = entry.getValue().getAppId();
if (!appGroupStatsMap.containsKey(appId)) {
appGroupStatsMap.put(appId, new ArrayList<>());
}
appGroupStatsMap.get(appId).add(entry.getValue());
}

// Select the moved group.
final List<String> movedGroupList = new ArrayList<>();
final int numEp = taskStats.getNumEventProcessors();
double movedGroupLoad = 0.;
for (final Map.Entry<String, List<GroupStats>> entry : appGroupStatsMap.entrySet()) {
final String appId = entry.getKey();
final List<GroupStats> groupStatsList = entry.getValue();
boolean isAppCompletelyMoved = true;
for (final GroupStats groupStats : groupStatsList) {
final double effectiveGroupLoad = groupStats.getGroupLoad() / numEp;
if (movedGroupLoad + effectiveGroupLoad < movableGroupLoadPerTask) {
movedGroupStatsMap.put(groupStats.getGroupId(), groupStats);
movedGroupList.add(groupStats.getGroupId());
movedGroupLoad += effectiveGroupLoad;
} else {
isAppCompletelyMoved = false;
}
}
if (isAppCompletelyMoved) {
// Update the AppTaskListMap if all the groups in the app are removed.
appTaskListMap.removeAppFromTask(overloadedTaskId, appId);
} else {
// No more groups to move.
break;
}
}
// Remove the stopped groups in the overloaded tasks.
final MasterToTaskMessage proxyToTask = proxyToTaskMap.get(overloadedTaskId);
proxyToTask.removeGroup(movedGroupList);
}
// Recover the moved groups into the new task using a single node recovery scheduler.
final Injector injector = Tang.Factory.getTang().newInjector();
injector.bindVolatileInstance(TaskStatsMap.class, taskStatsMap);
injector.bindVolatileInstance(ProxyToTaskMap.class, proxyToTaskMap);
final RecoveryScheduler recoveryScheduler = injector.getInstance(SingleNodeRecoveryScheduler.class);
// Start the recovery process.
recoveryScheduler.recover(movedGroupStatsMap);
}

@Override
public void close() throws Exception {
singleThreadedExecutorService.shutdown();
singleThreadedExecutorService.awaitTermination(60000, TimeUnit.MILLISECONDS);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (C) 2018 Seoul National University
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.snu.mist.core.master.lb.scaling;

import org.apache.reef.tang.annotations.DefaultImplementation;

/**
* The interface for scaling out.
*/
@DefaultImplementation(RecoveryBasedScaleOutManager.class)
public interface ScaleOutManager extends AutoCloseable {

/**
* Perform scale-out according to the given implementation.
* @throws Exception
*/
void scaleOut() throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
*/
package edu.snu.mist.core.rpc;

import edu.snu.mist.core.task.checkpointing.CheckpointManager;
import edu.snu.mist.core.task.recovery.RecoveryManager;
import edu.snu.mist.formats.avro.MasterToTaskMessage;
import org.apache.avro.AvroRemoteException;

import javax.inject.Inject;
import java.util.List;

/**
* The default master-to-task message implementation.
Expand All @@ -31,14 +33,30 @@ public final class DefaultMasterToTaskMessageImpl implements MasterToTaskMessage
*/
private final RecoveryManager recoveryManager;

/**
* The checkpoint manager.
*/
private final CheckpointManager checkpointManager;

@Inject
private DefaultMasterToTaskMessageImpl(final RecoveryManager recoveryManager) {
private DefaultMasterToTaskMessageImpl(
final RecoveryManager recoveryManager,
final CheckpointManager checkpointManager) {
this.recoveryManager = recoveryManager;
this.checkpointManager = checkpointManager;
}

@Override
public Void startTaskSideRecovery() throws AvroRemoteException {
recoveryManager.startRecovery();
return null;
}

@Override
public Void removeGroup(final List<String> removedGroupList) throws AvroRemoteException {
for (final String removedGroupId : removedGroupList) {
checkpointManager.deleteGroup(removedGroupId);
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public void updateTaskStatsToMaster(final GroupAllocationTable groupAllocationTa
proxyToMaster.updateTaskStats(taskId,
TaskStats.newBuilder()
.setTaskLoad(taskCpuLoad)
.setNumEventProcessors(numEventProcessors)
.setGroupStatsMap(groupStatsMap)
.build());
}
Expand Down