Skip to content

Commit a72f614

Browse files
committed
Use new GenericDatumReader for each topic
1 parent 6787de3 commit a72f614

File tree

1 file changed

+4
-4
lines changed

1 file changed

+4
-4
lines changed

src/main/java/org/radarcns/RestructureAvroRecords.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,13 @@ public class RestructureAvroRecords {
4949

5050
private final RecordConverterFactory converterFactory;
5151

52-
private File outputPath = new File(".");
53-
private File offsetsPath = new File(outputPath, OFFSETS_FILE_NAME);
52+
private File outputPath;
53+
private File offsetsPath;
5454
private OffsetRangeSet seenFiles;
5555
private final Frequency bins = new Frequency();
5656

5757
private final Configuration conf = new Configuration();
58-
private final DatumReader<GenericRecord> datumReader;
58+
private DatumReader<GenericRecord> datumReader;
5959

6060
private int processedFileCount;
6161
private int processedRecordsCount;
@@ -88,7 +88,6 @@ public static void main(String [] args) throws Exception {
8888
public RestructureAvroRecords(String inputPath, String outputPath) {
8989
this.setInputWebHdfsURL(inputPath);
9090
this.setOutputPath(outputPath);
91-
datumReader = new GenericDatumReader<>();
9291

9392
if (System.getProperty("org.radarcns.format", "csv").equalsIgnoreCase("json")) {
9493
converterFactory = JsonAvroConverter.getFactory();
@@ -155,6 +154,7 @@ private void processTopic(Path topicPath, RecordConverterFactory converterFactor
155154
RemoteIterator<LocatedFileStatus> files = fs.listFiles(topicPath, true);
156155

157156
String topicName = topicPath.getName();
157+
datumReader = new GenericDatumReader<>();
158158

159159
try (FileCache cache = new FileCache(converterFactory, 100)) {
160160
while (files.hasNext()) {

0 commit comments

Comments
 (0)