Skip to content

Commit 13f8a8c

Browse files
committed
Periodically cleaning part of factor table after block processing
Updated run.sh script Now, factors are output into two separate files: activityFactors and personFactors
1 parent 9eebded commit 13f8a8c

File tree

6 files changed

+74
-19
lines changed

6 files changed

+74
-19
lines changed

run.sh

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,10 @@ then
2727
mkdir -p substitution_parameters
2828
python paramgenerator/generateparams.py $LDBC_SNB_DATAGEN_HOME substitution_parameters/
2929
python paramgenerator/generateparamsbi.py $LDBC_SNB_DATAGEN_HOME substitution_parameters/
30-
rm -f m*factors*
31-
rm -f .m*factors*
30+
rm -f m*personFactors*
31+
rm -f .m*personFactors*
32+
rm -f m*activityFactors*
33+
rm -f .m*activityFactors*
3234
rm -f m0friendList*
3335
rm -f .m0friendList*
3436
fi

src/main/java/ldbc/snb/datagen/generator/DatagenParams.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ public class DatagenParams {
5757
public static final String IPZONE_DIRECTORY = "/ipaddrByCountries";
5858
public static final String STATS_FILE = "testdata.json";
5959
public static final String RDF_OUTPUT_FILE = "ldbc_socialnet_dbg";
60-
public static final String PARAM_COUNT_FILE = "factors.txt";
60+
public static final String PERSON_COUNTS_FILE = "personFactors.txt";
61+
public static final String ACTIVITY_FILE = "activityFactors.txt";
6162

6263
// Dictionaries dataset files
6364
public static final String browserDictonryFile = DICTIONARY_DIRECTORY + "browsersDic.txt";

src/main/java/ldbc/snb/datagen/generator/LDBCDatagen.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,8 @@ public int runGenerateJob(Configuration conf) throws Exception {
140140

141141
for( int i = 0; i < numThreads; ++i ) {
142142
if( i < numBlocks ) {
143-
fs.copyToLocalFile(false, new Path(DatagenParams.hadoopDir + "/m" + i + "factors.txt"), new Path("./"));
143+
fs.copyToLocalFile(false, new Path(DatagenParams.hadoopDir + "/m" + i + "personFactors.txt"), new Path("./"));
144+
fs.copyToLocalFile(false, new Path(DatagenParams.hadoopDir + "/m" + i + "activityFactors.txt"), new Path("./"));
144145
fs.copyToLocalFile(false, new Path(DatagenParams.hadoopDir + "/m0friendList" + i + ".csv"), new Path("./"));
145146
}
146147
}

src/main/java/ldbc/snb/datagen/generator/PersonActivityGenerator.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,8 @@ public PersonActivityGenerator( PersonActivitySerializer serializer, UpdateEvent
4949
}
5050

5151
private void generateActivity( Person person, ArrayList<Person> block ) {
52-
System.out.println("Generating wall");
5352
generateWall(person, block);
54-
System.out.println("Generating groups");
5553
generateGroups(person, block);
56-
System.out.println("Generating albums");
5754
generateAlbums(person, block);
5855
if(person.creationDate() < Dictionaries.dates.getUpdateThreshold() || !DatagenParams.updateStreams ) {
5956
factorTable_.extractFactors(person);
@@ -148,7 +145,7 @@ public void generateActivityForBlock( int seed, ArrayList<Person> block, Context
148145
personActivitySerializer_.reset();
149146
int counter = 0;
150147
float personGenerationTime = 0.0f;
151-
float accumTime = 0;
148+
long initTime = System.currentTimeMillis();
152149
for( Person p : block ) {
153150
System.out.println("Generating activity for person "+counter+" with degree "+p.knows().size());
154151
long start = System.currentTimeMillis();
@@ -162,15 +159,18 @@ public void generateActivityForBlock( int seed, ArrayList<Person> block, Context
162159
}
163160
float time = (System.currentTimeMillis() - start)/1000.0f;
164161
personGenerationTime+=time;
165-
accumTime += time;
166-
System.out.println("Time to generate activity for person "+counter+": "+time+". Throughput "+counter/accumTime);
162+
System.out.println("Time to generate activity for person "+counter+": "+time+". Throughput "+counter/((System.currentTimeMillis() - initTime)*0.001));
167163
counter++;
168164
}
169165
System.out.println("Average person activity generation time "+personGenerationTime / (float)block.size());
170166
}
171167

172168

173-
public void writeFactors( OutputStream writer) {
174-
factorTable_.write(writer);
169+
public void writeActivityFactors( OutputStream writer) {
170+
factorTable_.writeActivityFactors(writer);
175171
}
172+
173+
public void writePersonFactors( OutputStream writer) {
174+
factorTable_.writePersonFactors(writer);
175+
}
176176
}

src/main/java/ldbc/snb/datagen/hadoop/HadoopPersonActivityGenerator.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ public static class HadoopPersonActivityGeneratorReducer extends Reducer<BlockK
4040
private PersonActivitySerializer personActivitySerializer_;
4141
private PersonActivityGenerator personActivityGenerator_;
4242
private UpdateEventSerializer updateSerializer_;
43-
private OutputStream factors_;
43+
private OutputStream personFactors_;
44+
private OutputStream activityFactors_;
4445
private OutputStream friends_;
4546
private FileSystem fs_;
4647

@@ -58,7 +59,8 @@ protected void setup(Context context) {
5859
personActivityGenerator_ = new PersonActivityGenerator(personActivitySerializer_, updateSerializer_);
5960

6061
fs_ = FileSystem.get(context.getConfiguration());
61-
factors_ = fs_.create(new Path(DatagenParams.hadoopDir+"/"+ "m" + reducerId + DatagenParams.PARAM_COUNT_FILE));
62+
personFactors_ = fs_.create(new Path(DatagenParams.hadoopDir+"/"+ "m" + reducerId + DatagenParams.PERSON_COUNTS_FILE));
63+
activityFactors_ = fs_.create(new Path(DatagenParams.hadoopDir+"/"+ "m" + reducerId + DatagenParams.ACTIVITY_FILE));
6264
friends_ = fs_.create(new Path(DatagenParams.hadoopDir+"/"+ "m0friendList" + reducerId +".csv"));
6365

6466
} catch( Exception e ) {
@@ -91,12 +93,14 @@ public void reduce(BlockKey key, Iterable<Person> valueSet,Context context)
9193
}
9294
System.out.println("Starting generation of block: "+key.block);
9395
personActivityGenerator_.generateActivityForBlock((int)key.block, persons, context );
96+
personActivityGenerator_.writePersonFactors(personFactors_);
9497
}
9598
protected void cleanup(Context context){
9699
try {
97100
System.out.println("Cleaning up");
98-
personActivityGenerator_.writeFactors(factors_);
99-
factors_.close();
101+
personActivityGenerator_.writeActivityFactors(activityFactors_);
102+
activityFactors_.close();
103+
personFactors_.close();
100104
friends_.close();
101105
} catch (IOException e) {
102106
e.printStackTrace();

src/main/java/ldbc/snb/datagen/util/FactorTable.java

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -254,15 +254,62 @@ public void extractFactors( Like like ) {
254254
personCounts(like.user).incrNumLikes();
255255
}
256256

257-
public void write(OutputStream writer ) {
257+
public void writePersonFactors(OutputStream writer ) {
258258
try {
259-
Iterator<Map.Entry<Long,PersonCounts>> iter = personCounts_.entrySet().iterator();
259+
for (Map.Entry<Long, PersonCounts> c: personCounts_.entrySet()){
260+
PersonCounts count = c.getValue();
261+
// correct the group counts
262+
//count.numberOfGroups += count.numberOfFriends;
263+
String name = medianFirstName_.get(c.getKey());
264+
if( name != null ) {
265+
StringBuffer strbuf = new StringBuffer();
266+
strbuf.append(c.getKey()); strbuf.append(",");
267+
strbuf.append(name);
268+
strbuf.append(",");
269+
strbuf.append(count.numFriends());
270+
strbuf.append(",");
271+
strbuf.append(count.numPosts());
272+
strbuf.append(",");
273+
strbuf.append(count.numLikes());
274+
strbuf.append(",");
275+
strbuf.append(count.numTagsOfMessages());
276+
strbuf.append(",");
277+
strbuf.append(count.numForums());
278+
strbuf.append(",");
279+
strbuf.append(count.numWorkPlaces());
280+
strbuf.append(",");
281+
strbuf.append(count.numComments());
282+
strbuf.append(",");
283+
284+
for (Long bucket : count.numMessagesPerMonth()) {
285+
strbuf.append(bucket);
286+
strbuf.append(",");
287+
}
288+
for (Long bucket : count.numForumsPerMonth()) {
289+
strbuf.append(bucket);
290+
strbuf.append(",");
291+
}
292+
strbuf.setCharAt(strbuf.length() - 1, '\n');
293+
writer.write(strbuf.toString().getBytes("UTF8"));
294+
}
295+
}
296+
personCounts_.clear();
297+
} catch (IOException e) {
298+
System.err.println("Unable to write parameter counts");
299+
System.err.println(e.getMessage());
300+
e.printStackTrace();
301+
}
302+
}
303+
304+
public void writeActivityFactors(OutputStream writer ) {
305+
try {
306+
/*Iterator<Map.Entry<Long,PersonCounts>> iter = personCounts_.entrySet().iterator();
260307
while(iter.hasNext()) {
261308
Map.Entry<Long,PersonCounts> entry = iter.next();
262309
if (medianFirstName_.get(entry.getKey()) == null) {
263310
iter.remove();
264311
}
265-
}
312+
}*/
266313
writer.write(Integer.toString(personCounts_.size()).getBytes("UTF8"));
267314
writer.write("\n".getBytes("UTF8"));
268315
for (Map.Entry<Long, PersonCounts> c: personCounts_.entrySet()){

0 commit comments

Comments
 (0)