Skip to content

Commit 64731ab

Browse files
committed
Added Turtle serializer
1 parent 96651f6 commit 64731ab

26 files changed

+911
-57
lines changed

src/main/java/ldbc/snb/datagen/generator/PersonActivityGenerator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ private void generateActivity( Person person, ArrayList<Person> block ) {
5252
}
5353
}
5454

55+
public void reset() {
56+
personActivitySerializer_.reset();
57+
}
58+
5559
private void generateWall( Person person, ArrayList<Person> block ) {
5660
// generate wall
5761
Forum wall = forumGenerator_.createWall(randomFarm_, forumId++, person);
@@ -201,6 +205,7 @@ public void generateActivityForBlock( int seed, ArrayList<Person> block, Context
201205
forumId = 0;
202206
messageId = 0;
203207
SN.machineId = seed;
208+
personActivitySerializer_.reset();
204209
int counter = 0;
205210
for( Person p : block ) {
206211
generateActivity(p, block);

src/main/java/ldbc/snb/datagen/generator/PersonGenerator.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import ldbc.snb.datagen.generator.distribution.utils.BucketedDistribution;
66
import ldbc.snb.datagen.objects.Person;
77
import ldbc.snb.datagen.util.RandomGeneratorFarm;
8+
import ldbc.snb.datagen.vocabulary.SN;
89
import org.apache.hadoop.conf.Configuration;
910

1011
import java.text.Normalizer;
@@ -164,6 +165,7 @@ private void resetState(int blockId){
164165
public Person[] generateUserBlock( int seed, int blockSize ) {
165166
resetState(seed);
166167
nextId=seed*blockSize;
168+
SN.machineId = seed;
167169
Person[] block;
168170
block = new Person[blockSize];
169171
for (int j =0; j < blockSize; ++j) {

src/main/java/ldbc/snb/datagen/hadoop/HadoopPersonSerializer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ protected void setup(Context context) {
4747
@Override
4848
public void reduce(BlockKey key, Iterable<Person> valueSet,Context context)
4949
throws IOException, InterruptedException {
50+
personSerializer_.reset();
5051
for( Person p : valueSet ) {
5152
if(p.creationDate()< Dictionaries.dates.getUpdateThreshold() || !DatagenParams.updateStreams ) {
5253
personSerializer_.export(p);

src/main/java/ldbc/snb/datagen/serializer/HDFSCSVWriter.java

Lines changed: 5 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -9,43 +9,17 @@
99
import java.util.ArrayList;
1010
import java.util.zip.GZIPOutputStream;
1111

12-
public class HDFSCSVWriter {
12+
public class HDFSCSVWriter extends HDFSWriter {
1313

14-
private String outputDir;
15-
private String prefix;
16-
private int numPartitions;
17-
private int currentPartition = 0;
18-
private boolean compressed;
1914
private String separator = "|";
20-
private boolean firstEntry = true;
2115
private StringBuffer buffer;
2216

2317
private OutputStream[] fileOutputStream;
2418

25-
public HDFSCSVWriter( String outputDir, String prefix, int numPartitions, boolean compressed, String separator ) {
26-
this.outputDir = outputDir;
27-
this.prefix = prefix;
28-
this.numPartitions = numPartitions;
29-
this.compressed = compressed;
19+
public HDFSCSVWriter( String outputDir, String prefix, int numPartitions, boolean compressed, String separator ) {
20+
super(outputDir, prefix, numPartitions, compressed, "csv" );
3021
this.separator = separator;
31-
try {
32-
Configuration conf = new Configuration();
33-
FileSystem fs = FileSystem.get(conf);
34-
fileOutputStream = new OutputStream[numPartitions];
35-
if (compressed) {
36-
for (int i = 0; i < numPartitions; i++) {
37-
this.fileOutputStream[i] = new GZIPOutputStream(fs.create(new Path(outputDir + "/" + prefix + "_" + i + ".csv.gz")));
38-
}
39-
} else {
40-
for (int i = 0; i < numPartitions; i++) {
41-
this.fileOutputStream[i] = fs.create(new Path(outputDir + "/" + prefix + "_" + i + ".csv"));
42-
}
43-
}
44-
buffer = new StringBuffer(1024);
45-
} catch (IOException e) {
46-
System.err.println(e.getMessage());
47-
System.exit(-1);
48-
}
22+
4923
}
5024

5125
public void writeEntry( ArrayList<String> entry ) {
@@ -55,24 +29,6 @@ public void writeEntry( ArrayList<String> entry ) {
5529
buffer.append(separator);
5630
}
5731
buffer.append("\n");
58-
try {
59-
fileOutputStream[currentPartition].write(buffer.toString().getBytes("UTF8"));
60-
currentPartition = ++currentPartition % numPartitions;
61-
}
62-
catch (IOException e){
63-
System.out.println("Cannot write to output file ");
64-
e.printStackTrace();
65-
}
66-
}
67-
68-
public void close() {
69-
try {
70-
for (int i = 0; i < numPartitions; ++i) {
71-
fileOutputStream[i].close();
72-
}
73-
} catch (IOException e) {
74-
System.err.println("Exception when closing a file");
75-
System.err.println(e.getMessage());
76-
}
32+
this.write(buffer.toString());
7733
}
7834
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package ldbc.snb.datagen.serializer;
2+
3+
import org.apache.hadoop.conf.Configuration;
4+
import org.apache.hadoop.fs.FileSystem;
5+
import org.apache.hadoop.fs.Path;
6+
7+
import java.io.IOException;
8+
import java.io.OutputStream;
9+
import java.util.ArrayList;
10+
import java.util.zip.GZIPOutputStream;
11+
12+
public class HDFSWriter {
13+
14+
private String outputDir;
15+
private String prefix;
16+
private int numPartitions;
17+
private int currentPartition = 0;
18+
private boolean compressed;
19+
private boolean firstEntry = true;
20+
private StringBuffer buffer;
21+
22+
private OutputStream[] fileOutputStream;
23+
24+
public HDFSWriter(String outputDir, String prefix, int numPartitions, boolean compressed, String extension) {
25+
this.outputDir = outputDir;
26+
this.prefix = prefix;
27+
this.numPartitions = numPartitions;
28+
this.compressed = compressed;
29+
try {
30+
Configuration conf = new Configuration();
31+
FileSystem fs = FileSystem.get(conf);
32+
fileOutputStream = new OutputStream[numPartitions];
33+
if (compressed) {
34+
for (int i = 0; i < numPartitions; i++) {
35+
this.fileOutputStream[i] = new GZIPOutputStream(fs.create(new Path(outputDir + "/" + prefix + "_" + i + "."+extension+".gz")));
36+
}
37+
} else {
38+
for (int i = 0; i < numPartitions; i++) {
39+
this.fileOutputStream[i] = fs.create(new Path(outputDir + "/" + prefix + "_" + i + "."+extension));
40+
}
41+
}
42+
buffer = new StringBuffer(1024);
43+
} catch (IOException e) {
44+
System.err.println(e.getMessage());
45+
System.exit(-1);
46+
}
47+
}
48+
49+
public void write( String entry ) {
50+
buffer.setLength(0);
51+
buffer.append(entry);
52+
try {
53+
fileOutputStream[currentPartition].write(buffer.toString().getBytes("UTF8"));
54+
currentPartition = ++currentPartition % numPartitions;
55+
}
56+
catch (IOException e){
57+
System.out.println("Cannot write to output file ");
58+
e.printStackTrace();
59+
}
60+
}
61+
62+
public void writeAllPartitions( String entry ) {
63+
try {
64+
for(int i = 0; i < numPartitions;++i ) {
65+
fileOutputStream[i].write(entry.getBytes("UTF8"));
66+
}
67+
}
68+
catch (IOException e){
69+
System.out.println("Cannot write to output file ");
70+
e.printStackTrace();
71+
}
72+
}
73+
74+
public void close() {
75+
try {
76+
for (int i = 0; i < numPartitions; ++i) {
77+
fileOutputStream[i].close();
78+
}
79+
} catch (IOException e) {
80+
System.err.println("Exception when closing a file");
81+
System.err.println(e.getMessage());
82+
}
83+
}
84+
}

src/main/java/ldbc/snb/datagen/serializer/InvariantSerializer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public InvariantSerializer() {
2020
}
2121

2222

23+
abstract public void reset();
2324

2425
public void export(TagClass tagclass) {
2526
serialize(tagclass);

src/main/java/ldbc/snb/datagen/serializer/PersonActivitySerializer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ public void export( Like like ) {
4646
}
4747

4848

49+
abstract public void reset();
50+
4951
abstract public void initialize(Configuration conf, int reducerId);
5052

5153
abstract public void close();

src/main/java/ldbc/snb/datagen/serializer/PersonSerializer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ public void export(Person p, Knows k ) {
5050
serialize(p, k);
5151
}
5252

53+
abstract public void reset();
54+
5355
abstract public void initialize(Configuration conf, int reducerId);
5456

5557
abstract public void close();

0 commit comments

Comments
 (0)