Skip to content

Commit 1c70242

Browse files
committed
[MINOR] Improved parfor test coverage (new tests, removed old code)
This patch improves the parfor test coverage via a couple of new test for ID handling, result merge, and task queues. Additionally, we remove old code such as redundant cell indexes, and data partitioning for text cell (which is no longer reachable). These tests also led to minor improvements (faster obtaining of process IDs) and correct reset of ID sequences (0 instead of 1).
1 parent cebf845 commit 1c70242

File tree

10 files changed

+164
-520
lines changed

10 files changed

+164
-520
lines changed

src/main/java/org/apache/sysds/api/DMLScript.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -624,11 +624,11 @@ private static void registerForMonitoring() {
624624
// TODO fix and replace localhost identifyer with hostname in federated instructions SYSTEMDS-3440
625625
// https://issues.apache.org/jira/browse/SYSTEMDS-3440
626626
model.host = "localhost";
627-
model.processId = Long.parseLong(IDHandler.getProcessID());
627+
model.processId = IDHandler.getProcessID();
628628

629629
String requestBody = objectMapper
630-
.writerWithDefaultPrettyPrinter()
631-
.writeValueAsString(model);
630+
.writerWithDefaultPrettyPrinter()
631+
.writeValueAsString(model);
632632

633633
var client = HttpClient.newHttpClient();
634634
var request = HttpRequest.newBuilder(URI.create(uriString))

src/main/java/org/apache/sysds/runtime/controlprogram/parfor/DataPartitionerLocal.java

Lines changed: 24 additions & 200 deletions
Original file line numberDiff line numberDiff line change
@@ -19,36 +19,26 @@
1919

2020
package org.apache.sysds.runtime.controlprogram.parfor;
2121

22-
import java.io.BufferedWriter;
22+
import java.io.BufferedReader;
2323
import java.io.File;
24+
import java.io.FileInputStream;
2425
import java.io.IOException;
25-
import java.io.OutputStreamWriter;
26-
import java.util.HashMap;
26+
import java.io.InputStreamReader;
2727
import java.util.LinkedList;
28-
import java.util.Map.Entry;
2928

3029
import org.apache.hadoop.fs.FileSystem;
3130
import org.apache.hadoop.fs.Path;
32-
import org.apache.hadoop.io.LongWritable;
3331
import org.apache.hadoop.io.SequenceFile;
3432
import org.apache.hadoop.io.SequenceFile.Writer;
35-
import org.apache.hadoop.io.Text;
36-
import org.apache.hadoop.mapred.FileInputFormat;
37-
import org.apache.hadoop.mapred.InputSplit;
3833
import org.apache.hadoop.mapred.JobConf;
39-
import org.apache.hadoop.mapred.RecordReader;
40-
import org.apache.hadoop.mapred.Reporter;
41-
import org.apache.hadoop.mapred.TextInputFormat;
4234
import org.apache.sysds.common.Types.FileFormat;
4335
import org.apache.sysds.conf.ConfigurationManager;
4436
import org.apache.sysds.runtime.DMLRuntimeException;
4537
import org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
4638
import org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PartitionFormat;
4739
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
48-
import org.apache.sysds.runtime.controlprogram.parfor.util.Cell;
49-
import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence;
50-
import org.apache.sysds.runtime.controlprogram.parfor.util.StagingFileUtils;
5140
import org.apache.sysds.runtime.io.IOUtilFunctions;
41+
import org.apache.sysds.runtime.matrix.data.IJV;
5242
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
5343
import org.apache.sysds.runtime.matrix.data.MatrixCell;
5444
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
@@ -78,7 +68,6 @@ public class DataPartitionerLocal extends DataPartitioner
7868
{
7969
private static final boolean PARALLEL = true;
8070

81-
private IDSequence _seq = null;
8271
private MatrixBlock _reuseBlk = null;
8372

8473
private int _par = -1;
@@ -93,7 +82,6 @@ public DataPartitionerLocal(PartitionFormat dpf, int par) {
9382
super(dpf._dpf, dpf._N);
9483
if( dpf.isBlockwise() )
9584
throw new DMLRuntimeException("Data partitioning formt '"+dpf+"' not supported by DataPartitionerLocal" );
96-
_seq = new IDSequence();
9785
_par = (par > 0) ? par : 1;
9886
}
9987

@@ -107,107 +95,14 @@ protected void partitionMatrix(MatrixObject in, String fnameNew, FileFormat fmt,
10795
String fnameStaging = LocalFileUtils.getUniqueWorkingDir( LocalFileUtils.CATEGORY_PARTITIONING );
10896

10997
//reblock input matrix
110-
if( fmt == FileFormat.TEXT )
111-
partitionTextCell( fname, fnameStaging, fnameNew, rlen, clen, blen );
112-
else if( fmt == FileFormat.BINARY )
98+
if( fmt == FileFormat.BINARY )
11399
partitionBinaryBlock( fname, fnameStaging, fnameNew, rlen, clen, blen );
114100
else
115101
throw new DMLRuntimeException("Cannot create data partitions of format: "+fmt.toString());
116102

117103
LocalFileUtils.cleanupWorkingDirectory(fnameStaging);
118104
}
119105

120-
private void partitionTextCell( String fname, String fnameStaging, String fnameNew, long rlen, long clen, int blen )
121-
{
122-
long row = -1;
123-
long col = -1;
124-
125-
try
126-
{
127-
//STEP 1: read matrix from HDFS and write blocks to local staging area
128-
//check and add input path
129-
JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
130-
Path path = new Path(fname);
131-
FileInputFormat.addInputPath(job, path);
132-
TextInputFormat informat = new TextInputFormat();
133-
informat.configure(job);
134-
InputSplit[] splits = informat.getSplits(job, 1);
135-
136-
LinkedList<Cell> buffer = new LinkedList<>();
137-
LongWritable key = new LongWritable();
138-
Text value = new Text();
139-
FastStringTokenizer st = new FastStringTokenizer(' ');
140-
141-
for(InputSplit split: splits)
142-
{
143-
RecordReader<LongWritable,Text> reader = informat.getRecordReader(split, job, Reporter.NULL);
144-
try
145-
{
146-
while(reader.next(key, value))
147-
{
148-
st.reset( value.toString() ); //reset tokenizer
149-
row = st.nextLong();
150-
col = st.nextLong();
151-
double lvalue = st.nextDouble();
152-
Cell tmp = new Cell( row, col, lvalue );
153-
154-
buffer.addLast( tmp );
155-
if( buffer.size() > StagingFileUtils.CELL_BUFFER_SIZE ) //periodic flush
156-
{
157-
appendCellBufferToStagingArea(fnameStaging, buffer, blen);
158-
buffer.clear();
159-
}
160-
}
161-
162-
//final flush
163-
if( !buffer.isEmpty() )
164-
{
165-
appendCellBufferToStagingArea(fnameStaging, buffer, blen);
166-
buffer.clear();
167-
}
168-
}
169-
finally {
170-
IOUtilFunctions.closeSilently(reader);
171-
}
172-
}
173-
174-
//STEP 2: read matrix blocks from staging area and write matrix to HDFS
175-
String[] fnamesPartitions = new File(fnameStaging).list();
176-
if(PARALLEL)
177-
{
178-
int len = Math.min(fnamesPartitions.length, _par);
179-
Thread[] threads = new Thread[len];
180-
for( int i=0;i<len;i++ )
181-
{
182-
int start = i*(int)Math.ceil(((double)fnamesPartitions.length)/len);
183-
int end = (i+1)*(int)Math.ceil(((double)fnamesPartitions.length)/len)-1;
184-
end = Math.min(end, fnamesPartitions.length-1);
185-
threads[i] = new Thread(new DataPartitionerWorkerTextCell(job, fnameNew, fnameStaging, fnamesPartitions, start, end));
186-
threads[i].start();
187-
}
188-
189-
for( Thread t : threads )
190-
t.join();
191-
}
192-
else
193-
{
194-
for( String pdir : fnamesPartitions )
195-
writeTextCellFileToHDFS( job, fnameNew, fnameStaging+"/"+pdir );
196-
}
197-
}
198-
catch (Exception e)
199-
{
200-
//post-mortem error handling and bounds checking
201-
if( row < 1 || row > rlen || col < 1 || col > clen )
202-
{
203-
throw new DMLRuntimeException("Matrix cell ["+(row)+","+(col)+"] " +
204-
"out of overall matrix range [1:"+rlen+",1:"+clen+"].");
205-
}
206-
else
207-
throw new DMLRuntimeException("Unable to partition text cell matrix.", e);
208-
}
209-
}
210-
211106
@SuppressWarnings("deprecation")
212107
private void partitionBinaryBlock( String fname, String fnameStaging, String fnameNew, long rlen, long clen, int blen )
213108
{
@@ -323,52 +218,6 @@ else if( _format == PDataPartitionFormat.COLUMN_BLOCK_WISE )
323218
LocalFileUtils.writeMatrixBlockToLocal(pfname, mb);
324219
}
325220
}
326-
327-
private void appendCellBufferToStagingArea( String dir, LinkedList<Cell> buffer, int blen )
328-
throws IOException
329-
{
330-
HashMap<Long,LinkedList<Cell>> sortedBuffer = new HashMap<>();
331-
332-
//sort cells in buffer wrt key
333-
long key = -1;
334-
for( Cell c : buffer )
335-
{
336-
switch(_format)
337-
{
338-
case ROW_WISE:
339-
key = c.getRow();
340-
c.setRow(1);
341-
break;
342-
case ROW_BLOCK_WISE:
343-
key = (c.getRow()-1)/blen+1;
344-
c.setRow((c.getRow()-1)%blen+1);
345-
break;
346-
case COLUMN_WISE:
347-
key = c.getCol();
348-
c.setCol(1);
349-
break;
350-
case COLUMN_BLOCK_WISE:
351-
key = (c.getCol()-1)/blen+1;
352-
c.setCol((c.getCol()-1)%blen+1);
353-
break;
354-
default:
355-
//do nothing
356-
}
357-
358-
if( !sortedBuffer.containsKey(key) )
359-
sortedBuffer.put(key, new LinkedList<Cell>());
360-
sortedBuffer.get(key).addLast(c);
361-
}
362-
363-
//write lists of cells to local files
364-
for( Entry<Long,LinkedList<Cell>> e : sortedBuffer.entrySet() )
365-
{
366-
String pdir = LocalFileUtils.checkAndCreateStagingDir(dir+"/"+e.getKey());
367-
String pfname = pdir+"/"+"block_"+_seq.getNextID();
368-
StagingFileUtils.writeCellListToLocal(pfname, e.getValue());
369-
}
370-
}
371-
372221

373222
/////////////////////////////////////
374223
// Helper methods for HDFS //
@@ -425,11 +274,11 @@ public void writeBinaryCellSequenceFileToHDFS( JobConf job, String dir, String l
425274
String[] fnameBlocks = new File( lpdir ).list();
426275
for( String fnameBlock : fnameBlocks )
427276
{
428-
LinkedList<Cell> tmp = StagingFileUtils.readCellListFromLocal(lpdir+"/"+fnameBlock);
429-
for( Cell c : tmp )
277+
LinkedList<IJV> tmp = readCellListFromLocal(lpdir+"/"+fnameBlock);
278+
for( IJV c : tmp )
430279
{
431-
indexes.setIndexes(c.getRow(), c.getCol());
432-
cell.setValue(c.getValue());
280+
indexes.setIndexes(c.getI(), c.getJ());
281+
cell.setValue(c.getV());
433282
writer.append(indexes, cell);
434283
}
435284
}
@@ -438,38 +287,27 @@ public void writeBinaryCellSequenceFileToHDFS( JobConf job, String dir, String l
438287
IOUtilFunctions.closeSilently(writer);
439288
}
440289
}
441-
442-
public void writeTextCellFileToHDFS( JobConf job, String dir, String lpdir )
290+
291+
private static LinkedList<IJV> readCellListFromLocal( String fname )
443292
throws IOException
444293
{
445-
long key = getKeyFromFilePath(lpdir);
446-
Path path = new Path(dir+"/"+key);
447-
FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
448-
try(BufferedWriter out = new BufferedWriter(new OutputStreamWriter(fs.create(path,true))))
449-
{
450-
//for obj reuse and preventing repeated buffer re-allocations
451-
StringBuilder sb = new StringBuilder();
452-
453-
String[] fnameBlocks = new File( lpdir ).list();
454-
for( String fnameBlock : fnameBlocks )
455-
{
456-
LinkedList<Cell> tmp = StagingFileUtils.readCellListFromLocal(lpdir+"/"+fnameBlock);
457-
for( Cell c : tmp )
458-
{
459-
sb.append(c.getRow());
460-
sb.append(' ');
461-
sb.append(c.getCol());
462-
sb.append(' ');
463-
sb.append(c.getValue());
464-
sb.append('\n');
465-
out.write( sb.toString() );
466-
sb.setLength(0);
467-
}
294+
FileInputStream fis = new FileInputStream( fname );
295+
LinkedList<IJV> buffer = new LinkedList<>();
296+
try(BufferedReader in = new BufferedReader(new InputStreamReader(fis))) {
297+
String value = null;
298+
FastStringTokenizer st = new FastStringTokenizer(' ');
299+
while( (value=in.readLine())!=null ) {
300+
st.reset( value ); //reset tokenizer
301+
int row = (int)st.nextLong();
302+
int col = (int)st.nextLong();
303+
double lvalue = st.nextDouble();
304+
IJV c = new IJV().set( row, col, lvalue );
305+
buffer.addLast( c );
468306
}
469307
}
308+
return buffer;
470309
}
471310

472-
473311
/////////////////////////////////
474312
// Helper methods for local fs //
475313
// read/write //
@@ -525,21 +363,7 @@ public void run()
525363
public abstract void writeFileToHDFS( JobConf job, String fnameNew, String stagingDir )
526364
throws IOException;
527365
}
528-
529-
private class DataPartitionerWorkerTextCell extends DataPartitionerWorker
530-
{
531-
public DataPartitionerWorkerTextCell(JobConf job, String fnameNew, String fnameStaging, String[] fnamesPartitions, int start, int end) {
532-
super(job, fnameNew, fnameStaging, fnamesPartitions, start, end);
533-
}
534366

535-
@Override
536-
public void writeFileToHDFS(JobConf job, String fnameNew, String stagingDir)
537-
throws IOException
538-
{
539-
writeTextCellFileToHDFS( job, fnameNew, stagingDir );
540-
}
541-
}
542-
543367
private class DataPartitionerWorkerBinaryBlock extends DataPartitionerWorker
544368
{
545369
public DataPartitionerWorkerBinaryBlock(JobConf job, String fnameNew, String fnameStaging, String[] fnamesPartitions, int start, int end) {

src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PTaskPartitioner;
2424
import org.apache.sysds.runtime.instructions.cp.IntObject;
2525

26-
public abstract class TaskPartitionerFactory
26+
public class TaskPartitionerFactory
2727
{
2828
public static TaskPartitioner createTaskPartitioner(PTaskPartitioner type,
2929
IntObject from, IntObject to, IntObject incr, long taskSize, int numThreads, String iterPredVar)

0 commit comments

Comments
 (0)