Skip to content

Commit 746be11

Browse files
committed
Implemented update stream partitioning
1 parent fbb9f60 commit 746be11

File tree

5 files changed

+116
-58
lines changed

5 files changed

+116
-58
lines changed

src/main/java/ldbc/socialnet/dbgen/generator/MRGenerateUsers.java

Lines changed: 52 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -97,14 +97,16 @@ protected void setup(Context context){
9797
FileSystem fs = FileSystem.get(conf);
9898
String strTaskId = context.getTaskAttemptID().getTaskID().toString();
9999
int attempTaskId = Integer.parseInt(strTaskId.substring(strTaskId.length() - 3));
100+
int partitionId = conf.getInt("partitionId",0);
101+
String streamType = conf.get("streamType");
100102
if( Boolean.parseBoolean(conf.get("compressed")) == true ) {
101-
Path outFile = new Path(context.getConfiguration().get("outputDir")+"/social_network/updateStream_"+attempTaskId+".csv.gz");
103+
Path outFile = new Path(context.getConfiguration().get("outputDir")+"/social_network/updateStream_"+attempTaskId+"_"+partitionId+"_"+streamType+".csv.gz");
102104
out = new GZIPOutputStream( fs.create(outFile));
103105
} else {
104-
Path outFile = new Path(context.getConfiguration().get("outputDir")+"/social_network/updateStream_"+attempTaskId+".csv");
106+
Path outFile = new Path(context.getConfiguration().get("outputDir")+"/social_network/updateStream_"+attempTaskId+"_"+partitionId+"_"+streamType+".csv");
105107
out = fs.create(outFile);
106108
}
107-
properties = fs.create(new Path(context.getConfiguration().get("outputDir")+"/social_network/updateStream_"+attempTaskId+".properties"));
109+
properties = fs.create(new Path(context.getConfiguration().get("outputDir")+"/social_network/updateStream_"+attempTaskId+"_"+partitionId+"_"+streamType+".properties"));
108110
} catch (IOException e) {
109111
e.printStackTrace();
110112
}
@@ -481,25 +483,6 @@ public int runGenerateJob(Configuration conf) throws Exception {
481483

482484

483485

484-
/// --------------- Fifth job: Sort update streams ----------------
485-
conf.setInt("mapred.line.input.format.linespermap", 1000000);
486-
Job job5 = new Job(conf,"Soring update streams");
487-
job5.setMapOutputKeyClass(LongWritable.class);
488-
job5.setMapOutputValueClass(Text.class);
489-
job5.setOutputKeyClass(LongWritable.class);
490-
job5.setOutputValueClass(Text.class);
491-
job5.setJarByClass(UpdateEventMapper.class);
492-
job5.setMapperClass(UpdateEventMapper.class);
493-
job5.setReducerClass(UpdateEventReducer.class);
494-
job5.setNumReduceTasks(1);
495-
job5.setInputFormatClass(SequenceFileInputFormat.class);
496-
job5.setOutputFormatClass(SequenceFileOutputFormat.class);
497-
job5.setPartitionerClass(UpdateEventPartitioner.class);
498-
499-
for( int i =0; i < numThreads; ++i ) {
500-
FileInputFormat.addInputPath(job5, new Path(socialNetDir + "/temp_updateStream_"+i+".csv"));
501-
}
502-
FileOutputFormat.setOutputPath(job5, new Path(hadoopDir + "/sibEnd") );
503486

504487
/// --------------- Sixth job: Materialize the friends lists ----------------
505488
Job job6 = new Job(conf,"Dump the friends lists");
@@ -520,10 +503,7 @@ public int runGenerateJob(Configuration conf) throws Exception {
520503
FileOutputFormat.setOutputPath(job6, new Path(hadoopDir + "/job6") );
521504

522505

523-
524-
525506
/// --------- Execute Jobs ------
526-
527507
long start = System.currentTimeMillis();
528508

529509
printProgress("Starting: Person generation and friendship generation 1");
@@ -553,13 +533,55 @@ public int runGenerateJob(Configuration conf) throws Exception {
553533
int resUpdateStreams = job4.waitForCompletion(true) ? 0 : 1;
554534
fs.delete(new Path(hadoopDir + "/sib4"),true);
555535

556-
printProgress("Starting: Sorting update streams");
557-
int sortUpdateStreams= job5.waitForCompletion(true) ? 0 : 1;
558-
559536
for( int i =0; i < numThreads; ++i ) {
560-
fs.delete(new Path(socialNetDir + "/temp_updateStream_"+i+".csv"),false);
537+
int numPartitions = conf.getInt("numUpdatePartitions", 1);
538+
for( int j = 0; j < numPartitions; ++j ) {
539+
/// --------------- Fifth job: Sort update streams ----------------
540+
conf.setInt("mapred.line.input.format.linespermap", 1000000);
541+
conf.setInt("partitionId",j);
542+
conf.set("streamType","forum");
543+
Job jobForum = new Job(conf, "Soring update streams "+j+" of reducer "+i);
544+
jobForum.setMapOutputKeyClass(LongWritable.class);
545+
jobForum.setMapOutputValueClass(Text.class);
546+
jobForum.setOutputKeyClass(LongWritable.class);
547+
jobForum.setOutputValueClass(Text.class);
548+
jobForum.setJarByClass(UpdateEventMapper.class);
549+
jobForum.setMapperClass(UpdateEventMapper.class);
550+
jobForum.setReducerClass(UpdateEventReducer.class);
551+
jobForum.setNumReduceTasks(1);
552+
jobForum.setInputFormatClass(SequenceFileInputFormat.class);
553+
jobForum.setOutputFormatClass(SequenceFileOutputFormat.class);
554+
jobForum.setPartitionerClass(UpdateEventPartitioner.class);
555+
FileInputFormat.addInputPath(jobForum, new Path(socialNetDir + "/temp_updateStream_" + i+"_"+j+"_forum"));
556+
FileOutputFormat.setOutputPath(jobForum, new Path(hadoopDir + "/sibEnd"));
557+
printProgress("Starting: Sorting update streams");
558+
jobForum.waitForCompletion(true);
559+
fs.delete(new Path(socialNetDir + "/temp_updateStream_" + i+"_"+j+"_forum"), false);
560+
fs.delete(new Path(hadoopDir + "/sibEnd"), true);
561+
562+
conf.setInt("mapred.line.input.format.linespermap", 1000000);
563+
conf.setInt("partitionId",j);
564+
conf.set("streamType","person");
565+
Job jobPerson = new Job(conf, "Soring update streams "+j+" of reducer "+i);
566+
jobPerson.setMapOutputKeyClass(LongWritable.class);
567+
jobPerson.setMapOutputValueClass(Text.class);
568+
jobPerson.setOutputKeyClass(LongWritable.class);
569+
jobPerson.setOutputValueClass(Text.class);
570+
jobPerson.setJarByClass(UpdateEventMapper.class);
571+
jobPerson.setMapperClass(UpdateEventMapper.class);
572+
jobPerson.setReducerClass(UpdateEventReducer.class);
573+
jobPerson.setNumReduceTasks(1);
574+
jobPerson.setInputFormatClass(SequenceFileInputFormat.class);
575+
jobPerson.setOutputFormatClass(SequenceFileOutputFormat.class);
576+
jobPerson.setPartitionerClass(UpdateEventPartitioner.class);
577+
FileInputFormat.addInputPath(jobPerson, new Path(socialNetDir + "/temp_updateStream_" + i+"_"+j+"_person"));
578+
FileOutputFormat.setOutputPath(jobPerson, new Path(hadoopDir + "/sibEnd"));
579+
printProgress("Starting: Sorting update streams");
580+
jobPerson.waitForCompletion(true);
581+
fs.delete(new Path(socialNetDir + "/temp_updateStream_" + i+"_"+j+"_person"), false);
582+
fs.delete(new Path(hadoopDir + "/sibEnd"), true);
583+
}
561584
}
562-
fs.delete(new Path(hadoopDir + "/sibEnd"),true);
563585

564586
printProgress("Starting: Materialize friends for substitution parameters");
565587
int resMaterializeFriends = job6.waitForCompletion(true) ? 0 : 1;

src/main/java/ldbc/socialnet/dbgen/generator/ScalableGenerator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -799,6 +799,7 @@ public void generateUserActivity( ReducedUserProfile userProfile, Reducer<MapRed
799799
generatePhotos(reducedUserProfiles[index], extraInfo);
800800
generateUserGroups(reducedUserProfiles[index], extraInfo);
801801
if( numUserProfilesRead % 100 == 0) context.setStatus("Generated post and photo for "+numUserProfilesRead+" users");
802+
dataExporter.changePartition();
802803
}
803804

804805
private void generateUserGroups(ReducedUserProfile userProfile, UserExtraInfo extraInfo ) {
@@ -1514,7 +1515,7 @@ private DataExporter getSerializer(String type, String outputFileName) {
15141515
return null;
15151516
}
15161517
return new DataExporter(format,sibOutputDir, threadId, dateThreshold,
1517-
exportText,enableCompression,tagDictionary,browserDictonry,companiesDictionary,
1518+
exportText,enableCompression,conf.getInt("numUpdatePartitions",1),tagDictionary,browserDictonry,companiesDictionary,
15181519
unversityDictionary,ipAddDictionary,locationDictionary,languageDictionary, configFile, factorTable, startMonth, startYear, stats);
15191520
}
15201521

src/main/java/ldbc/socialnet/dbgen/serializer/DataExporter.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ public DataExporter( DataFormat format,
7878
long dateThreshold,
7979
boolean exportText,
8080
boolean compressed,
81+
int numPartitions,
8182
TagDictionary tagDic,
8283
BrowserDictionary browsers,
8384
CompanyDictionary companyDic,
@@ -111,10 +112,14 @@ public DataExporter( DataFormat format,
111112
} else if( format == DataFormat.NONE) {
112113
staticSerializer = new EmptySerializer();
113114
}
114-
updateStreamSerializer = new UpdateEventSerializer(directory,"temp_updateStream_"+reducerId+".csv",exportText, compressed,tagDic,browsers,languageDic,ipDic, statistics);
115+
updateStreamSerializer = new UpdateEventSerializer(directory,"temp_updateStream_"+reducerId,exportText, numPartitions,tagDic,browsers,languageDic,ipDic, statistics);
115116
exportCommonEntities();
116117
}
117118

119+
public void changePartition(){
120+
updateStreamSerializer.changePartition();
121+
}
122+
118123
public void close() {
119124
staticSerializer.close();
120125
updateStreamSerializer.close();

src/main/java/ldbc/socialnet/dbgen/serializer/UpdateEventSerializer.java

Lines changed: 53 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@
6767
*/
6868
public class UpdateEventSerializer implements Serializer{
6969

70-
private OutputStream fileOutputStream;
71-
private SequenceFile.Writer hdfsWriter;
70+
private SequenceFile.Writer forumStreamWriter[];
71+
private SequenceFile.Writer personStreamWriter[];
7272
private ArrayList<Object> data;
7373
private ArrayList<Object> list;
7474
private UpdateEvent currentEvent;
@@ -83,8 +83,18 @@ public class UpdateEventSerializer implements Serializer{
8383
private long maxDate;
8484
private Gson gson;
8585
private long numEvents = 0;
86-
87-
public UpdateEventSerializer( String outputDir, String outputFileName,boolean exportText, boolean compress, TagDictionary tagDic, BrowserDictionary browserDic, LanguageDictionary languageDic, IPAddressDictionary ipDic, Statistics statistics) {
86+
private int numPartitions = 1;
87+
private int nextPartition = 0;
88+
89+
public UpdateEventSerializer( String outputDir,
90+
String outputFileName,
91+
boolean exportText,
92+
int numPartitions,
93+
TagDictionary tagDic,
94+
BrowserDictionary browserDic,
95+
LanguageDictionary languageDic,
96+
IPAddressDictionary ipDic,
97+
Statistics statistics) {
8898
gson = new GsonBuilder().disableHtmlEscaping().create();
8999
this.data = new ArrayList<Object>();
90100
this.currentEvent = new UpdateEvent(-1, UpdateEvent.UpdateEventType.NO_EVENT,new String(""));
@@ -93,22 +103,22 @@ public UpdateEventSerializer( String outputDir, String outputFileName,boolean ex
93103
this.languageDic = languageDic;
94104
this.ipDic = ipDic;
95105
this.exportText = exportText;
106+
this.numPartitions = numPartitions;
96107
this.tagDic = tagDic;
97108
this.statistics = statistics;
98109
this.minDate = Long.MAX_VALUE;
99110
this.maxDate = Long.MIN_VALUE;
100111
try{
112+
this.forumStreamWriter = new SequenceFile.Writer[this.numPartitions];
113+
this.personStreamWriter = new SequenceFile.Writer[this.numPartitions];
101114
Configuration conf = new Configuration();
102115
FileSystem fs = FileSystem.get(conf);
103-
/*if( compress ) {
104-
this.fileOutputStream = new GZIPOutputStream(new FileOutputStream(outputDir + "/" + outputFileName +".gz"));
105-
} else {
106-
this.fileOutputStream = new FileOutputStream(outputDir + "/" + outputFileName );
116+
for( int i = 0; i < numPartitions; ++i ) {
117+
Path outFile = new Path(outputDir + "/" + outputFileName+"_"+i+"_forum");
118+
forumStreamWriter[i] = new SequenceFile.Writer(fs, conf, outFile, LongWritable.class, Text.class);
119+
outFile = new Path(outputDir + "/" + outputFileName+"_"+i+"_person");
120+
personStreamWriter[i] = new SequenceFile.Writer(fs, conf, outFile, LongWritable.class, Text.class);
107121
}
108-
hdfsOutput = new FSDataOutputStream(this.fileOutputStream, new FileSystem.Statistics(null));
109-
*/
110-
Path outFile = new Path(outputDir + "/" + outputFileName);
111-
hdfsWriter = new SequenceFile.Writer(fs, conf,outFile, LongWritable.class,Text.class);
112122
} catch(IOException e){
113123
System.err.println(e.getMessage());
114124
System.exit(-1);
@@ -201,8 +211,11 @@ public UpdateEventSerializer( String outputDir, String outputFileName,boolean ex
201211
// statistics.eventParams.add(params);
202212
}
203213

214+
public void changePartition() {
215+
nextPartition = (++nextPartition) % numPartitions;
216+
}
204217

205-
public void writeKeyValue( UpdateEvent event ) {
218+
public void writeKeyValue( UpdateEvent event, Stream s ) {
206219
try{
207220
StringBuffer string = new StringBuffer();
208221
string.append(Long.toString(event.date));
@@ -212,8 +225,14 @@ public void writeKeyValue( UpdateEvent event ) {
212225
string.append(event.eventData);
213226
string.append("|");
214227
string.append("\n");
215-
//fileOutputStream.write(string.toString().getBytes("UTF8"));
216-
hdfsWriter.append(new LongWritable(event.date),new Text(string.toString()));
228+
switch (s) {
229+
case FORUM_STREAM:
230+
forumStreamWriter[nextPartition].append(new LongWritable(event.date),new Text(string.toString()));
231+
break;
232+
case PERSON_STREAM:
233+
personStreamWriter[nextPartition].append(new LongWritable(event.date),new Text(string.toString()));
234+
break;
235+
}
217236
} catch(IOException e){
218237
System.err.println(e.getMessage());
219238
System.exit(-1);
@@ -229,10 +248,15 @@ private void beginEvent( long date, UpdateEvent.UpdateEventType type ) {
229248
data.clear();
230249
}
231250

232-
private void endEvent() {
251+
enum Stream {
252+
FORUM_STREAM,
253+
PERSON_STREAM
254+
}
255+
256+
private void endEvent( Stream s ) {
233257
numEvents++;
234258
currentEvent.eventData = gson.toJson(data);
235-
writeKeyValue(currentEvent);
259+
writeKeyValue(currentEvent, s);
236260
}
237261

238262
private void beginList() {
@@ -256,7 +280,10 @@ public void close() {
256280
System.out.println("Number of update events serialized "+numEvents);
257281

258282
try {
259-
hdfsWriter.close();
283+
for( int i = 0; i < numPartitions; ++i ) {
284+
forumStreamWriter[i].close();
285+
personStreamWriter[i].close();
286+
}
260287
} catch(IOException e){
261288
System.err.println(e.getMessage());
262289
System.exit(-1);
@@ -377,7 +404,7 @@ public void serialize(UserInfo info) {
377404
list.add(workAtData);
378405
}
379406
endList();
380-
endEvent();
407+
endEvent(Stream.PERSON_STREAM);
381408
}
382409

383410
@Override
@@ -388,7 +415,7 @@ public void serialize(Friend friend) {
388415
data.add(friend.getFriendAcc());
389416
date.setTimeInMillis(friend.getCreatedTime());
390417
data.add(DateGenerator.formatDateDetail(date));
391-
endEvent();
418+
endEvent(Stream.PERSON_STREAM);
392419
}
393420
}
394421

@@ -433,7 +460,7 @@ public void serialize(Post post) {
433460
list.add(tagId);
434461
}
435462
endList();
436-
endEvent();
463+
endEvent(Stream.FORUM_STREAM);
437464
}
438465

439466
@Override
@@ -448,7 +475,7 @@ public void serialize(Like like) {
448475
data.add(like.user);
449476
data.add(Long.parseLong(SN.formId(like.messageId)));
450477
data.add(dateString);
451-
endEvent();
478+
endEvent(Stream.FORUM_STREAM);
452479
}
453480

454481
@Override
@@ -485,7 +512,7 @@ public void serialize(Photo photo) {
485512
list.add(tagId);
486513
}
487514
endList();
488-
endEvent();
515+
endEvent(Stream.FORUM_STREAM);
489516
}
490517

491518
@Override
@@ -531,7 +558,7 @@ public void serialize(Comment comment) {
531558
list.add(tagId);
532559
}
533560
endList();
534-
endEvent();
561+
endEvent(Stream.FORUM_STREAM);
535562
}
536563

537564
@Override
@@ -551,7 +578,7 @@ public void serialize(Group group) {
551578
list.add(groupTags[i]);
552579
}
553580
endList();
554-
endEvent();
581+
endEvent(Stream.FORUM_STREAM);
555582
}
556583

557584
@Override
@@ -562,7 +589,7 @@ public void serialize(GroupMemberShip membership) {
562589
data.add(Long.parseLong(SN.formId(membership.getGroupId())));
563590
data.add(membership.getUserId());
564591
data.add(dateString);
565-
endEvent();
592+
endEvent(Stream.FORUM_STREAM);
566593
}
567594

568595
@Override

src/main/java/ldbc/socialnet/dbgen/util/ConfigParser.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public static Configuration GetConfig( String paramsFile ) {
2020
conf.set("updateStreams",Boolean.toString(false));
2121
conf.set("outputDir","./");
2222
conf.set("deltaTime","10000");
23+
conf.set("numUpdatePartitions","1");
2324

2425
try {
2526
//First read the internal params.ini
@@ -34,6 +35,7 @@ public static Configuration GetConfig( String paramsFile ) {
3435
CheckOption(conf, "numPersons", properties);
3536
CheckOption(conf, "numYears", properties);
3637
CheckOption(conf,"startYear",properties);
38+
CheckOption(conf,"numUpdatePartitions",properties);
3739
if(conf.get("fs.default.name").compareTo("file:///") == 0 ) {
3840
System.out.println("Running in standalone mode. Setting numThreads to 1");
3941
conf.set("numThreads","1");
@@ -67,6 +69,7 @@ public static void PringConfig( Configuration conf ) {
6769
System.out.println("compressed: "+conf.get("compressed"));
6870
System.out.println("updateStreams: "+conf.get("updateStreams"));
6971
System.out.println("outputDir: "+conf.get("outputDir"));
72+
System.out.println("numUpdatePartitions: "+conf.get("numUpdatePartitions"));
7073
System.out.println("*********************************");
7174
}
7275
}

0 commit comments

Comments
 (0)