@@ -547,101 +547,101 @@ public int runGenerateJob(Configuration conf) throws Exception {
547
547
long min = Long .MAX_VALUE ;
548
548
long max = Long .MIN_VALUE ;
549
549
550
- // for( int i =0; i < numThreads; ++i ) {
551
- // int numPartitions = conf.getInt("numUpdatePartitions", 1);
552
- // for( int j = 0; j < numPartitions; ++j ) {
553
- // /// --------------- Fifth job: Sort update streams ----------------
554
- // conf.setInt("mapred.line.input.format.linespermap", 1000000);
555
- // conf.setInt("partitionId",j);
556
- // conf.set("streamType","forum");
557
- // Job jobForum = new Job(conf, "Soring update streams "+j+" of reducer "+i);
558
- // jobForum.setMapOutputKeyClass(LongWritable.class);
559
- // jobForum.setMapOutputValueClass(Text.class);
560
- // jobForum.setOutputKeyClass(LongWritable.class);
561
- // jobForum.setOutputValueClass(Text.class);
562
- // jobForum.setJarByClass(UpdateEventMapper.class);
563
- // jobForum.setMapperClass(UpdateEventMapper.class);
564
- // jobForum.setReducerClass(UpdateEventReducer.class);
565
- // jobForum.setNumReduceTasks(1);
566
- // jobForum.setInputFormatClass(SequenceFileInputFormat.class);
567
- // jobForum.setOutputFormatClass(SequenceFileOutputFormat.class);
568
- // jobForum.setPartitionerClass(UpdateEventPartitioner.class);
569
- // FileInputFormat.addInputPath(jobForum, new Path(socialNetDir + "/temp_updateStream_" + i+"_"+j+"_forum"));
570
- // FileOutputFormat.setOutputPath(jobForum, new Path(hadoopDir + "/sibEnd"));
571
- // printProgress("Starting: Sorting update streams");
572
- // jobForum.waitForCompletion(true);
573
- // fs.delete(new Path(socialNetDir + "/temp_updateStream_" + i+"_"+j+"_forum"), false);
574
- // fs.delete(new Path(hadoopDir + "/sibEnd"), true);
575
- //
576
- // conf.setInt("mapred.line.input.format.linespermap", 1000000);
577
- // conf.setInt("partitionId",j);
578
- // conf.set("streamType","person");
579
- // Job jobPerson = new Job(conf, "Soring update streams "+j+" of reducer "+i);
580
- // jobPerson.setMapOutputKeyClass(LongWritable.class);
581
- // jobPerson.setMapOutputValueClass(Text.class);
582
- // jobPerson.setOutputKeyClass(LongWritable.class);
583
- // jobPerson.setOutputValueClass(Text.class);
584
- // jobPerson.setJarByClass(UpdateEventMapper.class);
585
- // jobPerson.setMapperClass(UpdateEventMapper.class);
586
- // jobPerson.setReducerClass(UpdateEventReducer.class);
587
- // jobPerson.setNumReduceTasks(1);
588
- // jobPerson.setInputFormatClass(SequenceFileInputFormat.class);
589
- // jobPerson.setOutputFormatClass(SequenceFileOutputFormat.class);
590
- // jobPerson.setPartitionerClass(UpdateEventPartitioner.class);
591
- // FileInputFormat.addInputPath(jobPerson, new Path(socialNetDir + "/temp_updateStream_" + i+"_"+j+"_person"));
592
- // FileOutputFormat.setOutputPath(jobPerson, new Path(hadoopDir + "/sibEnd"));
593
- // printProgress("Starting: Sorting update streams");
594
- // jobPerson.waitForCompletion(true);
595
- // fs.delete(new Path(socialNetDir + "/temp_updateStream_" + i+"_"+j+"_person"), false);
596
- // fs.delete(new Path(hadoopDir + "/sibEnd"), true);
597
- //
598
- // if(conf.getBoolean("updateStreams",false)) {
599
- // Properties properties = new Properties();
600
- // properties.load(fs.open(new Path(conf.get("outputDir") + "/social_network/updateStream_" + i + "_" + j + "_person.properties")));
601
- // Long auxMin = Long.parseLong(properties.getProperty("min_write_event_start_time"));
602
- // min = auxMin < min ? auxMin : min;
603
- // Long auxMax = Long.parseLong(properties.getProperty("max_write_event_start_time"));
604
- // max = auxMax > max ? auxMax : max;
605
- // numEvents += Long.parseLong(properties.getProperty("num_events"));
606
- //
607
- // properties.load(fs.open(new Path(conf.get("outputDir") + "/social_network/updateStream_" + i + "_" + j + "_forum.properties")));
608
- //
609
- // auxMin = Long.parseLong(properties.getProperty("min_write_event_start_time"));
610
- // min = auxMin < min ? auxMin : min;
611
- // auxMax = Long.parseLong(properties.getProperty("max_write_event_start_time"));
612
- // max = auxMax > max ? auxMax : max;
613
- // numEvents += Long.parseLong(properties.getProperty("num_events"));
614
- //
615
- // fs.delete(new Path(conf.get("outputDir") + "/social_network/updateStream_" + i + "_" + j + "_person.properties"),true);
616
- // fs.delete(new Path(conf.get("outputDir") + "/social_network/updateStream_" + i + "_" + j + "_forum.properties"),true);
617
- // }
618
- // }
619
- // }
620
- //
621
- // if(conf.getBoolean("updateStreams",false)) {
622
- // OutputStream output = fs.create(new Path(conf.get("outputDir") + "/social_network/updateStream.properties"));
623
- // output.write(new String("ldbc.snb.interactive.gct_delta_duration:"+conf.get("deltaTime")+"\n").getBytes());
624
- // output.write(new String("ldbc.snb.interactive.min_write_event_start_time:"+min+"\n").getBytes());
625
- // output.write(new String("ldbc.snb.interactive.max_write_event_start_time:"+max+"\n").getBytes());
626
- // output.write(new String("ldbc.snb.interactive.update_interleave:"+(max-min)/numEvents+"\n").getBytes());
627
- // output.write(new String("ldbc.snb.interactive.num_events:"+numEvents).getBytes());
628
- // output.close();
629
- // }
630
- //
631
- //
632
- //
633
- // printProgress("Starting: Materialize friends for substitution parameters");
634
- // int resMaterializeFriends = job6.waitForCompletion(true) ? 0 : 1;
635
- // fs.delete(new Path(hadoopDir + "/sibSorting3"),true);
550
+ for ( int i =0 ; i < numThreads ; ++i ) {
551
+ int numPartitions = conf .getInt ("numUpdatePartitions" , 1 );
552
+ for ( int j = 0 ; j < numPartitions ; ++j ) {
553
+ /// --------------- Fifth job: Sort update streams ----------------
554
+ conf .setInt ("mapred.line.input.format.linespermap" , 1000000 );
555
+ conf .setInt ("partitionId" ,j );
556
+ conf .set ("streamType" ,"forum" );
557
+ Job jobForum = new Job (conf , "Soring update streams " +j +" of reducer " +i );
558
+ jobForum .setMapOutputKeyClass (LongWritable .class );
559
+ jobForum .setMapOutputValueClass (Text .class );
560
+ jobForum .setOutputKeyClass (LongWritable .class );
561
+ jobForum .setOutputValueClass (Text .class );
562
+ jobForum .setJarByClass (UpdateEventMapper .class );
563
+ jobForum .setMapperClass (UpdateEventMapper .class );
564
+ jobForum .setReducerClass (UpdateEventReducer .class );
565
+ jobForum .setNumReduceTasks (1 );
566
+ jobForum .setInputFormatClass (SequenceFileInputFormat .class );
567
+ jobForum .setOutputFormatClass (SequenceFileOutputFormat .class );
568
+ jobForum .setPartitionerClass (UpdateEventPartitioner .class );
569
+ FileInputFormat .addInputPath (jobForum , new Path (socialNetDir + "/temp_updateStream_" + i +"_" +j +"_forum" ));
570
+ FileOutputFormat .setOutputPath (jobForum , new Path (hadoopDir + "/sibEnd" ));
571
+ printProgress ("Starting: Sorting update streams" );
572
+ jobForum .waitForCompletion (true );
573
+ fs .delete (new Path (socialNetDir + "/temp_updateStream_" + i +"_" +j +"_forum" ), false );
574
+ fs .delete (new Path (hadoopDir + "/sibEnd" ), true );
575
+
576
+ conf .setInt ("mapred.line.input.format.linespermap" , 1000000 );
577
+ conf .setInt ("partitionId" ,j );
578
+ conf .set ("streamType" ,"person" );
579
+ Job jobPerson = new Job (conf , "Soring update streams " +j +" of reducer " +i );
580
+ jobPerson .setMapOutputKeyClass (LongWritable .class );
581
+ jobPerson .setMapOutputValueClass (Text .class );
582
+ jobPerson .setOutputKeyClass (LongWritable .class );
583
+ jobPerson .setOutputValueClass (Text .class );
584
+ jobPerson .setJarByClass (UpdateEventMapper .class );
585
+ jobPerson .setMapperClass (UpdateEventMapper .class );
586
+ jobPerson .setReducerClass (UpdateEventReducer .class );
587
+ jobPerson .setNumReduceTasks (1 );
588
+ jobPerson .setInputFormatClass (SequenceFileInputFormat .class );
589
+ jobPerson .setOutputFormatClass (SequenceFileOutputFormat .class );
590
+ jobPerson .setPartitionerClass (UpdateEventPartitioner .class );
591
+ FileInputFormat .addInputPath (jobPerson , new Path (socialNetDir + "/temp_updateStream_" + i +"_" +j +"_person" ));
592
+ FileOutputFormat .setOutputPath (jobPerson , new Path (hadoopDir + "/sibEnd" ));
593
+ printProgress ("Starting: Sorting update streams" );
594
+ jobPerson .waitForCompletion (true );
595
+ fs .delete (new Path (socialNetDir + "/temp_updateStream_" + i +"_" +j +"_person" ), false );
596
+ fs .delete (new Path (hadoopDir + "/sibEnd" ), true );
597
+
598
+ if (conf .getBoolean ("updateStreams" ,false )) {
599
+ Properties properties = new Properties ();
600
+ properties .load (fs .open (new Path (conf .get ("outputDir" ) + "/social_network/updateStream_" + i + "_" + j + "_person.properties" )));
601
+ Long auxMin = Long .parseLong (properties .getProperty ("min_write_event_start_time" ));
602
+ min = auxMin < min ? auxMin : min ;
603
+ Long auxMax = Long .parseLong (properties .getProperty ("max_write_event_start_time" ));
604
+ max = auxMax > max ? auxMax : max ;
605
+ numEvents += Long .parseLong (properties .getProperty ("num_events" ));
606
+
607
+ properties .load (fs .open (new Path (conf .get ("outputDir" ) + "/social_network/updateStream_" + i + "_" + j + "_forum.properties" )));
608
+
609
+ auxMin = Long .parseLong (properties .getProperty ("min_write_event_start_time" ));
610
+ min = auxMin < min ? auxMin : min ;
611
+ auxMax = Long .parseLong (properties .getProperty ("max_write_event_start_time" ));
612
+ max = auxMax > max ? auxMax : max ;
613
+ numEvents += Long .parseLong (properties .getProperty ("num_events" ));
614
+
615
+ fs .delete (new Path (conf .get ("outputDir" ) + "/social_network/updateStream_" + i + "_" + j + "_person.properties" ),true );
616
+ fs .delete (new Path (conf .get ("outputDir" ) + "/social_network/updateStream_" + i + "_" + j + "_forum.properties" ),true );
617
+ }
618
+ }
619
+ }
620
+
621
+ if (conf .getBoolean ("updateStreams" ,false )) {
622
+ OutputStream output = fs .create (new Path (conf .get ("outputDir" ) + "/social_network/updateStream.properties" ));
623
+ output .write (new String ("ldbc.snb.interactive.gct_delta_duration:" +conf .get ("deltaTime" )+"\n " ).getBytes ());
624
+ output .write (new String ("ldbc.snb.interactive.min_write_event_start_time:" +min +"\n " ).getBytes ());
625
+ output .write (new String ("ldbc.snb.interactive.max_write_event_start_time:" +max +"\n " ).getBytes ());
626
+ output .write (new String ("ldbc.snb.interactive.update_interleave:" +(max -min )/numEvents +"\n " ).getBytes ());
627
+ output .write (new String ("ldbc.snb.interactive.num_events:" +numEvents ).getBytes ());
628
+ output .close ();
629
+ }
630
+
631
+
632
+
633
+ printProgress ("Starting: Materialize friends for substitution parameters" );
634
+ int resMaterializeFriends = job6 .waitForCompletion (true ) ? 0 : 1 ;
635
+ fs .delete (new Path (hadoopDir + "/sibSorting3" ),true );
636
636
637
637
638
638
long end = System .currentTimeMillis ();
639
639
System .out .println (((end - start ) / 1000 )
640
640
+ " total seconds" );
641
- // for( int i = 0; i < numThreads; ++i ) {
642
- // fs.copyToLocalFile(new Path(socialNetDir + "/m"+i+"factors.txt"), new Path("./"));
643
- // fs.copyToLocalFile(new Path(socialNetDir + "/m0friendList"+i+".csv"), new Path("./"));
644
- // }
641
+ for ( int i = 0 ; i < numThreads ; ++i ) {
642
+ fs .copyToLocalFile (new Path (socialNetDir + "/m" +i +"factors.txt" ), new Path ("./" ));
643
+ fs .copyToLocalFile (new Path (socialNetDir + "/m0friendList" +i +".csv" ), new Path ("./" ));
644
+ }
645
645
return res ;
646
646
}
647
647
0 commit comments