Skip to content

Commit 744d98a

Browse files
committed
[SYSTEMDS-3886] Federated Worker JVM Codecoverage
This commit adds jacoco coverage for the federated worker processes. Closes #2261
1 parent e2373e9 commit 744d98a

File tree

5 files changed

+81
-33
lines changed

5 files changed

+81
-33
lines changed

docker/entrypoint.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ mvn -ntp -B test -D maven.test.skip=false -D automatedtestbase.outputbuffering=t
3636
[ -f target/jacoco.exec ] && mv target/jacoco.exec target/jacoco_main.exec
3737
mvn -ntp -B jacoco:merge
3838

39+
# Merge Federated test runs.
40+
[ -f target/jacoco.exec ] && mv target/jacoco.exec target/jacoco_main.exec
41+
mvn -ntp -B jacoco:merge
42+
3943
grep_args="SUCCESS"
4044
grepvals="$( tail -n 100 $log | grep $grep_args)"
4145

pom.xml

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,19 @@
459459
<fileSet>
460460
<directory>${project.build.directory}</directory>
461461
<includes>
462-
<include>*/jacoco.exec</include>
462+
<include>*.exec</include>
463+
</includes>
464+
</fileSet>
465+
<fileSet>
466+
<directory>${project.build.directory}</directory>
467+
<includes>
468+
<include>transient_jacoco**/*.exec</include>
469+
</includes>
470+
</fileSet>
471+
<fileSet>
472+
<directory>${project.build.directory}</directory>
473+
<includes>
474+
<include>federated_jacoco/*.exec</include>
463475
</includes>
464476
</fileSet>
465477
</fileSets>

src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,6 @@
2828

2929
import javax.net.ssl.SSLException;
3030

31-
import io.netty.channel.ChannelFuture;
32-
import io.netty.channel.ChannelHandlerContext;
33-
import io.netty.channel.ChannelInboundHandlerAdapter;
34-
import io.netty.channel.ChannelInitializer;
35-
import io.netty.channel.ChannelOption;
36-
import io.netty.channel.ChannelOutboundHandlerAdapter;
37-
import io.netty.channel.ChannelPipeline;
3831
import org.apache.commons.lang3.tuple.ImmutablePair;
3932
import org.apache.log4j.Logger;
4033
import org.apache.sysds.api.DMLScript;
@@ -50,25 +43,32 @@
5043
import org.apache.sysds.runtime.lineage.LineageCache;
5144
import org.apache.sysds.runtime.lineage.LineageCacheConfig;
5245
import org.apache.sysds.runtime.lineage.LineageCacheConfig.ReuseCacheType;
46+
import org.apache.sysds.runtime.lineage.LineageItem;
5347
import org.apache.sysds.utils.stats.InfrastructureAnalyzer;
5448
import org.apache.sysds.utils.stats.Timing;
55-
import org.apache.sysds.runtime.lineage.LineageItem;
5649

5750
import io.netty.bootstrap.ServerBootstrap;
5851
import io.netty.buffer.ByteBuf;
52+
import io.netty.channel.ChannelFuture;
53+
import io.netty.channel.ChannelHandlerContext;
54+
import io.netty.channel.ChannelInboundHandlerAdapter;
55+
import io.netty.channel.ChannelInitializer;
56+
import io.netty.channel.ChannelOption;
57+
import io.netty.channel.ChannelOutboundHandlerAdapter;
58+
import io.netty.channel.ChannelPipeline;
5959
import io.netty.channel.nio.NioEventLoopGroup;
6060
import io.netty.channel.socket.SocketChannel;
6161
import io.netty.channel.socket.nio.NioServerSocketChannel;
62+
import io.netty.handler.codec.serialization.ClassResolvers;
63+
import io.netty.handler.codec.serialization.ObjectDecoder;
6264
import io.netty.handler.codec.serialization.ObjectEncoder;
6365
import io.netty.handler.ssl.SslContext;
6466
import io.netty.handler.ssl.SslContextBuilder;
6567
import io.netty.handler.ssl.util.SelfSignedCertificate;
66-
import io.netty.handler.codec.serialization.ObjectDecoder;
67-
import io.netty.handler.codec.serialization.ClassResolvers;
6868

6969
@SuppressWarnings("deprecation")
7070
public class FederatedWorker {
71-
protected static Logger log = Logger.getLogger(FederatedWorker.class);
71+
protected static Logger LOG = Logger.getLogger(FederatedWorker.class);
7272

7373
private final int _port;
7474
private final FederatedLookupTable _flt;
@@ -96,7 +96,7 @@ public FederatedWorker(int port, boolean debug) {
9696
}
9797

9898
private void run() {
99-
log.info("Setting up Federated Worker on port " + _port);
99+
LOG.info("Setting up Federated Worker on port " + _port);
100100
int par_conn = ConfigurationManager.getDMLConfig().getIntValue(DMLConfig.FEDERATED_PAR_CONN);
101101
final int EVENT_LOOP_THREADS = (par_conn > 0) ? par_conn : InfrastructureAnalyzer.getLocalParallelism();
102102
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
@@ -113,22 +113,23 @@ private void run() {
113113
b.option(ChannelOption.SO_BACKLOG, 128);
114114
b.childOption(ChannelOption.SO_KEEPALIVE, true);
115115

116-
log.info("Starting Federated Worker server at port: " + _port);
116+
LOG.info("Starting Federated Worker server at port: " + _port);
117117
ChannelFuture f = b.bind(_port).sync();
118-
log.info("Started Federated Worker at port: " + _port);
118+
LOG.info("Started Federated Worker at port: " + _port);
119119
f.channel().closeFuture().sync();
120-
}
120+
}
121121
catch(Exception e) {
122-
log.info("Federated worker interrupted");
122+
LOG.info("Federated worker interrupted");
123123
if(_debug) {
124-
log.error(e.getMessage());
124+
LOG.error(e.getMessage());
125125
e.printStackTrace();
126126
}
127127
}
128128
finally {
129-
log.info("Federated Worker Shutting down.");
129+
LOG.info("Federated Worker Shutting down.");
130130
workerGroup.shutdownGracefully();
131131
bossGroup.shutdownGracefully();
132+
132133
}
133134
}
134135

src/test/java/org/apache/sysds/test/AutomatedTestBase.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import java.io.File;
2929
import java.io.IOException;
3030
import java.io.PrintStream;
31+
import java.lang.management.ManagementFactory;
32+
import java.lang.management.RuntimeMXBean;
3133
import java.net.InetSocketAddress;
3234
import java.net.ServerSocket;
3335
import java.nio.charset.Charset;
@@ -1663,9 +1665,21 @@ protected static Process startLocalFedWorker(int port, String[] addArgs, int sle
16631665
"--add-opens=java.base/java.lang=ALL-UNNAMED" ,
16641666
"--add-opens=java.base/java.lang.ref=ALL-UNNAMED" ,
16651667
"--add-opens=java.base/java.util.concurrent=ALL-UNNAMED" ,
1666-
"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED",
1668+
"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED",};
1669+
1670+
RuntimeMXBean runtimeMxBean = ManagementFactory.getRuntimeMXBean();
1671+
List<String> jvmArgs = runtimeMxBean.getInputArguments();
1672+
1673+
for(String arg : jvmArgs) {
1674+
// add code coverage report
1675+
if(arg.contains("org.jacoco.agent"))
1676+
args = ArrayUtils.addAll(args,
1677+
new String[] {arg.replace("target/jacoco.exec", String.format("target/federated_jacoco/jacoco-%d.exec", port))});
1678+
}
1679+
1680+
args = ArrayUtils.addAll(args, new String[]{
16671681
"-cp", classpath,
1668-
DMLScript.class.getName(), "-w", Integer.toString(port), "-stats"};
1682+
DMLScript.class.getName(), "-w", Integer.toString(port), "-stats"});
16691683
if(addArgs != null)
16701684
args = ArrayUtils.addAll(args, addArgs);
16711685

src/test/java/org/apache/sysds/test/TestUtils.java

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,24 @@
1919

2020
package org.apache.sysds.test;
2121

22+
import static org.junit.Assert.assertEquals;
23+
import static org.junit.Assert.assertFalse;
24+
import static org.junit.Assert.assertNotNull;
25+
import static org.junit.Assert.assertTrue;
26+
import static org.junit.Assert.fail;
27+
2228
import java.io.BufferedReader;
2329
import java.io.BufferedWriter;
2430
import java.io.DataOutputStream;
2531
import java.io.File;
2632
import java.io.FileInputStream;
2733
import java.io.FileOutputStream;
2834
import java.io.FileReader;
29-
import java.io.RandomAccessFile;
3035
import java.io.IOException;
3136
import java.io.InputStreamReader;
3237
import java.io.OutputStreamWriter;
3338
import java.io.PrintWriter;
39+
import java.io.RandomAccessFile;
3440
import java.text.NumberFormat;
3541
import java.util.ArrayList;
3642
import java.util.Collections;
@@ -44,12 +50,8 @@
4450
import java.util.Random;
4551
import java.util.Set;
4652
import java.util.StringTokenizer;
53+
import java.util.concurrent.TimeUnit;
4754

48-
import static org.junit.Assert.assertEquals;
49-
import static org.junit.Assert.assertFalse;
50-
import static org.junit.Assert.assertNotNull;
51-
import static org.junit.Assert.assertTrue;
52-
import static org.junit.Assert.fail;
5355
import org.apache.commons.io.FileUtils;
5456
import org.apache.commons.io.IOUtils;
5557
import org.apache.commons.lang3.NotImplementedException;
@@ -3489,13 +3491,28 @@ public static void shutdownThread(Thread t) {
34893491
public static void shutdownThread(Process t) {
34903492
// kill the worker
34913493
if( t != null ) {
3492-
Process d = t.destroyForcibly();
3494+
sendSigInt(t);// Attempt graceful termination
34933495
try {
3494-
d.waitFor();
3495-
}
3496-
catch (InterruptedException e) {
3497-
e.printStackTrace();
3498-
}
3496+
// Wait up to 1 second for the process to exit
3497+
if (!t.waitFor(10, TimeUnit.SECONDS)) {
3498+
// If still alive after 1 second, force kill
3499+
Process forciblyDestroyed = t.destroyForcibly();
3500+
forciblyDestroyed.waitFor(); // Wait until it's definitely terminated
3501+
}
3502+
} catch (InterruptedException e) {
3503+
e.printStackTrace();
3504+
}
3505+
}
3506+
}
3507+
3508+
public static void sendSigInt(Process process) {
3509+
long pid = process.pid();
3510+
ProcessBuilder pb = new ProcessBuilder("kill", "-SIGINT", Long.toString(pid));
3511+
try {
3512+
pb.inheritIO().start().waitFor();
3513+
}
3514+
catch(IOException | InterruptedException e) {
3515+
e.printStackTrace();
34993516
}
35003517
}
35013518

0 commit comments

Comments
 (0)