Skip to content

Commit 674ac00

Browse files
LIVY-1030 cleanup of context and clients (#506) (edwardcapriolo via lmccay)
1 parent 284467b commit 674ac00

File tree

7 files changed

+74
-88
lines changed

7 files changed

+74
-88
lines changed

api/src/main/java/org/apache/livy/LivyClientBuilder.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,8 @@
2323
import java.net.URI;
2424
import java.net.URISyntaxException;
2525
import java.net.URL;
26-
import java.util.ArrayList;
27-
import java.util.List;
28-
import java.util.Map;
29-
import java.util.Properties;
30-
import java.util.ServiceLoader;
26+
import java.util.*;
27+
3128
import static java.nio.charset.StandardCharsets.UTF_8;
3229

3330
/**
@@ -134,20 +131,24 @@ public LivyClient build() {
134131

135132
LivyClient client = null;
136133
if (CLIENT_FACTORIES.isEmpty()) {
137-
throw new IllegalStateException("No LivyClientFactory implementation was found.");
134+
throw new IllegalStateException("No LivyClientFactory implementations were found.");
138135
}
139136

140137
for (LivyClientFactory factory : CLIENT_FACTORIES) {
141138
try {
142-
client = factory.createClient(uri, config);
143-
} catch (Exception e) {
144-
if (!(e instanceof RuntimeException)) {
145-
e = new RuntimeException(e);
139+
Optional<LivyClient> found = factory.createClient(uri, config);
140+
if (found.isPresent()){
141+
client = found.get();
142+
break;
143+
}
144+
} catch (Exception e){
145+
//Note: the compiler identifies this as impossible. The initial author might
146+
// believe the factories sneaky throw other exceptions
147+
if (e instanceof RuntimeException){
148+
throw e;
149+
} else {
150+
throw new RuntimeException(e);
146151
}
147-
throw (RuntimeException) e;
148-
}
149-
if (client != null) {
150-
break;
151152
}
152153
}
153154

api/src/main/java/org/apache/livy/LivyClientFactory.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.livy;
1919

2020
import java.net.URI;
21+
import java.util.Optional;
2122
import java.util.Properties;
2223

2324
import org.apache.livy.annotations.Private;
@@ -39,8 +40,9 @@ public interface LivyClientFactory {
3940
*
4041
* @param uri URI pointing at the livy backend to use.
4142
* @param config Livy client configs.
42-
* @return The newly created LivyClient or null if an unsupported URI
43+
* @return Some if the factory understands the URI
44+
* @throws RuntimeException if they supports the scheme but fail to create a client
4345
*/
44-
LivyClient createClient(URI uri, Properties config);
46+
Optional<LivyClient> createClient(URI uri, Properties config);
4547

4648
}

api/src/test/java/org/apache/livy/TestClientFactory.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,14 @@
1919

2020
import java.io.File;
2121
import java.net.URI;
22+
import java.util.Optional;
2223
import java.util.Properties;
2324
import java.util.concurrent.Future;
2425
import java.util.concurrent.atomic.AtomicLong;
2526

2627
public class TestClientFactory implements LivyClientFactory {
2728

28-
private static AtomicLong instanceCount = new AtomicLong();
29+
private static final AtomicLong instanceCount = new AtomicLong();
2930
public static long getInstanceCount() {
3031
return instanceCount.get();
3132
}
@@ -35,16 +36,14 @@ public TestClientFactory() {
3536
}
3637

3738
@Override
38-
public LivyClient createClient(URI uri, Properties config) {
39+
public Optional<LivyClient> createClient(URI uri, Properties config) {
3940
switch (uri.getPath()) {
4041
case "match":
41-
return new Client(config);
42-
42+
return Optional.of(new Client(config));
4343
case "error":
4444
throw new IllegalStateException("error");
45-
4645
default:
47-
return null;
46+
return Optional.empty();
4847
}
4948
}
5049

client-http/src/main/java/org/apache/livy/client/http/HttpClientFactory.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.livy.client.http;
1919

2020
import java.net.URI;
21+
import java.util.Optional;
2122
import java.util.Properties;
2223

2324
import org.apache.livy.LivyClient;
@@ -29,12 +30,11 @@
2930
public final class HttpClientFactory implements LivyClientFactory {
3031

3132
@Override
32-
public LivyClient createClient(URI uri, Properties config) {
33+
public Optional<LivyClient> createClient(URI uri, Properties config) {
3334
if (!"http".equals(uri.getScheme()) && !"https".equals(uri.getScheme())) {
34-
return null;
35+
return Optional.empty();
3536
}
36-
37-
return new HttpClient(uri, new HttpConf(config));
37+
return Optional.of(new HttpClient(uri, new HttpConf(config)));
3838
}
3939

4040
}

rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java

Lines changed: 36 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,9 @@
1717

1818
package org.apache.livy.rsc;
1919

20-
import java.io.BufferedReader;
2120
import java.io.File;
22-
import java.io.FileInputStream;
2321
import java.io.FileOutputStream;
2422
import java.io.IOException;
25-
import java.io.InputStream;
2623
import java.io.InputStreamReader;
2724
import java.io.OutputStreamWriter;
2825
import java.io.Reader;
@@ -66,7 +63,6 @@ class ContextLauncher {
6663
private static final Logger LOG = LoggerFactory.getLogger(ContextLauncher.class);
6764
private static final AtomicInteger CHILD_IDS = new AtomicInteger();
6865

69-
private static final String SPARK_DEPLOY_MODE = "spark.submit.deployMode";
7066
private static final String SPARK_JARS_KEY = "spark.jars";
7167
private static final String SPARK_ARCHIVES_KEY = "spark.yarn.dist.archives";
7268
private static final String SPARK_HOME_ENV = "SPARK_HOME";
@@ -95,9 +91,6 @@ private ContextLauncher(RSCClientFactory factory, RSCConf conf) throws IOExcepti
9591
final RegistrationHandler handler = new RegistrationHandler();
9692
try {
9793
factory.getServer().registerClient(clientId, secret, handler);
98-
String replMode = conf.get("repl");
99-
boolean repl = replMode != null && replMode.equals("true");
100-
10194
// In some scenarios the user may need to configure this endpoint setting explicitly.
10295
String address = conf.get(LAUNCHER_ADDRESS);
10396
// If not specified, use the RPC server address; otherwise use the specified address.
@@ -112,7 +105,7 @@ private ContextLauncher(RSCClientFactory factory, RSCConf conf) throws IOExcepti
112105
Utils.addListener(promise, new FutureListener<ContextInfo>() {
113106
@Override
114107
public void onFailure(Throwable error) throws Exception {
115-
// If promise is cancelled or failed, make sure spark-submit is not leaked.
108+
// If promise is canceled or failed, make sure spark-submit is not leaked.
116109
if (child != null) {
117110
child.kill();
118111
}
@@ -181,13 +174,16 @@ private static ChildProcess startDriver(final RSCConf conf, Promise<?> promise)
181174
}
182175

183176
Utils.checkState(rscJars.isDirectory(),
184-
"Cannot find rsc jars directory under LIVY_HOME.");
177+
"Cannot find rsc jars directory: " + rscJars.getAbsolutePath());
185178
allJars.add(rscJars);
186179

187180
List<String> jars = new ArrayList<>();
188181
for (File dir : allJars) {
189-
for (File f : dir.listFiles()) {
190-
jars.add(f.getAbsolutePath());
182+
File [] list = dir.listFiles();
183+
if (list != null) {
184+
for (File f : list) {
185+
jars.add(f.getAbsolutePath());
186+
}
191187
}
192188
}
193189
livyJars = Utils.join(jars, ",");
@@ -226,14 +222,11 @@ private static ChildProcess startDriver(final RSCConf conf, Promise<?> promise)
226222
} else if (conf.getBoolean(CLIENT_IN_PROCESS)) {
227223
// Mostly for testing things quickly. Do not do this in production.
228224
LOG.warn("!!!! Running remote driver in-process. !!!!");
229-
Runnable child = new Runnable() {
230-
@Override
231-
public void run() {
232-
try {
233-
RSCDriverBootstrapper.main(new String[] { confFile.getAbsolutePath() });
234-
} catch (Exception e) {
235-
throw Utils.propagate(e);
236-
}
225+
Runnable child = () -> {
226+
try {
227+
RSCDriverBootstrapper.main(new String[] { confFile.getAbsolutePath() });
228+
} catch (Exception e) {
229+
throw Utils.propagate(e);
237230
}
238231
};
239232
return new ChildProcess(conf, promise, child, confFile);
@@ -261,7 +254,7 @@ private static void merge(RSCConf conf, String key, String livyConf, String sep)
261254
* Write the configuration to a file readable only by the process's owner. Livy properties
262255
* are written with an added prefix so that they can be loaded using SparkConf on the driver
263256
* side.
264-
*
257+
* <br>
265258
* The default Spark configuration (from either SPARK_HOME or SPARK_CONF_DIR) is merged into
266259
* the user configuration, so that defaults set by Livy's admin take effect when not overridden
267260
* by the user.
@@ -286,13 +279,10 @@ private static File writeConfToFile(RSCConf conf) throws IOException {
286279
File sparkDefaults = new File(confDir + File.separator + "spark-defaults.conf");
287280
if (sparkDefaults.isFile()) {
288281
Properties sparkConf = new Properties();
289-
Reader r = new InputStreamReader(new FileInputStream(sparkDefaults), UTF_8);
290-
try {
291-
sparkConf.load(r);
292-
} finally {
293-
r.close();
282+
try (Reader r = new InputStreamReader(
283+
Files.newInputStream(sparkDefaults.toPath()), UTF_8)) {
284+
sparkConf.load(r);
294285
}
295-
296286
for (String key : sparkConf.stringPropertyNames()) {
297287
if (!confView.containsKey(key)) {
298288
confView.put(key, sparkConf.getProperty(key));
@@ -303,15 +293,9 @@ private static File writeConfToFile(RSCConf conf) throws IOException {
303293

304294
File file = File.createTempFile("livyConf", ".properties");
305295
Files.setPosixFilePermissions(file.toPath(), EnumSet.of(OWNER_READ, OWNER_WRITE));
306-
//file.deleteOnExit();
307-
308-
Writer writer = new OutputStreamWriter(new FileOutputStream(file), UTF_8);
309-
try {
310-
confView.store(writer, "Livy App Context Configuration");
311-
} finally {
312-
writer.close();
296+
try (Writer writer = new OutputStreamWriter(new FileOutputStream(file), UTF_8)) {
297+
confView.store(writer, "Livy App Context Configuration");
313298
}
314-
315299
return file;
316300
}
317301

@@ -340,14 +324,16 @@ void dispose() {
340324
}
341325
}
342326

343-
private void handle(ChannelHandlerContext ctx, RemoteDriverAddress msg) {
327+
//Note. Your compiler or IDE may identify this method as unused
328+
//tests fail without it
329+
public void handle(ChannelHandlerContext ctx, RemoteDriverAddress msg) {
344330
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
345331
String ip = insocket.getAddress().getHostAddress();
346332
ContextInfo info = new ContextInfo(ip, msg.port, clientId, secret);
347333
if (promise.trySuccess(info)) {
348334
timeout.cancel(true);
349335
LOG.debug("Received driver info for client {}: {}/{}.", client.getChannel(),
350-
msg.host, msg.port);
336+
msg.host, msg.port);
351337
} else {
352338
LOG.warn("Connection established but promise is already finalized.");
353339
}
@@ -398,7 +384,7 @@ public void run() {
398384
}
399385
} catch (InterruptedException ie) {
400386
LOG.warn("Waiting thread interrupted, killing child process.");
401-
Thread.interrupted();
387+
boolean ignored = Thread.interrupted();
402388
child.destroy();
403389
} catch (Exception e) {
404390
LOG.warn("Exception while waiting for child process.", e);
@@ -436,36 +422,34 @@ public void detach() {
436422
try {
437423
monitor.join(conf.getTimeAsMs(CLIENT_SHUTDOWN_TIMEOUT));
438424
} catch (InterruptedException ie) {
439-
LOG.debug("Interrupted before driver thread was finished.");
425+
LOG.debug("Interrupted before driver thread was finished.", ie);
440426
}
441427
}
442428

443429
private Thread monitor(final Runnable task, int childId) {
444-
Runnable wrappedTask = new Runnable() {
445-
@Override
446-
public void run() {
447-
try {
448-
task.run();
449-
} finally {
450-
confFile.delete();
451-
}
430+
Runnable wrappedTask = () -> {
431+
try {
432+
task.run();
433+
} finally {
434+
boolean ignored = confFile.delete();
452435
}
453436
};
454437
Thread thread = new Thread(wrappedTask);
455438
thread.setDaemon(true);
456439
thread.setName("ContextLauncher-" + childId);
457-
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
458-
@Override
459-
public void uncaughtException(Thread t, Throwable e) {
460-
LOG.warn("Child task threw exception.", e);
461-
fail(e);
462-
}
440+
thread.setUncaughtExceptionHandler((t, e) -> {
441+
LOG.warn("Child task threw exception.", e);
442+
fail(e);
463443
});
464444
thread.start();
465445
return thread;
466446
}
467447
}
468448

449+
public RSCConf getConf() {
450+
return conf;
451+
}
452+
469453
// Just for testing.
470454
static Process mockSparkSubmit;
471455

rsc/src/main/java/org/apache/livy/rsc/RSCClientFactory.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.io.IOException;
2121
import java.net.URI;
22+
import java.util.Optional;
2223
import java.util.Properties;
2324
import java.util.concurrent.atomic.AtomicInteger;
2425

@@ -37,7 +38,7 @@ public final class RSCClientFactory implements LivyClientFactory {
3738
private final AtomicInteger refCount = new AtomicInteger();
3839
private RpcServer server = null;
3940
// interactive session child processes number
40-
private static AtomicInteger iscpn = new AtomicInteger();
41+
private static final AtomicInteger iscpn = new AtomicInteger();
4142

4243
public static AtomicInteger childProcesses() {
4344
return iscpn;
@@ -53,13 +54,12 @@ public static AtomicInteger childProcesses() {
5354
* Otherwise, a new Spark context will be started with the given configuration.
5455
*/
5556
@Override
56-
public LivyClient createClient(URI uri, Properties config) {
57+
public Optional<LivyClient> createClient(URI uri, Properties config) {
5758
if (!"rsc".equals(uri.getScheme())) {
58-
return null;
59+
return Optional.empty();
5960
}
6061

6162
RSCConf lconf = new RSCConf(config);
62-
6363
boolean needsServer = false;
6464
try {
6565
Promise<ContextInfo> info;
@@ -73,7 +73,7 @@ public LivyClient createClient(URI uri, Properties config) {
7373
info = processInfo.getContextInfo();
7474
driverProcess = processInfo.getDriverProcess();
7575
}
76-
return new RSCClient(lconf, info, driverProcess);
76+
return Optional.of(new RSCClient(lconf, info, driverProcess));
7777
} catch (Exception e) {
7878
if (needsServer) {
7979
unref();

0 commit comments

Comments
 (0)