@@ -20,32 +20,37 @@ public class restructureAvroRecords {
20
20
21
21
private final String OUTPUT_FILE_EXTENSION = "json" ;
22
22
private final String OFFSETS_FILE_NAME = "offsets.csv" ;
23
+ private final String BINS_FILE_NAME = "bins.csv" ;
23
24
24
25
private String outputPath = "." ;
25
26
private String offsetsPath = outputPath + "/" + OFFSETS_FILE_NAME ;
26
27
private Set <String > seenFiles = new HashSet <>();
28
+ private Frequency bins = new Frequency ();
27
29
28
30
private Configuration conf = new Configuration ();
29
- private final SimpleDateFormat dateFormatFileName = new SimpleDateFormat ("yyyyMMdd_HH" );
31
+ private final static SimpleDateFormat dateFormatFileName = new SimpleDateFormat ("yyyyMMdd_HH" );
30
32
31
33
private int processedFileCount ;
32
34
33
35
public static void main (String [] args ) throws Exception {
34
36
35
- // restructureAvroRecords restr = new restructureAvroRecords(args[0], args[2]);
36
- // restr.start(args[1]);
37
+ restructureAvroRecords restr = new restructureAvroRecords (args [0 ], args [2 ]);
38
+ long time1 = System .currentTimeMillis ();
39
+ restr .start (args [1 ]);
40
+ System .out .printf ("Time taken: %.2f seconds\n " ,(System .currentTimeMillis () - time1 )/1000d );
37
41
38
- restructureAvroRecords restr = new restructureAvroRecords ("webhdfs://radar-test.thehyve.net:50070" , "output3/" );
39
- restr .start ("/topicAndroidPhoneNew/" );
40
42
41
- // restructureAvroRecords.processTopic("/topicE4/android_empatica_e4_inter_beat_interval/partition=0/");
42
- // restructureAvroRecords.processAvroFile(new Path("/topicE4/android_empatica_e4_inter_beat_interval/partition=0/android_empatica_e4_inter_beat_interval+0+0000031485+0000031488.avro") );
43
- // restructureAvroRecords.processAvroFile(new Path("/testE4Time/android_phone_acceleration/partition=0/android_phone_acceleration+0+0000590000+0000599999.avro"),"wazaa" );
43
+ // restructureAvroRecords restr = new restructureAvroRecords("webhdfs://radar-test.thehyve.net:50070", "output4/");
44
+ // restr.start("/topicE4/");
45
+
46
+ // restr.processTopic(new Path("/topicE4/android_empatica_e4_temperature/"));
47
+ // restr.processAvroFile(new Path("/testE4Time/android_phone_acceleration/partition=0/android_phone_acceleration+0+0000590000+0000599999.avro"),"wazaa" );
44
48
}
45
49
46
50
public restructureAvroRecords (String inputPath , String outputPath ) {
47
51
this .setInputWebHdfsURL (inputPath );
48
52
this .setOutputPath (outputPath );
53
+ bins .setBinFilePath (outputPath + "/" + BINS_FILE_NAME );
49
54
}
50
55
51
56
public void setInputWebHdfsURL (String fileSystemURL ) {
@@ -81,10 +86,11 @@ public void start(String directoryName) throws IOException {
81
86
processTopic (filePath );
82
87
}
83
88
}
89
+
84
90
System .out .printf ("%d files processed\n " , processedFileCount );
85
91
}
86
92
87
- public void processTopic (Path topicPath ) throws IOException {
93
+ private void processTopic (Path topicPath ) throws IOException {
88
94
// Get files in this topic directory
89
95
FileSystem fs = FileSystem .get (conf );
90
96
RemoteIterator <LocatedFileStatus > files = fs .listFiles (topicPath , true ); // TODO: all partitions or just 'partition=0'?
@@ -94,15 +100,12 @@ public void processTopic(Path topicPath) throws IOException {
94
100
while (files .hasNext ()) {
95
101
LocatedFileStatus locatedFileStatus = files .next ();
96
102
97
- System .out .println (locatedFileStatus .getPath ());
98
-
99
103
if (locatedFileStatus .isFile ())
100
104
this .processAvroFile ( locatedFileStatus .getPath (), topicName );
101
-
102
105
}
103
106
}
104
107
105
- public void processAvroFile (Path filePath , String topicName ) throws IOException {
108
+ private void processAvroFile (Path filePath , String topicName ) throws IOException {
106
109
String fileName = filePath .getName ();
107
110
108
111
// Skip if extension is not .avro
@@ -116,9 +119,10 @@ public void processAvroFile(Path filePath, String topicName) throws IOException
116
119
return ;
117
120
}
118
121
122
+ System .out .println (filePath );
123
+ // Read and parse avro file
119
124
FsInput input = new FsInput (filePath , conf );
120
125
DatumReader <GenericRecord > datumReader = new GenericDatumReader <GenericRecord >();
121
-
122
126
DataFileReader <GenericRecord > dataFileReader = new DataFileReader <GenericRecord >(input , datumReader );
123
127
124
128
GenericRecord record = null ;
@@ -130,15 +134,17 @@ record = dataFileReader.next(record);
130
134
}
131
135
132
136
this .writeSeenOffsets (fileName );
137
+ bins .writeBins ();
133
138
processedFileCount ++;
134
139
}
135
140
136
- public void writeRecord (GenericRecord record , String topicName ) throws IOException {
141
+ private void writeRecord (GenericRecord record , String topicName ) throws IOException {
137
142
GenericRecord keyField = (GenericRecord ) record .get ("keyField" );
138
143
GenericRecord valueField = (GenericRecord ) record .get ("valueField" );
139
144
140
145
// Make a timestamped filename YYYYMMDD_HH00.json
141
- String outputFileName = this .createFilePathFromTimestamp ( (Double ) valueField .get ("time" ));
146
+ String hourlyTimestamp = createHourTimestamp ( (Double ) valueField .get ("time" ));
147
+ String outputFileName = hourlyTimestamp + "00." + OUTPUT_FILE_EXTENSION ;
142
148
143
149
// Clean user id and create final output pathname
144
150
String userId = keyField .get ("userId" ).toString ().replaceAll ("\\ W+" , "" );
@@ -147,21 +153,18 @@ public void writeRecord(GenericRecord record, String topicName) throws IOExcepti
147
153
// Write data
148
154
String data = record .toString (); // TODO: check whether this indeed always creates valid JSON
149
155
this .appendToFile (dirName , outputFileName , data );
150
- }
151
156
152
- public String createFilePathFromTimestamp (Double time ) {
153
- // Send all output to the Appendable object sb
154
- StringBuilder sb = new StringBuilder ();
155
- Formatter formatter = new Formatter (sb , Locale .US );
157
+ // Count data
158
+ bins .addToBin (topicName , keyField .get ("sourceId" ).toString (), (Double ) valueField .get ("time" ));
159
+ }
156
160
157
- // In millis
161
+ public static String createHourTimestamp (Double time ) {
162
+ // Convert from millis to date and apply dateFormat
158
163
Date date = new Date ( time .longValue () * 1000 );
159
-
160
- formatter .format ("%s00.%s" , dateFormatFileName .format (date ), OUTPUT_FILE_EXTENSION );
161
- return sb .toString ();
164
+ return dateFormatFileName .format (date );
162
165
}
163
166
164
- public void appendToFile (String directoryName , String fileName , String data ) {
167
+ private void appendToFile (String directoryName , String fileName , String data ) {
165
168
File directory = new File (directoryName );
166
169
if (! directory .exists ()){
167
170
if (directory .mkdirs ())
@@ -216,7 +219,6 @@ private void writeSeenOffsets(String fileName) {
216
219
}
217
220
218
221
private void readSeenOffsets () {
219
-
220
222
try (FileReader fr = new FileReader (offsetsPath );
221
223
BufferedReader br = new BufferedReader (fr ))
222
224
{
@@ -231,6 +233,5 @@ private void readSeenOffsets() {
231
233
// TODO
232
234
e .printStackTrace ();
233
235
}
234
-
235
236
}
236
237
}
0 commit comments