42
42
import ldbc .socialnet .dbgen .generator .ScalableGenerator ;
43
43
import ldbc .socialnet .dbgen .objects .*;
44
44
import ldbc .socialnet .dbgen .serializer .CSVSerializer .CSVSerializer ;
45
+ import org .apache .hadoop .conf .Configuration ;
46
+ import org .apache .hadoop .fs .FSDataOutputStream ;
47
+ import org .apache .hadoop .fs .FileSystem ;
48
+ import org .apache .hadoop .fs .Path ;
45
49
50
+ import java .io .IOException ;
46
51
import java .util .*;
47
52
48
53
/**
@@ -71,6 +76,7 @@ public enum DataFormat {
71
76
private int startMonth , startYear ;
72
77
private int reducerId = 0 ;
73
78
private long deltaTime = 0 ;
79
+ FSDataOutputStream out ;
74
80
GregorianCalendar c ;
75
81
76
82
public DataExporter ( DataFormat format ,
@@ -117,6 +123,14 @@ public DataExporter( DataFormat format,
117
123
staticSerializer = new EmptySerializer ();
118
124
}
119
125
updateStreamSerializer = new UpdateEventSerializer (directory ,"temp_updateStream_" +reducerId ,exportText , numPartitions ,tagDic ,browsers ,languageDic ,ipDic , statistics );
126
+
127
+ try {
128
+ FileSystem fs = FileSystem .get (new Configuration ());
129
+ Path outFile = new Path (directory + "/m0friendList" + reducerId + ".csv" );
130
+ out = fs .create (outFile );
131
+ } catch (IOException e ) {
132
+ System .out .println (e .getMessage ());
133
+ }
120
134
exportCommonEntities ();
121
135
}
122
136
@@ -125,6 +139,12 @@ public void changePartition(){
125
139
}
126
140
127
141
public void close () {
142
+ try {
143
+ out .flush ();
144
+ out .close ();
145
+ } catch ( IOException e ) {
146
+ System .out .println (e .getMessage ());
147
+ }
128
148
staticSerializer .close ();
129
149
updateStreamSerializer .close ();
130
150
}
@@ -224,6 +244,9 @@ public void exportTags() {
224
244
225
245
public void export ( UserInfo userInfo ) {
226
246
long creationDate = userInfo .user .getCreationDate ();
247
+ if (userInfo .user .getAccountId () == 18691698848287L ) {
248
+ System .out .println ("creation date " +creationDate +" " +dateThreshold );
249
+ }
227
250
if ( creationDate <= dateThreshold ) {
228
251
staticSerializer .serialize (userInfo );
229
252
} else {
@@ -263,21 +286,38 @@ public void export( UserInfo userInfo ) {
263
286
}
264
287
}
265
288
289
+ StringBuffer strbuf = new StringBuffer ();
290
+ strbuf .append (userInfo .user .getAccountId ());
266
291
Friend friends [] = userInfo .user .getFriendList ();
267
292
int numFriends = friends .length ;
268
293
for ( int i = 0 ; i < numFriends ; ++i ) {
269
294
if (friends [i ] != null && friends [i ].getCreatedTime () != -1 ) {
270
295
if ( friends [i ].getCreatedTime () <= dateThreshold ) {
296
+
297
+ if (userInfo .user .getAccountId () == 18691698848287L ) {
298
+ System .out .println ("ENTRA friend creation date " +friends [i ].getCreatedTime ());
299
+ }
271
300
staticSerializer .serialize (friends [i ]);
272
301
if (!factorTable .containsKey (userInfo .user .getAccountId ()))
273
302
factorTable .put (userInfo .user .getAccountId (), new ReducedUserProfile .Counts ());
274
303
factorTable .get (userInfo .user .getAccountId ()).numberOfFriends ++;
304
+ strbuf .append ("," );
305
+ strbuf .append (friends [i ].getFriendAcc ());
275
306
} else {
276
307
updateStreamSerializer .setCurrentDependantDate (friends [i ].dependantDate );
277
308
updateStreamSerializer .serialize (friends [i ]);
278
309
}
279
310
}
280
311
}
312
+ strbuf .append ("\n " );
313
+
314
+ try {
315
+ if (creationDate <= dateThreshold ) {
316
+ out .write (strbuf .toString ().getBytes ());
317
+ }
318
+ }catch (IOException e ) {
319
+ System .out .println (e .getMessage ());
320
+ }
281
321
}
282
322
283
323
public void export (Post post , long dependantDate ) {
0 commit comments