11
11
import org .apache .hadoop .fs .LocatedFileStatus ;
12
12
import org .apache .hadoop .fs .Path ;
13
13
import org .apache .hadoop .fs .RemoteIterator ;
14
+ import org .apache .logging .log4j .LogManager ;
15
+ import org .apache .logging .log4j .Logger ;
14
16
15
17
import java .io .*;
16
18
import java .text .SimpleDateFormat ;
17
19
import java .util .*;
18
20
19
- public class restructureAvroRecords {
21
+ public class RestructureAvroRecords {
22
+ private static final Logger logger = LogManager .getLogger (RestructureAvroRecords .class );
20
23
21
24
private final String OUTPUT_FILE_EXTENSION = "json" ;
22
25
private final String OFFSETS_FILE_NAME = "offsets.csv" ;
@@ -34,23 +37,28 @@ public class restructureAvroRecords {
34
37
private int processedRecordsCount ;
35
38
36
39
public static void main (String [] args ) throws Exception {
37
- restructureAvroRecords restr = new restructureAvroRecords (args [0 ], args [2 ]);
40
+ SimpleDateFormat dateFormat = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss" );
41
+
42
+ logger .info (dateFormat .format (new Date ()));
43
+ logger .info ("Starting..." );
44
+ logger .info ("In: " + args [0 ] + args [1 ]);
45
+ logger .info ("Out: " + args [2 ]);
46
+
38
47
long time1 = System .currentTimeMillis ();
48
+
49
+ RestructureAvroRecords restr = new RestructureAvroRecords (args [0 ], args [2 ]);
39
50
restr .start (args [1 ]);
40
- System .out .printf ("Processed %d files and %,d records\n " , restr .getProcessedFileCount (), restr .getProcessedRecordsCount ());
41
- System .out .printf ("Time taken: %.2f seconds\n " ,(System .currentTimeMillis () - time1 )/1000d );
42
51
43
- // restructureAvroRecords restr = new restructureAvroRecords("webhdfs://radar-test.thehyve.net:50070", "output4/" );
44
- // restr.start("/topicE4/" );
52
+ logger . info ( String . format ( "Processed %d files and %,d records" , restr . getProcessedFileCount (), restr . getProcessedRecordsCount ()) );
53
+ logger . info ( String . format ( "Time taken: %.2f seconds" ,( System . currentTimeMillis () - time1 )/ 1000d ) );
45
54
46
55
// restr.processTopic(new Path("/topicE4/android_empatica_e4_temperature/"));
47
- // restr.processAvroFile (new Path("/testE4Time/android_phone_acceleration/partition=0/android_phone_acceleration+0+0000590000+0000599999.avro"),"wazaa" );
56
+ // restr.processFile (new Path("/testE4Time/android_phone_acceleration/partition=0/android_phone_acceleration+0+0000590000+0000599999.avro"),"wazaa" );
48
57
}
49
58
50
- public restructureAvroRecords (String inputPath , String outputPath ) {
59
+ public RestructureAvroRecords (String inputPath , String outputPath ) {
51
60
this .setInputWebHdfsURL (inputPath );
52
61
this .setOutputPath (outputPath );
53
- bins .setBinFilePath (outputPath + "/" + BINS_FILE_NAME );
54
62
}
55
63
56
64
public void setInputWebHdfsURL (String fileSystemURL ) {
@@ -61,6 +69,7 @@ public void setOutputPath(String path) {
61
69
// Remove trailing backslash
62
70
outputPath = path .replaceAll ("/$" ,"" );
63
71
offsetsPath = outputPath + "/" + OFFSETS_FILE_NAME ;
72
+ bins .setBinFilePath (outputPath + "/" + BINS_FILE_NAME );
64
73
}
65
74
66
75
public int getProcessedFileCount () {
@@ -107,16 +116,16 @@ private void processTopic(Path topicPath) throws IOException {
107
116
LocatedFileStatus locatedFileStatus = files .next ();
108
117
109
118
if (locatedFileStatus .isFile ())
110
- this .processAvroFile ( locatedFileStatus .getPath (), topicName );
119
+ this .processFile ( locatedFileStatus .getPath (), topicName );
111
120
}
112
121
}
113
122
114
- private void processAvroFile (Path filePath , String topicName ) throws IOException {
123
+ private void processFile (Path filePath , String topicName ) throws IOException {
115
124
String fileName = filePath .getName ();
116
125
117
126
// Skip if extension is not .avro
118
127
if (! FilenameUtils .getExtension (fileName ).equals ("avro" )) {
119
- System . out . printf ("Skipped non avro file: %s \n " , fileName );
128
+ logger . info ("Skipped non- avro file: " + fileName );
120
129
return ;
121
130
}
122
131
@@ -125,11 +134,12 @@ private void processAvroFile(Path filePath, String topicName) throws IOException
125
134
return ;
126
135
}
127
136
128
- System .out .println (filePath );
137
+ logger .info (filePath );
138
+
129
139
// Read and parse avro file
130
140
FsInput input = new FsInput (filePath , conf );
131
- DatumReader <GenericRecord > datumReader = new GenericDatumReader <GenericRecord >();
132
- DataFileReader <GenericRecord > dataFileReader = new DataFileReader <GenericRecord >(input , datumReader );
141
+ DatumReader <GenericRecord > datumReader = new GenericDatumReader <>();
142
+ DataFileReader <GenericRecord > dataFileReader = new DataFileReader <>(input , datumReader );
133
143
134
144
GenericRecord record = null ;
135
145
while (dataFileReader .hasNext ()) {
@@ -176,9 +186,9 @@ private void appendToFile(String directoryName, String fileName, String data) {
176
186
File directory = new File (directoryName );
177
187
if (! directory .exists ()){
178
188
if (directory .mkdirs ())
179
- System . out . printf ("Created directory: %s \n " , directory .getAbsolutePath ());
189
+ logger . info ("Created directory: " + directory .getAbsolutePath ());
180
190
else
181
- System . out . printf ("FAILED to create directory: %s \n " , directory .getAbsolutePath ());
191
+ logger . warn ("FAILED to create directory: " + directory .getAbsolutePath ());
182
192
}
183
193
184
194
String filePath = directoryName + "/" + fileName ;
@@ -206,10 +216,10 @@ private void writeSeenOffsets(String fileName) {
206
216
fromOffset = Integer .valueOf ( fileNameParts [2 ] );
207
217
toOffset = Integer .valueOf ( fileNameParts [3 ] );
208
218
} catch (IndexOutOfBoundsException e ) {
209
- System . out . println ("Could not split filename to the commit offsets." );
219
+ logger . warn ("Could not split filename to the commit offsets." );
210
220
return ;
211
221
} catch (NumberFormatException e ) {
212
- System . out . println ("Could not convert offsets to integers." );
222
+ logger . warn ("Could not convert offsets to integers." );
213
223
return ;
214
224
}
215
225
@@ -236,9 +246,8 @@ private void readSeenOffsets() {
236
246
String [] columns = line .split ("," );
237
247
seenFiles .add (columns [0 ]);
238
248
}
239
-
240
249
} catch (IOException e ) {
241
- System . out . println ( "No files processed yet." );
250
+ logger . info ( "Offsets file does not exist yet, will be created ." );
242
251
}
243
252
}
244
253
}
0 commit comments