Skip to content

Commit 8f5a42c

Browse files
committed
[MINOR] Federated Timeout
This commit reduce the timeout for federated tests, and enforce the timeout on federated requests. Previously we had some test cases that would infinitely run, and therefore we would not be able to decipher the log messages (because nothing would be written). This commit change it by enforcing a strict 16 seconds execution time for a single federated requests and a 1 day timeout for a default federated requests. Previously some operations did use the federated timeout. However, it was not enforced in critical places. Closes #2179
1 parent cc56144 commit 8f5a42c

File tree

10 files changed

+76
-34
lines changed

10 files changed

+76
-34
lines changed

src/main/java/org/apache/sysds/conf/DMLConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ public class DMLConfig
201201
_defaultVals.put(FLOATING_POINT_PRECISION, "double" );
202202
_defaultVals.put(USE_SSL_FEDERATED_COMMUNICATION, "false");
203203
_defaultVals.put(DEFAULT_FEDERATED_INITIALIZATION_TIMEOUT, "10");
204-
_defaultVals.put(FEDERATED_TIMEOUT, "-1");
204+
_defaultVals.put(FEDERATED_TIMEOUT, "86400"); // default 1 day compute timeout.
205205
_defaultVals.put(FEDERATED_PLANNER, FederatedPlanner.RUNTIME.name());
206206
_defaultVals.put(FEDERATED_PAR_CONN, "-1"); // vcores
207207
_defaultVals.put(FEDERATED_PAR_INST, "-1"); // vcores

src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedStatistics.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -503,7 +503,8 @@ public static List<DataObjectModel> getWorkerDataObjects() {
503503
return new ArrayList<>(workerDataObjects.values());
504504
}
505505

506-
public static void addEvent(EventModel event) {
506+
public synchronized static void addEvent(EventModel event) {
507+
// synchronized, because multiple requests can be handled concurrently
507508
workerEvents.add(event);
508509
}
509510

src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -651,8 +651,7 @@ private FederatedResponse execUDF(FederatedRequest request, ExecutionContextMap
651651
// get function and input parameters
652652
try {
653653
FederatedUDF udf = (FederatedUDF) request.getParam(0);
654-
if(LOG.isDebugEnabled())
655-
LOG.debug(udf);
654+
LOG.debug(udf);
656655

657656
eventStage.operation = udf.getClass().getSimpleName();
658657

src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,17 @@
2626
import java.util.Map.Entry;
2727
import java.util.concurrent.Callable;
2828
import java.util.concurrent.ExecutorService;
29+
import java.util.concurrent.Executors;
2930
import java.util.concurrent.Future;
31+
import java.util.concurrent.TimeUnit;
3032
import java.util.function.BiFunction;
3133
import java.util.stream.IntStream;
3234
import java.util.stream.Stream;
3335

3436
import org.apache.commons.lang3.tuple.Pair;
3537
import org.apache.sysds.common.Types.DataType;
3638
import org.apache.sysds.common.Types.ValueType;
39+
import org.apache.sysds.conf.ConfigurationManager;
3740
import org.apache.sysds.hops.fedplanner.FTypes.AlignType;
3841
import org.apache.sysds.hops.fedplanner.FTypes.FType;
3942
import org.apache.sysds.lops.RightIndex;
@@ -637,11 +640,25 @@ public long getMaxIndexInRange(int dim) {
637640
* @param forEachFunction function to execute for each pair
638641
*/
639642
public void forEachParallel(BiFunction<FederatedRange, FederatedData, Void> forEachFunction) {
640-
ExecutorService pool = CommonThreadPool.get(_fedMap.size());
643+
ExecutorService pool = Executors.newFixedThreadPool(_fedMap.size());
641644
ArrayList<MappingTask> mappingTasks = new ArrayList<>();
642645
for(Pair<FederatedRange, FederatedData> fedMap : _fedMap)
643646
mappingTasks.add(new MappingTask(fedMap.getKey(), fedMap.getValue(), forEachFunction, _ID));
644-
CommonThreadPool.invokeAndShutdown(pool, mappingTasks);
647+
648+
try {
649+
for(Future<?> t:pool.invokeAll(mappingTasks, ConfigurationManager.getFederatedTimeout(), TimeUnit.SECONDS)){
650+
if(!t.isDone())
651+
throw new RuntimeException("Timeout");
652+
else if(t.isCancelled())
653+
throw new RuntimeException("Failed");
654+
}
655+
}
656+
catch(InterruptedException e) {
657+
throw new RuntimeException(e);
658+
}
659+
finally{
660+
pool.shutdown();
661+
}
645662
}
646663

647664
/**
@@ -655,15 +672,25 @@ public void forEachParallel(BiFunction<FederatedRange, FederatedData, Void> forE
655672
* @return the new <code>FederationMap</code>
656673
*/
657674
public FederationMap mapParallel(long newVarID, BiFunction<FederatedRange, FederatedData, Void> mappingFunction) {
658-
ExecutorService pool = CommonThreadPool.get(_fedMap.size());
659-
675+
ExecutorService pool = Executors.newFixedThreadPool(_fedMap.size());
660676
FederationMap fedMapCopy = copyWithNewID(_ID);
661677
ArrayList<MappingTask> mappingTasks = new ArrayList<>();
662678
for(Pair<FederatedRange, FederatedData> fedMap : fedMapCopy._fedMap)
663679
mappingTasks.add(new MappingTask(fedMap.getKey(), fedMap.getValue(), mappingFunction, newVarID));
664-
CommonThreadPool.invokeAndShutdown(pool, mappingTasks);
665-
fedMapCopy._ID = newVarID;
666-
return fedMapCopy;
680+
try{
681+
for(Future<?> t : pool.invokeAll(mappingTasks, ConfigurationManager.getFederatedTimeout(), TimeUnit.SECONDS)){
682+
if(!t.isDone())
683+
throw new RuntimeException("Timeout");
684+
else if(t.isCancelled()){
685+
throw new RuntimeException("Failed");
686+
}
687+
}
688+
fedMapCopy._ID = newVarID;
689+
return fedMapCopy;
690+
}
691+
catch(Exception e){
692+
throw new RuntimeException(e);
693+
}
667694
}
668695

669696
public FederationMap filter(IndexRange ixrange) {

src/main/java/org/apache/sysds/runtime/instructions/cp/ParamservBuiltinCPInstruction.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import java.util.concurrent.ExecutorService;
5252
import java.util.concurrent.Executors;
5353
import java.util.concurrent.Future;
54+
import java.util.concurrent.TimeUnit;
5455
import java.util.stream.Collectors;
5556
import java.util.stream.IntStream;
5657

@@ -61,6 +62,7 @@
6162
import org.apache.spark.util.LongAccumulator;
6263
import org.apache.sysds.api.DMLScript;
6364
import org.apache.sysds.common.Types.ExecType;
65+
import org.apache.sysds.conf.ConfigurationManager;
6466
import org.apache.sysds.hops.recompile.Recompiler;
6567
import org.apache.sysds.parser.Statement.FederatedPSScheme;
6668
import org.apache.sysds.parser.Statement.PSFrequency;
@@ -241,13 +243,16 @@ model, aggServiceEC, getValFunction(), getNumBatchesPerEpoch(runtimeBalancing, r
241243

242244
try {
243245
// Launch the worker threads and wait for completion
244-
for (Future<Void> ret : es.invokeAll(threads))
245-
ret.get(); //error handling
246+
for (Future<Void> ret : es.invokeAll(threads, ConfigurationManager.getFederatedTimeout(), TimeUnit.SECONDS)){
247+
if(!ret.isDone())
248+
throw new RuntimeException("Failed federated execution");
249+
// ret.get(); //error handling
250+
}
246251
// Fetch the final model from ps
247252
ec.setVariable(output.getName(), ps.getResult());
248253
if (DMLScript.STATISTICS)
249254
ParamServStatistics.accExecutionTime((long) ParamServStatistics.getExecutionTimer().stop());
250-
} catch (InterruptedException | ExecutionException e) {
255+
} catch (Exception e) {
251256
throw new DMLRuntimeException("ParamservBuiltinCPInstruction: unknown error: ", e);
252257
} finally {
253258
es.shutdownNow();

src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.sysds.common.Types;
3636
import org.apache.sysds.common.Types.DataType;
3737
import org.apache.sysds.common.Types.ValueType;
38+
import org.apache.sysds.hops.OptimizerUtils;
3839
import org.apache.sysds.hops.fedplanner.FTypes;
3940
import org.apache.sysds.hops.fedplanner.FTypes.FType;
4041
import org.apache.sysds.lops.PickByCount;
@@ -47,10 +48,10 @@
4748
import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest.RequestType;
4849
import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
4950
import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse.ResponseType;
50-
import org.apache.sysds.runtime.frame.data.FrameBlock;
5151
import org.apache.sysds.runtime.controlprogram.federated.FederatedUDF;
5252
import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
5353
import org.apache.sysds.runtime.controlprogram.federated.FederationUtils;
54+
import org.apache.sysds.runtime.frame.data.FrameBlock;
5455
import org.apache.sysds.runtime.instructions.InstructionUtils;
5556
import org.apache.sysds.runtime.instructions.cp.CPOperand;
5657
import org.apache.sysds.runtime.instructions.cp.Data;
@@ -175,6 +176,7 @@ public void processInstruction(ExecutionContext ec) {
175176
try {
176177
FederatedResponse response = responseFuture.get();
177178
MultiColumnEncoder encoder = (MultiColumnEncoder) response.getData()[0];
179+
178180
// merge this encoder into a composite encoder
179181
synchronized(finalGlobalEncoder) {
180182
finalGlobalEncoder.mergeAt(encoder, columnOffset, (int) (range.getBeginDims()[0] + 1));
@@ -378,24 +380,30 @@ public ExecuteFrameEncoder(long input, long output, MultiColumnEncoder encoder)
378380

379381
@Override
380382
public FederatedResponse execute(ExecutionContext ec, Data... data) {
381-
FrameBlock fb = ((FrameObject) data[0]).acquireReadAndRelease();
382-
383-
// offset is applied on the Worker to shift the local encoders to their respective column
384-
_encoder.applyColumnOffset();
385-
// apply transformation
386-
//MatrixBlock mbout = _encoder.apply(fb, OptimizerUtils.getTransformNumThreads());
387-
// FIXME: Enabling multithreading intermittently hangs
388-
MatrixBlock mbout = _encoder.apply(fb, 1);
389-
390-
// create output matrix object
391-
MatrixObject mo = ExecutionContext.createMatrixObject(mbout);
392-
393-
// add it to the list of variables
394-
ec.setVariable(String.valueOf(_outputID), mo);
383+
try{
395384

396-
// return id handle
397-
return new FederatedResponse(
398-
ResponseType.SUCCESS_EMPTY, mbout.getNonZeros());
385+
FrameBlock fb = ((FrameObject) data[0]).acquireReadAndRelease();
386+
387+
// offset is applied on the Worker to shift the local encoders to their respective column
388+
_encoder.applyColumnOffset();
389+
// apply transformation
390+
MatrixBlock mbout = _encoder.apply(fb, OptimizerUtils.getTransformNumThreads());
391+
// FIXME: Enabling multithreading intermittently hangs
392+
// MatrixBlock mbout = _encoder.apply(fb, 1);
393+
394+
// create output matrix object
395+
MatrixObject mo = ExecutionContext.createMatrixObject(mbout);
396+
397+
// add it to the list of variables
398+
ec.setVariable(String.valueOf(_outputID), mo);
399+
400+
// return id handle
401+
return new FederatedResponse(
402+
ResponseType.SUCCESS_EMPTY, mbout.getNonZeros());
403+
}
404+
catch(Exception e){
405+
return new FederatedResponse(ResponseType.ERROR);
406+
}
399407
}
400408

401409
@Override

src/test/config/SystemDS-MultiTenant-config.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,6 @@
2121
<!-- The timeout of the federated tests to initialize the federated matrixes -->
2222
<sysds.federated.initialization.timeout>30</sysds.federated.initialization.timeout>
2323
<!-- The timeout of each instruction sent to federated workers -->
24-
<sysds.federated.timeout>128</sysds.federated.timeout>
24+
<sysds.federated.timeout>16</sysds.federated.timeout>
2525
<sysds.local.spark>true</sysds.local.spark>
2626
</root>

src/test/config/SystemDS-config.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,5 @@
2323
<!-- The timeout of the federated tests to initialize the federated matrixes -->
2424
<sysds.federated.initialization.timeout>2</sysds.federated.initialization.timeout>
2525
<!-- The timeout of each instruction sent to federated workers -->
26-
<sysds.federated.timeout>128</sysds.federated.timeout>
26+
<sysds.federated.timeout>16</sysds.federated.timeout>
2727
</root>

src/test/java/org/apache/sysds/test/functions/codegen/RowVectorComparisonTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ private void testCodegenIntegration( String testname, boolean rewrites, ExecType
128128
{
129129
boolean oldFlag = OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION;
130130
ExecMode platformOld = setExecMode(instType);
131+
setOutputBuffering(true);
131132

132133
try
133134
{

src/test/java/org/apache/sysds/test/functions/federated/primitives/part3/FederatedTokenizeTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ public void testTokenizeFullDenseFrameCP() {
7979

8080
private void runAggregateOperationTest(ExecMode execMode) {
8181
setExecMode(execMode);
82+
setOutputBuffering(true);
8283

8384
String TEST_NAME = TEST_NAME1;
8485

0 commit comments

Comments
 (0)