7
7
import ldbc .snb .datagen .objects .Person ;
8
8
import ldbc .snb .datagen .serializer .PersonSerializer ;
9
9
import ldbc .snb .datagen .serializer .UpdateEventSerializer ;
10
+ import ldbc .snb .datagen .vocabulary .SN ;
10
11
import org .apache .hadoop .conf .Configuration ;
11
12
import org .apache .hadoop .fs .FileSystem ;
12
13
import org .apache .hadoop .fs .Path ;
25
26
*/
26
27
public class HadoopPersonSerializer {
27
28
28
- public static class HadoopPersonSerializerReducer extends Reducer <TupleKey , Person , LongWritable , Person > {
29
+ public static class HadoopPersonSerializerReducer extends Reducer <BlockKey , Person , LongWritable , Person > {
29
30
30
31
private int reducerId ; /** The id of the reducer.**/
31
32
private PersonSerializer personSerializer_ ; /** The person serializer **/
@@ -47,8 +48,9 @@ protected void setup(Context context) {
47
48
}
48
49
49
50
@ Override
50
- public void reduce (TupleKey key , Iterable <Person > valueSet ,Context context )
51
+ public void reduce (BlockKey key , Iterable <Person > valueSet ,Context context )
51
52
throws IOException , InterruptedException {
53
+ SN .machineId = key .block ;
52
54
personSerializer_ .reset ();
53
55
for ( Person p : valueSet ) {
54
56
if (p .creationDate ()< Dictionaries .dates .getUpdateThreshold () || !DatagenParams .updateStreams ) {
@@ -85,42 +87,40 @@ public void run( String inputFileName ) throws Exception {
85
87
86
88
FileSystem fs = FileSystem .get (conf );
87
89
88
- /* String rankedFileName = conf.get("ldbc.snb.datagen.serializer.hadoopDir") + "/ranked";
90
+ String rankedFileName = conf .get ("ldbc.snb.datagen.serializer.hadoopDir" ) + "/ranked" ;
89
91
HadoopFileRanker hadoopFileRanker = new HadoopFileRanker ( conf , TupleKey .class , Person .class , null );
90
92
hadoopFileRanker .run (inputFileName ,rankedFileName );
91
- */
92
93
93
94
int numThreads = Integer .parseInt (conf .get ("ldbc.snb.datagen.generator.numThreads" ));
94
95
Job job = Job .getInstance (conf , "Person Serializer" );
95
- // job.setMapOutputKeyClass(BlockKey.class);
96
- job .setMapOutputKeyClass (TupleKey .class );
96
+ job .setMapOutputKeyClass (BlockKey .class );
97
+ // job.setMapOutputKeyClass(TupleKey.class);
97
98
job .setMapOutputValueClass (Person .class );
98
99
job .setOutputKeyClass (LongWritable .class );
99
100
job .setOutputValueClass (Person .class );
100
101
job .setJarByClass (HadoopBlockMapper .class );
101
- // job.setMapperClass(HadoopBlockMapper.class);
102
+ job .setMapperClass (HadoopBlockMapper .class );
102
103
job .setReducerClass (HadoopPersonSerializerReducer .class );
103
104
job .setNumReduceTasks (numThreads );
104
105
job .setInputFormatClass (SequenceFileInputFormat .class );
105
106
job .setOutputFormatClass (SequenceFileOutputFormat .class );
106
107
107
108
job .setPartitionerClass (HadoopTuplePartitioner .class );
108
109
109
- /* job.setSortComparatorClass(BlockKeyComparator.class);
110
+ job .setSortComparatorClass (BlockKeyComparator .class );
110
111
job .setGroupingComparatorClass (BlockKeyGroupComparator .class );
111
112
job .setPartitionerClass (HadoopBlockPartitioner .class );
112
- */
113
-
114
- //FileInputFormat.setInputPaths(job, new Path(rankedFileName));
115
- FileInputFormat .setInputPaths (job , new Path (inputFileName ));
113
+
114
+ FileInputFormat .setInputPaths (job , new Path (rankedFileName ));
115
+ //FileInputFormat.setInputPaths(job, new Path(inputFileName));
116
116
FileOutputFormat .setOutputPath (job , new Path (conf .get ("ldbc.snb.datagen.serializer.hadoopDir" )+"/aux" ));
117
117
if (!job .waitForCompletion (true )) {
118
118
throw new Exception ();
119
119
}
120
120
121
121
122
122
try {
123
- // fs.delete(new Path(rankedFileName), true);
123
+ fs .delete (new Path (rankedFileName ), true );
124
124
fs .delete (new Path (conf .get ("ldbc.snb.datagen.serializer.hadoopDir" )+"/aux" ),true );
125
125
} catch (IOException e ) {
126
126
System .err .println (e .getMessage ());
0 commit comments