Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
# Build the docker containers

# The first build is for running systemds through docker.
docker image build -f docker/sysds.Dockerfile -t apache/systemds:latest .
# docker image build -f docker/sysds.Dockerfile -t apache/systemds:latest .

# The second build is for testing systemds. This image installs the R dependencies needed to run the tests.
docker image build -f docker/testsysds.Dockerfile -t apache/systemds:testing-latest .
Expand Down
4 changes: 4 additions & 0 deletions docker/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ log="/tmp/sysdstest.log"
mvn -ntp -B test-compile 2>&1 | grep -E "BUILD|Total time:|---|Building SystemDS"
mvn -ntp -B test -D maven.test.skip=false -D automatedtestbase.outputbuffering=true -D test=$1 2>&1 | grep -v "already exists in destination." | tee $log

# Merge Federated test runs.
[ -f target/jacoco.exec ] && mv target/jacoco.exec target/jacoco_main.exec
mvn -ntp -B jacoco:merge

grep_args="SUCCESS"
grepvals="$( tail -n 100 $log | grep $grep_args)"

Expand Down
2 changes: 1 addition & 1 deletion docker/sysds.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#
#-------------------------------------------------------------

FROM ubuntu:24.04
FROM ubuntu:24.04@sha256:6015f66923d7afbc53558d7ccffd325d43b4e249f41a6e93eef074c9505d2233

WORKDIR /usr/src/

Expand Down
13 changes: 8 additions & 5 deletions docker/testsysds.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
#
#-------------------------------------------------------------

FROM ubuntu:24.04

FROM ubuntu:24.04@sha256:6015f66923d7afbc53558d7ccffd325d43b4e249f41a6e93eef074c9505d2233

WORKDIR /usr/src/
ENV MAVEN_VERSION=3.9.9
Expand All @@ -34,9 +33,6 @@ ENV LC_ALL=en_US.UTF-8
ENV LANG=en_US.UTF-8
ENV LD_LIBRARY_PATH=/usr/local/lib/

COPY ./src/test/scripts/installDependencies.R installDependencies.R
COPY ./docker/entrypoint.sh /entrypoint.sh

RUN apt-get update -qq \
&& apt-get upgrade -y \
&& apt-get install -y --no-install-recommends \
Expand Down Expand Up @@ -74,7 +70,9 @@ RUN apt-get install -y --no-install-recommends \
r-base-dev \
r-base-core


# Install R packages
COPY ./src/test/scripts/installDependencies.R installDependencies.R
RUN Rscript installDependencies.R \
&& rm -rf installDependencies.R \
&& rm -rf /var/lib/apt/lists/*
Expand All @@ -86,4 +84,9 @@ RUN wget -qO- https://github.com/microsoft/SEAL/archive/refs/tags/v3.7.0.tar.gz
&& cmake --build build \
&& cmake --install build

# Finally copy the entrypoint script
# This is last to enable quick updates to the script after initial local build.
COPY ./docker/entrypoint.sh /entrypoint.sh


ENTRYPOINT ["/entrypoint.sh"]
14 changes: 13 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,19 @@
<fileSet>
<directory>${project.build.directory}</directory>
<includes>
<include>*/jacoco.exec</include>
<include>*.exec</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.build.directory}</directory>
<includes>
<include>transient_jacoco**/*.exec</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.build.directory}</directory>
<includes>
<include>federated_jacoco/*.exec</include>
</includes>
</fileSet>
</fileSets>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,6 @@

import javax.net.ssl.SSLException;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.log4j.Logger;
import org.apache.sysds.api.DMLScript;
Expand All @@ -50,25 +43,32 @@
import org.apache.sysds.runtime.lineage.LineageCache;
import org.apache.sysds.runtime.lineage.LineageCacheConfig;
import org.apache.sysds.runtime.lineage.LineageCacheConfig.ReuseCacheType;
import org.apache.sysds.runtime.lineage.LineageItem;
import org.apache.sysds.utils.stats.InfrastructureAnalyzer;
import org.apache.sysds.utils.stats.Timing;
import org.apache.sysds.runtime.lineage.LineageItem;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ClassResolvers;

@SuppressWarnings("deprecation")
public class FederatedWorker {
protected static Logger log = Logger.getLogger(FederatedWorker.class);
protected static Logger LOG = Logger.getLogger(FederatedWorker.class);

private final int _port;
private final FederatedLookupTable _flt;
Expand Down Expand Up @@ -96,7 +96,7 @@ public FederatedWorker(int port, boolean debug) {
}

private void run() {
log.info("Setting up Federated Worker on port " + _port);
LOG.info("Setting up Federated Worker on port " + _port);
int par_conn = ConfigurationManager.getDMLConfig().getIntValue(DMLConfig.FEDERATED_PAR_CONN);
final int EVENT_LOOP_THREADS = (par_conn > 0) ? par_conn : InfrastructureAnalyzer.getLocalParallelism();
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
Expand All @@ -113,22 +113,23 @@ private void run() {
b.option(ChannelOption.SO_BACKLOG, 128);
b.childOption(ChannelOption.SO_KEEPALIVE, true);

log.info("Starting Federated Worker server at port: " + _port);
LOG.info("Starting Federated Worker server at port: " + _port);
ChannelFuture f = b.bind(_port).sync();
log.info("Started Federated Worker at port: " + _port);
LOG.info("Started Federated Worker at port: " + _port);
f.channel().closeFuture().sync();
}
}
catch(Exception e) {
log.info("Federated worker interrupted");
LOG.info("Federated worker interrupted");
if(_debug) {
log.error(e.getMessage());
LOG.error(e.getMessage());
e.printStackTrace();
}
}
finally {
log.info("Federated Worker Shutting down.");
LOG.info("Federated Worker Shutting down.");
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();

}
}

Expand Down
18 changes: 16 additions & 2 deletions src/test/java/org/apache/sysds/test/AutomatedTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.charset.Charset;
Expand Down Expand Up @@ -1663,9 +1665,21 @@ protected static Process startLocalFedWorker(int port, String[] addArgs, int sle
"--add-opens=java.base/java.lang=ALL-UNNAMED" ,
"--add-opens=java.base/java.lang.ref=ALL-UNNAMED" ,
"--add-opens=java.base/java.util.concurrent=ALL-UNNAMED" ,
"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED",
"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED",};

RuntimeMXBean runtimeMxBean = ManagementFactory.getRuntimeMXBean();
List<String> jvmArgs = runtimeMxBean.getInputArguments();

for(String arg : jvmArgs) {
// add code coverage report
if(arg.contains("org.jacoco.agent"))
args = ArrayUtils.addAll(args,
new String[] {arg.replace("target/jacoco.exec", String.format("target/federated_jacoco/jacoco-%d.exec", port))});
}

args = ArrayUtils.addAll(args, new String[]{
"-cp", classpath,
DMLScript.class.getName(), "-w", Integer.toString(port), "-stats"};
DMLScript.class.getName(), "-w", Integer.toString(port), "-stats"});
if(addArgs != null)
args = ArrayUtils.addAll(args, addArgs);

Expand Down
41 changes: 29 additions & 12 deletions src/test/java/org/apache/sysds/test/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,24 @@

package org.apache.sysds.test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.RandomAccessFile;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.RandomAccessFile;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -44,12 +50,8 @@
import java.util.Random;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.NotImplementedException;
Expand Down Expand Up @@ -3489,13 +3491,28 @@ public static void shutdownThread(Thread t) {
public static void shutdownThread(Process t) {
// kill the worker
if( t != null ) {
Process d = t.destroyForcibly();
sendSigInt(t);// Attempt graceful termination
try {
d.waitFor();
}
catch (InterruptedException e) {
e.printStackTrace();
}
// Wait up to 1 second for the process to exit
if (!t.waitFor(10, TimeUnit.SECONDS)) {
// If still alive after 1 second, force kill
Process forciblyDestroyed = t.destroyForcibly();
forciblyDestroyed.waitFor(); // Wait until it's definitely terminated
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public static void sendSigInt(Process process) {
long pid = process.pid();
ProcessBuilder pb = new ProcessBuilder("kill", "-SIGINT", Long.toString(pid));
try {
pb.inheritIO().start().waitFor();
}
catch(IOException | InterruptedException e) {
e.printStackTrace();
}
}

Expand Down
Loading