diff --git a/pom.xml b/pom.xml
index 0992424..4824136 100644
--- a/pom.xml
+++ b/pom.xml
@@ -63,7 +63,7 @@
0.9.0-wip21
- 2.1.0-beta
+ 2.2.0
diff --git a/src/main/java/com/yahoo/storm/yarn/Config.java b/src/main/java/com/yahoo/storm/yarn/Config.java
index 0d1d6df..f69e38e 100644
--- a/src/main/java/com/yahoo/storm/yarn/Config.java
+++ b/src/main/java/com/yahoo/storm/yarn/Config.java
@@ -36,7 +36,12 @@ public class Config {
//# of milliseconds to wait for YARN report on Storm Master host/port
final public static String YARN_REPORT_WAIT_MILLIS = "yarn.report.wait.millis";
final public static String MASTER_HEARTBEAT_INTERVAL_MILLIS = "master.heartbeat.interval.millis";
-
+
+ //size of the supervisor to request in yarn. This includes the supervisor
+ // and workers
+ final public static String SUPERVISOR_SIZE_MB = "supervisor.container.size-mb";
+ final public static int DEFAULT_SUPERVISOR_SIZE = 1024;
+
@SuppressWarnings("rawtypes")
static public Map readStormConfig() {
return readStormConfig(null);
diff --git a/src/main/java/com/yahoo/storm/yarn/MasterServer.java b/src/main/java/com/yahoo/storm/yarn/MasterServer.java
index 27b8cae..3c89c56 100644
--- a/src/main/java/com/yahoo/storm/yarn/MasterServer.java
+++ b/src/main/java/com/yahoo/storm/yarn/MasterServer.java
@@ -80,10 +80,16 @@ public void run() {
if (allocatedContainers.size() > 0) {
// Add newly allocated containers to the client.
LOG.info("HB: Received allocated containers (" + allocatedContainers.size() + ")");
- client.addAllocatedContainers(allocatedContainers);
if (client.supervisorsAreToRun()) {
LOG.info("HB: Supervisors are to run, so queueing (" + allocatedContainers.size() + ") containers...");
- launcherQueue.addAll(allocatedContainers);
+ for(Container allocatedContainer : allocatedContainers) {
+ if(client.addAllocatedContainer(allocatedContainer)){
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("HB: Queuing supervisor container["+allocatedContainer+"]");
+ }
+ launcherQueue.addAll(allocatedContainers);
+ }
+ }
} else {
LOG.info("HB: Supervisors are to stop, so releasing all containers...");
client.stopAllSupervisors();
@@ -95,6 +101,9 @@ public void run() {
if (completedContainers.size() > 0 && client.supervisorsAreToRun()) {
LOG.debug("HB: Containers completed (" + completedContainers.size() + "), so releasing them.");
+ for(ContainerStatus containerStatus : completedContainers) {
+ client.stopSupervisors(containerStatus.getContainerId());
+ }
client.startAllSupervisors();
}
@@ -162,8 +171,6 @@ public static void main(String[] args) throws Exception {
RegisterApplicationMasterResponse resp =
rmClient.registerApplicationMaster(addr.getHostName(), port, null);
LOG.info("Got a registration response "+resp);
- LOG.info("Max Capability "+resp.getMaximumResourceCapability());
- rmClient.setMaxResource(resp.getMaximumResourceCapability());
LOG.info("Starting HB thread");
server.initAndStartHeartbeat(rmClient, launcherQueue,
(Integer) storm_conf
diff --git a/src/main/java/com/yahoo/storm/yarn/StormAMRMClient.java b/src/main/java/com/yahoo/storm/yarn/StormAMRMClient.java
index 1a357ec..cb9c01d 100644
--- a/src/main/java/com/yahoo/storm/yarn/StormAMRMClient.java
+++ b/src/main/java/com/yahoo/storm/yarn/StormAMRMClient.java
@@ -16,43 +16,31 @@
package com.yahoo.storm.yarn;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.hadoop.security.UserGroupInformation;
+import backtype.storm.utils.Utils;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.util.Records;
-
+import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
-
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import backtype.storm.utils.Utils;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
class StormAMRMClient extends AMRMClientImpl {
private static final Logger LOG = LoggerFactory.getLogger(StormAMRMClient.class);
@@ -61,10 +49,10 @@ class StormAMRMClient extends AMRMClientImpl {
private final Map storm_conf;
private final YarnConfiguration hadoopConf;
private final Priority DEFAULT_PRIORITY = Records.newRecord(Priority.class);
- private final Set containers;
+ private final BiMap runningSupervisors;
+ private final Resource supervisorResource;
private volatile boolean supervisorsAreToRun = false;
private AtomicInteger numSupervisors;
- private Resource maxResourceCapability;
private ApplicationAttemptId appAttemptId;
private NMClientImpl nmClient;
@@ -76,13 +64,22 @@ public StormAMRMClient(ApplicationAttemptId appAttemptId,
this.hadoopConf = hadoopConf;
Integer pri = Utils.getInt(storm_conf.get(Config.MASTER_CONTAINER_PRIORITY));
this.DEFAULT_PRIORITY.setPriority(pri);
- this.containers = new TreeSet();
numSupervisors = new AtomicInteger(0);
+ runningSupervisors = Maps.synchronizedBiMap(HashBiMap.create());
// start am nm client
nmClient = (NMClientImpl) NMClient.createNMClient();
nmClient.init(hadoopConf);
nmClient.start();
+
+ //get number of slots for supervisor
+ int numWorkersPerSupervisor = Util.getNumWorkers(storm_conf);
+ int supervisorSizeMB = Util.getSupervisorSizeMB(storm_conf);
+ //add 1 for the supervisor itself
+ supervisorResource =
+ Resource.newInstance(supervisorSizeMB, numWorkersPerSupervisor + 1);
+ LOG.info("Supervisors will allocate Yarn Resource["+supervisorResource+"]");
}
public synchronized void startAllSupervisors() {
@@ -90,7 +87,33 @@ public synchronized void startAllSupervisors() {
this.supervisorsAreToRun = true;
this.addSupervisorsRequest();
}
-
+
+ /**
+ * Stopping a supervisor by {@link NodeId}
+ * @param nodeIds
+ */
+ public synchronized void stopSupervisors(NodeId... nodeIds) {
+ if(LOG.isDebugEnabled()){
+ LOG.debug(
+ "Stopping supervisors at nodes[" + Arrays.toString(nodeIds) + "], " +
+ "releasing all containers.");
+ }
+ releaseSupervisors(nodeIds);
+ }
+
+ /**
+ * Need to be able to stop a supervisor by {@link ContainerId}
+ * @param containerIds supervisor containers to stop
+ */
+ public synchronized void stopSupervisors(ContainerId... containerIds) {
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Stopping supervisors in containers[" +
+ Arrays.toString(containerIds) + "], " +
+ "releasing all containers.");
+ }
+ releaseSupervisors(containerIds);
+ }
+
public synchronized void stopAllSupervisors() {
LOG.debug("Stopping all supervisors, releasing all containers...");
this.supervisorsAreToRun = false;
@@ -100,36 +123,106 @@ public synchronized void stopAllSupervisors() {
private void addSupervisorsRequest() {
int num = numSupervisors.getAndSet(0);
for (int i=0; i containers) {
- for (int i=0; i it = this.containers.iterator();
- ContainerId id;
- while (it.hasNext()) {
- id = it.next().getId();
- LOG.debug("Releasing container (id:"+id+")");
- releaseAssignedContainer(id);
- it.remove();
+ Set nodeIds = runningSupervisors.keySet();
+ this.releaseSupervisors(nodeIds.toArray(new NodeId[nodeIds.size()]));
+ }
+
+ /**
+ * This is the main entry point to release a supervisor.
+ * @param nodeIds
+ */
+ private synchronized void releaseSupervisors(NodeId... nodeIds) {
+ for(NodeId nodeId : nodeIds) {
+ //remove from running supervisors list
+ ContainerId containerId = removeRunningSupervisor(nodeId);
+ if(containerId != null) {
+ LOG.debug("Releasing container (id:"+containerId+")");
+ //release the containers on the specified nodes
+ super.releaseAssignedContainer(containerId);
+ //increase the number of supervisors to request on the next heartbeat
+ numSupervisors.incrementAndGet();
+ }
+ }
+ }
+
+ private synchronized void releaseSupervisors(ContainerId... containerIds) {
+ BiMap inverse = runningSupervisors.inverse();
+ for(ContainerId containerId : containerIds) {
+ NodeId nodeId = inverse.get(containerId);
+ if(nodeId != null) {
+ this.releaseSupervisors(nodeId);
+ }
}
}
-
+
public synchronized boolean supervisorsAreToRun() {
return this.supervisorsAreToRun;
}
@@ -205,9 +298,4 @@ else if (vis.equals("APPLICATION"))
System.exit(-1);
}
}
-
- public void setMaxResource(Resource maximumResourceCapability) {
- this.maxResourceCapability = maximumResourceCapability;
- LOG.info("Max Capability is now "+this.maxResourceCapability);
- }
}
diff --git a/src/main/java/com/yahoo/storm/yarn/Util.java b/src/main/java/com/yahoo/storm/yarn/Util.java
index a2817f7..353b012 100644
--- a/src/main/java/com/yahoo/storm/yarn/Util.java
+++ b/src/main/java/com/yahoo/storm/yarn/Util.java
@@ -41,6 +41,7 @@
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
+import backtype.storm.utils.Utils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -61,6 +62,8 @@
class Util {
private static final String STORM_CONF_PATH_STRING = "conf" + Path.SEPARATOR + "storm.yaml";
+ private static final String STORM_CONF_SUPERVISOR_SLOTS_PORTS =
+ "supervisor.slots.ports";
static String getStormHome() {
String ret = System.getProperty("storm.home");
@@ -139,6 +142,28 @@ static void rmNulls(Map map) {
}
}
+ /**
+ * Get the number of workers for a supervisor. Will count the number of
+ * workers from the number of slots provided in the supervisor.slots.ports
+ * storm configuration
+ *
+ * @param stormConf
+ */
+ @SuppressWarnings("rawtypes")
+ static int getNumWorkers(Map stormConf) {
+ List slots = (List) stormConf.get(STORM_CONF_SUPERVISOR_SLOTS_PORTS);
+ if(slots == null || slots.size() == 0) {
+ return 1; //default to one worker
+ }
+ return slots.size();
+ }
+
+ static int getSupervisorSizeMB(Map stormConf) {
+ Object sizeObj = stormConf.get(Config.SUPERVISOR_SIZE_MB);
+ return sizeObj != null ? Utils.getInt(sizeObj) :
+ Config.DEFAULT_SUPERVISOR_SIZE;
+ }
+
@SuppressWarnings("rawtypes")
static Path createConfigurationFileInFs(FileSystem fs,
String appHome, Map stormConf, YarnConfiguration yarnConf)