@@ -147,7 +147,7 @@ protected void cleanup(Context context){
147
147
}
148
148
}
149
149
150
- public static class GenerateUsersMapper extends Mapper <LongWritable , Text , LongWritable , ReducedUserProfile > {
150
+ public static class GenerateUsersMapper extends Mapper <LongWritable , Text , TupleKey , ReducedUserProfile > {
151
151
152
152
private int fileIdx ;
153
153
@@ -168,33 +168,32 @@ public void map(LongWritable key, Text value, Context context)
168
168
169
169
}
170
170
171
- public static class DimensionReducer extends Reducer <ComposedKey , ReducedUserProfile , LongWritable , ReducedUserProfile >{
171
+ public static class DimensionReducer extends Reducer <ComposedKey , ReducedUserProfile , TupleKey , ReducedUserProfile >{
172
172
173
173
public static ScalableGenerator friendGenerator ;
174
174
private int attempTaskId ;
175
175
private int dimension ;
176
176
private int pass ;
177
+ private int numCalls = 0 ;
177
178
178
179
@ Override
179
180
protected void setup (Context context ){
180
181
Configuration conf = context .getConfiguration ();
181
182
dimension = Integer .parseInt (conf .get ("dimension" ));
182
183
pass = Integer .parseInt (conf .get ("pass" ));
183
-
184
- String strTaskId = context .getTaskAttemptID ().getTaskID ().toString ();
185
- attempTaskId = Integer .parseInt (strTaskId .substring (strTaskId .length () - 3 ));
184
+ attempTaskId = context .getTaskAttemptID ().getTaskID ().getId ();
186
185
friendGenerator = new ScalableGenerator (attempTaskId , conf );
187
186
friendGenerator .init ();
188
187
}
189
188
190
189
@ Override
191
190
public void reduce (ComposedKey key , Iterable <ReducedUserProfile > valueSet ,
192
191
Context context ) throws IOException , InterruptedException {
192
+ numCalls ++;
193
193
friendGenerator .resetState ((int )key .block );
194
194
int counter = 0 ;
195
195
System .out .println ("Start University group: " +key .block );
196
196
for (ReducedUserProfile user :valueSet ){
197
- // System.out.println(user.getAccountId());
198
197
friendGenerator .pushUserProfile (user , pass , dimension , context );
199
198
counter ++;
200
199
}
@@ -207,6 +206,7 @@ protected void cleanup(Context context){
207
206
System .out .println ("Number of user profile read " + friendGenerator .totalNumUserProfilesRead );
208
207
System .out .println ("Number of exact user profile out " + friendGenerator .exactOutput );
209
208
System .out .println ("Number of exact friend added " + friendGenerator .friendshipNum );
209
+ System .out .println ("Number of reducer calls " +numCalls );
210
210
}
211
211
}
212
212
@@ -220,7 +220,7 @@ public static class UserActivityReducer extends Reducer <ComposedKey, ReducedUse
220
220
protected void setup (Context context ){
221
221
Configuration conf = context .getConfiguration ();
222
222
String strTaskId = context .getTaskAttemptID ().getTaskID ().toString ();
223
- attempTaskId = Integer . parseInt ( strTaskId . substring ( strTaskId . length () - 3 ) );
223
+ attempTaskId = context . getTaskAttemptID (). getTaskID (). getId ( );
224
224
friendGenerator = new ScalableGenerator (attempTaskId , conf );
225
225
friendGenerator .init ();
226
226
friendGenerator .openSerializer ();
@@ -313,9 +313,9 @@ public int runGenerateJob(Configuration conf) throws Exception {
313
313
printProgress ("Starting: Person generation" );
314
314
conf .set ("pass" ,Integer .toString (0 ));
315
315
Job job = new Job (conf ,"SIB Generate Users & 1st Dimension" );
316
- job .setMapOutputKeyClass (LongWritable .class );
316
+ job .setMapOutputKeyClass (TupleKey .class );
317
317
job .setMapOutputValueClass (ReducedUserProfile .class );
318
- job .setOutputKeyClass (LongWritable .class );
318
+ job .setOutputKeyClass (TupleKey .class );
319
319
job .setOutputValueClass (ReducedUserProfile .class );
320
320
job .setJarByClass (GenerateUsersMapper .class );
321
321
job .setMapperClass (GenerateUsersMapper .class );
@@ -329,7 +329,7 @@ public int runGenerateJob(Configuration conf) throws Exception {
329
329
330
330
/// --------------- Sorting by first dimension ----------------
331
331
printProgress ("Starting: Sorting by first dimension" );
332
- HadoopFileRanker fileRanker = new HadoopFileRanker (conf ,LongWritable .class ,ReducedUserProfile .class );
332
+ HadoopFileRanker fileRanker = new HadoopFileRanker (conf ,TupleKey .class ,ReducedUserProfile .class );
333
333
fileRanker .run (hadoopDir +"/sib" ,hadoopDir +"/sibSorting" );
334
334
fs .delete (new Path (hadoopDir + "/sib" ),true );
335
335
@@ -340,7 +340,7 @@ public int runGenerateJob(Configuration conf) throws Exception {
340
340
job = new Job (conf ,"SIB Generate Friendship - Interest" );
341
341
job .setMapOutputKeyClass (ComposedKey .class );
342
342
job .setMapOutputValueClass (ReducedUserProfile .class );
343
- job .setOutputKeyClass (LongWritable .class );
343
+ job .setOutputKeyClass (TupleKey .class );
344
344
job .setOutputValueClass (ReducedUserProfile .class );
345
345
job .setJarByClass (HadoopBlockMapper .class );
346
346
job .setMapperClass (HadoopBlockMapper .class );
@@ -359,7 +359,7 @@ public int runGenerateJob(Configuration conf) throws Exception {
359
359
360
360
/// --------------- Sorting phase 2 ----------------
361
361
printProgress ("Starting: Sorting by second dimension" );
362
- fileRanker = new HadoopFileRanker (conf ,LongWritable .class ,ReducedUserProfile .class );
362
+ fileRanker = new HadoopFileRanker (conf ,TupleKey .class ,ReducedUserProfile .class );
363
363
fileRanker .run (hadoopDir +"/sib2" ,hadoopDir +"/sibSorting2" );
364
364
fs .delete (new Path (hadoopDir + "/sib2" ),true );
365
365
@@ -370,7 +370,7 @@ public int runGenerateJob(Configuration conf) throws Exception {
370
370
job = new Job (conf ,"SIB Generate Friendship - Interest" );
371
371
job .setMapOutputKeyClass (ComposedKey .class );
372
372
job .setMapOutputValueClass (ReducedUserProfile .class );
373
- job .setOutputKeyClass (LongWritable .class );
373
+ job .setOutputKeyClass (TupleKey .class );
374
374
job .setOutputValueClass (ReducedUserProfile .class );
375
375
job .setJarByClass (HadoopBlockMapper .class );
376
376
job .setMapperClass (HadoopBlockMapper .class );
@@ -388,7 +388,7 @@ public int runGenerateJob(Configuration conf) throws Exception {
388
388
389
389
/// --------------- Sorting phase 3--------------
390
390
printProgress ("Starting: Sorting by third dimension" );
391
- fileRanker = new HadoopFileRanker (conf ,LongWritable .class ,ReducedUserProfile .class );
391
+ fileRanker = new HadoopFileRanker (conf ,TupleKey .class ,ReducedUserProfile .class );
392
392
fileRanker .run (hadoopDir +"/sib3" ,hadoopDir +"/sibSorting3" );
393
393
fs .delete (new Path (hadoopDir + "/sib3" ),true );
394
394
@@ -399,7 +399,7 @@ public int runGenerateJob(Configuration conf) throws Exception {
399
399
job = new Job (conf ,"SIB Generate Friendship - Random" );
400
400
job .setMapOutputKeyClass (ComposedKey .class );
401
401
job .setMapOutputValueClass (ReducedUserProfile .class );
402
- job .setOutputKeyClass (LongWritable .class );
402
+ job .setOutputKeyClass (TupleKey .class );
403
403
job .setOutputValueClass (ReducedUserProfile .class );
404
404
job .setJarByClass (HadoopBlockMapper .class );
405
405
job .setMapperClass (HadoopBlockMapper .class );
@@ -418,7 +418,7 @@ public int runGenerateJob(Configuration conf) throws Exception {
418
418
/// --------------- Sorting phase 3--------------
419
419
420
420
printProgress ("Starting: Sorting by third dimension (for activity generation)" );
421
- fileRanker = new HadoopFileRanker (conf ,LongWritable .class ,ReducedUserProfile .class );
421
+ fileRanker = new HadoopFileRanker (conf ,TupleKey .class ,ReducedUserProfile .class );
422
422
fileRanker .run (hadoopDir +"/sib4" ,hadoopDir +"/sibSorting4" );
423
423
fs .delete (new Path (hadoopDir + "/sib4" ),true );
424
424
@@ -429,7 +429,7 @@ public int runGenerateJob(Configuration conf) throws Exception {
429
429
job = new Job (conf ,"Generate user activity" );
430
430
job .setMapOutputKeyClass (ComposedKey .class );
431
431
job .setMapOutputValueClass (ReducedUserProfile .class );
432
- job .setOutputKeyClass (LongWritable .class );
432
+ job .setOutputKeyClass (TupleKey .class );
433
433
job .setOutputValueClass (ReducedUserProfile .class );
434
434
job .setJarByClass (HadoopBlockMapper .class );
435
435
job .setMapperClass (HadoopBlockMapper .class );
@@ -543,7 +543,7 @@ public int runGenerateJob(Configuration conf) throws Exception {
543
543
Job job6 = new Job (conf ,"Dump the friends lists" );
544
544
job6 .setMapOutputKeyClass (ComposedKey .class );
545
545
job6 .setMapOutputValueClass (ReducedUserProfile .class );
546
- job6 .setOutputKeyClass (LongWritable .class );
546
+ job6 .setOutputKeyClass (ComposedKey .class );
547
547
job6 .setOutputValueClass (ReducedUserProfile .class );
548
548
job6 .setJarByClass (HadoopBlockMapper .class );
549
549
job6 .setMapperClass (HadoopBlockMapper .class );
0 commit comments