Skip to content

Commit 6733a73

Browse files
committed
Implemented keychanger into ranking
1 parent f46c7d7 commit 6733a73

File tree

6 files changed

+57
-11
lines changed

6 files changed

+57
-11
lines changed

src/main/java/ldbc/snb/datagen/generator/LDBCDatagen.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@ public static void main(String[] args) /*throws Exception*/ {
278278
}catch(Exception e ) {
279279
System.err.println("Error during execution");
280280
System.err.println(e.getMessage());
281+
e.printStackTrace();
281282
System.exit(1);
282283
}
283284
}

src/main/java/ldbc/snb/datagen/hadoop/HadoopFileRanker.java

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import org.apache.hadoop.fs.Path;
77
import org.apache.hadoop.io.LongWritable;
88
import org.apache.hadoop.mapreduce.Job;
9+
import org.apache.hadoop.mapreduce.Mapper;
910
import org.apache.hadoop.mapreduce.Partitioner;
1011
import org.apache.hadoop.mapreduce.Reducer;
1112
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -23,19 +24,53 @@ public class HadoopFileRanker {
2324
private Configuration conf;
2425
private Class<?> K;
2526
private Class<?> V;
27+
private String keySetterName;
2628

2729
/**
2830
*
2931
* @param conf The configuration object.
3032
* @param K The Key class of the hadoop sequence file.
3133
* @param V The Value class of the hadoop sequence file.
3234
*/
33-
public HadoopFileRanker( Configuration conf, Class<?> K, Class<?> V) {
35+
public HadoopFileRanker( Configuration conf, Class<?> K, Class<?> V, String keySetter ) {
3436
this.conf = new Configuration(conf);
37+
this.keySetterName = keySetter;
3538
this.K = K;
3639
this.V = V;
3740
}
3841

42+
public static class HadoopFileRankerSortMapper<K, V> extends Mapper<K, V, K, V> {
43+
44+
HadoopFileKeyChanger.KeySetter<TupleKey> keySetter;
45+
private long counter = 0; /** Counter of the number of elements received by this reducer.*/
46+
47+
@Override
48+
public void setup( Context context ) {
49+
50+
try {
51+
LDBCDatagen.init(context.getConfiguration());
52+
String className = context.getConfiguration().get("keySetterClassName");
53+
keySetter = (HadoopFileKeyChanger.KeySetter) Class.forName(className).newInstance();
54+
} catch(ClassNotFoundException e) {
55+
System.out.print(e.getMessage());
56+
} catch(IllegalAccessException e) {
57+
System.out.print(e.getMessage());
58+
} catch(InstantiationException e) {
59+
System.out.print(e.getMessage());
60+
}
61+
}
62+
63+
@Override
64+
public void map(K key, V value,
65+
Context context) throws IOException, InterruptedException {
66+
context.write((K)keySetter.getKey(value), value);
67+
}
68+
69+
@Override
70+
public void cleanup(Context context) {
71+
}
72+
}
73+
3974
public static class HadoopFileRankerSortReducer<K, V, T extends BlockKey> extends Reducer<K, V, BlockKey, V> {
4075

4176
private int reducerId; /** The id of the reducer.**/
@@ -136,12 +171,19 @@ public void reduce(BlockKey key, Iterable<V> valueSet,
136171
public void run( String inputFileName, String outputFileName ) throws Exception {
137172
int numThreads = conf.getInt("ldbc.snb.datagen.generator.numThreads",1);
138173

174+
if( keySetterName != null ) {
175+
conf.set("keySetterClassName", keySetterName);
176+
}
177+
139178
/** First Job to sort the key-value pairs and to count the number of elements processed by each reducer.**/
140179
Job jobSort = Job.getInstance(conf, "Sorting "+inputFileName);
141180

142181
FileInputFormat.setInputPaths(jobSort, new Path(inputFileName));
143-
FileOutputFormat.setOutputPath(jobSort, new Path(conf.get("ldbc.snb.datagen.serializer.hadoopDir")+"/rankIntermediate"));
182+
FileOutputFormat.setOutputPath(jobSort, new Path(conf.get("ldbc.snb.datagen.serializer.hadoopDir") + "/rankIntermediate"));
144183

184+
if( keySetterName != null ) {
185+
jobSort.setMapperClass(HadoopFileRankerSortMapper.class);
186+
}
145187
jobSort.setMapOutputKeyClass(K);
146188
jobSort.setMapOutputValueClass(V);
147189
jobSort.setOutputKeyClass(BlockKey.class);

src/main/java/ldbc/snb/datagen/hadoop/HadoopKnowsGenerator.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.apache.hadoop.fs.FileSystem;
1010
import org.apache.hadoop.fs.Path;
1111
import org.apache.hadoop.mapreduce.Job;
12+
import org.apache.hadoop.mapreduce.Mapper;
1213
import org.apache.hadoop.mapreduce.Reducer;
1314
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
1415
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
@@ -86,7 +87,7 @@ public void run( String inputFileName, String outputFileName ) throws Exception
8687

8788
FileSystem fs = FileSystem.get(conf);
8889

89-
String keyChangedFileName = inputFileName;
90+
/*String keyChangedFileName = inputFileName;
9091
if(preKeySetterName != null) {
9192
System.out.println("Changing key of persons");
9293
long start = System.currentTimeMillis();
@@ -95,18 +96,20 @@ public void run( String inputFileName, String outputFileName ) throws Exception
9596
keyChanger.run(inputFileName, keyChangedFileName);
9697
System.out.println("... Time to change keys: "+ (System.currentTimeMillis() - start)+" ms");
9798
}
99+
*/
98100

99101
System.out.println("Ranking persons");
100102
long start = System.currentTimeMillis();
101103
String rankedFileName = conf.get("ldbc.snb.datagen.serializer.hadoopDir") + "/ranked";
102-
HadoopFileRanker hadoopFileRanker = new HadoopFileRanker( conf, TupleKey.class, Person.class );
103-
hadoopFileRanker.run(keyChangedFileName,rankedFileName);
104-
if(preKeySetterName != null ) {
104+
HadoopFileRanker hadoopFileRanker = new HadoopFileRanker( conf, TupleKey.class, Person.class , preKeySetterName);
105+
hadoopFileRanker.run(inputFileName,rankedFileName);
106+
/* if(preKeySetterName != null ) {
105107
fs.delete(new Path(keyChangedFileName), true);
106108
}
107-
System.out.println("... Time to rank persons: "+ (System.currentTimeMillis() - start)+" ms");
109+
*/
110+
System.out.println("... Time to rank persons: " + (System.currentTimeMillis() - start) + " ms");
108111

109-
conf.setInt("stepIndex",step_index);
112+
conf.setInt("stepIndex", step_index);
110113
int index = 0;
111114
for( float p : percentages ) {
112115
conf.setFloat("percentage"+index, p);

src/main/java/ldbc/snb/datagen/hadoop/HadoopMergeFriendshipFiles.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public void reduce(TupleKey key, Iterable<Person> valueSet,Context context)
6262

6363
public HadoopMergeFriendshipFiles( Configuration conf, String postKeySetterName ) {
6464

65-
this.conf = conf;
65+
this.conf = new Configuration(conf);
6666
this.postKeySetterName = postKeySetterName;
6767
}
6868

src/main/java/ldbc/snb/datagen/hadoop/HadoopPersonActivityGenerator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ public void run( String inputFileName ) throws Exception {
116116
FileSystem fs = FileSystem.get(conf);
117117

118118
String rankedFileName = conf.get("ldbc.snb.datagen.serializer.hadoopDir") + "/ranked";
119-
HadoopFileRanker hadoopFileRanker = new HadoopFileRanker( conf, TupleKey.class, Person.class );
119+
HadoopFileRanker hadoopFileRanker = new HadoopFileRanker( conf, TupleKey.class, Person.class, null );
120120
hadoopFileRanker.run(inputFileName,rankedFileName);
121121

122122
int numThreads = Integer.parseInt(conf.get("ldbc.snb.datagen.generator.numThreads"));

src/main/java/ldbc/snb/datagen/hadoop/HadoopPersonSerializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public void run( String inputFileName ) throws Exception {
8686
FileSystem fs = FileSystem.get(conf);
8787

8888
String rankedFileName = conf.get("ldbc.snb.datagen.serializer.hadoopDir") + "/ranked";
89-
HadoopFileRanker hadoopFileRanker = new HadoopFileRanker( conf, TupleKey.class, Person.class );
89+
HadoopFileRanker hadoopFileRanker = new HadoopFileRanker( conf, TupleKey.class, Person.class, null );
9090
hadoopFileRanker.run(inputFileName,rankedFileName);
9191

9292
int numThreads = Integer.parseInt(conf.get("ldbc.snb.datagen.generator.numThreads"));

0 commit comments

Comments
 (0)