@@ -547,101 +547,101 @@ public int runGenerateJob(Configuration conf) throws Exception {
547
547
long min = Long .MAX_VALUE ;
548
548
long max = Long .MIN_VALUE ;
549
549
550
- for ( int i =0 ; i < numThreads ; ++i ) {
551
- int numPartitions = conf .getInt ("numUpdatePartitions" , 1 );
552
- for ( int j = 0 ; j < numPartitions ; ++j ) {
553
- /// --------------- Fifth job: Sort update streams ----------------
554
- conf .setInt ("mapred.line.input.format.linespermap" , 1000000 );
555
- conf .setInt ("partitionId" ,j );
556
- conf .set ("streamType" ,"forum" );
557
- Job jobForum = new Job (conf , "Soring update streams " +j +" of reducer " +i );
558
- jobForum .setMapOutputKeyClass (LongWritable .class );
559
- jobForum .setMapOutputValueClass (Text .class );
560
- jobForum .setOutputKeyClass (LongWritable .class );
561
- jobForum .setOutputValueClass (Text .class );
562
- jobForum .setJarByClass (UpdateEventMapper .class );
563
- jobForum .setMapperClass (UpdateEventMapper .class );
564
- jobForum .setReducerClass (UpdateEventReducer .class );
565
- jobForum .setNumReduceTasks (1 );
566
- jobForum .setInputFormatClass (SequenceFileInputFormat .class );
567
- jobForum .setOutputFormatClass (SequenceFileOutputFormat .class );
568
- jobForum .setPartitionerClass (UpdateEventPartitioner .class );
569
- FileInputFormat .addInputPath (jobForum , new Path (socialNetDir + "/temp_updateStream_" + i +"_" +j +"_forum" ));
570
- FileOutputFormat .setOutputPath (jobForum , new Path (hadoopDir + "/sibEnd" ));
571
- printProgress ("Starting: Sorting update streams" );
572
- jobForum .waitForCompletion (true );
573
- fs .delete (new Path (socialNetDir + "/temp_updateStream_" + i +"_" +j +"_forum" ), false );
574
- fs .delete (new Path (hadoopDir + "/sibEnd" ), true );
575
-
576
- conf .setInt ("mapred.line.input.format.linespermap" , 1000000 );
577
- conf .setInt ("partitionId" ,j );
578
- conf .set ("streamType" ,"person" );
579
- Job jobPerson = new Job (conf , "Soring update streams " +j +" of reducer " +i );
580
- jobPerson .setMapOutputKeyClass (LongWritable .class );
581
- jobPerson .setMapOutputValueClass (Text .class );
582
- jobPerson .setOutputKeyClass (LongWritable .class );
583
- jobPerson .setOutputValueClass (Text .class );
584
- jobPerson .setJarByClass (UpdateEventMapper .class );
585
- jobPerson .setMapperClass (UpdateEventMapper .class );
586
- jobPerson .setReducerClass (UpdateEventReducer .class );
587
- jobPerson .setNumReduceTasks (1 );
588
- jobPerson .setInputFormatClass (SequenceFileInputFormat .class );
589
- jobPerson .setOutputFormatClass (SequenceFileOutputFormat .class );
590
- jobPerson .setPartitionerClass (UpdateEventPartitioner .class );
591
- FileInputFormat .addInputPath (jobPerson , new Path (socialNetDir + "/temp_updateStream_" + i +"_" +j +"_person" ));
592
- FileOutputFormat .setOutputPath (jobPerson , new Path (hadoopDir + "/sibEnd" ));
593
- printProgress ("Starting: Sorting update streams" );
594
- jobPerson .waitForCompletion (true );
595
- fs .delete (new Path (socialNetDir + "/temp_updateStream_" + i +"_" +j +"_person" ), false );
596
- fs .delete (new Path (hadoopDir + "/sibEnd" ), true );
597
-
598
- if (conf .getBoolean ("updateStreams" ,false )) {
599
- Properties properties = new Properties ();
600
- properties .load (fs .open (new Path (conf .get ("outputDir" ) + "/social_network/updateStream_" + i + "_" + j + "_person.properties" )));
601
- Long auxMin = Long .parseLong (properties .getProperty ("min_write_event_start_time" ));
602
- min = auxMin < min ? auxMin : min ;
603
- Long auxMax = Long .parseLong (properties .getProperty ("max_write_event_start_time" ));
604
- max = auxMax > max ? auxMax : max ;
605
- numEvents += Long .parseLong (properties .getProperty ("num_events" ));
606
-
607
- properties .load (fs .open (new Path (conf .get ("outputDir" ) + "/social_network/updateStream_" + i + "_" + j + "_forum.properties" )));
608
-
609
- auxMin = Long .parseLong (properties .getProperty ("min_write_event_start_time" ));
610
- min = auxMin < min ? auxMin : min ;
611
- auxMax = Long .parseLong (properties .getProperty ("max_write_event_start_time" ));
612
- max = auxMax > max ? auxMax : max ;
613
- numEvents += Long .parseLong (properties .getProperty ("num_events" ));
614
-
615
- fs .delete (new Path (conf .get ("outputDir" ) + "/social_network/updateStream_" + i + "_" + j + "_person.properties" ),true );
616
- fs .delete (new Path (conf .get ("outputDir" ) + "/social_network/updateStream_" + i + "_" + j + "_forum.properties" ),true );
617
- }
618
- }
619
- }
620
-
621
- if (conf .getBoolean ("updateStreams" ,false )) {
622
- OutputStream output = fs .create (new Path (conf .get ("outputDir" ) + "/social_network/updateStream.properties" ));
623
- output .write (new String ("ldbc.snb.interactive.gct_delta_duration:" +conf .get ("deltaTime" )+"\n " ).getBytes ());
624
- output .write (new String ("ldbc.snb.interactive.min_write_event_start_time:" +min +"\n " ).getBytes ());
625
- output .write (new String ("ldbc.snb.interactive.max_write_event_start_time:" +max +"\n " ).getBytes ());
626
- output .write (new String ("ldbc.snb.interactive.update_interleave:" +(max -min )/numEvents +"\n " ).getBytes ());
627
- output .write (new String ("ldbc.snb.interactive.num_events:" +numEvents ).getBytes ());
628
- output .close ();
629
- }
630
-
631
-
632
-
633
- printProgress ("Starting: Materialize friends for substitution parameters" );
634
- int resMaterializeFriends = job6 .waitForCompletion (true ) ? 0 : 1 ;
635
- fs .delete (new Path (hadoopDir + "/sibSorting3" ),true );
550
+ // for( int i =0; i < numThreads; ++i ) {
551
+ // int numPartitions = conf.getInt("numUpdatePartitions", 1);
552
+ // for( int j = 0; j < numPartitions; ++j ) {
553
+ // /// --------------- Fifth job: Sort update streams ----------------
554
+ // conf.setInt("mapred.line.input.format.linespermap", 1000000);
555
+ // conf.setInt("partitionId",j);
556
+ // conf.set("streamType","forum");
557
+ // Job jobForum = new Job(conf, "Soring update streams "+j+" of reducer "+i);
558
+ // jobForum.setMapOutputKeyClass(LongWritable.class);
559
+ // jobForum.setMapOutputValueClass(Text.class);
560
+ // jobForum.setOutputKeyClass(LongWritable.class);
561
+ // jobForum.setOutputValueClass(Text.class);
562
+ // jobForum.setJarByClass(UpdateEventMapper.class);
563
+ // jobForum.setMapperClass(UpdateEventMapper.class);
564
+ // jobForum.setReducerClass(UpdateEventReducer.class);
565
+ // jobForum.setNumReduceTasks(1);
566
+ // jobForum.setInputFormatClass(SequenceFileInputFormat.class);
567
+ // jobForum.setOutputFormatClass(SequenceFileOutputFormat.class);
568
+ // jobForum.setPartitionerClass(UpdateEventPartitioner.class);
569
+ // FileInputFormat.addInputPath(jobForum, new Path(socialNetDir + "/temp_updateStream_" + i+"_"+j+"_forum"));
570
+ // FileOutputFormat.setOutputPath(jobForum, new Path(hadoopDir + "/sibEnd"));
571
+ // printProgress("Starting: Sorting update streams");
572
+ // jobForum.waitForCompletion(true);
573
+ // fs.delete(new Path(socialNetDir + "/temp_updateStream_" + i+"_"+j+"_forum"), false);
574
+ // fs.delete(new Path(hadoopDir + "/sibEnd"), true);
575
+ //
576
+ // conf.setInt("mapred.line.input.format.linespermap", 1000000);
577
+ // conf.setInt("partitionId",j);
578
+ // conf.set("streamType","person");
579
+ // Job jobPerson = new Job(conf, "Soring update streams "+j+" of reducer "+i);
580
+ // jobPerson.setMapOutputKeyClass(LongWritable.class);
581
+ // jobPerson.setMapOutputValueClass(Text.class);
582
+ // jobPerson.setOutputKeyClass(LongWritable.class);
583
+ // jobPerson.setOutputValueClass(Text.class);
584
+ // jobPerson.setJarByClass(UpdateEventMapper.class);
585
+ // jobPerson.setMapperClass(UpdateEventMapper.class);
586
+ // jobPerson.setReducerClass(UpdateEventReducer.class);
587
+ // jobPerson.setNumReduceTasks(1);
588
+ // jobPerson.setInputFormatClass(SequenceFileInputFormat.class);
589
+ // jobPerson.setOutputFormatClass(SequenceFileOutputFormat.class);
590
+ // jobPerson.setPartitionerClass(UpdateEventPartitioner.class);
591
+ // FileInputFormat.addInputPath(jobPerson, new Path(socialNetDir + "/temp_updateStream_" + i+"_"+j+"_person"));
592
+ // FileOutputFormat.setOutputPath(jobPerson, new Path(hadoopDir + "/sibEnd"));
593
+ // printProgress("Starting: Sorting update streams");
594
+ // jobPerson.waitForCompletion(true);
595
+ // fs.delete(new Path(socialNetDir + "/temp_updateStream_" + i+"_"+j+"_person"), false);
596
+ // fs.delete(new Path(hadoopDir + "/sibEnd"), true);
597
+ //
598
+ // if(conf.getBoolean("updateStreams",false)) {
599
+ // Properties properties = new Properties();
600
+ // properties.load(fs.open(new Path(conf.get("outputDir") + "/social_network/updateStream_" + i + "_" + j + "_person.properties")));
601
+ // Long auxMin = Long.parseLong(properties.getProperty("min_write_event_start_time"));
602
+ // min = auxMin < min ? auxMin : min;
603
+ // Long auxMax = Long.parseLong(properties.getProperty("max_write_event_start_time"));
604
+ // max = auxMax > max ? auxMax : max;
605
+ // numEvents += Long.parseLong(properties.getProperty("num_events"));
606
+ //
607
+ // properties.load(fs.open(new Path(conf.get("outputDir") + "/social_network/updateStream_" + i + "_" + j + "_forum.properties")));
608
+ //
609
+ // auxMin = Long.parseLong(properties.getProperty("min_write_event_start_time"));
610
+ // min = auxMin < min ? auxMin : min;
611
+ // auxMax = Long.parseLong(properties.getProperty("max_write_event_start_time"));
612
+ // max = auxMax > max ? auxMax : max;
613
+ // numEvents += Long.parseLong(properties.getProperty("num_events"));
614
+ //
615
+ // fs.delete(new Path(conf.get("outputDir") + "/social_network/updateStream_" + i + "_" + j + "_person.properties"),true);
616
+ // fs.delete(new Path(conf.get("outputDir") + "/social_network/updateStream_" + i + "_" + j + "_forum.properties"),true);
617
+ // }
618
+ // }
619
+ // }
620
+ //
621
+ // if(conf.getBoolean("updateStreams",false)) {
622
+ // OutputStream output = fs.create(new Path(conf.get("outputDir") + "/social_network/updateStream.properties"));
623
+ // output.write(new String("ldbc.snb.interactive.gct_delta_duration:"+conf.get("deltaTime")+"\n").getBytes());
624
+ // output.write(new String("ldbc.snb.interactive.min_write_event_start_time:"+min+"\n").getBytes());
625
+ // output.write(new String("ldbc.snb.interactive.max_write_event_start_time:"+max+"\n").getBytes());
626
+ // output.write(new String("ldbc.snb.interactive.update_interleave:"+(max-min)/numEvents+"\n").getBytes());
627
+ // output.write(new String("ldbc.snb.interactive.num_events:"+numEvents).getBytes());
628
+ // output.close();
629
+ // }
630
+ //
631
+ //
632
+ //
633
+ // printProgress("Starting: Materialize friends for substitution parameters");
634
+ // int resMaterializeFriends = job6.waitForCompletion(true) ? 0 : 1;
635
+ // fs.delete(new Path(hadoopDir + "/sibSorting3"),true);
636
636
637
637
638
638
long end = System .currentTimeMillis ();
639
639
System .out .println (((end - start ) / 1000 )
640
640
+ " total seconds" );
641
- for ( int i = 0 ; i < numThreads ; ++i ) {
642
- fs .copyToLocalFile (new Path (socialNetDir + "/m" +i +"factors.txt" ), new Path ("./" ));
643
- fs .copyToLocalFile (new Path (socialNetDir + "/m0friendList" +i +".csv" ), new Path ("./" ));
644
- }
641
+ // for( int i = 0; i < numThreads; ++i ) {
642
+ // fs.copyToLocalFile(new Path(socialNetDir + "/m"+i+"factors.txt"), new Path("./"));
643
+ // fs.copyToLocalFile(new Path(socialNetDir + "/m0friendList"+i+".csv"), new Path("./"));
644
+ // }
645
645
return res ;
646
646
}
647
647
0 commit comments