Skip to content

Commit 51e167f

Browse files
committed
output buffer test
1 parent 751dabb commit 51e167f

File tree

6 files changed

+56
-35
lines changed

6 files changed

+56
-35
lines changed

src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -651,8 +651,7 @@ private FederatedResponse execUDF(FederatedRequest request, ExecutionContextMap
651651
// get function and input parameters
652652
try {
653653
FederatedUDF udf = (FederatedUDF) request.getParam(0);
654-
if(LOG.isDebugEnabled())
655-
LOG.debug(udf);
654+
LOG.debug(udf);
656655

657656
eventStage.operation = udf.getClass().getSimpleName();
658657

src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -644,12 +644,17 @@ public void forEachParallel(BiFunction<FederatedRange, FederatedData, Void> forE
644644
ArrayList<MappingTask> mappingTasks = new ArrayList<>();
645645
for(Pair<FederatedRange, FederatedData> fedMap : _fedMap)
646646
mappingTasks.add(new MappingTask(fedMap.getKey(), fedMap.getValue(), forEachFunction, _ID));
647+
647648
try {
648-
pool.invokeAll(mappingTasks, ConfigurationManager.getFederatedTimeout(), TimeUnit.SECONDS);
649+
for(Future<?> t:pool.invokeAll(mappingTasks, ConfigurationManager.getFederatedTimeout(), TimeUnit.SECONDS)){
650+
if(!t.isDone())
651+
throw new RuntimeException("Timeout");
652+
else if(t.isCancelled())
653+
throw new RuntimeException("Failed");
654+
}
649655
}
650656
catch(InterruptedException e) {
651-
// TODO Auto-generated catch block
652-
e.printStackTrace();
657+
throw new RuntimeException(e);
653658
}
654659
finally{
655660
pool.shutdown();
@@ -667,15 +672,25 @@ public void forEachParallel(BiFunction<FederatedRange, FederatedData, Void> forE
667672
* @return the new <code>FederationMap</code>
668673
*/
669674
public FederationMap mapParallel(long newVarID, BiFunction<FederatedRange, FederatedData, Void> mappingFunction) {
670-
ExecutorService pool = CommonThreadPool.get(_fedMap.size());
671-
675+
ExecutorService pool = Executors.newFixedThreadPool(_fedMap.size());
672676
FederationMap fedMapCopy = copyWithNewID(_ID);
673677
ArrayList<MappingTask> mappingTasks = new ArrayList<>();
674678
for(Pair<FederatedRange, FederatedData> fedMap : fedMapCopy._fedMap)
675679
mappingTasks.add(new MappingTask(fedMap.getKey(), fedMap.getValue(), mappingFunction, newVarID));
676-
CommonThreadPool.invokeAndShutdown(pool, mappingTasks);
677-
fedMapCopy._ID = newVarID;
678-
return fedMapCopy;
680+
try{
681+
for(Future<?> t : pool.invokeAll(mappingTasks, ConfigurationManager.getFederatedTimeout(), TimeUnit.SECONDS)){
682+
if(!t.isDone())
683+
throw new RuntimeException("Timeout");
684+
else if(t.isCancelled()){
685+
throw new RuntimeException("Failed");
686+
}
687+
}
688+
fedMapCopy._ID = newVarID;
689+
return fedMapCopy;
690+
}
691+
catch(Exception e){
692+
throw new RuntimeException(e);
693+
}
679694
}
680695

681696
public FederationMap filter(IndexRange ixrange) {

src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.util.List;
2626
import java.util.Map;
2727
import java.util.concurrent.Future;
28-
import java.util.concurrent.TimeUnit;
2928
import java.util.concurrent.atomic.LongAdder;
3029
import java.util.stream.Stream;
3130
import java.util.zip.Adler32;
@@ -36,8 +35,7 @@
3635
import org.apache.sysds.common.Types;
3736
import org.apache.sysds.common.Types.DataType;
3837
import org.apache.sysds.common.Types.ValueType;
39-
import org.apache.sysds.conf.ConfigurationManager;
40-
import org.apache.sysds.conf.DMLConfig;
38+
import org.apache.sysds.hops.OptimizerUtils;
4139
import org.apache.sysds.hops.fedplanner.FTypes;
4240
import org.apache.sysds.hops.fedplanner.FTypes.FType;
4341
import org.apache.sysds.lops.PickByCount;
@@ -50,10 +48,10 @@
5048
import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest.RequestType;
5149
import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
5250
import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse.ResponseType;
53-
import org.apache.sysds.runtime.frame.data.FrameBlock;
5451
import org.apache.sysds.runtime.controlprogram.federated.FederatedUDF;
5552
import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
5653
import org.apache.sysds.runtime.controlprogram.federated.FederationUtils;
54+
import org.apache.sysds.runtime.frame.data.FrameBlock;
5755
import org.apache.sysds.runtime.instructions.InstructionUtils;
5856
import org.apache.sysds.runtime.instructions.cp.CPOperand;
5957
import org.apache.sysds.runtime.instructions.cp.Data;
@@ -176,8 +174,9 @@ public void processInstruction(ExecutionContext ec) {
176174
new CreateFrameEncoder(data.getVarID(), spec, columnOffset + 1)));
177175
// collect responses with encoders
178176
try {
179-
FederatedResponse response = responseFuture.get(ConfigurationManager.getFederatedTimeout(), TimeUnit.SECONDS);
177+
FederatedResponse response = responseFuture.get();
180178
MultiColumnEncoder encoder = (MultiColumnEncoder) response.getData()[0];
179+
181180
// merge this encoder into a composite encoder
182181
synchronized(finalGlobalEncoder) {
183182
finalGlobalEncoder.mergeAt(encoder, columnOffset, (int) (range.getBeginDims()[0] + 1));
@@ -381,24 +380,30 @@ public ExecuteFrameEncoder(long input, long output, MultiColumnEncoder encoder)
381380

382381
@Override
383382
public FederatedResponse execute(ExecutionContext ec, Data... data) {
384-
FrameBlock fb = ((FrameObject) data[0]).acquireReadAndRelease();
385-
386-
// offset is applied on the Worker to shift the local encoders to their respective column
387-
_encoder.applyColumnOffset();
388-
// apply transformation
389-
//MatrixBlock mbout = _encoder.apply(fb, OptimizerUtils.getTransformNumThreads());
390-
// FIXME: Enabling multithreading intermittently hangs
391-
MatrixBlock mbout = _encoder.apply(fb, 1);
392-
393-
// create output matrix object
394-
MatrixObject mo = ExecutionContext.createMatrixObject(mbout);
395-
396-
// add it to the list of variables
397-
ec.setVariable(String.valueOf(_outputID), mo);
383+
try{
398384

399-
// return id handle
400-
return new FederatedResponse(
401-
ResponseType.SUCCESS_EMPTY, mbout.getNonZeros());
385+
FrameBlock fb = ((FrameObject) data[0]).acquireReadAndRelease();
386+
387+
// offset is applied on the Worker to shift the local encoders to their respective column
388+
_encoder.applyColumnOffset();
389+
// apply transformation
390+
MatrixBlock mbout = _encoder.apply(fb, OptimizerUtils.getTransformNumThreads());
391+
// FIXME: Enabling multithreading intermittently hangs
392+
// MatrixBlock mbout = _encoder.apply(fb, 1);
393+
394+
// create output matrix object
395+
MatrixObject mo = ExecutionContext.createMatrixObject(mbout);
396+
397+
// add it to the list of variables
398+
ec.setVariable(String.valueOf(_outputID), mo);
399+
400+
// return id handle
401+
return new FederatedResponse(
402+
ResponseType.SUCCESS_EMPTY, mbout.getNonZeros());
403+
}
404+
catch(Exception e){
405+
return new FederatedResponse(ResponseType.ERROR);
406+
}
402407
}
403408

404409
@Override

src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ public void writeExternal(ObjectOutput out) throws IOException {
333333

334334
for(Entry<Object, Integer> e : _rcdMap.entrySet()) {
335335
out.writeUTF(e.getKey().toString());
336-
out.writeLong(e.getValue());
336+
out.writeInt(e.getValue());
337337
}
338338
}
339339

src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -364,10 +364,11 @@ public MatrixBlock apply(CacheBlock<?> in, MatrixBlock out, int outputCol, int k
364364
// There should be a encoder for every column
365365
if(hasLegacyEncoder() && !(in instanceof FrameBlock))
366366
throw new DMLRuntimeException("LegacyEncoders do not support non FrameBlock Inputs");
367-
int numEncoders = getFromAll(ColumnEncoderComposite.class, ColumnEncoder::getColID).size();
367+
int numEncoders = getEncoders().size();
368+
// getFromAll(ColumnEncoderComposite.class, ColumnEncoder::getColID).size();
368369
if(in.getNumColumns() != numEncoders)
369370
throw new DMLRuntimeException("Not every column in has a CompositeEncoder. Please make sure every column "
370-
+ "has a encoder or slice the input accordingly");
371+
+ "has a encoder or slice the input accordingly: num encoders: " + getEncoders() + " vs columns " + in.getNumColumns());
371372
// TODO smart checks
372373
// Block allocation for MT access
373374
if(in.getNumRows() == 0)

src/test/java/org/apache/sysds/test/functions/federated/primitives/part3/FederatedTokenizeTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ public void testTokenizeFullDenseFrameCP() {
7979

8080
private void runAggregateOperationTest(ExecMode execMode) {
8181
setExecMode(execMode);
82+
setOutputBuffering(true);
8283

8384
String TEST_NAME = TEST_NAME1;
8485

0 commit comments

Comments
 (0)