Skip to content

Commit d0d09dc

Browse files
committed
dict writable
1 parent 9bf572f commit d0d09dc

File tree

2 files changed

+59
-14
lines changed

2 files changed

+59
-14
lines changed

src/main/java/org/apache/sysds/runtime/compress/io/DictWritable.java

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,19 @@
2424
import java.io.IOException;
2525
import java.io.Serializable;
2626
import java.util.ArrayList;
27+
import java.util.HashMap;
28+
import java.util.HashSet;
2729
import java.util.List;
30+
import java.util.Map;
31+
import java.util.Set;
2832

2933
import org.apache.hadoop.io.Writable;
3034
import org.apache.sysds.runtime.compress.colgroup.dictionary.DictionaryFactory;
3135
import org.apache.sysds.runtime.compress.colgroup.dictionary.IDictionary;
3236

3337
public class DictWritable implements Writable, Serializable {
3438
private static final long serialVersionUID = 731937201435558L;
39+
3540
public List<IDictionary> dicts;
3641

3742
public DictWritable() {
@@ -44,26 +49,73 @@ protected DictWritable(List<IDictionary> dicts) {
4449

4550
@Override
4651
public void write(DataOutput out) throws IOException {
52+
// the dicts can contain duplicates.
53+
// to avoid writing duplicates we run though once to detect them
54+
Set<IDictionary> ud = new HashSet<>();
55+
for(IDictionary d: dicts){
56+
if(ud.contains(d)){
57+
writeWithDuplicates(out);
58+
return;
59+
}
60+
ud.add(d);
61+
}
62+
4763
out.writeInt(dicts.size());
4864
for(int i = 0; i < dicts.size(); i++)
4965
dicts.get(i).write(out);
5066
}
5167

68+
private void writeWithDuplicates(DataOutput out) throws IOException {
69+
// indicate that we use duplicate detection
70+
out.writeInt(dicts.size() * -1);
71+
Map<IDictionary, Integer> m = new HashMap<>();
72+
73+
for(int i = 0; i < dicts.size(); i++){
74+
int id = m.getOrDefault(dicts.get(i), m.size() );
75+
out.writeInt(id);
76+
77+
if(!m.containsKey(dicts.get(i))){
78+
m.put(dicts.get(i), m.size());
79+
dicts.get(i).write(out);
80+
}
81+
82+
}
83+
}
84+
5285
@Override
5386
public void readFields(DataInput in) throws IOException {
5487
int s = in.readInt();
88+
if( s < 0){
89+
readFieldsWithDuplicates(Math.abs(s), in);
90+
}
91+
else{
92+
dicts = new ArrayList<>(s);
93+
for(int i = 0; i < s; i++)
94+
dicts.add(DictionaryFactory.read(in));
95+
}
96+
}
97+
98+
private void readFieldsWithDuplicates(int s, DataInput in) throws IOException {
99+
55100
dicts = new ArrayList<>(s);
56-
for(int i = 0; i < s; i++)
57-
dicts.add(DictionaryFactory.read(in));
101+
for(int i = 0; i < s; i++){
102+
int id = in.readInt();
103+
if(id < i)
104+
dicts.set(i, dicts.get(id));
105+
else
106+
dicts.add(DictionaryFactory.read(in));
107+
}
58108
}
59109

110+
60111
@Override
61112
public String toString() {
62113
StringBuilder sb = new StringBuilder();
63114
sb.append("Written dictionaries:\n");
64115
for(IDictionary d : dicts) {
65116
sb.append(d);
66117
sb.append("\n");
118+
67119
}
68120
return sb.toString();
69121
}

src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
5858
import org.apache.sysds.runtime.util.CommonThreadPool;
5959
import org.apache.sysds.utils.stats.InfrastructureAnalyzer;
60+
import org.apache.sysds.runtime.util.HDFSTool;
6061

6162
public final class WriterCompressed extends MatrixWriter {
6263

@@ -146,7 +147,7 @@ private void write(MatrixBlock src, final String fname, final int blen) throws I
146147
}
147148

148149
fs = IOUtilFunctions.getFileSystem(new Path(fname), job);
149-
150+
150151
int k = OptimizerUtils.getParallelBinaryWriteParallelism();
151152

152153
k = Math.min(k, (int)(src.getInMemorySize() / InfrastructureAnalyzer.getBlockSize(fs)));
@@ -213,8 +214,6 @@ private void writeMultiBlockCompressedSingleThread(MatrixBlock mb, final int rle
213214
throws IOException {
214215
try {
215216
final CompressedMatrixBlock cmb = (CompressedMatrixBlock) mb;
216-
217-
setupWrite();
218217
final Path path = new Path(fname);
219218
Writer w = generateWriter(job, path, fs);
220219
for(int bc = 0; bc * blen < clen; bc++) {// column blocks
@@ -244,7 +243,6 @@ private void writeMultiBlockCompressedSingleThread(MatrixBlock mb, final int rle
244243
private void writeMultiBlockCompressedParallel(MatrixBlock b, final int rlen, final int clen, final int blen, int k)
245244
throws IOException {
246245

247-
setupWrite();
248246
final ExecutorService pool = CommonThreadPool.get(k);
249247
try {
250248
final ArrayList<Callable<Object>> tasks = new ArrayList<>();
@@ -265,7 +263,8 @@ private void writeMultiBlockCompressedParallel(MatrixBlock b, final int rlen, fi
265263
final int colBlocks = (int) Math.ceil((double) clen / blen );
266264
final int nBlocks = (int) Math.ceil((double) rlen / blen);
267265
final int blocksPerThread = Math.max(1, nBlocks * colBlocks / k );
268-
266+
HDFSTool.deleteFileIfExistOnHDFS(new Path(fname + ".dict"), job);
267+
269268
int i = 0;
270269
for(int bc = 0; bc * blen < clen; bc++) {// column blocks
271270
final int sC = bc * blen;
@@ -307,13 +306,6 @@ private void writeMultiBlockCompressedParallel(MatrixBlock b, final int rlen, fi
307306
}
308307
}
309308

310-
private void setupWrite() throws IOException {
311-
// final Path path = new Path(fname);
312-
// final JobConf job = ConfigurationManager.getCachedJobConf();
313-
// HDFSTool.deleteFileIfExistOnHDFS(path, job);
314-
// HDFSTool.createDirIfNotExistOnHDFS(path, DMLConfig.DEFAULT_SHARED_DIR_PERMISSION);
315-
}
316-
317309
private Path getPath(int id) {
318310
return new Path(fname, IOUtilFunctions.getPartFileName(id));
319311
}
@@ -397,6 +389,7 @@ protected DictWriteTask(String fname, List<IDictionary> dicts, int id) {
397389
public Object call() throws Exception {
398390

399391
Path p = new Path(fname + ".dict", IOUtilFunctions.getPartFileName(id));
392+
HDFSTool.deleteFileIfExistOnHDFS(p, job);
400393
try(Writer w = SequenceFile.createWriter(job, Writer.file(p), //
401394
Writer.bufferSize(4096), //
402395
Writer.keyClass(DictWritable.K.class), //

0 commit comments

Comments
 (0)