Skip to content

Commit 5c8e26a

Browse files
committed
Added some testing.
Fixed bug flashmob post generation
1 parent 2645cc0 commit 5c8e26a

File tree

9 files changed

+264
-20
lines changed

9 files changed

+264
-20
lines changed

src/main/java/ldbc/snb/datagen/generator/FlashmobPostGenerator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ private int selectRandomTag( Random randomFlashmobTag, FlashmobTag[] tags, int i
5656
/** @brief Selects the earliest flashmob tag index from a given date.
5757
* @return The index to the earliest flashmob tag.*/
5858
private int searchEarliest( FlashmobTag[] tags, ForumMembership membership ) {
59-
long fromDate = membership.creationDate();
59+
long fromDate = membership.creationDate() + flashmobSpan_/2 + DatagenParams.deltaTime;
6060
int lowerBound = 0;
6161
int upperBound = tags.length - 1;
6262
int midPoint = (upperBound + lowerBound) / 2;

src/main/java/ldbc/snb/datagen/generator/LDBCDatagen.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@
4949
import org.apache.hadoop.io.LongWritable;
5050
import org.apache.hadoop.io.Text;
5151

52+
53+
import java.io.File;
5254
import java.io.OutputStream;
5355
import java.util.ArrayList;
5456
import java.util.List;
@@ -139,6 +141,7 @@ public int runGenerateJob(Configuration conf) throws Exception {
139141

140142

141143

144+
142145
fs.delete(new Path(DatagenParams.hadoopDir + "/persons"), true);
143146
printProgress("Merging the different edge files");
144147
ArrayList<String> edgeFileNames = new ArrayList<String>();
@@ -311,6 +314,30 @@ public int runGenerateJob(Configuration conf) throws Exception {
311314
System.out.println("Sorting update streams time: "+((endSortingUpdateStreams - startSortingUpdateStreams) / 1000));
312315
System.out.println("Invariant schema serialization time: "+((endInvariantSerializing - startInvariantSerializing) / 1000));
313316
System.out.println("Total Execution time: "+((end - start) / 1000));
317+
318+
System.out.println("Running Parameter Generation");
319+
if(conf.getBoolean("ldbc.snb.datagen.parametergenerator.parameters",false) && conf.getBoolean("ldbc.snb.datagen.generator.activity",false)) {
320+
ProcessBuilder pb = new ProcessBuilder("mkdir", "-p",conf.get("ldbc.snb.datagen.serializer.outputDir")+"/substitution_parameters");
321+
pb.directory(new File("./"));
322+
Process p = pb.start();
323+
p.waitFor();
324+
325+
pb = new ProcessBuilder(conf.get("ldbc.snb.datagen.parametergenerator.python"), "paramgenerator/generateparams.py", "./",conf.get("ldbc.snb.datagen.serializer.outputDir")+"/substitution_parameters");
326+
pb.directory(new File("./"));
327+
File logInteractive = new File("parameters_interactive.log");
328+
pb.redirectErrorStream(true);
329+
pb.redirectOutput(ProcessBuilder.Redirect.appendTo(logInteractive));
330+
p = pb.start();
331+
p.waitFor();
332+
333+
pb = new ProcessBuilder(conf.get("ldbc.snb.datagen.parametergenerator.python"), "paramgenerator/generateparamsbi.py", "./",conf.get("ldbc.snb.datagen.serializer.outputDir")+"/substitution_parameters");
334+
pb.directory(new File("./"));
335+
File logBi = new File("parameters_bi.log");
336+
pb.redirectErrorStream(true);
337+
pb.redirectOutput(ProcessBuilder.Redirect.appendTo(logBi));
338+
p = pb.start();
339+
p.waitFor();
340+
}
314341
return 0;
315342
}
316343

src/main/java/ldbc/snb/datagen/generator/PersonActivityGenerator.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,6 @@ public void generateActivityForBlock( int seed, ArrayList<Person> block, Context
146146
float personGenerationTime = 0.0f;
147147
long initTime = System.currentTimeMillis();
148148
for( Person p : block ) {
149-
System.out.println("Generating activity for person "+counter+" with degree "+p.knows().size());
150149
long start = System.currentTimeMillis();
151150
generateActivity(p, block);
152151
if( DatagenParams.updateStreams ) {
@@ -158,7 +157,7 @@ public void generateActivityForBlock( int seed, ArrayList<Person> block, Context
158157
}
159158
float time = (System.currentTimeMillis() - start)/1000.0f;
160159
personGenerationTime+=time;
161-
System.out.println("Time to generate activity for person "+counter+": "+time+". Throughput "+counter/((System.currentTimeMillis() - initTime)*0.001));
160+
//System.out.println("Time to generate activity for person "+counter+": "+time+". Throughput "+counter/((System.currentTimeMillis() - initTime)*0.001));
162161
counter++;
163162
}
164163
System.out.println("Average person activity generation time "+personGenerationTime / (float)block.size());

src/main/java/ldbc/snb/datagen/hadoop/HadoopPersonSerializer.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@
2626
*/
2727
public class HadoopPersonSerializer {
2828

29-
public static class HadoopPersonSerializerReducer extends Reducer<BlockKey, Person, LongWritable, Person> {
30-
29+
// public static class HadoopPersonSerializerReducer extends Reducer<BlockKey, Person, LongWritable, Person> {
30+
public static class HadoopPersonSerializerReducer extends Reducer<TupleKey, Person, LongWritable, Person> {
31+
3132
private int reducerId; /** The id of the reducer.**/
3233
private PersonSerializer personSerializer_; /** The person serializer **/
3334
private UpdateEventSerializer updateSerializer_;
@@ -48,9 +49,10 @@ protected void setup(Context context) {
4849
}
4950

5051
@Override
51-
public void reduce(BlockKey key, Iterable<Person> valueSet,Context context)
52+
// public void reduce(BlockKey key, Iterable<Person> valueSet,Context context)
53+
public void reduce(TupleKey key, Iterable<Person> valueSet,Context context)
5254
throws IOException, InterruptedException {
53-
SN.machineId = key.block;
55+
// SN.machineId = key.block;
5456
personSerializer_.reset();
5557
for( Person p : valueSet ) {
5658
if(p.creationDate()< Dictionaries.dates.getUpdateThreshold() || !DatagenParams.updateStreams ) {
@@ -87,40 +89,40 @@ public void run( String inputFileName ) throws Exception {
8789

8890
FileSystem fs = FileSystem.get(conf);
8991

90-
String rankedFileName = conf.get("ldbc.snb.datagen.serializer.hadoopDir") + "/ranked";
92+
/*String rankedFileName = conf.get("ldbc.snb.datagen.serializer.hadoopDir") + "/ranked";
9193
HadoopFileRanker hadoopFileRanker = new HadoopFileRanker( conf, TupleKey.class, Person.class, null );
92-
hadoopFileRanker.run(inputFileName,rankedFileName);
94+
hadoopFileRanker.run(inputFileName,rankedFileName);*/
9395

9496
int numThreads = Integer.parseInt(conf.get("ldbc.snb.datagen.generator.numThreads"));
9597
Job job = Job.getInstance(conf, "Person Serializer");
96-
job.setMapOutputKeyClass(BlockKey.class);
97-
//job.setMapOutputKeyClass(TupleKey.class);
98+
//job.setMapOutputKeyClass(BlockKey.class);
99+
job.setMapOutputKeyClass(TupleKey.class);
98100
job.setMapOutputValueClass(Person.class);
99101
job.setOutputKeyClass(LongWritable.class);
100102
job.setOutputValueClass(Person.class);
101103
job.setJarByClass(HadoopBlockMapper.class);
102-
job.setMapperClass(HadoopBlockMapper.class);
104+
//job.setMapperClass(HadoopBlockMapper.class);
103105
job.setReducerClass(HadoopPersonSerializerReducer.class);
104106
job.setNumReduceTasks(numThreads);
105107
job.setInputFormatClass(SequenceFileInputFormat.class);
106108
job.setOutputFormatClass(SequenceFileOutputFormat.class);
107109

108110
job.setPartitionerClass(HadoopTuplePartitioner.class);
109111

110-
job.setSortComparatorClass(BlockKeyComparator.class);
112+
/*job.setSortComparatorClass(BlockKeyComparator.class);
111113
job.setGroupingComparatorClass(BlockKeyGroupComparator.class);
112-
job.setPartitionerClass(HadoopBlockPartitioner.class);
114+
job.setPartitionerClass(HadoopBlockPartitioner.class);*/
113115

114-
FileInputFormat.setInputPaths(job, new Path(rankedFileName));
115-
//FileInputFormat.setInputPaths(job, new Path(inputFileName));
116+
//FileInputFormat.setInputPaths(job, new Path(rankedFileName));
117+
FileInputFormat.setInputPaths(job, new Path(inputFileName));
116118
FileOutputFormat.setOutputPath(job, new Path(conf.get("ldbc.snb.datagen.serializer.hadoopDir")+"/aux"));
117119
if(!job.waitForCompletion(true)) {
118120
throw new Exception();
119121
}
120122

121123

122124
try{
123-
fs.delete(new Path(rankedFileName), true);
125+
//fs.delete(new Path(rankedFileName), true);
124126
fs.delete(new Path(conf.get("ldbc.snb.datagen.serializer.hadoopDir")+"/aux"),true);
125127
} catch(IOException e) {
126128
System.err.println(e.getMessage());

src/main/java/ldbc/snb/datagen/util/ConfigParser.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ public static Configuration initialize() {
5252
conf.set("ldbc.snb.datagen.serializer.formatter.StringDateFormatter.dateTimeFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZ");
5353
conf.set("ldbc.snb.datagen.serializer.formatter.StringDateFormatter.dateFormat", "yyyy-MM-dd");
5454
conf.set("ldbc.snb.datagen.generator.person.similarity", "ldbc.snb.datagen.objects.similarity.GeoDistanceSimilarity");
55+
conf.set("ldbc.snb.datagen.parametergenerator.python", "python");
56+
conf.set("ldbc.snb.datagen.parametergenerator.parameters", "true");
5557

5658
/** Loading predefined Scale Factors **/
5759

src/test/java/ldbc/snb/datagen/test/LDBCDatagenTest.java

Lines changed: 108 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,18 @@
1818
public class LDBCDatagenTest {
1919

2020
final String dir = "./test_data/social_network";
21+
final String sdir = "./test_data/substitution_parameters";
2122

2223
@BeforeClass
2324
public static void generateData() {
24-
ProcessBuilder pb = new ProcessBuilder("java", "-cp", "ldbc_snb_datagen.jar","org.apache.hadoop.util.RunJar","./ldbc_snb_datagen.jar","./test_params.ini");
25+
ProcessBuilder pb = new ProcessBuilder("java", "-cp", "target/ldbc_snb_datagen.jar","org.apache.hadoop.util.RunJar","target/ldbc_snb_datagen.jar","./test_params.ini");
2526
pb.directory(new File("./"));
27+
File log = new File("test_log");
28+
pb.redirectErrorStream(true);
29+
pb.redirectOutput(ProcessBuilder.Redirect.appendTo(log));
2630
try {
2731
Process p = pb.start();
2832
p.waitFor();
29-
3033
}catch(Exception e) {
3134
System.err.println(e.getMessage());
3235
}
@@ -182,9 +185,111 @@ public void personLikesPostCheck() {
182185
testPairUniquenessPlusExistance(dir+"/person_likes_post_0_0.csv",0,1,dir+"/person_0_0.csv",0,dir+"/post_0_0.csv",0);
183186
}
184187

188+
// test update stream time consistency
189+
@Test
190+
public void updateStreamForumsConsistencyCheck() {
191+
testLongPair(dir+"/updateStream_0_0_forum.csv",0,1,NumericPairCheck.NumericCheckType.G);
192+
}
193+
194+
@Test
195+
public void queryParamsTest() {
196+
LongParser parser = new LongParser();
197+
ColumnSet<Long> persons = new ColumnSet<Long>(parser,new File(dir+"/person_0_0.csv"),0,1);
198+
persons.initialize();
199+
List<ColumnSet<Long>> personsRef = new ArrayList<ColumnSet<Long>>();
200+
personsRef.add(persons);
201+
List<Integer> personIndex = new ArrayList<Integer>();
202+
personIndex.add(0);
203+
ExistsCheck<Long> existsPersonCheck = new ExistsCheck<Long>(parser,personIndex, personsRef);
204+
205+
FileChecker fileChecker = new FileChecker(sdir+"/query_1_param.txt");
206+
fileChecker.addCheck(existsPersonCheck);
207+
if(!fileChecker.run(1)) assertEquals("ERROR PASSING TEST QUERY 1 PERSON EXISTS ",true, false);
208+
209+
fileChecker = new FileChecker(sdir+"/query_2_param.txt");
210+
fileChecker.addCheck(existsPersonCheck);
211+
if(!fileChecker.run(1)) assertEquals("ERROR PASSING TEST QUERY 2 PERSON EXISTS ",true, false);
212+
213+
fileChecker = new FileChecker(sdir+"/query_3_param.txt");
214+
fileChecker.addCheck(existsPersonCheck);
215+
if(!fileChecker.run(1)) assertEquals("ERROR PASSING TEST QUERY 3 PERSON EXISTS ",true, false);
216+
217+
fileChecker = new FileChecker(sdir+"/query_4_param.txt");
218+
fileChecker.addCheck(existsPersonCheck);
219+
if(!fileChecker.run(1)) assertEquals("ERROR PASSING TEST QUERY 4 PERSON EXISTS ",true, false);
220+
221+
fileChecker = new FileChecker(sdir+"/query_5_param.txt");
222+
fileChecker.addCheck(existsPersonCheck);
223+
if(!fileChecker.run(1)) assertEquals("ERROR PASSING TEST QUERY 5 PERSON EXISTS ",true, false);
224+
225+
fileChecker = new FileChecker(sdir+"/query_6_param.txt");
226+
fileChecker.addCheck(existsPersonCheck);
227+
if(!fileChecker.run(1)) assertEquals("ERROR PASSING TEST QUERY 6 PERSON EXISTS ",true, false);
228+
229+
fileChecker = new FileChecker(sdir+"/query_7_param.txt");
230+
fileChecker.addCheck(existsPersonCheck);
231+
if(!fileChecker.run(1)) assertEquals("ERROR PASSING TEST QUERY 7 PERSON EXISTS ",true, false);
232+
233+
fileChecker = new FileChecker(sdir+"/query_8_param.txt");
234+
fileChecker.addCheck(existsPersonCheck);
235+
if(!fileChecker.run(1)) assertEquals("ERROR PASSING TEST QUERY 8 PERSON EXISTS ",true, false);
236+
237+
fileChecker = new FileChecker(sdir+"/query_9_param.txt");
238+
fileChecker.addCheck(existsPersonCheck);
239+
if(!fileChecker.run(1)) assertEquals("ERROR PASSING TEST QUERY 9 PERSON EXISTS ",true, false);
240+
241+
fileChecker = new FileChecker(sdir+"/query_10_param.txt");
242+
fileChecker.addCheck(existsPersonCheck);
243+
if(!fileChecker.run(1)) assertEquals("ERROR PASSING TEST QUERY 10 PERSON EXISTS ",true, false);
244+
245+
fileChecker = new FileChecker(sdir+"/query_11_param.txt");
246+
fileChecker.addCheck(existsPersonCheck);
247+
if(!fileChecker.run(1)) assertEquals("ERROR PASSING TEST QUERY 11 PERSON EXISTS ",true, false);
248+
249+
fileChecker = new FileChecker(sdir+"/query_12_param.txt");
250+
fileChecker.addCheck(existsPersonCheck);
251+
if(!fileChecker.run(1)) assertEquals("ERROR PASSING TEST QUERY 12 PERSON EXISTS ",true, false);
252+
253+
personIndex.add(1);
254+
ExistsCheck<Long> exists2PersonCheck = new ExistsCheck<Long>(parser,personIndex, personsRef);
255+
256+
fileChecker = new FileChecker(sdir+"/query_13_param.txt");
257+
fileChecker.addCheck(exists2PersonCheck);
258+
if(!fileChecker.run(1)) assertEquals("ERROR PASSING TEST QUERY 13 PERSON EXISTS ",true, false);
259+
260+
fileChecker = new FileChecker(sdir+"/query_14_param.txt");
261+
fileChecker.addCheck(exists2PersonCheck);
262+
if(!fileChecker.run(1)) assertEquals("ERROR PASSING TEST QUERY 14 PERSON EXISTS ",true, false);
263+
264+
265+
}
266+
// test query parameters correctness
267+
// query 1, check person id existance and surname existance
268+
// query 2, check person id existance and date time within simulation interval
269+
// query 3, check person id existance, country X and Y existance, startData + duration within simulation interval
270+
// query 4, check person id existance and startDate + duration within simulation interval
271+
// query 5, check person id and date within simulation interval
272+
// query 6, check person id and tag name existance
273+
// query 7, check person id existance
274+
// query 8, check person id existance
275+
// query 9, check person id and date within simulation interval
276+
// query 10, check person id existance and month between 1 and 12
277+
// query 11, check person id existance, Country existance and year something reasonable
278+
// query 12, check person id existance and tagclass existance
279+
// query 13, check persons id existance
280+
// query 14, check persons id existance
281+
282+
public void testLongPair(String fileName, Integer columnA, Integer columnB, NumericPairCheck.NumericCheckType type) {
283+
FileChecker fileChecker = new FileChecker(fileName);
284+
LongParser parser = new LongParser();
285+
LongPairCheck check = new LongPairCheck(parser, " Long check less equal ", columnA, columnB, type);
286+
fileChecker.addCheck(check);
287+
if(!fileChecker.run(0)) assertEquals("ERROR PASSING TEST LONG PAIR FOR FILE "+fileName,true, false);
288+
}
289+
185290
public void testIdUniqueness(String fileName, int column) {
186291
FileChecker fileChecker = new FileChecker(fileName);
187-
UniquenessCheck check = new UniquenessCheck(0);
292+
UniquenessCheck check = new UniquenessCheck(column);
188293
fileChecker.addCheck(check);
189294
if(!fileChecker.run(1)) assertEquals("ERROR PASSING TEST ID UNIQUENESS FOR FILE "+fileName,true, false);
190295
}

src/test/java/ldbc/snb/datagen/test/csv/FileChecker.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ public boolean run(int startLine) {
4040
for(Integer index : c.getColumns()) {
4141
System.err.print(index+" ");
4242
}
43+
System.err.print(" with values ");
44+
for(String index : row) {
45+
System.err.print(index+" ");
46+
}
4347
System.err.println();
4448
return false;
4549
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package ldbc.snb.datagen.test.csv;
2+
3+
/**
4+
* Created by aprat on 30/03/16.
5+
*/
6+
7+
public class LongPairCheck extends NumericPairCheck<Long> {
8+
9+
public LongPairCheck(Parser<Long> parser, String name, Integer columnA, Integer columnB, NumericCheckType type) {
10+
super(parser, name, columnA, columnB, type);
11+
}
12+
13+
@Override
14+
public boolean greater(Long val1, Long val2) {
15+
return val1 > val2;
16+
}
17+
18+
@Override
19+
public boolean greaterEqual(Long val1, Long val2) {
20+
return val1 >= val2;
21+
}
22+
23+
@Override
24+
public boolean less(Long val1, Long val2) {
25+
return val1 < val2;
26+
}
27+
28+
@Override
29+
public boolean lessEqual(Long val1, Long val2) {
30+
return val1 <= val2;
31+
}
32+
33+
@Override
34+
public boolean equals(Long val1, Long val2) {
35+
return val1 == val2;
36+
}
37+
38+
@Override
39+
public boolean nonEquals(Long val1, Long val2) {
40+
return val1 != val2;
41+
}
42+
}

0 commit comments

Comments
 (0)