Skip to content

Commit 2c65612

Browse files
committed
Fixed a bug with update stream serialization where different reducers where writting to the same files
1 parent 601c0ed commit 2c65612

File tree

1 file changed

+2
-2
lines changed

1 file changed

+2
-2
lines changed

src/main/java/ldbc/snb/datagen/hadoop/HadoopUpdateStreamSorterAndSerializer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,10 @@ public void reduce(UpdateEventKey key, Iterable<Text> valueSet,Context context)
5252
try {
5353
FileSystem fs = FileSystem.get(conf);
5454
if( compressed ) {
55-
Path outFile = new Path(context.getConfiguration().get("ldbc.snb.datagen.serializer.socialNetworkDir")+"/updateStream_"+reducerId+"_"+key.partition+"_"+streamType+".csv.gz");
55+
Path outFile = new Path(context.getConfiguration().get("ldbc.snb.datagen.serializer.socialNetworkDir")+"/updateStream_"+key.reducerId+"_"+key.partition+"_"+streamType+".csv.gz");
5656
out = new GZIPOutputStream( fs.create(outFile));
5757
} else {
58-
Path outFile = new Path(context.getConfiguration().get("ldbc.snb.datagen.serializer.socialNetworkDir")+"/updateStream_"+reducerId+"_"+key.partition+"_"+streamType+".csv");
58+
Path outFile = new Path(context.getConfiguration().get("ldbc.snb.datagen.serializer.socialNetworkDir")+"/updateStream_"+key.reducerId+"_"+key.partition+"_"+streamType+".csv");
5959
out = fs.create(outFile);
6060
}
6161
for( Text t : valueSet ) {

0 commit comments

Comments
 (0)