Skip to content

Commit 6ca74f3

Browse files
committed
Fixed critial bug when more than one updatePartitions were set
1 parent 2cfd261 commit 6ca74f3

File tree

3 files changed

+31
-2
lines changed

3 files changed

+31
-2
lines changed

src/main/java/ldbc/snb/datagen/hadoop/HadoopUpdateStreamSorterAndSerializer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@ public void reduce(UpdateEventKey key, Iterable<Text> valueSet,Context context)
5555
Path outFile = new Path(context.getConfiguration().get("ldbc.snb.datagen.serializer.socialNetworkDir")+"/updateStream_"+key.reducerId+"_"+key.partition+"_"+streamType+".csv");
5656
out = fs.create(outFile);
5757
}
58+
int counter = 0;
5859
for( Text t : valueSet ) {
60+
counter++;
5961
out.write(t.toString().getBytes("UTF8"));
6062
}
6163
out.close();
@@ -95,6 +97,7 @@ public void run(List<String> inputFileNames, String type ) throws Exception {
9597
job.setOutputFormatClass(SequenceFileOutputFormat.class);
9698
job.setPartitionerClass(HadoopUpdateEventKeyPartitioner.class);
9799
job.setGroupingComparatorClass(UpdateEventKeyGroupComparator.class);
100+
//job.setSortComparatorClass(UpdateEventKeySortComparator.class);
98101

99102
for(String s : inputFileNames) {
100103
FileInputFormat.addInputPath(job, new Path(s));

src/main/java/ldbc/snb/datagen/hadoop/UpdateEventKey.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,10 @@ public void readFields(DataInput in) throws IOException {
4646
}
4747

4848
public int compareTo( UpdateEventKey key) {
49-
if( date < key.date) return -1;
50-
if( date > key.date) return 1;
5149
if (reducerId != key.reducerId) return reducerId - key.reducerId;
5250
if (partition != key.partition) return partition - key.partition;
51+
if( date < key.date) return -1;
52+
if( date > key.date) return 1;
5353
return 0;
5454
}
5555
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package ldbc.snb.datagen.hadoop;
2+
3+
import org.apache.hadoop.io.WritableComparable;
4+
import org.apache.hadoop.io.WritableComparator;
5+
6+
/**
7+
* Created by aprat on 11/17/14.
8+
*/
9+
10+
public class UpdateEventKeySortComparator extends WritableComparator {
11+
12+
protected UpdateEventKeySortComparator() {
13+
super(UpdateEventKey.class,true);
14+
}
15+
16+
@Override
17+
public int compare(WritableComparable a, WritableComparable b) {
18+
UpdateEventKey keyA = (UpdateEventKey)a;
19+
UpdateEventKey keyB = (UpdateEventKey)b;
20+
if (keyA.reducerId != keyB.reducerId) return keyA.reducerId - keyB.reducerId;
21+
if (keyA.partition != keyB.partition) return keyA.partition - keyB.partition;
22+
if( keyA.date < keyB.date) return -1;
23+
if( keyA.date > keyB.date) return 1;
24+
return 0;
25+
}
26+
}

0 commit comments

Comments
 (0)