16
16
import java .text .SimpleDateFormat ;
17
17
import java .util .*;
18
18
19
- public class restructureAvroSensorTopics {
19
+ public class restructureAvroRecords {
20
20
21
21
String outputPath = "." ;
22
22
String OUTPUT_FILE_EXTENSION = "json" ;
@@ -25,16 +25,15 @@ public class restructureAvroSensorTopics {
25
25
SimpleDateFormat dateFormatFileName = new SimpleDateFormat ("yyyyMMdd_HH" );
26
26
27
27
public static void main (String [] args ) throws Exception {
28
- System .out .println ("Started!" );
29
28
30
- restructureAvroSensorTopics res = new restructureAvroSensorTopics ();
31
- res .setInputWebHdfsURL ("webhdfs://radar-test.thehyve.net:50070" );
32
- res .setOutputPath ("output3/" );
29
+ restructureAvroRecords restr = new restructureAvroRecords ();
30
+ restr .setInputWebHdfsURL (args [ 0 ]); // "webhdfs://radar-test.thehyve.net:50070");
31
+ restr .setOutputPath (args [ 2 ]); // "output3/");
33
32
34
- res .start ("/topicAndroidPhoneNew/" );
35
- // restructureAvroSensorTopics .processTopic("/topicE4/android_empatica_e4_inter_beat_interval/partition=0/");
36
- // restructureAvroSensorTopics .processAvroFile(new Path("/topicE4/android_empatica_e4_inter_beat_interval/partition=0/android_empatica_e4_inter_beat_interval+0+0000031485+0000031488.avro") );
37
- // restructureAvroSensorTopics .processAvroFile(new Path("/testE4Time/android_phone_acceleration/partition=0/android_phone_acceleration+0+0000590000+0000599999.avro"),"wazaa" );
33
+ restr .start (args [ 1 ]); // "/topicAndroidPhoneNew/");
34
+ // restructureAvroRecords .processTopic("/topicE4/android_empatica_e4_inter_beat_interval/partition=0/");
35
+ // restructureAvroRecords .processAvroFile(new Path("/topicE4/android_empatica_e4_inter_beat_interval/partition=0/android_empatica_e4_inter_beat_interval+0+0000031485+0000031488.avro") );
36
+ // restructureAvroRecords .processAvroFile(new Path("/testE4Time/android_phone_acceleration/partition=0/android_phone_acceleration+0+0000590000+0000599999.avro"),"wazaa" );
38
37
}
39
38
40
39
public void setInputWebHdfsURL (String fileSystemURL ) {
@@ -74,18 +73,13 @@ public void processTopic(Path topicPath) throws IOException {
74
73
75
74
String topicName = topicPath .getName ();
76
75
77
- int fileCounter = 0 ;
78
76
while (files .hasNext ()) {
79
77
LocatedFileStatus locatedFileStatus = files .next ();
80
78
81
79
System .out .println (locatedFileStatus .getPath ());
82
80
83
81
if (locatedFileStatus .isFile ())
84
82
this .processAvroFile ( locatedFileStatus .getPath (), topicName );
85
- fileCounter += 1 ;
86
-
87
- if (fileCounter > 1 )
88
- continue ;
89
83
}
90
84
}
91
85
@@ -119,7 +113,7 @@ public void writeRecord(GenericRecord record, String topicName) throws IOExcepti
119
113
GenericRecord valueField = (GenericRecord ) record .get ("valueField" );
120
114
121
115
// Make a timestamped filename YYYYMMDD_HH00.json
122
- String outputFileName = this .createFilepathFromTimestamp ( (Double ) valueField .get ("time" ));
116
+ String outputFileName = this .createFilePathFromTimestamp ( (Double ) valueField .get ("time" ));
123
117
124
118
// Clean user id and create final output pathname
125
119
String userId = keyField .get ("userId" ).toString ().replaceAll ("\\ W+" , "" );
@@ -130,7 +124,7 @@ public void writeRecord(GenericRecord record, String topicName) throws IOExcepti
130
124
this .appendToFile (dirName , outputFileName , data );
131
125
}
132
126
133
- public String createFilepathFromTimestamp (Double time ) {
127
+ public String createFilePathFromTimestamp (Double time ) {
134
128
// Send all output to the Appendable object sb
135
129
StringBuilder sb = new StringBuilder ();
136
130
Formatter formatter = new Formatter (sb , Locale .US );
0 commit comments