@@ -98,17 +98,18 @@ protected void setup(Context context){
98
98
FileSystem fs = FileSystem .get (conf );
99
99
String strTaskId = context .getTaskAttemptID ().getTaskID ().toString ();
100
100
int attempTaskId = Integer .parseInt (strTaskId .substring (strTaskId .length () - 3 ));
101
+ int reducerId = conf .getInt ("reducerId" ,0 );
101
102
int partitionId = conf .getInt ("partitionId" ,0 );
102
103
String streamType = conf .get ("streamType" );
103
104
if ( Boolean .parseBoolean (conf .get ("compressed" )) == true ) {
104
- Path outFile = new Path (context .getConfiguration ().get ("outputDir" )+"/social_network/updateStream_" +attempTaskId +"_" +partitionId +"_" +streamType +".csv.gz" );
105
+ Path outFile = new Path (context .getConfiguration ().get ("outputDir" )+"/social_network/updateStream_" +reducerId +"_" +partitionId +"_" +streamType +".csv.gz" );
105
106
out = new GZIPOutputStream ( fs .create (outFile ));
106
107
} else {
107
- Path outFile = new Path (context .getConfiguration ().get ("outputDir" )+"/social_network/updateStream_" +attempTaskId +"_" +partitionId +"_" +streamType +".csv" );
108
+ Path outFile = new Path (context .getConfiguration ().get ("outputDir" )+"/social_network/updateStream_" +reducerId +"_" +partitionId +"_" +streamType +".csv" );
108
109
out = fs .create (outFile );
109
110
}
110
111
if (conf .getBoolean ("updateStreams" ,false )) {
111
- properties = fs .create (new Path (context .getConfiguration ().get ("outputDir" )+"/social_network/updateStream_" +attempTaskId +"_" +partitionId +"_" +streamType +".properties" ));
112
+ properties = fs .create (new Path (context .getConfiguration ().get ("outputDir" )+"/social_network/updateStream_" +reducerId +"_" +partitionId +"_" +streamType +".properties" ));
112
113
}
113
114
} catch (IOException e ) {
114
115
e .printStackTrace ();
@@ -552,6 +553,7 @@ public int runGenerateJob(Configuration conf) throws Exception {
552
553
for ( int j = 0 ; j < numPartitions ; ++j ) {
553
554
/// --------------- Fifth job: Sort update streams ----------------
554
555
conf .setInt ("mapred.line.input.format.linespermap" , 1000000 );
556
+ conf .setInt ("reducerId" ,i );
555
557
conf .setInt ("partitionId" ,j );
556
558
conf .set ("streamType" ,"forum" );
557
559
Job jobForum = new Job (conf , "Soring update streams " +j +" of reducer " +i );
@@ -574,6 +576,7 @@ public int runGenerateJob(Configuration conf) throws Exception {
574
576
fs .delete (new Path (hadoopDir + "/sibEnd" ), true );
575
577
576
578
conf .setInt ("mapred.line.input.format.linespermap" , 1000000 );
579
+ conf .setInt ("reducerId" ,i );
577
580
conf .setInt ("partitionId" ,j );
578
581
conf .set ("streamType" ,"person" );
579
582
Job jobPerson = new Job (conf , "Soring update streams " +j +" of reducer " +i );
0 commit comments