Skip to content

Commit 5d30e9b

Browse files
committed
Unified update stream property files
1 parent 9cd4a1a commit 5d30e9b

File tree

1 file changed

+43
-2
lines changed

1 file changed

+43
-2
lines changed

src/main/java/ldbc/socialnet/dbgen/generator/MRGenerateUsers.java

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
package ldbc.socialnet.dbgen.generator;
3838

3939
import java.io.*;
40+
import java.util.Properties;
4041
import java.util.TreeSet;
4142
import java.util.zip.GZIPOutputStream;
4243

@@ -131,7 +132,11 @@ protected void cleanup(Context context){
131132
System.out.println("Number of events reduced "+numEvents);
132133
if (numEvents > 0) {
133134
long updateDistance = (max-min)/numEvents;
134-
String propertiesStr = new String("gctdeltaduration:"+context.getConfiguration().get("deltaTime")+"\nmin_write_event_start_time:"+min+"\nmax_write_event_start_time:"+max+"\nupdate_interleave:"+updateDistance);
135+
String propertiesStr = new String("gctdeltaduration:"+context.getConfiguration().get("deltaTime")+
136+
"\nmin_write_event_start_time:"+min+
137+
"\nmax_write_event_start_time:"+max+
138+
"\nupdate_interleave:"+updateDistance+
139+
"\nnum_events:"+numEvents);
135140
properties.write(propertiesStr.getBytes("UTF8"));
136141
properties.flush();
137142
properties.close();
@@ -538,6 +543,10 @@ public int runGenerateJob(Configuration conf) throws Exception {
538543
int resUpdateStreams = job4.waitForCompletion(true) ? 0 : 1;
539544
fs.delete(new Path(hadoopDir + "/sib4"),true);
540545

546+
int numEvents = 0;
547+
long min = Long.MAX_VALUE;
548+
long max = Long.MIN_VALUE;
549+
541550
for( int i =0; i < numThreads; ++i ) {
542551
int numPartitions = conf.getInt("numUpdatePartitions", 1);
543552
for( int j = 0; j < numPartitions; ++j ) {
@@ -585,15 +594,47 @@ public int runGenerateJob(Configuration conf) throws Exception {
585594
jobPerson.waitForCompletion(true);
586595
fs.delete(new Path(socialNetDir + "/temp_updateStream_" + i+"_"+j+"_person"), false);
587596
fs.delete(new Path(hadoopDir + "/sibEnd"), true);
597+
598+
if(conf.getBoolean("updateStreams",false)) {
599+
Properties properties = new Properties();
600+
properties.load(fs.open(new Path(conf.get("outputDir") + "/social_network/updateStream_" + i + "_" + j + "_person.properties")));
601+
Long auxMin = Long.parseLong(properties.getProperty("min_write_event_start_time"));
602+
min = auxMin < min ? auxMin : min;
603+
Long auxMax = Long.parseLong(properties.getProperty("max_write_event_start_time"));
604+
max = auxMax > max ? auxMax : max;
605+
numEvents += Long.parseLong(properties.getProperty("num_events"));
606+
607+
properties.load(fs.open(new Path(conf.get("outputDir") + "/social_network/updateStream_" + i + "_" + j + "_forum.properties")));
608+
609+
auxMin = Long.parseLong(properties.getProperty("min_write_event_start_time"));
610+
min = auxMin < min ? auxMin : min;
611+
auxMax = Long.parseLong(properties.getProperty("max_write_event_start_time"));
612+
max = auxMax > max ? auxMax : max;
613+
numEvents += Long.parseLong(properties.getProperty("num_events"));
614+
615+
fs.delete(new Path(conf.get("outputDir") + "/social_network/updateStream_" + i + "_" + j + "_person.properties"),true);
616+
fs.delete(new Path(conf.get("outputDir") + "/social_network/updateStream_" + i + "_" + j + "_forum.properties"),true);
617+
}
588618
}
589619
}
590620

621+
if(conf.getBoolean("updateStreams",false)) {
622+
OutputStream output = fs.create(new Path(conf.get("outputDir") + "/social_network/updateStream.properties"));
623+
output.write(new String("gctdeltaduration:"+conf.get("deltaTime")+"\n").getBytes());
624+
output.write(new String("min_write_event_start_time:"+min+"\n").getBytes());
625+
output.write(new String("max_write_event_start_time:"+max+"\n").getBytes());
626+
output.write(new String("update_interleave:"+(max-min)/numEvents+"\n").getBytes());
627+
output.write(new String("num_events:"+numEvents).getBytes());
628+
output.close();
629+
}
630+
631+
632+
591633
printProgress("Starting: Materialize friends for substitution parameters");
592634
int resMaterializeFriends = job6.waitForCompletion(true) ? 0 : 1;
593635
fs.delete(new Path(hadoopDir + "/sibSorting3"),true);
594636

595637

596-
597638
long end = System.currentTimeMillis();
598639
System.out.println(((end - start) / 1000)
599640
+ " total seconds");

0 commit comments

Comments
 (0)