Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import backtype.storm.utils.Utils;
import org.apache.thrift7.TException;
import org.json.simple.JSONValue;
import org.slf4j.Logger;
Expand All @@ -38,12 +40,14 @@ public class StormMasterServerHandler implements StormMaster.Iface {
Map _storm_conf;
StormAMRMClient _client;
MasterServer _masterServer;
AtomicInteger curNumNeedSupervisors;

StormMasterServerHandler(@SuppressWarnings("rawtypes") Map storm_conf, StormAMRMClient client) {
_storm_conf = storm_conf;
setStormHostConf();
Util.rmNulls(_storm_conf);
_client = client;
curNumNeedSupervisors = new AtomicInteger(0);
}

void init(MasterServer masterServer) {
Expand Down Expand Up @@ -102,6 +106,7 @@ public void setStormConf(String storm_conf) throws TException {
@Override
public void addSupervisors(int number) throws TException {
LOG.info("adding "+number+" supervisors...");
curNumNeedSupervisors.addAndGet(number);
_client.addSupervisors(number);
}

Expand Down Expand Up @@ -215,7 +220,10 @@ public void stopUI() throws TException {
@Override
public void startSupervisors() throws TException {
LOG.info("starting supervisors...");
_client.startAllSupervisors();
if (!_client.supervisorsAreToRun()) {
_client.addSupervisors(curNumNeedSupervisors.get());
_client.startAllSupervisors();
}
}

@Override
Expand Down