1919import java .io .File ;
2020import java .io .IOException ;
2121import java .text .SimpleDateFormat ;
22+ import java .util .ArrayList ;
2223import java .util .Date ;
24+ import java .util .HashMap ;
25+ import java .util .List ;
26+ import java .util .Map ;
2327import java .util .TimeZone ;
2428import org .apache .avro .Schema .Field ;
2529import org .apache .avro .file .DataFileReader ;
2630import org .apache .avro .generic .GenericDatumReader ;
2731import org .apache .avro .generic .GenericRecord ;
2832import org .apache .avro .mapred .FsInput ;
29- import org .apache .commons .io .FilenameUtils ;
3033import org .apache .hadoop .conf .Configuration ;
3134import org .apache .hadoop .fs .FileSystem ;
3235import org .apache .hadoop .fs .LocatedFileStatus ;
3336import org .apache .hadoop .fs .Path ;
3437import org .apache .hadoop .fs .RemoteIterator ;
3538import org .radarcns .util .CsvAvroConverter ;
36- import org .radarcns .util .FileCache ;
39+ import org .radarcns .util .FileCacheStore ;
3740import org .radarcns .util .JsonAvroConverter ;
41+ import org .radarcns .util .ProgressBar ;
3842import org .radarcns .util .RecordConverterFactory ;
3943import org .slf4j .Logger ;
4044import org .slf4j .LoggerFactory ;
@@ -55,17 +59,17 @@ public class RestructureAvroRecords {
5559
5660 private File outputPath ;
5761 private File offsetsPath ;
58- private OffsetRangeSet seenFiles ;
5962 private Frequency bins ;
6063
6164 private final Configuration conf = new Configuration ();
6265
63- private int processedFileCount ;
64- private int processedRecordsCount ;
66+ private long processedFileCount ;
67+ private long processedRecordsCount ;
68+ private static final boolean USE_GZIP = "gzip" .equalsIgnoreCase (System .getProperty ("org.radarcns.compression" ));
6569
6670 public static void main (String [] args ) throws Exception {
6771 if (args .length != 3 ) {
68- System .out .println ("Usage: hadoop jar restructurehdfs-all-0.1.0. jar <webhdfs_url> <hdfs_topic > <output_folder>" );
72+ System .out .println ("Usage: hadoop jar restructurehdfs-all-0.2. jar <webhdfs_url> <hdfs_root_directory > <output_folder>" );
6973 System .exit (1 );
7074 }
7175
@@ -91,13 +95,21 @@ public RestructureAvroRecords(String inputPath, String outputPath) {
9195 this .setInputWebHdfsURL (inputPath );
9296 this .setOutputPath (outputPath );
9397
98+ String extension ;
9499 if (System .getProperty ("org.radarcns.format" , "csv" ).equalsIgnoreCase ("json" )) {
100+ logger .info ("Writing output files in JSON format" );
95101 converterFactory = JsonAvroConverter .getFactory ();
96- outputFileExtension = "json" ;
102+ extension = "json" ;
97103 } else {
104+ logger .info ("Writing output files in CSV format" );
98105 converterFactory = CsvAvroConverter .getFactory ();
99- outputFileExtension = "csv" ;
106+ extension = "csv" ;
100107 }
108+ if (USE_GZIP ) {
109+ logger .info ("Compressing output files in GZIP format" );
110+ extension += ".gz" ;
111+ }
112+ outputFileExtension = extension ;
101113 }
102114
103115 public void setInputWebHdfsURL (String fileSystemURL ) {
@@ -111,83 +123,100 @@ public void setOutputPath(String path) {
111123 bins = Frequency .read (new File (outputPath , BINS_FILE_NAME ));
112124 }
113125
114- public int getProcessedFileCount () {
126+ public long getProcessedFileCount () {
115127 return processedFileCount ;
116128 }
117129
118- public int getProcessedRecordsCount () {
130+ public long getProcessedRecordsCount () {
119131 return processedRecordsCount ;
120132 }
121133
122134 public void start (String directoryName ) throws IOException {
123135 // Get files and directories
124136 Path path = new Path (directoryName );
125137 FileSystem fs = FileSystem .get (conf );
126- RemoteIterator < LocatedFileStatus > files = fs . listLocatedStatus ( path );
138+
127139
128140 try (OffsetRangeFile offsets = new OffsetRangeFile (offsetsPath )) {
141+ OffsetRangeSet seenFiles ;
129142 try {
130143 seenFiles = offsets .read ();
131144 } catch (IOException ex ) {
132145 logger .error ("Error reading offsets file. Processing all offsets." );
133146 seenFiles = new OffsetRangeSet ();
134147 }
135- // Process the directories topics
136- processedFileCount = 0 ;
148+ logger .info ("Retrieving file list from {}" , path );
149+ // Get filenames to process
150+ Map <String , List <Path >> topicPaths = new HashMap <>();
151+ long toProcessFileCount = 0L ;
152+ processedFileCount = 0L ;
153+ RemoteIterator <LocatedFileStatus > files = fs .listFiles (path , true );
137154 while (files .hasNext ()) {
138155 LocatedFileStatus locatedFileStatus = files .next ();
139- Path filePath = locatedFileStatus .getPath ();
140-
141- if (filePath .toString ().contains ("+tmp" )) {
156+ if (locatedFileStatus .isDirectory ()) {
142157 continue ;
143158 }
159+ Path filePath = locatedFileStatus .getPath ();
144160
145- if (locatedFileStatus .isDirectory ()) {
146- processTopic (filePath , converterFactory , offsets );
161+ String topic = getTopic (filePath , seenFiles );
162+ if (topic != null ) {
163+ topicPaths .computeIfAbsent (topic , k -> new ArrayList <>()).add (filePath );
164+ toProcessFileCount ++;
147165 }
148166 }
149- }
150- }
151167
152- private void processTopic (Path topicPath , RecordConverterFactory converterFactory ,
153- OffsetRangeFile offsets ) throws IOException {
154- // Get files in this topic directory
155- FileSystem fs = FileSystem .get (conf );
156- RemoteIterator <LocatedFileStatus > files = fs .listFiles (topicPath , true );
168+ logger .info ("Converting {} files" , toProcessFileCount );
157169
158- String topicName = topicPath .getName ();
159-
160- try (FileCache cache = new FileCache (converterFactory , 100 )) {
161- while (files .hasNext ()) {
162- LocatedFileStatus locatedFileStatus = files .next ();
170+ ProgressBar progressBar = new ProgressBar (toProcessFileCount , 10 );
171+ progressBar .update (0 );
163172
164- if (locatedFileStatus .isFile ()) {
165- this .processFile (locatedFileStatus .getPath (), topicName , cache , offsets );
173+ // Actually process the files
174+ for (Map .Entry <String , List <Path >> entry : topicPaths .entrySet ()) {
175+ try (FileCacheStore cache = new FileCacheStore (converterFactory , 100 , USE_GZIP )) {
176+ for (Path filePath : entry .getValue ()) {
177+ this .processFile (filePath , entry .getKey (), cache , offsets );
178+ progressBar .update (++processedFileCount );
179+ }
166180 }
167181 }
168182 }
169183 }
170184
171- private void processFile (Path filePath , String topicName , FileCache cache ,
172- OffsetRangeFile offsets ) throws IOException {
173- String fileName = filePath .getName ();
185+ private static String getTopic (Path filePath , OffsetRangeSet seenFiles ) {
186+ if (filePath .toString ().contains ("+tmp" )) {
187+ return null ;
188+ }
174189
190+ String fileName = filePath .getName ();
175191 // Skip if extension is not .avro
176- if (!FilenameUtils . getExtension ( fileName ). equals ( " avro" )) {
177- logger .info ("Skipped non-avro file: {}" , fileName );
178- return ;
192+ if (!fileName . endsWith ( ". avro" )) {
193+ logger .info ("Skipping non-avro file: {}" , fileName );
194+ return null ;
179195 }
180196
181197 OffsetRange range = OffsetRange .parse (fileName );
182198 // Skip already processed avro files
183199 if (seenFiles .contains (range )) {
184- return ;
200+ return null ;
185201 }
186202
187- logger .info ("{}" , filePath );
203+ return filePath .getParent ().getParent ().getName ();
204+ }
205+
206+ private void processFile (Path filePath , String topicName , FileCacheStore cache ,
207+ OffsetRangeFile offsets ) throws IOException {
208+ logger .debug ("Reading {}" , filePath );
188209
189210 // Read and parse avro file
190211 FsInput input = new FsInput (filePath , conf );
212+
213+ // processing zero-length files may trigger a stall. See:
214+ // https://github.com/RADAR-CNS/Restructure-HDFS-topic/issues/3
215+ if (input .length () == 0 ) {
216+ logger .warn ("File {} has zero length, skipping." , filePath );
217+ return ;
218+ }
219+
191220 DataFileReader <GenericRecord > dataFileReader = new DataFileReader <>(input ,
192221 new GenericDatumReader <>());
193222
@@ -201,15 +230,15 @@ record = dataFileReader.next(record);
201230
202231 // Write which file has been processed and update bins
203232 try {
233+ OffsetRange range = OffsetRange .parse (filePath .getName ());
204234 offsets .write (range );
205235 bins .write ();
206236 } catch (IOException ex ) {
207237 logger .warn ("Failed to update status. Continuing processing." , ex );
208238 }
209- processedFileCount ++;
210239 }
211240
212- private void writeRecord (GenericRecord record , String topicName , FileCache cache )
241+ private void writeRecord (GenericRecord record , String topicName , FileCacheStore cache )
213242 throws IOException {
214243 GenericRecord keyField = (GenericRecord ) record .get ("key" );
215244 GenericRecord valueField = (GenericRecord ) record .get ("value" );
@@ -256,4 +285,5 @@ public static String createHourTimestamp(GenericRecord valueField, Field timeFie
256285 Date date = new Date ((long ) (time * 1000d ));
257286 return FILE_DATE_FORMAT .format (date );
258287 }
288+
259289}
0 commit comments