Skip to content

Commit 6744bf8

Browse files
committed
Binary readers update
1 parent 0f45fe0 commit 6744bf8

File tree

3 files changed

+151
-16
lines changed

3 files changed

+151
-16
lines changed

src/main/java/org/apache/sysds/runtime/io/FrameReaderBinaryBlock.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import org.apache.sysds.conf.ConfigurationManager;
3232
import org.apache.sysds.runtime.DMLRuntimeException;
3333
import org.apache.sysds.runtime.frame.data.FrameBlock;
34+
import org.apache.sysds.runtime.frame.data.columns.ArrayWrapper;
35+
import org.apache.sysds.runtime.frame.data.columns.DDCArray;
3436

3537
/**
3638
* Single-threaded frame binary block reader.
@@ -58,6 +60,9 @@ public final FrameBlock readFrameFromHDFS(String fname, ValueType[] schema, Stri
5860

5961
// core read (sequential/parallel)
6062
readBinaryBlockFrameFromHDFS(path, job, fs, ret, rlen, clen);
63+
64+
readBinaryDictionariesFromHDFS(new Path(fname + ".dict"), job, fs, ret);
65+
6166
return ret;
6267
}
6368

@@ -114,6 +119,29 @@ protected static void readBinaryBlockFrameFromSequenceFile(Path path, JobConf jo
114119
}
115120
}
116121

122+
protected static void readBinaryDictionariesFromHDFS(Path path, JobConf job, FileSystem fs, FrameBlock ret) {
123+
try{
124+
if(fs.exists(path)){
125+
LongWritable key = new LongWritable();
126+
ArrayWrapper value = new ArrayWrapper(null);
127+
SequenceFile.Reader reader = new SequenceFile.Reader(job, SequenceFile.Reader.file(path));
128+
try{
129+
while(reader.next(key,value)){
130+
int colId = (int)key.get();
131+
DDCArray<?> a = (DDCArray<?>) ret.getColumn(colId);
132+
ret.setColumn(colId, a.setDict(value._a));
133+
}
134+
}
135+
finally{
136+
IOUtilFunctions.closeSilently(reader);
137+
}
138+
}
139+
}
140+
catch(IOException e){
141+
throw new DMLRuntimeException("Failed to read Frame Dictionaries", e);
142+
}
143+
}
144+
117145
/**
118146
* Specific functionality of FrameReaderBinaryBlock, mostly used for testing.
119147
*
@@ -143,4 +171,7 @@ public FrameBlock readFirstBlock(String fname) throws IOException {
143171

144172
return value;
145173
}
174+
175+
176+
146177
}

src/main/java/org/apache/sysds/runtime/io/FrameWriterBinaryBlock.java

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
package org.apache.sysds.runtime.io;
2121

2222
import java.io.IOException;
23+
import java.util.ArrayList;
24+
import java.util.List;
2325

2426
import org.apache.hadoop.fs.FileSystem;
2527
import org.apache.hadoop.fs.Path;
@@ -29,6 +31,10 @@
2931
import org.apache.sysds.conf.ConfigurationManager;
3032
import org.apache.sysds.runtime.DMLRuntimeException;
3133
import org.apache.sysds.runtime.frame.data.FrameBlock;
34+
import org.apache.sysds.runtime.frame.data.columns.Array;
35+
import org.apache.sysds.runtime.frame.data.columns.ArrayWrapper;
36+
import org.apache.sysds.runtime.frame.data.columns.DDCArray;
37+
import org.apache.sysds.runtime.matrix.data.Pair;
3238
import org.apache.sysds.runtime.util.HDFSTool;
3339

3440
/**
@@ -43,30 +49,67 @@ public final void writeFrameToHDFS(FrameBlock src, String fname, long rlen, long
4349
// prepare file access
4450
JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
4551
Path path = new Path(fname);
46-
52+
4753
// if the file already exists on HDFS, remove it.
4854
HDFSTool.deleteFileIfExistOnHDFS(fname);
49-
55+
HDFSTool.deleteFileIfExistOnHDFS(fname + ".dict");
56+
5057
// bound check for src block
5158
if(src.getNumRows() > rlen || src.getNumColumns() > clen) {
5259
throw new IOException("Frame block [1:" + src.getNumRows() + ",1:" + src.getNumColumns() + "] "
5360
+ "out of overall frame range [1:" + rlen + ",1:" + clen + "].");
5461
}
5562

63+
Pair<List<Pair<Integer,Array<?>>>, FrameBlock> prep = extractDictionaries(src);
64+
src = prep.getValue();
65+
5666
// write binary block to hdfs (sequential/parallel)
57-
writeBinaryBlockFrameToHDFS(path, job, src, rlen, clen);
67+
writeBinaryBlockFrameToHDFS(path, job, prep.getValue(), rlen, clen);
68+
69+
if(prep.getKey().size() > 0)
70+
writeBinaryBlockDictsToSequenceFile(new Path(fname + ".dict"), job, prep.getKey());
71+
72+
}
73+
74+
protected Pair<List<Pair<Integer,Array<?>>>, FrameBlock> extractDictionaries(FrameBlock src){
75+
List<Pair<Integer,Array<?>>> dicts = new ArrayList<>();
76+
int blen = ConfigurationManager.getBlocksize();
77+
if(src.getNumRows() < blen )
78+
return new Pair<>(dicts, src);
79+
boolean modified = false;
80+
for(int i = 0; i < src.getNumColumns(); i++){
81+
Array<?> a = src.getColumn(i);
82+
if(a instanceof DDCArray){
83+
DDCArray<?> d = (DDCArray<?>)a;
84+
dicts.add(new Pair<>(i, d.getDict()));
85+
if(modified == false){
86+
modified = true;
87+
// make sure other users of this frame does not get effected
88+
src = src.copyShallow();
89+
}
90+
src.setColumn(i, d.nullDict());
91+
}
92+
}
93+
return new Pair<>(dicts, src);
5894
}
5995

6096
protected void writeBinaryBlockFrameToHDFS(Path path, JobConf job, FrameBlock src, long rlen, long clen)
6197
throws IOException, DMLRuntimeException {
6298
FileSystem fs = IOUtilFunctions.getFileSystem(path);
6399
int blen = ConfigurationManager.getBlocksize();
64-
100+
65101
// sequential write to single file
66102
writeBinaryBlockFrameToSequenceFile(path, job, fs, src, blen, 0, (int) rlen);
67103
IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path);
68104
}
69105

106+
protected void writeBinaryBlockDictsToSequenceFile(Path path, JobConf job, List<Pair<Integer, Array<?>>> dicts)
107+
throws IOException, DMLRuntimeException {
108+
FileSystem fs = IOUtilFunctions.getFileSystem(path);
109+
writeBinaryBlockDictsToSequenceFile(path, job, fs, dicts);
110+
IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path);
111+
}
112+
70113
/**
71114
* Internal primitive to write a block-aligned row range of a frame to a single sequence file, which is used for both
72115
* single- and multi-threaded writers (for consistency).
@@ -111,4 +154,20 @@ protected static void writeBinaryBlockFrameToSequenceFile(Path path, JobConf job
111154
IOUtilFunctions.closeSilently(writer);
112155
}
113156
}
157+
158+
protected static void writeBinaryBlockDictsToSequenceFile(Path path, JobConf job, FileSystem fs, List<Pair<Integer,Array<?>>> dicts) throws IOException{
159+
final Writer writer = IOUtilFunctions.getSeqWriterArray(path, job, 1);
160+
try{
161+
LongWritable index = new LongWritable();
162+
163+
for(int i = 0; i < dicts.size(); i++){
164+
Pair<Integer, Array<?>> p = dicts.get(i);
165+
index.set(p.getKey());
166+
writer.append(index, new ArrayWrapper(p.getValue()));
167+
}
168+
}
169+
finally {
170+
IOUtilFunctions.closeSilently(writer);
171+
}
172+
}
114173
}

src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java

Lines changed: 57 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,10 @@
7272
import org.apache.sysds.runtime.data.TensorBlock;
7373
import org.apache.sysds.runtime.data.TensorIndexes;
7474
import org.apache.sysds.runtime.frame.data.FrameBlock;
75+
import org.apache.sysds.runtime.frame.data.columns.ArrayWrapper;
7576
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
7677
import org.apache.sysds.runtime.matrix.data.MatrixCell;
7778
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
78-
import org.apache.sysds.runtime.transform.TfUtils;
7979
import org.apache.sysds.runtime.util.LocalFileUtils;
8080

8181
import io.airlift.compress.lzo.LzoCodec;
@@ -242,6 +242,29 @@ public static String[] splitCSV(String str, String delim){
242242
return tokens.toArray(new String[0]);
243243
}
244244

245+
public static String[] splitCSV(String str, String delim, int clen){
246+
if(str == null || str.isEmpty())
247+
return new String[] {""};
248+
249+
int from = 0, to = 0;
250+
final int len = str.length();
251+
final int delimLen = delim.length();
252+
253+
final String[] tokens = new String[clen];
254+
int c = 0;
255+
while(from < len) { // for all tokens
256+
to = getTo(str, from, delim, len, delimLen);
257+
tokens[c++] = str.substring(from, to);
258+
from = to + delimLen;
259+
}
260+
261+
// handle empty string at end
262+
if(from == len)
263+
tokens[c++] = "";
264+
265+
return tokens;
266+
}
267+
245268
/**
246269
* Splits a string by a specified delimiter into all tokens, including empty
247270
* while respecting the rules for quotes and escapes defined in RFC4180,
@@ -346,7 +369,7 @@ private static boolean isEmptyMatch(final String str, final int from, final Stri
346369
* @param dLen The length of the delimiter string
347370
* @return The next index.
348371
*/
349-
private static int getTo(final String str, final int from, final String delim,
372+
public static int getTo(final String str, final int from, final String delim,
350373
final int len, final int dLen) {
351374
final char cq = CSV_QUOTE_CHAR;
352375
final int fromP1 = from + 1;
@@ -404,17 +427,32 @@ private static int getToNoQuoteCharDelim(final String str, final int from, final
404427
}
405428

406429
public static String trim(String str) {
430+
final int len = str.length();
431+
if(len == 0)
432+
return str;
433+
return trim(str, len);
434+
}
435+
436+
/**
437+
* Caller must have a string of at least 1 character length.
438+
*
439+
* @param str string to trim
440+
* @param len length of string
441+
* @return the trimmed string.
442+
*/
443+
public static String trim(final String str, final int len) {
407444
try{
408-
final int len = str.length();
409-
if(len == 0)
410-
return str;
411445
// short the call to return input if not whitespace in ends.
412-
else if(str.charAt(0) <= ' ' || str.charAt(len -1) <= ' ')
446+
if(str.charAt(0) <= ' ' || str.charAt(len -1) <= ' ')
413447
return str.trim();
414448
else
415449
return str;
416-
}catch(Exception e){
417-
throw new RuntimeException("failed trimming: " + str + " " + str.length(),e);
450+
}
451+
catch(NullPointerException e){
452+
return null;
453+
}
454+
catch(Exception e){
455+
throw new RuntimeException("failed trimming: " + str + " " + str.length(), e);
418456
}
419457
}
420458

@@ -657,10 +695,10 @@ public static int countNumColumnsCSV(InputSplit[] splits, InputFormat informat,
657695
try {
658696
if( reader.next(key, value) ) {
659697
boolean hasValue = true;
660-
if( value.toString().startsWith(TfUtils.TXMTD_MVPREFIX) )
661-
hasValue = reader.next(key, value);
662-
if( value.toString().startsWith(TfUtils.TXMTD_NDPREFIX) )
663-
hasValue = reader.next(key, value);
698+
// if( value.toString().startsWith(TfUtils.TXMTD_MVPREFIX) )
699+
// hasValue = reader.next(key, value);
700+
// if( value.toString().startsWith(TfUtils.TXMTD_NDPREFIX) )
701+
// hasValue = reader.next(key, value);
664702
String row = value.toString().trim();
665703
if( hasValue && !row.isEmpty() ) {
666704
ncol = IOUtilFunctions.countTokensCSV(row, delim);
@@ -901,6 +939,13 @@ public static Writer getSeqWriterFrame(Path path, Configuration job, int replica
901939
Writer.replication((short) (replication > 0 ? replication : 1)));
902940
}
903941

942+
public static Writer getSeqWriterArray(Path path, Configuration job, int replication) throws IOException {
943+
return SequenceFile.createWriter(job, Writer.file(path), Writer.bufferSize(4096),
944+
Writer.keyClass(LongWritable.class), Writer.valueClass(ArrayWrapper.class),
945+
Writer.compression(getCompressionEncodingType(), getCompressionCodec()),
946+
Writer.replication((short) (replication > 0 ? replication : 1)));
947+
}
948+
904949
public static Writer getSeqWriterTensor(Path path, Configuration job, int replication) throws IOException {
905950
return SequenceFile.createWriter(job, Writer.file(path), Writer.bufferSize(4096),
906951
Writer.replication((short) (replication > 0 ? replication : 1)),

0 commit comments

Comments
 (0)