Skip to content

Commit 58361b2

Browse files
committed
[SYSTEMDS-3893] Basic out-of-core binary-read and acquire primitive
This patch introduces a basic integration of the out-of-core backend. For reading, we use a dedicated reblock instruction which creates a queue of blocks, spawns a thread for reading and immediately returns. In addition, we extended the acquireRead functionality to collect such streams of blocks whenever an operations requires the full matrix. Based on these foundations, we can now add other OCC operations that directly work with the input stream of blocks and produce either results or created modified output streams.
1 parent 5aa0307 commit 58361b2

File tree

19 files changed

+318
-50
lines changed

19 files changed

+318
-50
lines changed

.github/workflows/javaTests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ jobs:
7373
"**.functions.builtin.part1.**",
7474
"**.functions.builtin.part2.**",
7575
"**.functions.frame.**,**.functions.indexing.**,**.functions.io.**,**.functions.iogen.**",
76-
"**.functions.dnn.**",
76+
"**.functions.dnn.**,**.functions.ooc.**",
7777
"**.functions.paramserv.**",
7878
"**.functions.recompile.**,**.functions.misc.**",
7979
"**.functions.mlcontext.**",

src/main/java/org/apache/sysds/common/Opcodes.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ public enum Opcodes {
349349
MAPMIN("mapmin", InstructionType.Binary),
350350

351351
//REBLOCK Instruction Opcodes
352-
RBLK("rblk", null, InstructionType.Reblock),
352+
RBLK("rblk", null, InstructionType.Reblock, null, InstructionType.Reblock),
353353
CSVRBLK("csvrblk", InstructionType.CSVReblock),
354354
LIBSVMRBLK("libsvmrblk", InstructionType.LIBSVMReblock),
355355

@@ -398,31 +398,31 @@ public enum Opcodes {
398398

399399
// Constructors
400400
Opcodes(String name, InstructionType type) {
401-
this._name = name;
402-
this._type = type;
403-
this._spType=null;
404-
this._fedType=null;
401+
this(name, type, null, null, null);
405402
}
406403

407404
Opcodes(String name, InstructionType type, InstructionType spType){
408-
this._name=name;
409-
this._type=type;
410-
this._spType=spType;
411-
this._fedType=null;
405+
this(name, type, spType, null, null);
412406
}
413407

414408
Opcodes(String name, InstructionType type, InstructionType spType, InstructionType fedType){
409+
this(name, type, spType, fedType, null);
410+
}
411+
412+
Opcodes(String name, InstructionType type, InstructionType spType, InstructionType fedType, InstructionType oocType){
415413
this._name=name;
416414
this._type=type;
417415
this._spType=spType;
418416
this._fedType=fedType;
417+
this._oocType=oocType;
419418
}
420419

421420
// Fields
422421
private final String _name;
423422
private final InstructionType _type;
424423
private final InstructionType _spType;
425424
private final InstructionType _fedType;
425+
private final InstructionType _oocType;
426426

427427
private static final Map<String, Opcodes> _lookupMap = new HashMap<>();
428428

@@ -451,6 +451,10 @@ public InstructionType getSpType() {
451451
public InstructionType getFedType(){
452452
return _fedType != null ? _fedType : _type;
453453
}
454+
455+
public InstructionType getOocType(){
456+
return _oocType != null ? _oocType : _type;
457+
}
454458

455459
public static InstructionType getTypeByOpcode(String opcode, Types.ExecType type) {
456460
if (opcode == null || opcode.trim().isEmpty()) {
@@ -463,6 +467,8 @@ public static InstructionType getTypeByOpcode(String opcode, Types.ExecType type
463467
return (op.getSpType() != null) ? op.getSpType() : op.getType();
464468
case FED:
465469
return (op.getFedType() != null) ? op.getFedType() : op.getType();
470+
case OOC:
471+
return (op.getOocType() != null) ? op.getOocType() : op.getType();
466472
default:
467473
return op.getType();
468474
}

src/main/java/org/apache/sysds/hops/AggUnaryOp.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ public Lop constructLops()
116116
ExecType et = optFindExecType();
117117
Hop input = getInput().get(0);
118118

119-
if ( et == ExecType.CP || et == ExecType.GPU || et == ExecType.FED )
119+
if ( et == ExecType.CP || et == ExecType.GPU || et == ExecType.FED || et == ExecType.OOC )
120120
{
121121
Lop agg1 = null;
122122
if( isTernaryAggregateRewriteApplicable() ) {
@@ -409,6 +409,9 @@ else if(getInput().get(0).areDimsBelowThreshold() || getInput().get(0).isVector(
409409
else
410410
setRequiresRecompileIfNecessary();
411411

412+
if( _etype == ExecType.OOC ) //TODO
413+
setExecType(ExecType.CP);
414+
412415
return _etype;
413416
}
414417

src/main/java/org/apache/sysds/hops/BinaryOp.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -854,6 +854,9 @@ else if( (op == OpOp2.CBIND && getDataType().isList())
854854
_etype = ExecType.CP;
855855
}
856856

857+
if( _etype == ExecType.OOC ) //TODO
858+
setExecType(ExecType.CP);
859+
857860
//mark for recompile (forever)
858861
setRequiresRecompileIfNecessary();
859862

src/main/java/org/apache/sysds/hops/DataOp.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import org.apache.commons.logging.Log;
2626
import org.apache.commons.logging.LogFactory;
27+
import org.apache.sysds.api.DMLScript;
2728
import org.apache.sysds.common.Types.DataType;
2829
import org.apache.sysds.common.Types.FileFormat;
2930
import org.apache.sysds.common.Types.OpOpData;
@@ -465,6 +466,9 @@ else if ( getInput().get(0).areDimsBelowThreshold() )
465466
}
466467
else //READ
467468
{
469+
if( DMLScript.USE_OOC )
470+
checkAndSetForcedPlatform();
471+
468472
//mark for recompile (forever)
469473
if( ConfigurationManager.isDynamicRecompilation() && !dimsKnown(true) && letype==ExecType.SPARK
470474
&& (_recompileRead || _requiresCheckpoint) )
@@ -473,7 +477,7 @@ else if ( getInput().get(0).areDimsBelowThreshold() )
473477
}
474478

475479
_etype = letype;
476-
if ( _etypeForced == ExecType.FED )
480+
if ( _etypeForced == ExecType.FED || _etypeForced == ExecType.OOC )
477481
_etype = _etypeForced;
478482
}
479483

src/main/java/org/apache/sysds/hops/Hop.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,8 @@ public void checkAndSetForcedPlatform()
256256
{
257257
if(DMLScript.USE_ACCELERATOR && DMLScript.FORCE_ACCELERATOR && isGPUEnabled())
258258
_etypeForced = ExecType.GPU; // enabled with -gpu force option
259+
else if (DMLScript.USE_OOC)
260+
_etypeForced = ExecType.OOC;
259261
else if ( DMLScript.getGlobalExecMode() == ExecMode.SINGLE_NODE && _etypeForced != ExecType.FED ) {
260262
if(OptimizerUtils.isMemoryBasedOptLevel() && DMLScript.USE_ACCELERATOR && isGPUEnabled()) {
261263
// enabled with -exec singlenode -gpu option
@@ -406,7 +408,7 @@ public void constructAndSetLopsDataFlowProperties() {
406408
private void constructAndSetReblockLopIfRequired()
407409
{
408410
//determine execution type
409-
ExecType et = ExecType.CP;
411+
ExecType et = DMLScript.USE_OOC ? ExecType.OOC : ExecType.CP;
410412
if( DMLScript.getGlobalExecMode() != ExecMode.SINGLE_NODE
411413
&& !(getDataType()==DataType.SCALAR) )
412414
{

src/main/java/org/apache/sysds/hops/rewrite/RewriteBlockSizeAndReblock.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.sysds.hops.Hop;
3030
import org.apache.sysds.hops.HopsException;
3131
import org.apache.sysds.hops.OptimizerUtils;
32+
import org.apache.sysds.api.DMLScript;
3233
import org.apache.sysds.common.Types.DataType;
3334

3435
/**
@@ -80,8 +81,12 @@ private void rule_BlockSizeAndReblock(Hop hop, final int blocksize)
8081
{
8182
DataOp dop = (DataOp) hop;
8283

84+
if( DMLScript.USE_OOC && dop.getOp() == OpOpData.PERSISTENTREAD ) {
85+
dop.setRequiresReblock(true);
86+
dop.setBlocksize(blocksize);
87+
}
8388
// if block size does not match
84-
if( (dop.getDataType() == DataType.MATRIX && (dop.getBlocksize() != blocksize))
89+
else if( (dop.getDataType() == DataType.MATRIX && (dop.getBlocksize() != blocksize))
8590
||(dop.getDataType() == DataType.FRAME && OptimizerUtils.isSparkExecutionMode()
8691
&& (dop.getFileFormat()==FileFormat.TEXT || dop.getFileFormat()==FileFormat.CSV)) )
8792
{

src/main/java/org/apache/sysds/lops/ReBlock.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ public ReBlock(Lop input, int blen, DataType dt, ValueType vt, boolean outputEmp
4646
_blocksize = blen;
4747
_outputEmptyBlocks = outputEmptyBlocks;
4848

49-
if(et == ExecType.SPARK)
50-
lps.setProperties(inputs, ExecType.SPARK);
49+
if(et == ExecType.SPARK || et == ExecType.OOC)
50+
lps.setProperties(inputs, et);
5151
else
5252
throw new LopsException("Incorrect execution type for Reblock:" + et);
5353
}

src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,14 @@
4242
import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
4343
import org.apache.sysds.runtime.controlprogram.caching.LazyWriteBuffer.RPolicy;
4444
import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
45+
import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
4546
import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence;
4647
import org.apache.sysds.runtime.instructions.cp.Data;
4748
import org.apache.sysds.runtime.instructions.fed.InitFEDInstruction;
4849
import org.apache.sysds.runtime.instructions.gpu.context.GPUContext;
4950
import org.apache.sysds.runtime.instructions.gpu.context.GPUObject;
5051
import org.apache.sysds.runtime.instructions.spark.data.BroadcastObject;
52+
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
5153
import org.apache.sysds.runtime.instructions.spark.data.RDDObject;
5254
import org.apache.sysds.runtime.io.FileFormatProperties;
5355
import org.apache.sysds.runtime.io.IOUtilFunctions;
@@ -210,13 +212,15 @@ public enum CacheStatus {
210212
private boolean _requiresLocalWrite = false; //flag if local write for read obj
211213
private boolean _isAcquireFromEmpty = false; //flag if read from status empty
212214

213-
//spark-specific handles
215+
//backend-specific handles
214216
//note: we use the abstraction of LineageObjects for two reasons: (1) to keep track of cleanup
215217
//for lazily evaluated RDDs, and (2) as abstraction for environments that do not necessarily have spark libraries available
216218
private RDDObject _rddHandle = null; //RDD handle
217219
private BroadcastObject<T> _bcHandle = null; //Broadcast handle
218220
protected HashMap<GPUContext, GPUObject> _gpuObjects = null; //Per GPUContext object allocated on GPU
219-
221+
//TODO generalize for frames
222+
private LocalTaskQueue<IndexedMatrixValue> _streamHandle = null;
223+
220224
private LineageItem _lineage = null;
221225

222226
/**
@@ -460,6 +464,10 @@ public BroadcastObject<T> getBroadcastHandle() {
460464
public boolean hasBroadcastHandle() {
461465
return _bcHandle != null && _bcHandle.hasBackReference();
462466
}
467+
468+
public LocalTaskQueue<IndexedMatrixValue> getStreamHandle() {
469+
return _streamHandle;
470+
}
463471

464472
@SuppressWarnings({ "rawtypes", "unchecked" })
465473
public void setBroadcastHandle( BroadcastObject bc ) {
@@ -490,6 +498,10 @@ public synchronized void setGPUObject(GPUContext gCtx, GPUObject gObj) {
490498
public synchronized void removeGPUObject(GPUContext gCtx) {
491499
_gpuObjects.remove(gCtx);
492500
}
501+
502+
public synchronized void setStreamHandle(LocalTaskQueue<IndexedMatrixValue> q) {
503+
_streamHandle = q;
504+
}
493505

494506
// *********************************************
495507
// *** ***
@@ -580,6 +592,9 @@ && getRDDHandle() == null) ) {
580592
//mark for initial local write despite read operation
581593
_requiresLocalWrite = false;
582594
}
595+
else if( getStreamHandle() != null ) {
596+
_data = readBlobFromStream( getStreamHandle() );
597+
}
583598
else if( getRDDHandle()==null || getRDDHandle().allowsShortCircuitRead() ) {
584599
if( DMLScript.STATISTICS )
585600
CacheStatistics.incrementHDFSHits();
@@ -1099,6 +1114,9 @@ protected abstract T readBlobFromHDFS(String fname, long[] dims)
10991114
protected abstract T readBlobFromRDD(RDDObject rdd, MutableBoolean status)
11001115
throws IOException;
11011116

1117+
protected abstract T readBlobFromStream(LocalTaskQueue<IndexedMatrixValue> stream)
1118+
throws IOException;
1119+
11021120
// Federated read
11031121
protected T readBlobFromFederated(FederationMap fedMap) throws IOException {
11041122
if( LOG.isDebugEnabled() ) //common if instructions keep federated outputs

src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@
3333
import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
3434
import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
3535
import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
36+
import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
3637
import org.apache.sysds.runtime.frame.data.FrameBlock;
38+
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
3739
import org.apache.sysds.runtime.instructions.spark.data.RDDObject;
3840
import org.apache.sysds.runtime.io.FileFormatProperties;
3941
import org.apache.sysds.runtime.io.FrameReaderFactory;
@@ -304,6 +306,12 @@ protected void writeBlobFromRDDtoHDFS(RDDObject rdd, String fname, String ofmt)
304306
//lazy evaluation of pending transformations.
305307
SparkExecutionContext.writeFrameRDDtoHDFS(rdd, fname, iimd.getFileFormat());
306308
}
309+
310+
@Override
311+
protected FrameBlock readBlobFromStream(LocalTaskQueue<IndexedMatrixValue> stream) throws IOException {
312+
// TODO Auto-generated method stub
313+
return null;
314+
}
307315

308316
@Override
309317
protected FrameBlock reconstructByLineage(LineageItem li) throws IOException {

0 commit comments

Comments
 (0)