1919import java .io .File ;
2020import java .io .IOException ;
2121import java .text .SimpleDateFormat ;
22+ import java .util .ArrayList ;
2223import java .util .Date ;
24+ import java .util .HashMap ;
25+ import java .util .List ;
26+ import java .util .Map ;
2327import java .util .TimeZone ;
2428import org .apache .avro .Schema .Field ;
2529import org .apache .avro .file .DataFileReader ;
2630import org .apache .avro .generic .GenericDatumReader ;
2731import org .apache .avro .generic .GenericRecord ;
2832import org .apache .avro .mapred .FsInput ;
29- import org .apache .commons .io .FilenameUtils ;
3033import org .apache .hadoop .conf .Configuration ;
3134import org .apache .hadoop .fs .FileSystem ;
3235import org .apache .hadoop .fs .LocatedFileStatus ;
3538import org .radarcns .util .CsvAvroConverter ;
3639import org .radarcns .util .FileCacheStore ;
3740import org .radarcns .util .JsonAvroConverter ;
41+ import org .radarcns .util .ProgressBar ;
3842import org .radarcns .util .RecordConverterFactory ;
3943import org .slf4j .Logger ;
4044import org .slf4j .LoggerFactory ;
@@ -55,18 +59,17 @@ public class RestructureAvroRecords {
5559
5660 private File outputPath ;
5761 private File offsetsPath ;
58- private OffsetRangeSet seenFiles ;
5962 private Frequency bins ;
6063
6164 private final Configuration conf = new Configuration ();
6265
63- private int processedFileCount ;
64- private int processedRecordsCount ;
66+ private long processedFileCount ;
67+ private long processedRecordsCount ;
6568 private static final boolean USE_GZIP = "gzip" .equalsIgnoreCase (System .getProperty ("org.radarcns.compression" ));
6669
6770 public static void main (String [] args ) throws Exception {
6871 if (args .length != 3 ) {
69- System .out .println ("Usage: hadoop jar restructurehdfs-all-0.1.0. jar <webhdfs_url> <hdfs_topic > <output_folder>" );
72+ System .out .println ("Usage: hadoop jar restructurehdfs-all-0.2. jar <webhdfs_url> <hdfs_root_directory > <output_folder>" );
7073 System .exit (1 );
7174 }
7275
@@ -120,80 +123,89 @@ public void setOutputPath(String path) {
120123 bins = Frequency .read (new File (outputPath , BINS_FILE_NAME ));
121124 }
122125
123- public int getProcessedFileCount () {
126+ public long getProcessedFileCount () {
124127 return processedFileCount ;
125128 }
126129
127- public int getProcessedRecordsCount () {
130+ public long getProcessedRecordsCount () {
128131 return processedRecordsCount ;
129132 }
130133
131134 public void start (String directoryName ) throws IOException {
132135 // Get files and directories
133136 Path path = new Path (directoryName );
134137 FileSystem fs = FileSystem .get (conf );
135- RemoteIterator < LocatedFileStatus > files = fs . listLocatedStatus ( path );
138+
136139
137140 try (OffsetRangeFile offsets = new OffsetRangeFile (offsetsPath )) {
141+ OffsetRangeSet seenFiles ;
138142 try {
139143 seenFiles = offsets .read ();
140144 } catch (IOException ex ) {
141145 logger .error ("Error reading offsets file. Processing all offsets." );
142146 seenFiles = new OffsetRangeSet ();
143147 }
144- // Process the directories topics
145- processedFileCount = 0 ;
148+ logger .info ("Retrieving file list from {}" , path );
149+ // Get filenames to process
150+ Map <String , List <Path >> topicPaths = new HashMap <>();
151+ long toProcessFileCount = 0L ;
152+ processedFileCount = 0L ;
153+ RemoteIterator <LocatedFileStatus > files = fs .listFiles (path , true );
146154 while (files .hasNext ()) {
147155 LocatedFileStatus locatedFileStatus = files .next ();
148- Path filePath = locatedFileStatus .getPath ();
149-
150- if (filePath .toString ().contains ("+tmp" )) {
156+ if (locatedFileStatus .isDirectory ()) {
151157 continue ;
152158 }
159+ Path filePath = locatedFileStatus .getPath ();
153160
154- if (locatedFileStatus .isDirectory ()) {
155- processTopic (filePath , converterFactory , offsets );
161+ String topic = getTopic (filePath , seenFiles );
162+ if (topic != null ) {
163+ topicPaths .computeIfAbsent (topic , k -> new ArrayList <>()).add (filePath );
164+ toProcessFileCount ++;
156165 }
157166 }
158- }
159- }
160167
161- private void processTopic (Path topicPath , RecordConverterFactory converterFactory ,
162- OffsetRangeFile offsets ) throws IOException {
163- // Get files in this topic directory
164- FileSystem fs = FileSystem .get (conf );
165- RemoteIterator <LocatedFileStatus > files = fs .listFiles (topicPath , true );
168+ logger .info ("Converting {} files" , toProcessFileCount );
166169
167- String topicName = topicPath .getName ();
170+ ProgressBar progressBar = new ProgressBar (toProcessFileCount , 10 );
171+ progressBar .update (0 );
168172
169- try (FileCacheStore cache = new FileCacheStore (converterFactory , 100 , USE_GZIP )) {
170- while (files .hasNext ()) {
171- LocatedFileStatus locatedFileStatus = files .next ();
172-
173- if (locatedFileStatus .isFile ()) {
174- this .processFile (locatedFileStatus .getPath (), topicName , cache , offsets );
173+ // Actually process the files
174+ for (Map .Entry <String , List <Path >> entry : topicPaths .entrySet ()) {
175+ try (FileCacheStore cache = new FileCacheStore (converterFactory , 100 , USE_GZIP )) {
176+ for (Path filePath : entry .getValue ()) {
177+ this .processFile (filePath , entry .getKey (), cache , offsets );
178+ progressBar .update (++processedFileCount );
179+ }
175180 }
176181 }
177182 }
178183 }
179184
180- private void processFile (Path filePath , String topicName , FileCacheStore cache ,
181- OffsetRangeFile offsets ) throws IOException {
182- String fileName = filePath .getName ();
185+ private static String getTopic (Path filePath , OffsetRangeSet seenFiles ) {
186+ if (filePath .toString ().contains ("+tmp" )) {
187+ return null ;
188+ }
183189
190+ String fileName = filePath .getName ();
184191 // Skip if extension is not .avro
185- if (!FilenameUtils . getExtension ( fileName ). equals ( " avro" )) {
186- logger .info ("Skipped non-avro file: {}" , fileName );
187- return ;
192+ if (!fileName . endsWith ( ". avro" )) {
193+ logger .info ("Skipping non-avro file: {}" , fileName );
194+ return null ;
188195 }
189196
190197 OffsetRange range = OffsetRange .parse (fileName );
191198 // Skip already processed avro files
192199 if (seenFiles .contains (range )) {
193- return ;
200+ return null ;
194201 }
195202
196- logger .info ("{}" , filePath );
203+ return filePath .getParent ().getParent ().getName ();
204+ }
205+
206+ private void processFile (Path filePath , String topicName , FileCacheStore cache ,
207+ OffsetRangeFile offsets ) throws IOException {
208+ logger .debug ("Reading {}" , filePath );
197209
198210 // Read and parse avro file
199211 FsInput input = new FsInput (filePath , conf );
@@ -210,12 +222,12 @@ record = dataFileReader.next(record);
210222
211223 // Write which file has been processed and update bins
212224 try {
225+ OffsetRange range = OffsetRange .parse (filePath .getName ());
213226 offsets .write (range );
214227 bins .write ();
215228 } catch (IOException ex ) {
216229 logger .warn ("Failed to update status. Continuing processing." , ex );
217230 }
218- processedFileCount ++;
219231 }
220232
221233 private void writeRecord (GenericRecord record , String topicName , FileCacheStore cache )
@@ -265,4 +277,5 @@ public static String createHourTimestamp(GenericRecord valueField, Field timeFie
265277 Date date = new Date ((long ) (time * 1000d ));
266278 return FILE_DATE_FORMAT .format (date );
267279 }
280+
268281}
0 commit comments