Skip to content

Commit a92e014

Browse files
author
jiarunying-it@360.cn
committed
update the port reserve of am and container based on the pre commit
1 parent 2b5b620 commit a92e014

File tree

3 files changed

+39
-22
lines changed

3 files changed

+39
-22
lines changed

src/main/java/net/qihoo/xlearning/AM/ApplicationMaster.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,9 @@ public class ApplicationMaster extends CompositeService {
111111

112112
private int outputIndex;
113113

114+
private int reservePortBegin = 0;
115+
private int reservePortEnd = 0;
116+
114117
/**
115118
* Constructor, connect to Resource Manager
116119
*
@@ -155,6 +158,10 @@ private ApplicationMaster() {
155158
tfEvaluatorContainerId = "";
156159
inputPath = new StringBuilder();
157160
outputIndex = -1;
161+
this.reservePortBegin = this.conf.getInt(XLearningConfiguration.XLEARNING_RESERVE_PORT_BEGIN,
162+
XLearningConfiguration.DEFAULT_XLEARNING_RESERVE_PORT_BEGIN);
163+
this.reservePortEnd = this.conf.getInt(XLearningConfiguration.XLEARNING_RESERVE_PORT_END,
164+
XLearningConfiguration.DEFAULT_XLEARNING_RESERVE_PORT_END);
158165

159166
if (envs.containsKey(ApplicationConstants.Environment.CONTAINER_ID.toString())) {
160167
ContainerId containerId = ConverterUtils
@@ -1151,7 +1158,7 @@ private boolean run() throws IOException, NoSuchAlgorithmException {
11511158
dmlcPsRootUri = applicationMasterHostname;
11521159
Socket schedulerReservedSocket = new Socket();
11531160
try {
1154-
schedulerReservedSocket.bind(new InetSocketAddress("127.0.0.1", 0));
1161+
Utilities.getReservePort(schedulerReservedSocket, InetAddress.getByName(applicationMasterHostname).getHostAddress(), reservePortBegin, reservePortEnd);
11551162
} catch (IOException e) {
11561163
LOG.error("Can not get available port");
11571164
}
@@ -1256,7 +1263,7 @@ public void run() {
12561263
dmlcTrackerUri = applicationMasterHostname;
12571264
Socket schedulerReservedSocket = new Socket();
12581265
try {
1259-
schedulerReservedSocket.bind(new InetSocketAddress("127.0.0.1", 0));
1266+
Utilities.getReservePort(schedulerReservedSocket, InetAddress.getByName(applicationMasterHostname).getHostAddress(), reservePortBegin, reservePortEnd);
12601267
} catch (IOException e) {
12611268
LOG.error("Can not get available port");
12621269
}
@@ -1364,7 +1371,7 @@ public void run() {
13641371
}
13651372
Socket schedulerReservedSocket = new Socket();
13661373
try {
1367-
schedulerReservedSocket.bind(new InetSocketAddress("127.0.0.1", 0));
1374+
Utilities.getReservePort(schedulerReservedSocket, InetAddress.getByName(applicationMasterHostname).getHostAddress(), reservePortBegin, reservePortEnd);
13681375
} catch (IOException e) {
13691376
LOG.error("Can not get available port");
13701377
}

src/main/java/net/qihoo/xlearning/container/XLearningContainer.java

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ public class XLearningContainer {
8585

8686
private int outputIndex;
8787

88+
private String localHost;
89+
8890
private XLearningContainer() {
8991
this.conf = new XLearningConfiguration();
9092
conf.addResource(new Path(XLearningConstants.XLEARNING_JOB_CONFIGURATION));
@@ -93,6 +95,11 @@ private XLearningContainer() {
9395
.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())));
9496
this.downloadRetry = conf.getInt(XLearningConfiguration.XLEARNING_DOWNLOAD_FILE_RETRY, XLearningConfiguration.DEFAULT_XLEARNING_DOWNLOAD_FILE_RETRY);
9597
this.envs = System.getenv();
98+
if (envs.containsKey(ApplicationConstants.Environment.NM_HOST.toString())) {
99+
localHost = envs.get(ApplicationConstants.Environment.NM_HOST.toString());
100+
} else {
101+
localHost = "127.0.0.1";
102+
}
96103
this.xlearningAppType = envs.get(XLearningConstants.Environment.XLEARNING_APP_TYPE.toString()).toUpperCase();
97104
this.role = envs.get(XLearningConstants.Environment.XLEARNING_TF_ROLE.toString());
98105
this.index = Integer.valueOf(envs.get(XLearningConstants.Environment.XLEARNING_TF_INDEX.toString()));
@@ -183,7 +190,7 @@ private void init() {
183190

184191
if ((("TENSORFLOW".equals(xlearningAppType) || "LIGHTLDA".equals(xlearningAppType)) && !single) || xlearningAppType.equals("DISTLIGHTGBM")) {
185192
try {
186-
getReservePort(reservedSocket);
193+
Utilities.getReservePort(reservedSocket, InetAddress.getByName(localHost).getHostAddress(), reservePortBegin, reservePortEnd);
187194
} catch (IOException e) {
188195
LOG.error("Can not get available port");
189196
reportFailedAndExit();
@@ -832,7 +839,7 @@ public void run() {
832839
if (boardEnable && this.role.equals(XLearningConstants.WORKER) && boardIndex == this.index) {
833840
Socket boardReservedSocket = new Socket();
834841
try {
835-
boardReservedSocket.bind(new InetSocketAddress("127.0.0.1", 0));
842+
Utilities.getReservePort(boardReservedSocket, InetAddress.getByName(localHost).getHostAddress(), reservePortBegin, reservePortEnd);
836843
} catch (IOException e) {
837844
LOG.error("Can not get available port");
838845
reportFailedAndExit();
@@ -969,23 +976,6 @@ private void reportSucceededAndExit() {
969976
System.exit(0);
970977
}
971978

972-
private void getReservePort(Socket socket) throws IOException {
973-
int i = 0;
974-
Random random = new Random(System.currentTimeMillis());
975-
while (i < 1000) {
976-
int rand = random.nextInt(reservePortEnd - reservePortBegin);
977-
try {
978-
socket.bind(new InetSocketAddress("127.0.0.1", reservePortBegin + rand));
979-
return;
980-
} catch (IOException e) {
981-
try {
982-
Thread.sleep(1000);
983-
} catch (InterruptedException e2) {}
984-
}
985-
}
986-
throw new IOException("couldn't allocate a unused port");
987-
}
988-
989979
public static void main(String[] args) {
990980
XLearningContainer container = new XLearningContainer();
991981
try {

src/main/java/net/qihoo/xlearning/util/Utilities.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,12 @@
1515

1616
import java.io.File;
1717
import java.io.IOException;
18+
import java.net.InetSocketAddress;
19+
import java.net.Socket;
1820
import java.util.ArrayList;
1921
import java.util.List;
2022
import java.util.Map;
23+
import java.util.Random;
2124

2225
public final class Utilities {
2326
private static Log LOG = LogFactory.getLog(Utilities.class);
@@ -136,4 +139,21 @@ public static void addPathToEnvironment(Map<String, String> env, String userEnvK
136139
}
137140
}
138141

142+
public static void getReservePort(Socket socket, String localHost, int reservePortBegin, int reservePortEnd) throws IOException {
143+
int i = 0;
144+
Random random = new Random(System.currentTimeMillis());
145+
while (i < 1000) {
146+
int rand = random.nextInt(reservePortEnd - reservePortBegin);
147+
try {
148+
socket.bind(new InetSocketAddress(localHost, reservePortBegin + rand));
149+
return;
150+
} catch (IOException e) {
151+
try {
152+
Thread.sleep(1000);
153+
} catch (InterruptedException e2) {}
154+
}
155+
}
156+
throw new IOException("couldn't allocate a unused port");
157+
}
158+
139159
}

0 commit comments

Comments
 (0)