Skip to content

Commit 47ca1c8

Browse files
author
Hadoop user
committed
2 parents ef9ecd6 + ea8d50f commit 47ca1c8

File tree

3 files changed

+8
-4
lines changed

3 files changed

+8
-4
lines changed

src/main/java/ldbc/snb/datagen/hadoop/HadoopPersonSerializer.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,13 @@
2626
*/
2727
public class HadoopPersonSerializer {
2828

29-
// public static class HadoopPersonSerializerReducer extends Reducer<BlockKey, Person, LongWritable, Person> {
3029
public static class HadoopPersonSerializerReducer extends Reducer<TupleKey, Person, LongWritable, Person> {
3130

3231
private int reducerId; /** The id of the reducer.**/
3332
private PersonSerializer personSerializer_; /** The person serializer **/
34-
private UpdateEventSerializer updateSerializer_;
35-
33+
private UpdateEventSerializer updateSerializer_;
34+
35+
@Override
3636
protected void setup(Context context) {
3737
Configuration conf = context.getConfiguration();
3838
reducerId = context.getTaskAttemptID().getTaskID().getId();
@@ -49,7 +49,6 @@ protected void setup(Context context) {
4949
}
5050

5151
@Override
52-
// public void reduce(BlockKey key, Iterable<Person> valueSet,Context context)
5352
public void reduce(TupleKey key, Iterable<Person> valueSet,Context context)
5453
throws IOException, InterruptedException {
5554
// SN.machineId = key.block;
@@ -70,6 +69,8 @@ public void reduce(TupleKey key, Iterable<Person> valueSet,Context context)
7069
}
7170

7271
}
72+
73+
@Override
7374
protected void cleanup(Context context){
7475
personSerializer_.close();
7576
if (DatagenParams.updateStreams) {

src/main/java/ldbc/snb/datagen/hadoop/HadoopPersonSortAndSerializer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public static class HadoopPersonSerializerReducer extends Reducer<BlockKey, Per
3232
private PersonSerializer personSerializer_; /** The person serializer **/
3333
private UpdateEventSerializer updateSerializer_;
3434

35+
@Override
3536
protected void setup(Context context) {
3637
Configuration conf = context.getConfiguration();
3738
reducerId = context.getTaskAttemptID().getTaskID().getId();
@@ -68,6 +69,7 @@ public void reduce(BlockKey key, Iterable<Person> valueSet,Context context)
6869
}
6970

7071
}
72+
@Override
7173
protected void cleanup(Context context){
7274
personSerializer_.close();
7375
if (DatagenParams.updateStreams) {

src/main/java/ldbc/snb/datagen/serializer/HDFSWriter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ public void writeAllPartitions( String entry ) {
7575
public void close() {
7676
try {
7777
for (int i = 0; i < numPartitions; ++i) {
78+
fileOutputStream[i].flush();
7879
fileOutputStream[i].close();
7980
}
8081
} catch (IOException e) {

0 commit comments

Comments
 (0)