Skip to content

Commit 3ebbedd

Browse files
committed
Fully parallelized update stream sorting and serialization
1 parent 4bd1dfd commit 3ebbedd

10 files changed

+259
-13
lines changed

src/main/java/ldbc/snb/datagen/generator/LDBCDatagen.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949

5050
import java.io.OutputStream;
5151
import java.util.ArrayList;
52+
import java.util.List;
5253
import java.util.Properties;
5354

5455
public class LDBCDatagen {
@@ -157,7 +158,36 @@ public int runGenerateJob(Configuration conf) throws Exception {
157158
int blockSize = DatagenParams.blockSize;
158159
int numBlocks = (int)Math.ceil(DatagenParams.numPersons / (double)blockSize);
159160

161+
List<String> personStreamsFileNames = new ArrayList<String>();
162+
List<String> forumStreamsFileNames = new ArrayList<String>();
160163
for( int i = 0; i < DatagenParams.numThreads; ++i) {
164+
int numPartitions = conf.getInt("ldbc.snb.datagen.serializer.numUpdatePartitions", 1);
165+
if( i < numBlocks ) {
166+
for (int j = 0; j < numPartitions; ++j) {
167+
personStreamsFileNames.add(DatagenParams.hadoopDir + "/temp_updateStream_person_" + i + "_" + j);
168+
if( conf.getBoolean("ldbc.snb.datagen.generator.activity", false)) {
169+
forumStreamsFileNames.add(DatagenParams.hadoopDir + "/temp_updateStream_forum_" + i + "_" + j);
170+
}
171+
}
172+
} else {
173+
for (int j = 0; j < numPartitions; ++j) {
174+
fs.delete(new Path(DatagenParams.hadoopDir + "/temp_updateStream_person_" + i + "_" + j), true);
175+
fs.delete(new Path(DatagenParams.hadoopDir + "/temp_updateStream_forum_" + i + "_" + j), true);
176+
}
177+
}
178+
}
179+
HadoopUpdateStreamSorterAndSerializer updateSorterAndSerializer = new HadoopUpdateStreamSorterAndSerializer(conf);
180+
updateSorterAndSerializer.run(personStreamsFileNames, "person");
181+
updateSorterAndSerializer.run(forumStreamsFileNames, "forum");
182+
for(String file : personStreamsFileNames) {
183+
fs.delete(new Path(file), true);
184+
}
185+
186+
for(String file : forumStreamsFileNames) {
187+
fs.delete(new Path(file), true);
188+
}
189+
190+
/*for( int i = 0; i < DatagenParams.numThreads; ++i) {
161191
int numPartitions = conf.getInt("ldbc.snb.datagen.serializer.numUpdatePartitions", 1);
162192
if( i < numBlocks ) {
163193
for (int j = 0; j < numPartitions; ++j) {
@@ -180,7 +210,7 @@ public int runGenerateJob(Configuration conf) throws Exception {
180210
fs.delete(new Path(DatagenParams.hadoopDir + "/temp_updateStream_forum_" + i + "_" + j), true);
181211
}
182212
}
183-
}
213+
}*/
184214

185215
long minDate = Long.MAX_VALUE;
186216
long maxDate = Long.MIN_VALUE;

src/main/java/ldbc/snb/datagen/hadoop/BlockKey.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,16 @@ public BlockKey( long block, TupleKey tk) {
2828
this.tk = tk;
2929
}
3030

31-
@Override
3231
public void write(DataOutput out) throws IOException {
3332
out.writeLong(block);
3433
tk.write(out);
3534
}
3635

37-
@Override
3836
public void readFields(DataInput in) throws IOException {
3937
block = in.readLong();
4038
tk.readFields(in);
4139
}
4240

43-
@Override
4441
public int compareTo( BlockKey mpk) {
4542
if (block < mpk.block) return -1;
4643
if (block > mpk.block) return 1;

src/main/java/ldbc/snb/datagen/hadoop/HadoopPersonActivityGenerator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ protected void setup(Context context) {
5454
personActivitySerializer_ = (PersonActivitySerializer) Class.forName(conf.get("ldbc.snb.datagen.serializer.personActivitySerializer")).newInstance();
5555
personActivitySerializer_.initialize(conf,reducerId);
5656
if(DatagenParams.updateStreams) {
57-
updateSerializer_ = new UpdateEventSerializer(conf, DatagenParams.hadoopDir + "/temp_updateStream_forum_" + reducerId, DatagenParams.numUpdatePartitions);
57+
updateSerializer_ = new UpdateEventSerializer(conf, DatagenParams.hadoopDir + "/temp_updateStream_forum_" + reducerId, reducerId, DatagenParams.numUpdatePartitions);
5858
}
5959
personActivityGenerator_ = new PersonActivityGenerator(personActivitySerializer_, updateSerializer_);
6060

src/main/java/ldbc/snb/datagen/hadoop/HadoopPersonSerializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ protected void setup(Context context) {
3939
personSerializer_ = (PersonSerializer) Class.forName(conf.get("ldbc.snb.datagen.serializer.personSerializer")).newInstance();
4040
personSerializer_.initialize(conf,reducerId);
4141
if (DatagenParams.updateStreams) {
42-
updateSerializer_ = new UpdateEventSerializer(conf, DatagenParams.hadoopDir + "/temp_updateStream_person_" + reducerId, DatagenParams.numUpdatePartitions);
42+
updateSerializer_ = new UpdateEventSerializer(conf, DatagenParams.hadoopDir + "/temp_updateStream_person_" + reducerId, reducerId, DatagenParams.numUpdatePartitions);
4343
}
4444
} catch( Exception e ) {
4545
System.err.println(e.getMessage());
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package ldbc.snb.datagen.hadoop;
2+
3+
import ldbc.snb.datagen.objects.Person;
4+
import org.apache.hadoop.io.Text;
5+
import org.apache.hadoop.mapreduce.Partitioner;
6+
7+
/**
8+
* Created by aprat on 25/08/15.
9+
*/
10+
public class HadoopUpdateEventKeyPartitioner extends Partitioner<UpdateEventKey, Text> {
11+
12+
public HadoopUpdateEventKeyPartitioner() {
13+
super();
14+
}
15+
16+
@Override
17+
public int getPartition(UpdateEventKey key, Text text, int numReduceTasks) {
18+
return (int)(key.reducerId);
19+
}
20+
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
package ldbc.snb.datagen.hadoop;
2+
3+
import ldbc.snb.datagen.serializer.PersonSerializer;
4+
import ldbc.snb.datagen.serializer.UpdateEventSerializer;
5+
import org.apache.hadoop.conf.Configuration;
6+
import org.apache.hadoop.fs.FileSystem;
7+
import org.apache.hadoop.fs.Path;
8+
import org.apache.hadoop.io.LongWritable;
9+
import org.apache.hadoop.io.Text;
10+
import org.apache.hadoop.mapreduce.Job;
11+
import org.apache.hadoop.mapreduce.Mapper;
12+
import org.apache.hadoop.mapreduce.Reducer;
13+
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
14+
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
15+
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
16+
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
17+
18+
import java.io.IOException;
19+
import java.io.OutputStream;
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
import java.util.zip.GZIPOutputStream;
23+
24+
/**
25+
* Created by aprat on 10/15/14.
26+
*/
27+
public class HadoopUpdateStreamSorterAndSerializer {
28+
29+
public static class HadoopUpdateStreamSorterAndSerializerReducer extends Reducer<UpdateEventKey, Text, UpdateEventKey, Text> {
30+
31+
private int reducerId; /** The id of the reducer.**/
32+
private PersonSerializer personSerializer_; /** The person serializer **/
33+
private UpdateEventSerializer updateSerializer_;
34+
private boolean compressed = false;
35+
private Configuration conf;
36+
private String streamType;
37+
38+
protected void setup(Context context) {
39+
conf = context.getConfiguration();
40+
streamType = conf.get("streamType");
41+
try {
42+
compressed = Boolean.parseBoolean(conf.get("ldbc.snb.datagen.serializer.compressed"));
43+
} catch( Exception e) {
44+
System.err.println(e.getMessage());
45+
}
46+
}
47+
48+
@Override
49+
public void reduce(UpdateEventKey key, Iterable<Text> valueSet,Context context)
50+
throws IOException, InterruptedException {
51+
OutputStream out;
52+
try {
53+
FileSystem fs = FileSystem.get(conf);
54+
if( compressed ) {
55+
Path outFile = new Path(context.getConfiguration().get("ldbc.snb.datagen.serializer.socialNetworkDir")+"/updateStream_"+reducerId+"_"+key.partition+"_"+streamType+".csv.gz");
56+
out = new GZIPOutputStream( fs.create(outFile));
57+
} else {
58+
Path outFile = new Path(context.getConfiguration().get("ldbc.snb.datagen.serializer.socialNetworkDir")+"/updateStream_"+reducerId+"_"+key.partition+"_"+streamType+".csv");
59+
out = fs.create(outFile);
60+
}
61+
for( Text t : valueSet ) {
62+
out.write(t.toString().getBytes("UTF8"));
63+
}
64+
out.close();
65+
} catch( Exception e ) {
66+
System.err.println(e.getMessage());
67+
}
68+
}
69+
protected void cleanup(Context context){
70+
try {
71+
} catch( Exception e ) {
72+
System.err.println(e.getMessage());
73+
}
74+
}
75+
}
76+
77+
78+
private Configuration conf;
79+
80+
public HadoopUpdateStreamSorterAndSerializer(Configuration conf ) {
81+
this.conf = new Configuration(conf);
82+
}
83+
84+
public void run(List<String> inputFileNames, String type ) throws Exception {
85+
86+
int numThreads = conf.getInt("ldbc.snb.datagen.generator.numThreads",1);
87+
conf.set("streamType", type);
88+
89+
Job job = Job.getInstance(conf, "Update Stream Serializer");
90+
job.setMapOutputKeyClass(UpdateEventKey.class);
91+
job.setMapOutputValueClass(Text.class);
92+
job.setOutputKeyClass(UpdateEventKey.class);
93+
job.setOutputValueClass(Text.class);
94+
job.setJarByClass(HadoopUpdateStreamSorterAndSerializerReducer.class);
95+
job.setReducerClass(HadoopUpdateStreamSorterAndSerializerReducer.class);
96+
job.setNumReduceTasks(numThreads);
97+
job.setInputFormatClass(SequenceFileInputFormat.class);
98+
job.setOutputFormatClass(SequenceFileOutputFormat.class);
99+
job.setPartitionerClass(HadoopUpdateEventKeyPartitioner.class);
100+
job.setGroupingComparatorClass(UpdateEventKeyGroupComparator.class);
101+
102+
for(String s : inputFileNames) {
103+
FileInputFormat.addInputPath(job, new Path(s));
104+
}
105+
FileOutputFormat.setOutputPath(job, new Path(conf.get("ldbc.snb.datagen.serializer.hadoopDir")+"/aux"));
106+
if(!job.waitForCompletion(true)) {
107+
throw new Exception();
108+
}
109+
110+
111+
try{
112+
FileSystem fs = FileSystem.get(conf);
113+
fs.delete(new Path(conf.get("ldbc.snb.datagen.serializer.hadoopDir")+"/aux"),true);
114+
} catch(IOException e) {
115+
System.err.println(e.getMessage());
116+
}
117+
}
118+
}
119+

src/main/java/ldbc/snb/datagen/hadoop/TupleKey.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,16 @@ public TupleKey( long key, long id) {
2626
this.id = id;
2727
}
2828

29-
@Override
3029
public void write(DataOutput out) throws IOException {
3130
out.writeLong(key);
3231
out.writeLong(id);
3332
}
3433

35-
@Override
3634
public void readFields(DataInput in) throws IOException {
3735
key = in.readLong();
3836
id = in.readLong();
3937
}
4038

41-
@Override
4239
public int compareTo( TupleKey tk) {
4340
if (key < tk.key) return -1;
4441
if (key > tk.key) return 1;
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package ldbc.snb.datagen.hadoop;
2+
3+
import ldbc.snb.datagen.objects.UpdateEvent;
4+
import org.apache.hadoop.io.Writable;
5+
import org.apache.hadoop.io.WritableComparable;
6+
7+
import java.io.DataInput;
8+
import java.io.DataOutput;
9+
import java.io.IOException;
10+
11+
/**
12+
* Created by aprat on 5/01/16.
13+
*/
14+
public class UpdateEventKey implements WritableComparable<UpdateEventKey> {
15+
16+
public long date;
17+
public int reducerId;
18+
public int partition;
19+
20+
public UpdateEventKey( ) {
21+
}
22+
23+
public UpdateEventKey(UpdateEventKey key) {
24+
this.date = key.date;
25+
this.reducerId = key.reducerId;
26+
this.partition = key.partition;
27+
}
28+
29+
public UpdateEventKey( long date, int reducerId, int partition) {
30+
31+
this.date = date;
32+
this.reducerId = reducerId;
33+
this.partition = partition;
34+
}
35+
36+
public void write(DataOutput out) throws IOException {
37+
out.writeLong(date);
38+
out.writeInt(reducerId);
39+
out.writeInt(partition);
40+
}
41+
42+
public void readFields(DataInput in) throws IOException {
43+
date = in.readLong();
44+
reducerId = in.readInt();
45+
partition = in.readInt();
46+
}
47+
48+
public int compareTo( UpdateEventKey key) {
49+
if( date < key.date) return -1;
50+
if( date > key.date) return 1;
51+
if (reducerId != key.reducerId) return reducerId - key.reducerId;
52+
if (partition != key.partition) return partition - key.partition;
53+
return 0;
54+
}
55+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package ldbc.snb.datagen.hadoop;
2+
3+
import org.apache.hadoop.io.WritableComparable;
4+
import org.apache.hadoop.io.WritableComparator;
5+
6+
/**
7+
* Created by aprat on 11/17/14.
8+
*/
9+
10+
public class UpdateEventKeyGroupComparator extends WritableComparator {
11+
12+
protected UpdateEventKeyGroupComparator() {
13+
super(UpdateEventKey.class,true);
14+
}
15+
16+
@Override
17+
public int compare(WritableComparable a, WritableComparable b) {
18+
UpdateEventKey keyA = (UpdateEventKey)a;
19+
UpdateEventKey keyB = (UpdateEventKey)b;
20+
if (keyA.reducerId != keyB.reducerId) return keyA.reducerId - keyB.reducerId;
21+
if (keyA.partition != keyB.partition) return keyA.partition - keyB.partition;
22+
return 0;
23+
}
24+
}

src/main/java/ldbc/snb/datagen/serializer/UpdateEventSerializer.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838

3939
import ldbc.snb.datagen.dictionary.Dictionaries;
4040
import ldbc.snb.datagen.generator.DatagenParams;
41+
import ldbc.snb.datagen.hadoop.TupleKey;
42+
import ldbc.snb.datagen.hadoop.UpdateEventKey;
4143
import ldbc.snb.datagen.objects.*;
4244
import org.apache.hadoop.conf.Configuration;
4345
import org.apache.hadoop.fs.*;
@@ -75,9 +77,11 @@ private class UpdateStreamStats {
7577
private Configuration conf_;
7678
private UpdateStreamStats stats_;
7779
private String fileNamePrefix_;
80+
private int reducerId_;
7881

79-
public UpdateEventSerializer(Configuration conf, String fileNamePrefix, int numPartitions ) {
82+
public UpdateEventSerializer(Configuration conf, String fileNamePrefix, int reducerId, int numPartitions ) {
8083
conf_ = conf;
84+
reducerId_ = reducerId;
8185
stringBuffer_ = new StringBuffer(512);
8286
data_ = new ArrayList<String>();
8387
list_ = new ArrayList<String>();
@@ -90,7 +94,7 @@ public UpdateEventSerializer(Configuration conf, String fileNamePrefix, int numP
9094
FileContext fc = FileContext.getFileContext(conf);
9195
for( int i = 0; i < numPartitions_; ++i ) {
9296
Path outFile = new Path(fileNamePrefix_+"_"+i);
93-
streamWriter_[i] = SequenceFile.createWriter(fc, conf, outFile, LongWritable.class, Text.class, CompressionType.NONE, new DefaultCodec(),new SequenceFile.Metadata(), EnumSet.of(CreateFlag.CREATE), Options.CreateOpts.checksumParam(Options.ChecksumOpt.createDisabled()));
97+
streamWriter_[i] = SequenceFile.createWriter(fc, conf, outFile, UpdateEventKey.class, Text.class, CompressionType.NONE, new DefaultCodec(),new SequenceFile.Metadata(), EnumSet.of(CreateFlag.CREATE), Options.CreateOpts.checksumParam(Options.ChecksumOpt.createDisabled()));
9498
FileSystem fs = FileSystem.get(conf);
9599
Path propertiesFile = new Path(fileNamePrefix+".properties");
96100
if(fs.exists(propertiesFile)){
@@ -116,7 +120,7 @@ public void changePartition() {
116120

117121
public void writeKeyValue( UpdateEvent event ) {
118122
try{
119-
StringBuffer string = new StringBuffer();
123+
StringBuilder string = new StringBuilder();
120124
string.append(Long.toString(event.date));
121125
string.append("|");
122126
string.append(Long.toString(event.dependantDate));
@@ -125,7 +129,7 @@ public void writeKeyValue( UpdateEvent event ) {
125129
string.append("|");
126130
string.append(event.eventData);
127131
string.append("\n");
128-
streamWriter_[nextPartition_].append(new LongWritable(event.date),new Text(string.toString()));
132+
streamWriter_[nextPartition_].append(new UpdateEventKey(event.date, reducerId_, nextPartition_),new Text(string.toString()));
129133
} catch(IOException e){
130134
System.err.println(e.getMessage());
131135
System.exit(-1);

0 commit comments

Comments
 (0)