Skip to content

Commit 6ba48f7

Browse files
committed
Disabled update stream sorting when update stream generationis not enabled.
Improved ranking
1 parent 5f00d48 commit 6ba48f7

File tree

9 files changed

+560
-312
lines changed

9 files changed

+560
-312
lines changed

src/main/java/ldbc/socialnet/dbgen/generator/MRGenerateUsers.java

Lines changed: 221 additions & 302 deletions
Large diffs are not rendered by default.

src/main/java/ldbc/socialnet/dbgen/generator/MRWriter.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import ldbc.socialnet.dbgen.util.MapReduceKey;
4444

4545
import org.apache.hadoop.io.IntWritable;
46+
import org.apache.hadoop.io.LongWritable;
4647
import org.apache.hadoop.mapreduce.Reducer;
4748

4849

@@ -65,11 +66,12 @@ public void writeReducedUserProfiles(int from, int to, int pass,
6566
try {
6667
to = to % windowSize;
6768
for (int i = from; i != to; i = (i+1)%windowSize) {
68-
int key = userProfiles[i].getDicElementId(pass);
69+
/* int key = userProfiles[i].getDicElementId(pass);
6970
int block = 0;
7071
long id = userProfiles[i].getAccountId();
7172
MapReduceKey mpk = new MapReduceKey(block,key,id);
72-
context.write(mpk, userProfiles[i]);
73+
context.write(mpk, userProfiles[i]);*/
74+
context.write(new LongWritable(userProfiles[i].getDicElementId(pass)), userProfiles[i]);
7375
numberSerializedObject++;
7476
}
7577
}

src/main/java/ldbc/socialnet/dbgen/generator/ScalableGenerator.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import ldbc.socialnet.dbgen.objects.*;
5959
import ldbc.socialnet.dbgen.serializer.*;
6060
import ldbc.socialnet.dbgen.storage.StorageManager;
61+
import ldbc.socialnet.dbgen.util.ComposedKey;
6162
import ldbc.socialnet.dbgen.util.ScaleFactor;
6263
import ldbc.socialnet.dbgen.vocabulary.SN;
6364
import ldbc.socialnet.dbgen.util.MapReduceKey;
@@ -66,6 +67,7 @@
6667
import org.apache.hadoop.conf.Configuration;
6768
import org.apache.hadoop.fs.FileSystem;
6869
import org.apache.hadoop.fs.Path;
70+
import org.apache.hadoop.io.LongWritable;
6971
import org.apache.hadoop.mapreduce.Mapper.Context;
7072
import org.apache.hadoop.mapreduce.Reducer;
7173

@@ -781,7 +783,7 @@ public void closeSerializer() {
781783
System.out.println("Writing the data for test driver ");
782784
}
783785

784-
public void generateUserActivity( ReducedUserProfile userProfile, Reducer<MapReduceKey, ReducedUserProfile,MapReduceKey, ReducedUserProfile>.Context context) {
786+
public void generateUserActivity( ReducedUserProfile userProfile, Reducer<ComposedKey, ReducedUserProfile,LongWritable, ReducedUserProfile>.Context context) {
785787
int index = numUserProfilesRead%windowSize;
786788
numUserProfilesRead++;
787789
reducedUserProfiles[index] = userProfile;
@@ -829,7 +831,7 @@ public void resetWindow() {
829831
mrCurCellPost = 0;
830832
}
831833

832-
public void pushUserProfile(ReducedUserProfile reduceUser, int pass, int outputDimension, Reducer<MapReduceKey, ReducedUserProfile,MapReduceKey, ReducedUserProfile>.Context context){
834+
public void pushUserProfile(ReducedUserProfile reduceUser, int pass, int outputDimension, Reducer<ComposedKey, ReducedUserProfile,LongWritable, ReducedUserProfile>.Context context){
833835
ReducedUserProfile userObject = new ReducedUserProfile();
834836
userObject.copyFields(reduceUser);
835837
totalNumUserProfilesRead++;
@@ -851,7 +853,7 @@ public void pushUserProfile(ReducedUserProfile reduceUser, int pass, int outputD
851853
}
852854
}
853855

854-
public void pushAllRemainingUser(int pass, int outputDimension, Reducer<MapReduceKey, ReducedUserProfile,MapReduceKey, ReducedUserProfile>.Context context){
856+
public void pushAllRemainingUser(int pass, int outputDimension, Reducer<ComposedKey, ReducedUserProfile,LongWritable, ReducedUserProfile>.Context context){
855857

856858
// For each remianing cell in the window, we create the edges.
857859
for (int numLeftCell = Math.min(numberOfCellPerWindow, numUserProfilesRead/cellSize); numLeftCell > 0; --numLeftCell, ++mrCurCellPost) {
@@ -910,11 +912,7 @@ public void mrGenerateUserInfo(int pass, Context context){
910912
ReducedUserProfile reduceUserProf = generateGeneralInformation(j);
911913
++numUsersToGenerate;
912914
try {
913-
int block = 0; // The mapreduce group this university will be assigned.
914-
int key = reduceUserProf.getDicElementId(pass); // The key used to sort within the block.
915-
long id = reduceUserProf.getAccountId(); // The id used to sort within the key, to guarantee determinism.
916-
MapReduceKey mpk = new MapReduceKey( block, key, id );
917-
context.write(mpk, reduceUserProf);
915+
context.write(new LongWritable(reduceUserProf.getDicElementId(pass)), reduceUserProf);
918916
} catch (IOException e) {
919917
e.printStackTrace();
920918
} catch (InterruptedException e) {
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
2+
package ldbc.socialnet.dbgen.util;
3+
4+
import org.apache.hadoop.io.WritableComparable;
5+
6+
import java.io.DataInput;
7+
import java.io.DataOutput;
8+
import java.io.IOException;
9+
10+
/**
11+
* Created by aprat on 11/9/14.
12+
*/
13+
14+
public class ComposedKey implements WritableComparable<ComposedKey> {
15+
public long block;
16+
public long key;
17+
18+
public ComposedKey( ) {
19+
}
20+
21+
public ComposedKey(ComposedKey cK) {
22+
this.block = cK.block;
23+
this.key = cK.key;
24+
}
25+
26+
public ComposedKey( long block, long key) {
27+
this.block = block;
28+
this.key = key;
29+
}
30+
31+
@Override
32+
public void write(DataOutput out) throws IOException {
33+
out.writeLong(block);
34+
out.writeLong(key);
35+
}
36+
37+
@Override
38+
public void readFields(DataInput in) throws IOException {
39+
block = in.readLong();
40+
key = in.readLong();
41+
}
42+
43+
@Override
44+
public int compareTo( ComposedKey mpk) {
45+
if (block < mpk.block) return -1;
46+
if (block > mpk.block) return 1;
47+
return 0;
48+
}
49+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package ldbc.socialnet.dbgen.util;
2+
3+
import org.apache.hadoop.io.WritableComparable;
4+
import org.apache.hadoop.io.WritableComparator;
5+
6+
/**
7+
* Created by aprat on 11/9/14.
8+
*/
9+
public class ComposedKeyComparator extends WritableComparator {
10+
11+
protected ComposedKeyComparator() {
12+
super(ComposedKey.class,true);
13+
}
14+
15+
@Override
16+
public int compare(WritableComparable a, WritableComparable b) {
17+
//return a.compareTo(b);
18+
ComposedKey keyA = (ComposedKey)a;
19+
ComposedKey keyB = (ComposedKey)b;
20+
if (keyA.block < keyB.block) return -1;
21+
if (keyA.block > keyB.block) return 1;
22+
if (keyA.key < keyB.key) return -1;
23+
if (keyA.key > keyB.key) return 1;
24+
return 0;
25+
}
26+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package ldbc.socialnet.dbgen.util;
2+
3+
import org.apache.hadoop.io.WritableComparable;
4+
import org.apache.hadoop.io.WritableComparator;
5+
6+
/**
7+
* Created by aprat on 11/17/14.
8+
*/
9+
10+
public class ComposedKeyGroupComparator extends WritableComparator {
11+
12+
protected ComposedKeyGroupComparator() {
13+
super(ComposedKey.class,true);
14+
}
15+
16+
@Override
17+
public int compare(WritableComparable a, WritableComparable b) {
18+
//return a.compareTo(b);
19+
ComposedKey keyA = (ComposedKey)a;
20+
ComposedKey keyB = (ComposedKey)b;
21+
if (keyA.block < keyB.block) return -1;
22+
if (keyA.block > keyB.block) return 1;
23+
return 0;
24+
}
25+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package ldbc.socialnet.dbgen.util;
2+
3+
import ldbc.socialnet.dbgen.objects.ReducedUserProfile;
4+
import org.apache.hadoop.conf.Configuration;
5+
import org.apache.hadoop.io.LongWritable;
6+
import org.apache.hadoop.mapreduce.Mapper;
7+
8+
import java.io.IOException;
9+
10+
/**
11+
* Created by aprat on 11/17/14.
12+
*/
13+
public class HadoopBlockMapper extends Mapper<LongWritable, ReducedUserProfile, ComposedKey, ReducedUserProfile> {
14+
int mapId;
15+
16+
@Override
17+
public void setup(Mapper.Context context) {
18+
Configuration conf = context.getConfiguration();
19+
mapId = context.getTaskAttemptID().getId();
20+
}
21+
22+
@Override
23+
public void map(LongWritable key, ReducedUserProfile value, Mapper.Context context)
24+
throws IOException, InterruptedException {
25+
context.write(new ComposedKey(key.get() / 10000, key.get()), value);
26+
}
27+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package ldbc.socialnet.dbgen.util;
2+
3+
import ldbc.socialnet.dbgen.objects.ReducedUserProfile;
4+
import org.apache.hadoop.mapreduce.Partitioner;
5+
6+
/**
7+
* Created by aprat on 11/17/14.
8+
*/
9+
public class HadoopBlockPartitioner extends Partitioner<ComposedKey, ReducedUserProfile> {
10+
11+
public HadoopBlockPartitioner() {
12+
super();
13+
}
14+
15+
@Override
16+
public int getPartition(ComposedKey key, ReducedUserProfile person, int numReduceTasks) {
17+
return (int)(key.block % numReduceTasks);
18+
}
19+
}

0 commit comments

Comments
 (0)