Skip to content

Commit 8e1642e

Browse files
committed
Added option to select sorting the persons prior to serialization
1 parent 7cf0a8e commit 8e1642e

File tree

3 files changed

+135
-2
lines changed

3 files changed

+135
-2
lines changed

src/main/java/ldbc/snb/datagen/generator/LDBCDatagen.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,13 @@ public int runGenerateJob(Configuration conf) throws Exception {
162162

163163
printProgress("Serializing persons");
164164
long startPersonSerializing= System.currentTimeMillis();
165-
HadoopPersonSerializer serializer = new HadoopPersonSerializer(conf);
166-
serializer.run(hadoopPrefix+"/mergedPersons");
165+
if(conf.getBoolean("ldbc.snb.datagen.serializer.persons.sort",false) == false) {
166+
HadoopPersonSerializer serializer = new HadoopPersonSerializer(conf);
167+
serializer.run(hadoopPrefix + "/mergedPersons");
168+
} else {
169+
HadoopPersonSortAndSerializer serializer = new HadoopPersonSortAndSerializer(conf);
170+
serializer.run(hadoopPrefix + "/mergedPersons");
171+
}
167172
long endPersonSerializing= System.currentTimeMillis();
168173

169174
long startPersonActivity= System.currentTimeMillis();
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
package ldbc.snb.datagen.hadoop;
2+
3+
import ldbc.snb.datagen.dictionary.Dictionaries;
4+
import ldbc.snb.datagen.generator.DatagenParams;
5+
import ldbc.snb.datagen.generator.LDBCDatagen;
6+
import ldbc.snb.datagen.objects.Knows;
7+
import ldbc.snb.datagen.objects.Person;
8+
import ldbc.snb.datagen.serializer.PersonSerializer;
9+
import ldbc.snb.datagen.serializer.UpdateEventSerializer;
10+
import ldbc.snb.datagen.vocabulary.SN;
11+
import org.apache.hadoop.conf.Configuration;
12+
import org.apache.hadoop.fs.FileSystem;
13+
import org.apache.hadoop.fs.Path;
14+
import org.apache.hadoop.io.LongWritable;
15+
import org.apache.hadoop.mapreduce.Job;
16+
import org.apache.hadoop.mapreduce.Reducer;
17+
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
18+
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
19+
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
20+
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
21+
22+
import java.io.IOException;
23+
24+
/**
25+
* Created by aprat on 10/15/14.
26+
*/
27+
public class HadoopPersonSortAndSerializer {
28+
29+
public static class HadoopPersonSerializerReducer extends Reducer<BlockKey, Person, LongWritable, Person> {
30+
31+
private int reducerId; /** The id of the reducer.**/
32+
private PersonSerializer personSerializer_; /** The person serializer **/
33+
private UpdateEventSerializer updateSerializer_;
34+
35+
protected void setup(Context context) {
36+
Configuration conf = context.getConfiguration();
37+
reducerId = context.getTaskAttemptID().getTaskID().getId();
38+
LDBCDatagen.init(conf);
39+
try {
40+
personSerializer_ = (PersonSerializer) Class.forName(conf.get("ldbc.snb.datagen.serializer.personSerializer")).newInstance();
41+
personSerializer_.initialize(conf,reducerId);
42+
if (DatagenParams.updateStreams) {
43+
updateSerializer_ = new UpdateEventSerializer(conf, DatagenParams.hadoopDir + "/temp_updateStream_person_" + reducerId, reducerId, DatagenParams.numUpdatePartitions);
44+
}
45+
} catch( Exception e ) {
46+
System.err.println(e.getMessage());
47+
}
48+
}
49+
50+
@Override
51+
public void reduce(BlockKey key, Iterable<Person> valueSet,Context context)
52+
throws IOException, InterruptedException {
53+
SN.machineId = key.block;
54+
personSerializer_.reset();
55+
for( Person p : valueSet ) {
56+
if(p.creationDate()< Dictionaries.dates.getUpdateThreshold() || !DatagenParams.updateStreams ) {
57+
personSerializer_.export(p);
58+
} else {
59+
updateSerializer_.export(p);
60+
updateSerializer_.changePartition();
61+
}
62+
63+
for( Knows k : p.knows() ) {
64+
if( k.creationDate() < Dictionaries.dates.getUpdateThreshold() || !DatagenParams.updateStreams ) {
65+
personSerializer_.export(p, k);
66+
}
67+
}
68+
}
69+
70+
}
71+
protected void cleanup(Context context){
72+
personSerializer_.close();
73+
if (DatagenParams.updateStreams) {
74+
updateSerializer_.close();
75+
}
76+
}
77+
}
78+
79+
80+
private Configuration conf;
81+
82+
public HadoopPersonSortAndSerializer(Configuration conf ) {
83+
this.conf = new Configuration(conf);
84+
}
85+
86+
public void run( String inputFileName ) throws Exception {
87+
88+
FileSystem fs = FileSystem.get(conf);
89+
90+
String rankedFileName = conf.get("ldbc.snb.datagen.serializer.hadoopDir") + "/ranked";
91+
HadoopFileRanker hadoopFileRanker = new HadoopFileRanker( conf, TupleKey.class, Person.class, null );
92+
hadoopFileRanker.run(inputFileName,rankedFileName);
93+
94+
int numThreads = Integer.parseInt(conf.get("ldbc.snb.datagen.generator.numThreads"));
95+
Job job = Job.getInstance(conf, "Person Serializer");
96+
job.setMapOutputKeyClass(BlockKey.class);
97+
job.setMapOutputValueClass(Person.class);
98+
job.setOutputKeyClass(LongWritable.class);
99+
job.setOutputValueClass(Person.class);
100+
job.setJarByClass(HadoopBlockMapper.class);
101+
job.setMapperClass(HadoopBlockMapper.class);
102+
job.setReducerClass(HadoopPersonSerializerReducer.class);
103+
job.setNumReduceTasks(numThreads);
104+
job.setInputFormatClass(SequenceFileInputFormat.class);
105+
job.setOutputFormatClass(SequenceFileOutputFormat.class);
106+
107+
job.setPartitionerClass(HadoopTuplePartitioner.class);
108+
109+
job.setSortComparatorClass(BlockKeyComparator.class);
110+
job.setGroupingComparatorClass(BlockKeyGroupComparator.class);
111+
job.setPartitionerClass(HadoopBlockPartitioner.class);
112+
113+
FileInputFormat.setInputPaths(job, new Path(rankedFileName));
114+
FileOutputFormat.setOutputPath(job, new Path(conf.get("ldbc.snb.datagen.serializer.hadoopDir")+"/aux"));
115+
if(!job.waitForCompletion(true)) {
116+
throw new Exception();
117+
}
118+
119+
120+
try{
121+
fs.delete(new Path(rankedFileName), true);
122+
fs.delete(new Path(conf.get("ldbc.snb.datagen.serializer.hadoopDir")+"/aux"),true);
123+
} catch(IOException e) {
124+
System.err.println(e.getMessage());
125+
}
126+
}
127+
}

src/main/java/ldbc/snb/datagen/util/ConfigParser.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public static Configuration initialize() {
5454
conf.set("ldbc.snb.datagen.generator.person.similarity", "ldbc.snb.datagen.objects.similarity.GeoDistanceSimilarity");
5555
conf.set("ldbc.snb.datagen.parametergenerator.python", "python");
5656
conf.set("ldbc.snb.datagen.parametergenerator.parameters", "true");
57+
conf.set("ldbc.snb.datagen.serializer.persons.sort", "false");
5758

5859
/** Loading predefined Scale Factors **/
5960

0 commit comments

Comments
 (0)