46
46
import ldbc .socialnet .dbgen .util .*;
47
47
48
48
import org .apache .hadoop .conf .Configuration ;
49
+ import org .apache .hadoop .fs .FSDataInputStream ;
49
50
import org .apache .hadoop .fs .FSDataOutputStream ;
50
51
import org .apache .hadoop .fs .FileSystem ;
51
52
import org .apache .hadoop .fs .Path ;
@@ -98,17 +99,18 @@ protected void setup(Context context){
98
99
FileSystem fs = FileSystem .get (conf );
99
100
String strTaskId = context .getTaskAttemptID ().getTaskID ().toString ();
100
101
int attempTaskId = Integer .parseInt (strTaskId .substring (strTaskId .length () - 3 ));
102
+ int reducerId = conf .getInt ("reducerId" ,0 );
101
103
int partitionId = conf .getInt ("partitionId" ,0 );
102
104
String streamType = conf .get ("streamType" );
103
105
if ( Boolean .parseBoolean (conf .get ("compressed" )) == true ) {
104
- Path outFile = new Path (context .getConfiguration ().get ("outputDir" )+"/social_network/updateStream_" +attempTaskId +"_" +partitionId +"_" +streamType +".csv.gz" );
106
+ Path outFile = new Path (context .getConfiguration ().get ("outputDir" )+"/social_network/updateStream_" +reducerId +"_" +partitionId +"_" +streamType +".csv.gz" );
105
107
out = new GZIPOutputStream ( fs .create (outFile ));
106
108
} else {
107
- Path outFile = new Path (context .getConfiguration ().get ("outputDir" )+"/social_network/updateStream_" +attempTaskId +"_" +partitionId +"_" +streamType +".csv" );
109
+ Path outFile = new Path (context .getConfiguration ().get ("outputDir" )+"/social_network/updateStream_" +reducerId +"_" +partitionId +"_" +streamType +".csv" );
108
110
out = fs .create (outFile );
109
111
}
110
112
if (conf .getBoolean ("updateStreams" ,false )) {
111
- properties = fs .create (new Path (context .getConfiguration ().get ("outputDir" )+"/social_network/updateStream_" +attempTaskId +"_" +partitionId +"_" +streamType +".properties" ));
113
+ properties = fs .create (new Path (context .getConfiguration ().get ("outputDir" )+"/social_network/updateStream_" +reducerId +"_" +partitionId +"_" +streamType +".properties" ));
112
114
}
113
115
} catch (IOException e ) {
114
116
e .printStackTrace ();
@@ -552,6 +554,7 @@ public int runGenerateJob(Configuration conf) throws Exception {
552
554
for ( int j = 0 ; j < numPartitions ; ++j ) {
553
555
/// --------------- Fifth job: Sort update streams ----------------
554
556
conf .setInt ("mapred.line.input.format.linespermap" , 1000000 );
557
+ conf .setInt ("reducerId" ,i );
555
558
conf .setInt ("partitionId" ,j );
556
559
conf .set ("streamType" ,"forum" );
557
560
Job jobForum = new Job (conf , "Soring update streams " +j +" of reducer " +i );
@@ -574,6 +577,7 @@ public int runGenerateJob(Configuration conf) throws Exception {
574
577
fs .delete (new Path (hadoopDir + "/sibEnd" ), true );
575
578
576
579
conf .setInt ("mapred.line.input.format.linespermap" , 1000000 );
580
+ conf .setInt ("reducerId" ,i );
577
581
conf .setInt ("partitionId" ,j );
578
582
conf .set ("streamType" ,"person" );
579
583
Job jobPerson = new Job (conf , "Soring update streams " +j +" of reducer " +i );
@@ -597,21 +601,26 @@ public int runGenerateJob(Configuration conf) throws Exception {
597
601
598
602
if (conf .getBoolean ("updateStreams" ,false )) {
599
603
Properties properties = new Properties ();
600
- properties .load (fs .open (new Path (conf .get ("outputDir" ) + "/social_network/updateStream_" + i + "_" + j + "_person.properties" )));
601
- Long auxMin = Long .parseLong (properties .getProperty ("min_write_event_start_time" ));
602
- min = auxMin < min ? auxMin : min ;
603
- Long auxMax = Long .parseLong (properties .getProperty ("max_write_event_start_time" ));
604
- max = auxMax > max ? auxMax : max ;
605
- numEvents += Long .parseLong (properties .getProperty ("num_events" ));
606
-
607
- properties .load (fs .open (new Path (conf .get ("outputDir" ) + "/social_network/updateStream_" + i + "_" + j + "_forum.properties" )));
608
-
609
- auxMin = Long .parseLong (properties .getProperty ("min_write_event_start_time" ));
610
- min = auxMin < min ? auxMin : min ;
611
- auxMax = Long .parseLong (properties .getProperty ("max_write_event_start_time" ));
612
- max = auxMax > max ? auxMax : max ;
613
- numEvents += Long .parseLong (properties .getProperty ("num_events" ));
614
-
604
+ FSDataInputStream file = fs .open (new Path (conf .get ("outputDir" ) + "/social_network/updateStream_" + i + "_" + j + "_person.properties" ));
605
+ properties .load (file );
606
+ if ( properties .getProperty ("min_write_event_start_time" ) != null ) {
607
+ Long auxMin = Long .parseLong (properties .getProperty ("min_write_event_start_time" ));
608
+ min = auxMin < min ? auxMin : min ;
609
+ Long auxMax = Long .parseLong (properties .getProperty ("max_write_event_start_time" ));
610
+ max = auxMax > max ? auxMax : max ;
611
+ numEvents += Long .parseLong (properties .getProperty ("num_events" ));
612
+ }
613
+ file .close ();
614
+ file = fs .open (new Path (conf .get ("outputDir" ) + "/social_network/updateStream_" + i + "_" + j + "_forum.properties" ));
615
+ properties .load (file );
616
+ if ( properties .getProperty ("min_write_event_start_time" ) != null ) {
617
+ Long auxMin = Long .parseLong (properties .getProperty ("min_write_event_start_time" ));
618
+ min = auxMin < min ? auxMin : min ;
619
+ Long auxMax = Long .parseLong (properties .getProperty ("max_write_event_start_time" ));
620
+ max = auxMax > max ? auxMax : max ;
621
+ numEvents += Long .parseLong (properties .getProperty ("num_events" ));
622
+ }
623
+ file .close ();
615
624
fs .delete (new Path (conf .get ("outputDir" ) + "/social_network/updateStream_" + i + "_" + j + "_person.properties" ),true );
616
625
fs .delete (new Path (conf .get ("outputDir" ) + "/social_network/updateStream_" + i + "_" + j + "_forum.properties" ),true );
617
626
}
0 commit comments