Skip to content

Commit 7fa1cd1

Browse files
committed
Some code simplification
1 parent a72f614 commit 7fa1cd1

File tree

2 files changed

+46
-64
lines changed

2 files changed

+46
-64
lines changed

src/main/java/org/radarcns/Frequency.java

Lines changed: 40 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616

1717
package org.radarcns;
1818

19+
import java.nio.file.Files;
20+
import java.util.Objects;
21+
import javax.annotation.Nonnull;
1922
import org.apache.avro.Schema.Field;
2023
import org.apache.avro.generic.GenericRecord;
2124
import org.apache.commons.collections.MapIterator;
@@ -30,38 +33,46 @@
3033
public class Frequency {
3134
private static final Logger logger = LoggerFactory.getLogger(Frequency.class);
3235

33-
private MultiKeyMap bins = new MultiKeyMap();
34-
private File binFilePath;
36+
private final MultiKeyMap bins;
37+
private final File file;
3538

36-
public void setBinFilePath(File binFilePath) {
37-
this.binFilePath = binFilePath;
39+
public Frequency(@Nonnull File file, @Nonnull MultiKeyMap initialData) {
40+
Objects.requireNonNull(file);
41+
Objects.requireNonNull(initialData);
42+
this.file = file;
43+
this.bins = initialData;
3844
}
3945

40-
public MultiKeyMap getBins() {
41-
return bins;
46+
public static Frequency read(File file) {
47+
MultiKeyMap map = new MultiKeyMap();
48+
try {
49+
// Read in all lines as multikeymap (key, key, key, value)
50+
Files.readAllLines(file.toPath()).forEach(line -> {
51+
String[] columns = line.split(",");
52+
try {
53+
map.put(columns[0], columns[1], columns[2], Integer.valueOf(columns[3]));
54+
} catch (ArrayIndexOutOfBoundsException ex) {
55+
logger.warn("Unable to row of the bins file. Skipping.");
56+
}
57+
});
58+
} catch (IOException e) {
59+
logger.warn("Could not read the file with bins. Creating new file when writing.");
60+
}
61+
return new Frequency(file, map);
4262
}
4363

44-
public void addToBin(String topicName, String id, String timestamp, int countToAdd) {
64+
public void add(String topicName, String id, GenericRecord valueField, Field timeField) {
65+
String timestamp = RestructureAvroRecords.createHourTimestamp(valueField, timeField);
66+
4567
Integer count = (Integer) bins.get(topicName, id, timestamp);
4668
if (count == null) {
47-
bins.put(topicName, id, timestamp, countToAdd);
69+
bins.put(topicName, id, timestamp, 1);
4870
} else {
49-
bins.put(topicName, id, timestamp, count + countToAdd);
71+
bins.put(topicName, id, timestamp, count + 1);
5072
}
5173
}
5274

53-
public void addToBin(String topicName, String id, GenericRecord valueField, Field timeField, int countToAdd) {
54-
// Hour resolution
55-
String hourlyTimestamp = RestructureAvroRecords.createHourTimestamp(valueField, timeField);
56-
57-
addToBin(topicName, id, hourlyTimestamp, countToAdd);
58-
}
59-
60-
public void addToBin(String topicName, String id, GenericRecord valueField, Field timeField) {
61-
addToBin(topicName, id, valueField, timeField, 1);
62-
}
63-
64-
public void printBins() {
75+
public void print() {
6576
MapIterator mapIterator = bins.mapIterator();
6677

6778
while (mapIterator.hasNext()) {
@@ -71,50 +82,24 @@ public void printBins() {
7182
}
7283
}
7384

74-
public void writeBins() {
75-
// Read bins from file and add to current bins
76-
// Creates new bins if not existing yet
77-
addBinsFromFile();
78-
85+
public void write() {
7986
// Write all bins to csv
8087
MapIterator mapIterator = bins.mapIterator();
81-
try(FileWriter fw = new FileWriter(binFilePath, false);
82-
BufferedWriter bw = new BufferedWriter(fw);
83-
PrintWriter out = new PrintWriter(bw))
84-
{
88+
try (FileWriter fw = new FileWriter(file, false);
89+
BufferedWriter bw = new BufferedWriter(fw)) {
8590
String header = String.join(",","topic","device","timestamp","count");
86-
out.println(header);
91+
bw.write(header);
92+
bw.write('\n');
8793

8894
while (mapIterator.hasNext()) {
8995
MultiKey key = (MultiKey) mapIterator.next();
9096
Integer value = (Integer) mapIterator.getValue();
9197
String data = String.join(",", key.getKey(0).toString(), key.getKey(1).toString(), key.getKey(2).toString(), value.toString());
92-
out.println(data);
93-
}
94-
} catch (IOException e) {
95-
// TODO: exception handling
96-
e.printStackTrace();
97-
}
98-
99-
// Reset the map
100-
bins = new MultiKeyMap();
101-
}
102-
103-
private void addBinsFromFile() {
104-
try (FileReader fr = new FileReader(binFilePath);
105-
BufferedReader br = new BufferedReader(fr))
106-
{
107-
// Read in all lines as multikeymap (key, key, key, value)
108-
String line;
109-
br.readLine(); // Skip header
110-
while ( (line = br.readLine()) != null ) {
111-
String[] columns = line.split(",");
112-
this.addToBin(columns[0], columns[1], columns[2], Integer.valueOf(columns[3]));
98+
bw.write(data);
99+
bw.write('\n');
113100
}
114101
} catch (IOException e) {
115-
logger.warn("Could not read the file with bins. Creating new file when writing.");
116-
} catch (ArrayIndexOutOfBoundsException e) {
117-
logger.warn("Unable to parse the contents of the bins file. Skipping reading.");
102+
logger.error("Failed to write bins", e);
118103
}
119104
}
120105
}

src/main/java/org/radarcns/RestructureAvroRecords.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.apache.avro.file.DataFileReader;
2525
import org.apache.avro.generic.GenericDatumReader;
2626
import org.apache.avro.generic.GenericRecord;
27-
import org.apache.avro.io.DatumReader;
2827
import org.apache.avro.mapred.FsInput;
2928
import org.apache.commons.io.FilenameUtils;
3029
import org.apache.hadoop.conf.Configuration;
@@ -52,16 +51,14 @@ public class RestructureAvroRecords {
5251
private File outputPath;
5352
private File offsetsPath;
5453
private OffsetRangeSet seenFiles;
55-
private final Frequency bins = new Frequency();
54+
private Frequency bins;
5655

5756
private final Configuration conf = new Configuration();
58-
private DatumReader<GenericRecord> datumReader;
5957

6058
private int processedFileCount;
6159
private int processedRecordsCount;
6260

6361
public static void main(String [] args) throws Exception {
64-
// Thread.sleep(120_000);
6562
if (args.length != 3) {
6663
System.out.println("Usage: hadoop jar restructurehdfs-all-0.1.0.jar <webhdfs_url> <hdfs_topic> <output_folder>");
6764
System.exit(1);
@@ -106,7 +103,7 @@ public void setOutputPath(String path) {
106103
// Remove trailing backslash
107104
outputPath = new File(path.replaceAll("/$",""));
108105
offsetsPath = new File(outputPath, OFFSETS_FILE_NAME);
109-
bins.setBinFilePath(new File(outputPath, BINS_FILE_NAME));
106+
bins = Frequency.read(new File(outputPath, BINS_FILE_NAME));
110107
}
111108

112109
public int getProcessedFileCount() {
@@ -154,7 +151,6 @@ private void processTopic(Path topicPath, RecordConverterFactory converterFactor
154151
RemoteIterator<LocatedFileStatus> files = fs.listFiles(topicPath, true);
155152

156153
String topicName = topicPath.getName();
157-
datumReader = new GenericDatumReader<>();
158154

159155
try (FileCache cache = new FileCache(converterFactory, 100)) {
160156
while (files.hasNext()) {
@@ -187,7 +183,8 @@ private void processFile(Path filePath, String topicName, FileCache cache,
187183

188184
// Read and parse avro file
189185
FsInput input = new FsInput(filePath, conf);
190-
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(input, datumReader);
186+
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(input,
187+
new GenericDatumReader<>());
191188

192189
GenericRecord record = null;
193190
while (dataFileReader.hasNext()) {
@@ -200,7 +197,7 @@ record = dataFileReader.next(record);
200197
// Write which file has been processed and update bins
201198
try {
202199
offsets.write(range);
203-
bins.writeBins();
200+
bins.write();
204201
} catch (IOException ex) {
205202
logger.warn("Failed to update status. Continuing processing.", ex);
206203
}
@@ -230,7 +227,7 @@ private void writeRecord(GenericRecord record, String topicName, FileCache cache
230227
cache.writeRecord(outputFile, record);
231228

232229
// Count data (binned and total)
233-
bins.addToBin(topicName, keyField.get("sourceId").toString(), valueField, timeField);
230+
bins.add(topicName, keyField.get("sourceId").toString(), valueField, timeField);
234231
processedRecordsCount++;
235232
}
236233

0 commit comments

Comments
 (0)