3434import org .slf4j .Logger ;
3535import org .slf4j .LoggerFactory ;
3636
37- import java .io .File ;
38- import java .io .FileWriter ;
3937import java .io .IOException ;
38+ import java .io .Writer ;
39+ import java .nio .file .Files ;
40+ import java .nio .file .Paths ;
4041import java .text .SimpleDateFormat ;
4142import java .util .ArrayList ;
4243import java .util .Date ;
@@ -49,9 +50,9 @@ public class RestructureAvroRecords {
4950 private static final Logger logger = LoggerFactory .getLogger (RestructureAvroRecords .class );
5051
5152 private final String outputFileExtension ;
52- private static final String OFFSETS_FILE_NAME = "offsets.csv" ;
53- private static final String BINS_FILE_NAME = "bins.csv" ;
54- private static final String SCHEMA_OUTPUT_FILE_NAME = "schema.json" ;
53+ private static final java . nio . file . Path OFFSETS_FILE_NAME = Paths . get ( "offsets.csv" ) ;
54+ private static final java . nio . file . Path BINS_FILE_NAME = Paths . get ( "bins.csv" ) ;
55+ private static final java . nio . file . Path SCHEMA_OUTPUT_FILE_NAME = Paths . get ( "schema.json" ) ;
5556 private static final SimpleDateFormat FILE_DATE_FORMAT = new SimpleDateFormat ("yyyyMMdd_HH" );
5657
5758 static {
@@ -60,15 +61,16 @@ public class RestructureAvroRecords {
6061
6162 private final RecordConverterFactory converterFactory ;
6263
63- private File outputPath ;
64- private File offsetsPath ;
64+ private java . nio . file . Path outputPath ;
65+ private java . nio . file . Path offsetsPath ;
6566 private Frequency bins ;
6667
6768 private final Configuration conf = new Configuration ();
6869
6970 private long processedFileCount ;
7071 private long processedRecordsCount ;
7172 private static final boolean USE_GZIP = "gzip" .equalsIgnoreCase (System .getProperty ("org.radarcns.compression" ));
73+ private static final boolean DO_DEDUPLICATE = "true" .equalsIgnoreCase (System .getProperty ("org.radarcns.deduplicate" , "true" ));
7274
7375 public static void main (String [] args ) throws Exception {
7476 if (args .length != 3 ) {
@@ -121,9 +123,9 @@ public void setInputWebHdfsURL(String fileSystemURL) {
121123
122124 public void setOutputPath (String path ) {
123125 // Remove trailing backslash
124- outputPath = new File (path .replaceAll ("/$" ,"" ));
125- offsetsPath = new File ( outputPath , OFFSETS_FILE_NAME );
126- bins = Frequency .read (new File ( outputPath , BINS_FILE_NAME ));
126+ outputPath = Paths . get (path .replaceAll ("/$" , "" ));
127+ offsetsPath = outputPath . resolve ( OFFSETS_FILE_NAME );
128+ bins = Frequency .read (outputPath . resolve ( BINS_FILE_NAME ));
127129 }
128130
129131 public long getProcessedFileCount () {
@@ -174,7 +176,7 @@ public void start(String directoryName) throws IOException {
174176
175177 // Actually process the files
176178 for (Map .Entry <String , List <Path >> entry : topicPaths .entrySet ()) {
177- try (FileCacheStore cache = new FileCacheStore (converterFactory , 100 , USE_GZIP )) {
179+ try (FileCacheStore cache = new FileCacheStore (converterFactory , 100 , USE_GZIP , DO_DEDUPLICATE )) {
178180 for (Path filePath : entry .getValue ()) {
179181 this .processFile (filePath , entry .getKey (), cache , offsets );
180182 progressBar .update (++processedFileCount );
@@ -258,16 +260,16 @@ private void writeRecord(GenericRecord record, String topicName, FileCacheStore
258260
259261 // Clean user id and create final output pathname
260262 String userId = keyField .get ("userId" ).toString ().replaceAll ("[^a-zA-Z0-9_-]+" , "" );
261- File userDir = new File ( this .outputPath , userId );
262- File userTopicDir = new File ( userDir , topicName );
263- File outputFile = new File ( userTopicDir , outputFileName );
263+ java . nio . file . Path userDir = this .outputPath . resolve ( userId );
264+ java . nio . file . Path userTopicDir = userDir . resolve ( topicName );
265+ java . nio . file . Path outputPath = userTopicDir . resolve ( outputFileName );
264266
265267 // Write data
266- cache .writeRecord (outputFile , record );
268+ cache .writeRecord (outputPath , record );
267269
268- File schemaFile = new File ( userTopicDir , SCHEMA_OUTPUT_FILE_NAME );
269- if (!schemaFile .exists ()) {
270- try (FileWriter writer = new FileWriter ( schemaFile , false )) {
270+ java . nio . file . Path schemaPath = userTopicDir . resolve ( SCHEMA_OUTPUT_FILE_NAME );
271+ if (!Files .exists (schemaPath )) {
272+ try (Writer writer = Files . newBufferedWriter ( schemaPath )) {
271273 writer .write (record .getSchema ().toString (true ));
272274 }
273275 }
0 commit comments