Skip to content

Commit 3af1620

Browse files
committed
Implemented split of edge generation into multiple disjoint files and then merging of them
1 parent 4b50106 commit 3af1620

File tree

5 files changed

+174
-63
lines changed

5 files changed

+174
-63
lines changed

src/main/java/ldbc/snb/datagen/generator/DistanceKnowsGenerator.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,21 @@ public DistanceKnowsGenerator() {
1818
this.randomFarm = new RandomGeneratorFarm();
1919
}
2020

21-
public void generateKnows( ArrayList<Person> persons, int seed, float upperBound ) {
21+
public void generateKnows( ArrayList<Person> persons, int seed, float percentage ) {
2222
randomFarm.resetRandomGenerators(seed);
2323
for( int i = 0; i < persons.size(); ++i ) {
2424
Person p = persons.get(i);
25-
for( int j = i+1; (p.maxNumKnows()*upperBound > p.knows().size()) /*&& (j < (i + 1000))*/ && (j < persons.size()); ++j ) {
26-
if( know(p, persons.get(j), j - i, upperBound)) {
25+
for( int j = i+1; (p.maxNumKnows()*percentage > p.knows().size()) && (j < persons.size()); ++j ) {
26+
if( know(p, persons.get(j), j - i, percentage)) {
2727
createKnow(p, persons.get(j));
2828
}
2929
}
3030
}
3131
}
3232

33-
boolean know( Person personA, Person personB, int dist, float upperBound ) {
34-
if((float)(personA.knows().size()) >= (float)(personA.maxNumKnows())*upperBound ||
35-
personB.knows().size() >= (float)(personB.maxNumKnows())*upperBound ) return false;
33+
boolean know( Person personA, Person personB, int dist, float percentage ) {
34+
if((float)(personA.knows().size()) >= (float)(personA.maxNumKnows())*percentage ||
35+
personB.knows().size() >= (float)(personB.maxNumKnows())*percentage ) return false;
3636
double randProb = randomFarm.get(RandomGeneratorFarm.Aspect.UNIFORM).nextDouble();
3737
double prob = Math.pow(DatagenParams.baseProbCorrelated, dist);
3838
if ((randProb < prob) || (randProb < DatagenParams.limitProCorrelated)) {

src/main/java/ldbc/snb/datagen/generator/LDBCDatagen.java

Lines changed: 65 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.hadoop.io.Text;
4949

5050
import java.io.OutputStream;
51+
import java.util.ArrayList;
5152
import java.util.Properties;
5253

5354
public class LDBCDatagen {
@@ -70,39 +71,45 @@ private void printProgress(String message) {
7071

7172
public int runGenerateJob(Configuration conf) throws Exception {
7273

73-
String personsFileName1 = conf.get("ldbc.snb.datagen.serializer.hadoopDir") + "/persons1";
74-
String personsFileName2 = conf.get("ldbc.snb.datagen.serializer.hadoopDir") + "/persons2";
74+
String hadoopPrefix = conf.get("ldbc.snb.datagen.serializer.hadoopDir");
7575
FileSystem fs = FileSystem.get(conf);
7676

7777
long start = System.currentTimeMillis();
7878
printProgress("Starting: Person generation");
7979
long startPerson = System.currentTimeMillis();
8080
HadoopPersonGenerator personGenerator = new HadoopPersonGenerator( conf );
81-
personGenerator.run(personsFileName1, "ldbc.snb.datagen.hadoop.UniversityKeySetter");
81+
personGenerator.run(hadoopPrefix+"/persons", "ldbc.snb.datagen.hadoop.UniversityKeySetter");
8282
long endPerson = System.currentTimeMillis();
8383

84-
8584
printProgress("Creating university location correlated edges");
8685
long startUniversity = System.currentTimeMillis();
87-
HadoopKnowsGenerator knowsGenerator = new HadoopKnowsGenerator(conf,null, "ldbc.snb.datagen.hadoop.InterestKeySetter", 0.45f);
88-
knowsGenerator.run(personsFileName1,personsFileName2);
89-
fs.delete(new Path(personsFileName1), true);
86+
HadoopKnowsGenerator knowsGenerator = new HadoopKnowsGenerator(conf,"ldbc.snb.datagen.hadoop.UniversityKeySetter", "ldbc.snb.datagen.hadoop.RandomKeySetter", 0.45f);
87+
knowsGenerator.run(hadoopPrefix+"/persons",hadoopPrefix+"/universityEdges");
9088
long endUniversity = System.currentTimeMillis();
9189

9290
printProgress("Creating main interest correlated edges");
9391
long startInterest= System.currentTimeMillis();
94-
knowsGenerator = new HadoopKnowsGenerator(conf,null, "ldbc.snb.datagen.hadoop.RandomKeySetter", 0.90f);
95-
knowsGenerator.run(personsFileName2,personsFileName1);
96-
fs.delete(new Path(personsFileName2), true);
92+
knowsGenerator = new HadoopKnowsGenerator(conf,"ldbc.snb.datagen.hadoop.InterestKeySetter", "ldbc.snb.datagen.hadoop.RandomKeySetter", 0.45f);
93+
knowsGenerator.run(hadoopPrefix+"/persons",hadoopPrefix+"/interestEdges");
9794
long endInterest = System.currentTimeMillis();
9895

9996
printProgress("Creating random correlated edges");
10097
long startRandom= System.currentTimeMillis();
101-
knowsGenerator = new HadoopKnowsGenerator(conf,null, "ldbc.snb.datagen.hadoop.RandomKeySetter", 1.0f);
102-
knowsGenerator.run(personsFileName1,personsFileName2);
103-
fs.delete(new Path(personsFileName1), true);
98+
knowsGenerator = new HadoopKnowsGenerator(conf,"ldbc.snb.datagen.hadoop.RandomKeySetter", "ldbc.snb.datagen.hadoop.RandomKeySetter", 0.1f);
99+
knowsGenerator.run(hadoopPrefix+"/persons",hadoopPrefix+"/randomEdges");
104100
long endRandom= System.currentTimeMillis();
105101

102+
103+
fs.delete(new Path(DatagenParams.hadoopDir + "/persons"), true);
104+
printProgress("Merging the different edge files");
105+
ArrayList<String> edgeFileNames = new ArrayList<String>();
106+
edgeFileNames.add(hadoopPrefix+"/universityEdges");
107+
edgeFileNames.add(hadoopPrefix+"/interestEdges");
108+
edgeFileNames.add(hadoopPrefix+"/randomEdges");
109+
long startMerge = System.currentTimeMillis();
110+
HadoopMergeFriendshipFiles merger = new HadoopMergeFriendshipFiles(conf,"ldbc.snb.datagen.hadoop.RandomKeySetter");
111+
merger.run(hadoopPrefix+"/mergedPersons", edgeFileNames);
112+
long endMerge = System.currentTimeMillis();
106113
/*printProgress("Creating edges to fill the degree gap");
107114
long startGap = System.currentTimeMillis();
108115
knowsGenerator = new HadoopKnowsGenerator(conf,null, "ldbc.snb.datagen.hadoop.DegreeGapKeySetter", 1.0f);
@@ -114,14 +121,14 @@ public int runGenerateJob(Configuration conf) throws Exception {
114121
printProgress("Serializing persons");
115122
long startPersonSerializing= System.currentTimeMillis();
116123
HadoopPersonSerializer serializer = new HadoopPersonSerializer(conf);
117-
serializer.run(personsFileName2);
124+
serializer.run(hadoopPrefix+"/mergedPersons");
118125
long endPersonSerializing= System.currentTimeMillis();
119126

120127
long startPersonActivity= System.currentTimeMillis();
121128
if(conf.getBoolean("ldbc.snb.datagen.generator.activity", true)) {
122129
printProgress("Generating and serializing person activity");
123130
HadoopPersonActivityGenerator activityGenerator = new HadoopPersonActivityGenerator(conf);
124-
activityGenerator.run(personsFileName2);
131+
activityGenerator.run(hadoopPrefix+"/mergedPersons");
125132

126133
int numThreads = DatagenParams.numThreads;
127134
int blockSize = DatagenParams.blockSize;
@@ -134,41 +141,41 @@ public int runGenerateJob(Configuration conf) throws Exception {
134141
}
135142
}
136143
}
137-
fs.delete(new Path(personsFileName2), true);
138144
long endPersonActivity= System.currentTimeMillis();
139145

140146
long startSortingUpdateStreams= System.currentTimeMillis();
141-
if(conf.getBoolean("ldbc.snb.datagen.serializer.updateStreams", false)) {
142-
printProgress("Sorting update streams ");
143147

144-
int blockSize = DatagenParams.blockSize;
145-
int numBlocks = (int)Math.ceil(DatagenParams.numPersons / (double)blockSize);
146-
147-
for( int i = 0; i < DatagenParams.numThreads; ++i) {
148-
int numPartitions = conf.getInt("ldbc.snb.datagen.serializer.numUpdatePartitions", 1);
149-
if( i < numBlocks ) {
150-
for (int j = 0; j < numPartitions; ++j) {
151-
HadoopFileSorter updateStreamSorter = new HadoopFileSorter(conf, LongWritable.class, Text.class);
152-
updateStreamSorter.run(DatagenParams.hadoopDir + "/temp_updateStream_person_" + i + "_" + j, DatagenParams.hadoopDir + "/updateStream_person_" + i + "_" + j);
153-
updateStreamSorter.run(DatagenParams.hadoopDir + "/temp_updateStream_forum_" + i + "_" + j, DatagenParams.hadoopDir + "/updateStream_forum_" + i + "_" + j);
148+
if(conf.getBoolean("ldbc.snb.datagen.serializer.updateStreams", false)) {
154149

155-
fs.delete(new Path(DatagenParams.hadoopDir + "/temp_updateStream_person_" + i + "_" + j), true);
156-
fs.delete(new Path(DatagenParams.hadoopDir + "/temp_updateStream_forum_" + i + "_" + j), true);
150+
printProgress("Sorting update streams ");
157151

158-
HadoopUpdateStreamSerializer updateSerializer = new HadoopUpdateStreamSerializer(conf);
159-
updateSerializer.run(DatagenParams.hadoopDir + "/updateStream_person_" + i + "_" + j, i, j, "person");
160-
updateSerializer.run(DatagenParams.hadoopDir + "/updateStream_forum_" + i + "_" + j, i, j, "forum");
152+
int blockSize = DatagenParams.blockSize;
153+
int numBlocks = (int)Math.ceil(DatagenParams.numPersons / (double)blockSize);
161154

162-
fs.delete(new Path(DatagenParams.hadoopDir + "/updateStream_person_" + i + "_" + j), true);
163-
fs.delete(new Path(DatagenParams.hadoopDir + "/updateStream_forum_" + i + "_" + j), true);
164-
}
165-
} else {
166-
for (int j = 0; j < numPartitions; ++j) {
167-
fs.delete(new Path(DatagenParams.hadoopDir + "/temp_updateStream_person_" + i + "_" + j), true);
168-
fs.delete(new Path(DatagenParams.hadoopDir + "/temp_updateStream_forum_" + i + "_" + j), true);
155+
for( int i = 0; i < DatagenParams.numThreads; ++i) {
156+
int numPartitions = conf.getInt("ldbc.snb.datagen.serializer.numUpdatePartitions", 1);
157+
if( i < numBlocks ) {
158+
for (int j = 0; j < numPartitions; ++j) {
159+
HadoopFileSorter updateStreamSorter = new HadoopFileSorter(conf, LongWritable.class, Text.class);
160+
HadoopUpdateStreamSerializer updateSerializer = new HadoopUpdateStreamSerializer(conf);
161+
updateStreamSorter.run(DatagenParams.hadoopDir + "/temp_updateStream_person_" + i + "_" + j, DatagenParams.hadoopDir + "/updateStream_person_" + i + "_" + j);
162+
fs.delete(new Path(DatagenParams.hadoopDir + "/temp_updateStream_person_" + i + "_" + j), true);
163+
updateSerializer.run(DatagenParams.hadoopDir + "/updateStream_person_" + i + "_" + j, i, j, "person");
164+
fs.delete(new Path(DatagenParams.hadoopDir + "/updateStream_person_" + i + "_" + j), true);
165+
if( conf.getBoolean("ldbc.snb.datagen.generator.activity", false)) {
166+
updateStreamSorter.run(DatagenParams.hadoopDir + "/temp_updateStream_forum_" + i + "_" + j, DatagenParams.hadoopDir + "/updateStream_forum_" + i + "_" + j);
167+
fs.delete(new Path(DatagenParams.hadoopDir + "/temp_updateStream_forum_" + i + "_" + j), true);
168+
updateSerializer.run(DatagenParams.hadoopDir + "/updateStream_forum_" + i + "_" + j, i, j, "forum");
169+
fs.delete(new Path(DatagenParams.hadoopDir + "/updateStream_forum_" + i + "_" + j), true);
170+
}
171+
}
172+
} else {
173+
for (int j = 0; j < numPartitions; ++j) {
174+
fs.delete(new Path(DatagenParams.hadoopDir + "/temp_updateStream_person_" + i + "_" + j), true);
175+
fs.delete(new Path(DatagenParams.hadoopDir + "/temp_updateStream_forum_" + i + "_" + j), true);
176+
}
169177
}
170178
}
171-
}
172179

173180
long minDate = Long.MAX_VALUE;
174181
long maxDate = Long.MIN_VALUE;
@@ -188,18 +195,20 @@ public int runGenerateJob(Configuration conf) throws Exception {
188195
file.close();
189196
fs.delete(propertiesFile,true);
190197

191-
propertiesFile = new Path(DatagenParams.hadoopDir+"/temp_updateStream_forum_"+i+".properties");
192-
file = fs.open(propertiesFile);
193-
properties = new Properties();
194-
properties.load(file);
195-
aux = Long.parseLong(properties.getProperty("ldbc.snb.interactive.min_write_event_start_time"));
196-
minDate = aux < minDate ? aux : minDate;
197-
aux = Long.parseLong(properties.getProperty("ldbc.snb.interactive.max_write_event_start_time"));
198-
maxDate = aux > maxDate ? aux : maxDate;
199-
aux = Long.parseLong(properties.getProperty("ldbc.snb.interactive.num_events"));
200-
count += aux;
201-
file.close();
202-
fs.delete(propertiesFile,true);
198+
if( conf.getBoolean("ldbc.snb.datagen.generator.activity", false)) {
199+
propertiesFile = new Path(DatagenParams.hadoopDir + "/temp_updateStream_forum_" + i + ".properties");
200+
file = fs.open(propertiesFile);
201+
properties = new Properties();
202+
properties.load(file);
203+
aux = Long.parseLong(properties.getProperty("ldbc.snb.interactive.min_write_event_start_time"));
204+
minDate = aux < minDate ? aux : minDate;
205+
aux = Long.parseLong(properties.getProperty("ldbc.snb.interactive.max_write_event_start_time"));
206+
maxDate = aux > maxDate ? aux : maxDate;
207+
aux = Long.parseLong(properties.getProperty("ldbc.snb.interactive.num_events"));
208+
count += aux;
209+
file.close();
210+
fs.delete(propertiesFile, true);
211+
}
203212
}
204213

205214
OutputStream output = fs.create(new Path(DatagenParams.socialNetworkDir+"/updateStream"+".properties"),true);
@@ -229,8 +238,9 @@ public int runGenerateJob(Configuration conf) throws Exception {
229238
+ " total seconds");
230239
System.out.println("Person generation time: "+((endPerson - startPerson) / 1000));
231240
System.out.println("University correlated edge generation time: "+((endUniversity - startUniversity) / 1000));
232-
// System.out.println("Interest correlated edge generation time: "+((endInterest - startInterest) / 1000));
233-
// System.out.println("Random correlated edge generation time: "+((endRandom - startRandom) / 1000));
241+
System.out.println("Interest correlated edge generation time: "+((endInterest - startInterest) / 1000));
242+
System.out.println("Random correlated edge generation time: "+((endRandom - startRandom) / 1000));
243+
System.out.println("Edges merge time: "+((endMerge - startMerge) / 1000));
234244
System.out.println("Person serialization time: "+((endPersonSerializing - startPersonSerializing) / 1000));
235245
System.out.println("Person activity generation and serialization time: "+((endPersonActivity - startPersonActivity) / 1000));
236246
System.out.println("Sorting update streams time: "+((endSortingUpdateStreams - startSortingUpdateStreams) / 1000));

src/main/java/ldbc/snb/datagen/hadoop/HadoopKnowsGenerator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,9 @@ public void run( String inputFileName, String outputFileName ) throws Exception
8888
String rankedFileName = conf.get("ldbc.snb.datagen.serializer.hadoopDir") + "/ranked";
8989
HadoopFileRanker hadoopFileRanker = new HadoopFileRanker( conf, TupleKey.class, Person.class );
9090
hadoopFileRanker.run(keyChangedFileName,rankedFileName);
91-
fs.delete(new Path(keyChangedFileName), true);
91+
if(preKeySetterName != null ) {
92+
fs.delete(new Path(keyChangedFileName), true);
93+
}
9294
System.out.println("... Time to rank persons: "+ (System.currentTimeMillis() - start)+" ms");
9395

9496
conf.set("upperBound",Double.toString(upperBound));
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package ldbc.snb.datagen.hadoop;
2+
3+
import ldbc.snb.datagen.generator.KnowsGenerator;
4+
import ldbc.snb.datagen.generator.LDBCDatagen;
5+
import ldbc.snb.datagen.objects.Knows;
6+
import ldbc.snb.datagen.objects.Person;
7+
import org.apache.hadoop.conf.Configuration;
8+
import org.apache.hadoop.fs.Path;
9+
import org.apache.hadoop.mapreduce.Job;
10+
import org.apache.hadoop.mapreduce.Reducer;
11+
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
12+
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
13+
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
14+
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
15+
16+
import java.io.IOException;
17+
import java.util.ArrayList;
18+
19+
/**
20+
* Created by aprat on 29/07/15.
21+
*/
22+
public class HadoopMergeFriendshipFiles {
23+
24+
public static class HadoopMergeFriendshipFilesReducer extends Reducer<TupleKey, Person, TupleKey, Person> {
25+
26+
private Configuration conf;
27+
private HadoopFileKeyChanger.KeySetter<TupleKey> keySetter = null;
28+
29+
protected void setup(Context context) {
30+
this.conf = context.getConfiguration();
31+
try {
32+
this.keySetter = (HadoopFileKeyChanger.KeySetter) Class.forName(conf.get("postKeySetterName")).newInstance();
33+
}catch(Exception e) {
34+
System.out.println(e.getMessage());
35+
}
36+
LDBCDatagen.init(conf);
37+
}
38+
39+
@Override
40+
public void reduce(TupleKey key, Iterable<Person> valueSet,Context context)
41+
throws IOException, InterruptedException {
42+
43+
Person person = null;
44+
int index = 0;
45+
for ( Person p : valueSet) {
46+
if( index == 0 ) {
47+
person = new Person(p);
48+
} else {
49+
for ( Knows k : p.knows() ) {
50+
person.knows().add(k);
51+
}
52+
}
53+
index++;
54+
}
55+
//System.out.println("Num persons "+index);
56+
context.write(keySetter.getKey(person),person);
57+
}
58+
}
59+
60+
private Configuration conf;
61+
private String postKeySetterName;
62+
63+
public HadoopMergeFriendshipFiles( Configuration conf, String postKeySetterName ) {
64+
65+
this.conf = conf;
66+
this.postKeySetterName = postKeySetterName;
67+
}
68+
69+
public void run( String outputFileName, ArrayList<String> friendshipFileNames ) throws Exception {
70+
71+
conf.set("postKeySetterName",postKeySetterName);
72+
int numThreads = Integer.parseInt(conf.get("ldbc.snb.datagen.generator.numThreads"));
73+
Job job = Job.getInstance(conf, "Edges merger generator");
74+
job.setMapOutputKeyClass(TupleKey.class);
75+
job.setMapOutputValueClass(Person.class);
76+
job.setOutputKeyClass(TupleKey.class);
77+
job.setOutputValueClass(Person.class);
78+
//job.setJarByClass(HadoopBlockMapper.class);
79+
//job.setMapperClass(HadoopBlockMapper.class);
80+
job.setReducerClass(HadoopMergeFriendshipFilesReducer.class);
81+
job.setNumReduceTasks(numThreads);
82+
job.setInputFormatClass(SequenceFileInputFormat.class);
83+
job.setOutputFormatClass(SequenceFileOutputFormat.class);
84+
85+
for ( String s : friendshipFileNames ) {
86+
FileInputFormat.addInputPath(job, new Path(s));
87+
}
88+
FileOutputFormat.setOutputPath(job, new Path(outputFileName));
89+
90+
System.out.println("Merging edges");
91+
long start = System.currentTimeMillis();
92+
if(!job.waitForCompletion(true) ){
93+
throw new Exception();
94+
}
95+
System.out.println("... time to merge edges: "+ (System.currentTimeMillis() - start)+" ms");
96+
97+
98+
}
99+
}

src/main/java/ldbc/snb/datagen/hadoop/HadoopPersonSerializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public void reduce(BlockKey key, Iterable<Person> valueSet,Context context)
5151
throws IOException, InterruptedException {
5252
personSerializer_.reset();
5353
for( Person p : valueSet ) {
54-
if(p.creationDate()< Dictionaries.dates.getUpdateThreshold() || !DatagenParams.updateStreams ) {
54+
if(p.creationDate()< Dictionaries.dates.getUpdateThreshold() || !DatagenParams.updateStreams ) {
5555
personSerializer_.export(p);
5656
} else {
5757
updateSerializer_.export(p);

0 commit comments

Comments
 (0)