18
18
19
19
public class restructureAvroRecords {
20
20
21
- String outputPath = "." ;
22
- String OUTPUT_FILE_EXTENSION = "json" ;
23
- String OFFSETS_FILE_NAME = "offsets.csv" ;
24
- Configuration conf = new Configuration ();
25
- SimpleDateFormat dateFormatFileName = new SimpleDateFormat ("yyyyMMdd_HH" );
21
+ private final String OUTPUT_FILE_EXTENSION = "json" ;
22
+ private final String OFFSETS_FILE_NAME = "offsets.csv" ;
23
+
24
+ private String outputPath = "." ;
25
+ private String offsetsPath = outputPath + "/" + OFFSETS_FILE_NAME ;
26
+ private Set <String > seenFiles = new HashSet <>();
27
+
28
+ private Configuration conf = new Configuration ();
29
+ private final SimpleDateFormat dateFormatFileName = new SimpleDateFormat ("yyyyMMdd_HH" );
30
+
31
+ private int processedFileCount ;
26
32
27
33
public static void main (String [] args ) throws Exception {
28
34
29
- restructureAvroRecords restr = new restructureAvroRecords ();
30
- restr .setInputWebHdfsURL (args [0 ]); //"webhdfs://radar-test.thehyve.net:50070");
31
- restr .setOutputPath (args [2 ]); //"output3/");
35
+ // restructureAvroRecords restr = new restructureAvroRecords(args[0], args[2]);
36
+ // restr.start(args[1]);
37
+
38
+ restructureAvroRecords restr = new restructureAvroRecords ("webhdfs://radar-test.thehyve.net:50070" , "output3/" );
39
+ restr .start ("/topicAndroidPhoneNew/" );
32
40
33
- restr .start (args [1 ]); //"/topicAndroidPhoneNew/");
34
41
// restructureAvroRecords.processTopic("/topicE4/android_empatica_e4_inter_beat_interval/partition=0/");
35
42
// restructureAvroRecords.processAvroFile(new Path("/topicE4/android_empatica_e4_inter_beat_interval/partition=0/android_empatica_e4_inter_beat_interval+0+0000031485+0000031488.avro") );
36
43
// restructureAvroRecords.processAvroFile(new Path("/testE4Time/android_phone_acceleration/partition=0/android_phone_acceleration+0+0000590000+0000599999.avro"),"wazaa" );
37
44
}
38
45
46
+ public restructureAvroRecords (String inputPath , String outputPath ) {
47
+ this .setInputWebHdfsURL (inputPath );
48
+ this .setOutputPath (outputPath );
49
+ }
50
+
39
51
public void setInputWebHdfsURL (String fileSystemURL ) {
40
- conf .set ("fs.defaultFS" ,fileSystemURL );
52
+ conf .set ("fs.defaultFS" , fileSystemURL );
41
53
}
42
54
43
55
public void setOutputPath (String path ) {
44
56
// Remove trailing backslash
45
57
outputPath = path .replaceAll ("/$" ,"" );
58
+ offsetsPath = outputPath + "/" + OFFSETS_FILE_NAME ;
46
59
}
47
60
48
61
public void start (String directoryName ) throws IOException {
@@ -51,7 +64,11 @@ public void start(String directoryName) throws IOException {
51
64
FileSystem fs = FileSystem .get (conf );
52
65
RemoteIterator <LocatedFileStatus > files = fs .listLocatedStatus (path );
53
66
67
+ // Load seen offsets from file
68
+ readSeenOffsets ();
69
+
54
70
// Process the directories topics
71
+ processedFileCount = 0 ;
55
72
while (files .hasNext ()) {
56
73
LocatedFileStatus locatedFileStatus = files .next ();
57
74
Path filePath = locatedFileStatus .getPath ();
@@ -64,6 +81,7 @@ public void start(String directoryName) throws IOException {
64
81
processTopic (filePath );
65
82
}
66
83
}
84
+ System .out .printf ("%d files processed\n " , processedFileCount );
67
85
}
68
86
69
87
public void processTopic (Path topicPath ) throws IOException {
@@ -80,6 +98,7 @@ public void processTopic(Path topicPath) throws IOException {
80
98
81
99
if (locatedFileStatus .isFile ())
82
100
this .processAvroFile ( locatedFileStatus .getPath (), topicName );
101
+
83
102
}
84
103
}
85
104
@@ -88,7 +107,12 @@ public void processAvroFile(Path filePath, String topicName) throws IOException
88
107
89
108
// Skip if extension is not .avro
90
109
if (! FilenameUtils .getExtension (fileName ).equals ("avro" )) {
91
- System .out .printf ("Skipped non avro file: %s\n " , filePath .getName ());
110
+ System .out .printf ("Skipped non avro file: %s\n " , fileName );
111
+ return ;
112
+ }
113
+
114
+ // Skip already processed avro files
115
+ if (seenFiles .contains (fileName )) {
92
116
return ;
93
117
}
94
118
@@ -106,6 +130,7 @@ record = dataFileReader.next(record);
106
130
}
107
131
108
132
this .writeSeenOffsets (fileName );
133
+ processedFileCount ++;
109
134
}
110
135
111
136
public void writeRecord (GenericRecord record , String topicName ) throws IOException {
@@ -139,8 +164,10 @@ public String createFilePathFromTimestamp(Double time) {
139
164
public void appendToFile (String directoryName , String fileName , String data ) {
140
165
File directory = new File (directoryName );
141
166
if (! directory .exists ()){
142
- directory .mkdirs ();
143
- System .out .printf ("Created directory: %s\n " , directory .getAbsolutePath ());
167
+ if (directory .mkdirs ())
168
+ System .out .printf ("Created directory: %s\n " , directory .getAbsolutePath ());
169
+ else
170
+ System .out .printf ("FAILED to create directory: %s\n " , directory .getAbsolutePath ());
144
171
}
145
172
146
173
String filePath = directoryName + "/" + fileName ;
@@ -175,9 +202,9 @@ private void writeSeenOffsets(String fileName) {
175
202
return ;
176
203
}
177
204
178
- String data = String .join ("," , topicName , partition , fromOffset .toString (), toOffset .toString ());
205
+ String data = String .join ("," , fileName , topicName , partition , fromOffset .toString (), toOffset .toString ());
179
206
180
- try (FileWriter fw = new FileWriter (outputPath + "/" + OFFSETS_FILE_NAME , true );
207
+ try (FileWriter fw = new FileWriter (offsetsPath , true );
181
208
BufferedWriter bw = new BufferedWriter (fw );
182
209
PrintWriter out = new PrintWriter (bw ))
183
210
{
@@ -187,4 +214,23 @@ private void writeSeenOffsets(String fileName) {
187
214
e .printStackTrace ();
188
215
}
189
216
}
217
+
218
+ private void readSeenOffsets () {
219
+
220
+ try (FileReader fr = new FileReader (offsetsPath );
221
+ BufferedReader br = new BufferedReader (fr ))
222
+ {
223
+ // Read in all file names from csv
224
+ String line ;
225
+ while ( (line = br .readLine ()) != null ) {
226
+ String [] columns = line .split ("," );
227
+ seenFiles .add (columns [0 ]);
228
+ }
229
+
230
+ } catch (IOException e ) {
231
+ // TODO
232
+ e .printStackTrace ();
233
+ }
234
+
235
+ }
190
236
}
0 commit comments