Skip to content

Commit 95270e9

Browse files
janniklindemboehm7
authored andcommitted
[SYSTEMDS-3891] OOC Performance Improvements and Statistics
Closes #2387.
1 parent d88519a commit 95270e9

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+3288
-393
lines changed

src/main/java/org/apache/sysds/api/DMLOptions.java

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@
1919

2020
package org.apache.sysds.api;
2121

22+
import java.nio.file.Files;
23+
import java.nio.file.InvalidPathException;
24+
import java.nio.file.Path;
25+
import java.nio.file.Paths;
2226
import java.util.HashMap;
2327
import java.util.Map;
2428

@@ -66,6 +70,10 @@ public class DMLOptions {
6670
public boolean gpu = false; // Whether to use the GPU
6771
public boolean forceGPU = false; // Whether to ignore memory & estimates and always use the GPU
6872
public boolean ooc = false; // Whether to use the OOC backend
73+
public boolean oocLogEvents = false; // Whether to record I/O and task compute events (fine grained, may impact performance on many small tasks)
74+
public String oocLogPath = "./"; // The directory where to save the recorded event logs (csv)
75+
public boolean oocStats = false; // Wether to record and print coarse grained ooc statistics
76+
public int oocStatsCount = 10; // Default ooc statistics count
6977
public boolean debug = false; // to go into debug mode to be able to step through a program
7078
public String filePath = null; // path to script
7179
public String script = null; // the script itself
@@ -105,7 +113,11 @@ public String toString() {
105113
", fedStats=" + fedStats +
106114
", fedStatsCount=" + fedStatsCount +
107115
", fedMonitoring=" + fedMonitoring +
108-
", fedMonitoringAddress" + fedMonitoringAddress +
116+
", fedMonitoringAddress=" + fedMonitoringAddress +
117+
", oocStats=" + oocStats +
118+
", oocStatsCount=" + oocStatsCount +
119+
", oocLogEvents=" + oocLogEvents +
120+
", oocLogPath=" + oocLogPath +
109121
", memStats=" + memStats +
110122
", explainType=" + explainType +
111123
", execMode=" + execMode +
@@ -193,7 +205,7 @@ else if (lineageType.equalsIgnoreCase("debugger"))
193205
else if (execMode.equalsIgnoreCase("hybrid")) dmlOptions.execMode = ExecMode.HYBRID;
194206
else if (execMode.equalsIgnoreCase("spark")) dmlOptions.execMode = ExecMode.SPARK;
195207
else throw new org.apache.commons.cli.ParseException("Invalid argument specified for -exec option, must be one of [hadoop, singlenode, hybrid, HYBRID, spark]");
196-
}
208+
}
197209
if (line.hasOption("explain")) {
198210
dmlOptions.explainType = ExplainType.RUNTIME;
199211
String explainType = line.getOptionValue("explain");
@@ -259,6 +271,33 @@ else if (lineageType.equalsIgnoreCase("debugger"))
259271
}
260272
}
261273

274+
dmlOptions.oocStats = line.hasOption("oocStats");
275+
if (dmlOptions.oocStats) {
276+
String oocStatsCount = line.getOptionValue("oocStats");
277+
if (oocStatsCount != null) {
278+
try {
279+
dmlOptions.oocStatsCount = Integer.parseInt(oocStatsCount);
280+
} catch (NumberFormatException e) {
281+
throw new org.apache.commons.cli.ParseException("Invalid argument specified for -oocStats option, must be a valid integer");
282+
}
283+
}
284+
}
285+
286+
dmlOptions.oocLogEvents = line.hasOption("oocLogEvents");
287+
if (dmlOptions.oocLogEvents) {
288+
String eventLogPath = line.getOptionValue("oocLogEvents");
289+
if (eventLogPath != null) {
290+
try {
291+
Path p = Paths.get(eventLogPath);
292+
if (!Files.isDirectory(p))
293+
throw new org.apache.commons.cli.ParseException("Invalid argument specified for -oocLogEvents option, must be valid directory");
294+
} catch (InvalidPathException e) {
295+
throw new org.apache.commons.cli.ParseException("Invalid argument specified for -oocLogEvents option, must be a valid path");
296+
}
297+
dmlOptions.oocLogPath = eventLogPath;
298+
}
299+
}
300+
262301
dmlOptions.memStats = line.hasOption("mem");
263302

264303
dmlOptions.clean = line.hasOption("clean");
@@ -387,6 +426,12 @@ private static Options createCLIOptions() {
387426
Option fedStatsOpt = OptionBuilder.withArgName("count")
388427
.withDescription("monitors and reports summary execution statistics of federated workers; heavy hitter <count> is 10 unless overridden; default off")
389428
.hasOptionalArg().create("fedStats");
429+
Option oocStatsOpt = OptionBuilder
430+
.withDescription("monitors and reports summary execution statistics of ooc operators and tasks; heavy hitter <count> is 10 unless overriden; default off")
431+
.hasOptionalArg().create("oocStats");
432+
Option oocLogEventsOpt = OptionBuilder
433+
.withDescription("records fine grained events of compute tasks, I/O, and cache; -oocLogEvents [dir='./']")
434+
.hasOptionalArg().create("oocLogEvents");
390435
Option memOpt = OptionBuilder.withDescription("monitors and reports max memory consumption in CP; default off")
391436
.create("mem");
392437
Option explainOpt = OptionBuilder.withArgName("level")
@@ -452,6 +497,8 @@ private static Options createCLIOptions() {
452497
options.addOption(statsOpt);
453498
options.addOption(ngramsOpt);
454499
options.addOption(fedStatsOpt);
500+
options.addOption(oocStatsOpt);
501+
options.addOption(oocLogEventsOpt);
455502
options.addOption(memOpt);
456503
options.addOption(explainOpt);
457504
options.addOption(execOpt);

src/main/java/org/apache/sysds/api/DMLScript.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,11 @@
7171
import org.apache.sysds.runtime.controlprogram.federated.monitoring.models.CoordinatorModel;
7272
import org.apache.sysds.runtime.controlprogram.parfor.util.IDHandler;
7373
import org.apache.sysds.runtime.instructions.gpu.context.GPUContextPool;
74-
import org.apache.sysds.runtime.instructions.ooc.OOCEvictionManager;
7574
import org.apache.sysds.runtime.io.IOUtilFunctions;
7675
import org.apache.sysds.runtime.lineage.LineageCacheConfig;
7776
import org.apache.sysds.runtime.lineage.LineageCacheConfig.LineageCachePolicy;
7877
import org.apache.sysds.runtime.lineage.LineageCacheConfig.ReuseCacheType;
78+
import org.apache.sysds.runtime.ooc.stats.OOCEventLog;
7979
import org.apache.sysds.runtime.util.CommonThreadPool;
8080
import org.apache.sysds.runtime.util.HDFSTool;
8181
import org.apache.sysds.runtime.util.LocalFileUtils;
@@ -150,6 +150,12 @@ public class DMLScript
150150
public static boolean SYNCHRONIZE_GPU = true;
151151
// Set OOC backend
152152
public static boolean USE_OOC = DMLOptions.defaultOptions.ooc;
153+
// Record and print OOC statistics
154+
public static boolean OOC_STATISTICS = DMLOptions.defaultOptions.oocStats;
155+
public static int OOC_STATISTICS_COUNT = DMLOptions.defaultOptions.oocStatsCount;
156+
// Record and save fine grained OOC event logs as csv to the specified dir
157+
public static boolean OOC_LOG_EVENTS = DMLOptions.defaultOptions.oocLogEvents;
158+
public static String OOC_LOG_PATH = DMLOptions.defaultOptions.oocLogPath;
153159
// Enable eager CUDA free on rmvar
154160
public static boolean EAGER_CUDA_FREE = false;
155161

@@ -273,6 +279,10 @@ public static boolean executeScript( String[] args )
273279
USE_ACCELERATOR = dmlOptions.gpu;
274280
FORCE_ACCELERATOR = dmlOptions.forceGPU;
275281
USE_OOC = dmlOptions.ooc;
282+
OOC_STATISTICS = dmlOptions.oocStats;
283+
OOC_STATISTICS_COUNT = dmlOptions.oocStatsCount;
284+
OOC_LOG_EVENTS = dmlOptions.oocLogEvents;
285+
OOC_LOG_PATH = dmlOptions.oocLogPath;
276286
EXPLAIN = dmlOptions.explainType;
277287
EXEC_MODE = dmlOptions.execMode;
278288
LINEAGE = dmlOptions.lineage;
@@ -324,6 +334,9 @@ public static boolean executeScript( String[] args )
324334
LineageCacheConfig.setCachePolicy(LINEAGE_POLICY);
325335
LineageCacheConfig.setEstimator(LINEAGE_ESTIMATE);
326336

337+
if (dmlOptions.oocLogEvents)
338+
OOCEventLog.setup(100000);
339+
327340
String dmlScriptStr = readDMLScript(isFile, fileOrScript);
328341
Map<String, String> argVals = dmlOptions.argVals;
329342

@@ -498,8 +511,6 @@ private static void execute(String dmlScriptStr, String fnameOptConfig, Map<Stri
498511
ScriptExecutorUtils.executeRuntimeProgram(rtprog, ec, ConfigurationManager.getDMLConfig(), STATISTICS ? STATISTICS_COUNT : 0, null);
499512
}
500513
finally {
501-
//cleanup OOC streams and cache
502-
OOCEvictionManager.reset();
503514
//cleanup scratch_space and all working dirs
504515
cleanupHadoopExecution(ConfigurationManager.getDMLConfig());
505516
FederatedData.clearWorkGroup();

src/main/java/org/apache/sysds/api/ScriptExecutorUtils.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.sysds.runtime.instructions.gpu.context.GPUObject;
3737
import org.apache.sysds.runtime.lineage.LineageEstimatorStatistics;
3838
import org.apache.sysds.runtime.lineage.LineageGPUCacheEviction;
39+
import org.apache.sysds.runtime.ooc.cache.OOCCacheManager;
3940
import org.apache.sysds.utils.Statistics;
4041

4142
public class ScriptExecutorUtils {
@@ -127,6 +128,9 @@ public static void executeRuntimeProgram(Program rtprog, ExecutionContext ec, DM
127128

128129
if (DMLScript.LINEAGE_ESTIMATE)
129130
System.out.println(LineageEstimatorStatistics.displayLineageEstimates());
131+
132+
if (DMLScript.USE_OOC)
133+
OOCCacheManager.reset();
130134
}
131135
}
132136

src/main/java/org/apache/sysds/hops/rewrite/ProgramRewriter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@ public ProgramRewriter(boolean staticRewrites, boolean dynamicRewrites)
6969

7070
//initialize StatementBlock rewrite ruleSet (with fixed rewrite order)
7171
_sbRuleSet = new ArrayList<>();
72-
73-
72+
73+
7474
//STATIC REWRITES (which do not rely on size information)
7575
if( staticRewrites )
7676
{

src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
import org.apache.sysds.runtime.compress.lib.CLALibSeparator;
4747
import org.apache.sysds.runtime.compress.lib.CLALibSeparator.SeparatedGroups;
4848
import org.apache.sysds.runtime.compress.lib.CLALibSlice;
49-
import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
49+
import org.apache.sysds.runtime.instructions.ooc.OOCStream;
5050
import org.apache.sysds.runtime.instructions.spark.CompressionSPInstruction.CompressionFunction;
5151
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
5252
import org.apache.sysds.runtime.instructions.spark.utils.RDDConverterUtils;
@@ -410,7 +410,7 @@ public Object call() throws Exception {
410410
}
411411

412412
@Override
413-
public long writeMatrixFromStream(String fname, LocalTaskQueue<IndexedMatrixValue> stream, long rlen, long clen, int blen) {
413+
public long writeMatrixFromStream(String fname, OOCStream<IndexedMatrixValue> stream, long rlen, long clen, int blen) {
414414
throw new UnsupportedOperationException("Writing from an OOC stream is not supported for the HDF5 format.");
415415
};
416416

src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -633,7 +633,7 @@ && getRDDHandle() == null) ) {
633633
_requiresLocalWrite = false;
634634
}
635635
else if( hasStreamHandle() ) {
636-
_data = readBlobFromStream( getStreamHandle().toLocalTaskQueue() );
636+
_data = readBlobFromStream( getStreamHandle() );
637637
}
638638
else if( getRDDHandle()==null || getRDDHandle().allowsShortCircuitRead() ) {
639639
if( DMLScript.STATISTICS )
@@ -1168,7 +1168,7 @@ protected abstract T readBlobFromHDFS(String fname, long[] dims)
11681168
protected abstract T readBlobFromRDD(RDDObject rdd, MutableBoolean status)
11691169
throws IOException;
11701170

1171-
protected abstract T readBlobFromStream(LocalTaskQueue<IndexedMatrixValue> stream)
1171+
protected abstract T readBlobFromStream(OOCStream<IndexedMatrixValue> stream)
11721172
throws IOException;
11731173

11741174
// Federated read

src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
3636
import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
3737
import org.apache.sysds.runtime.frame.data.FrameBlock;
38+
import org.apache.sysds.runtime.instructions.ooc.OOCStream;
3839
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
3940
import org.apache.sysds.runtime.instructions.spark.data.RDDObject;
4041
import org.apache.sysds.runtime.io.FileFormatProperties;
@@ -316,7 +317,7 @@ protected void writeBlobFromRDDtoHDFS(RDDObject rdd, String fname, String ofmt)
316317
}
317318

318319
@Override
319-
protected FrameBlock readBlobFromStream(LocalTaskQueue<IndexedMatrixValue> stream) throws IOException {
320+
protected FrameBlock readBlobFromStream(OOCStream<IndexedMatrixValue> stream) throws IOException {
320321
// TODO Auto-generated method stub
321322
return null;
322323
}

src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.sysds.runtime.controlprogram.federated.FederationUtils;
4646
import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
4747
import org.apache.sysds.runtime.instructions.fed.InitFEDInstruction;
48+
import org.apache.sysds.runtime.instructions.ooc.OOCStream;
4849
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
4950
import org.apache.sysds.runtime.instructions.spark.data.RDDObject;
5051
import org.apache.sysds.runtime.io.FileFormatProperties;
@@ -528,17 +529,16 @@ protected MatrixBlock readBlobFromRDD(RDDObject rdd, MutableBoolean writeStatus)
528529

529530

530531
@Override
531-
protected MatrixBlock readBlobFromStream(LocalTaskQueue<IndexedMatrixValue> stream) throws IOException {
532+
protected MatrixBlock readBlobFromStream(OOCStream<IndexedMatrixValue> stream) throws IOException {
532533
boolean dimsUnknown = getNumRows() < 0 || getNumColumns() < 0;
533534
int nrows = (int)getNumRows();
534535
int ncols = (int)getNumColumns();
535536
MatrixBlock ret = dimsUnknown ? null : new MatrixBlock((int)getNumRows(), (int)getNumColumns(), false);
536-
// TODO if stream is CachingStream, block parts might be evicted resulting in null pointer exceptions
537537
List<IndexedMatrixValue> blockCache = dimsUnknown ? new ArrayList<>() : null;
538538
IndexedMatrixValue tmp = null;
539539
try {
540540
int blen = getBlocksize(), lnnz = 0;
541-
while( (tmp = stream.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS ) {
541+
while( (tmp = stream.dequeue()) != LocalTaskQueue.NO_MORE_TASKS ) {
542542
// compute row/column block offsets
543543
final int row_offset = (int) (tmp.getIndexes().getRowIndex() - 1) * blen;
544544
final int col_offset = (int) (tmp.getIndexes().getColumnIndex() - 1) * blen;
@@ -636,7 +636,7 @@ protected long writeStreamToHDFS(String fname, String ofmt, int rep, FileFormatP
636636
MetaDataFormat iimd = (MetaDataFormat) _metaData;
637637
FileFormat fmt = (ofmt != null ? FileFormat.safeValueOf(ofmt) : iimd.getFileFormat());
638638
MatrixWriter writer = MatrixWriterFactory.createMatrixWriter(fmt, rep, fprop);
639-
return writer.writeMatrixFromStream(fname, getStreamHandle().toLocalTaskQueue(),
639+
return writer.writeMatrixFromStream(fname, getStreamHandle(),
640640
getNumRows(), getNumColumns(), ConfigurationManager.getBlocksize());
641641
}
642642

src/main/java/org/apache/sysds/runtime/controlprogram/caching/TensorObject.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@
3030
import org.apache.sysds.runtime.DMLRuntimeException;
3131
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
3232
import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
33-
import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
3433
import org.apache.sysds.runtime.data.TensorBlock;
3534
import org.apache.sysds.runtime.data.TensorIndexes;
35+
import org.apache.sysds.runtime.instructions.ooc.OOCStream;
3636
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
3737
import org.apache.sysds.runtime.instructions.spark.data.RDDObject;
3838
import org.apache.sysds.runtime.io.FileFormatProperties;
@@ -210,7 +210,7 @@ protected void writeBlobFromRDDtoHDFS(RDDObject rdd, String fname, String ofmt)
210210

211211

212212
@Override
213-
protected TensorBlock readBlobFromStream(LocalTaskQueue<IndexedMatrixValue> stream) throws IOException {
213+
protected TensorBlock readBlobFromStream(OOCStream<IndexedMatrixValue> stream) throws IOException {
214214
// TODO Auto-generated method stub
215215
return null;
216216
}

src/main/java/org/apache/sysds/runtime/controlprogram/parfor/LocalTaskQueue.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,10 @@ public synchronized T dequeueTask()
103103
return t;
104104
}
105105

106+
public synchronized boolean hasNext() {
107+
return !_data.isEmpty() || _closedInput;
108+
}
109+
106110
/**
107111
* Synchronized (logical) insert of a NO_MORE_TASKS symbol at the end of the FIFO queue in order to
108112
* mark that no more tasks will be inserted into the queue.

0 commit comments

Comments
 (0)