@@ -31,15 +31,15 @@ public class restructureAvroRecords {
31
31
private final static SimpleDateFormat dateFormatFileName = new SimpleDateFormat ("yyyyMMdd_HH" );
32
32
33
33
private int processedFileCount ;
34
+ private int processedRecordsCount ;
34
35
35
36
public static void main (String [] args ) throws Exception {
36
-
37
37
restructureAvroRecords restr = new restructureAvroRecords (args [0 ], args [2 ]);
38
38
long time1 = System .currentTimeMillis ();
39
39
restr .start (args [1 ]);
40
+ System .out .printf ("Processed %d files and %,d records\n " , restr .getProcessedFileCount (), restr .getProcessedRecordsCount ());
40
41
System .out .printf ("Time taken: %.2f seconds\n " ,(System .currentTimeMillis () - time1 )/1000d );
41
42
42
-
43
43
// restructureAvroRecords restr = new restructureAvroRecords("webhdfs://radar-test.thehyve.net:50070", "output4/");
44
44
// restr.start("/topicE4/");
45
45
@@ -63,6 +63,14 @@ public void setOutputPath(String path) {
63
63
offsetsPath = outputPath + "/" + OFFSETS_FILE_NAME ;
64
64
}
65
65
66
+ public int getProcessedFileCount () {
67
+ return processedFileCount ;
68
+ }
69
+
70
+ public int getProcessedRecordsCount () {
71
+ return processedRecordsCount ;
72
+ }
73
+
66
74
public void start (String directoryName ) throws IOException {
67
75
// Get files and directories
68
76
Path path = new Path (directoryName );
@@ -86,8 +94,6 @@ public void start(String directoryName) throws IOException {
86
94
processTopic (filePath );
87
95
}
88
96
}
89
-
90
- System .out .printf ("%d files processed\n " , processedFileCount );
91
97
}
92
98
93
99
private void processTopic (Path topicPath ) throws IOException {
@@ -133,6 +139,7 @@ record = dataFileReader.next(record);
133
139
this .writeRecord (record , topicName );
134
140
}
135
141
142
+ // Write which file has been processed and update bins
136
143
this .writeSeenOffsets (fileName );
137
144
bins .writeBins ();
138
145
processedFileCount ++;
@@ -154,8 +161,9 @@ private void writeRecord(GenericRecord record, String topicName) throws IOExcept
154
161
String data = record .toString (); // TODO: check whether this indeed always creates valid JSON
155
162
this .appendToFile (dirName , outputFileName , data );
156
163
157
- // Count data
164
+ // Count data (binned and total)
158
165
bins .addToBin (topicName , keyField .get ("sourceId" ).toString (), (Double ) valueField .get ("time" ));
166
+ processedRecordsCount ++;
159
167
}
160
168
161
169
public static String createHourTimestamp (Double time ) {
@@ -230,8 +238,7 @@ private void readSeenOffsets() {
230
238
}
231
239
232
240
} catch (IOException e ) {
233
- // TODO
234
- e .printStackTrace ();
241
+ System .out .println ("No files processed yet." );
235
242
}
236
243
}
237
244
}
0 commit comments