Skip to content

Commit 6787de3

Browse files
committed
Fixed processing for latest HDFS connector
1 parent 9a46f4c commit 6787de3

File tree

2 files changed

+34
-12
lines changed

2 files changed

+34
-12
lines changed

src/main/java/org/radarcns/Frequency.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.radarcns;
1818

19+
import org.apache.avro.Schema.Field;
20+
import org.apache.avro.generic.GenericRecord;
1921
import org.apache.commons.collections.MapIterator;
2022
import org.apache.commons.collections.keyvalue.MultiKey;
2123
import org.apache.commons.collections.map.MultiKeyMap;
@@ -48,15 +50,15 @@ public void addToBin(String topicName, String id, String timestamp, int countToA
4850
}
4951
}
5052

51-
public void addToBin(String topicName, String id, Double time, int countToAdd) {
53+
public void addToBin(String topicName, String id, GenericRecord valueField, Field timeField, int countToAdd) {
5254
// Hour resolution
53-
String hourlyTimestamp = RestructureAvroRecords.createHourTimestamp(time);
55+
String hourlyTimestamp = RestructureAvroRecords.createHourTimestamp(valueField, timeField);
5456

5557
addToBin(topicName, id, hourlyTimestamp, countToAdd);
5658
}
5759

58-
public void addToBin(String topicName, String id, Double time) {
59-
addToBin(topicName, id, time, 1);
60+
public void addToBin(String topicName, String id, GenericRecord valueField, Field timeField) {
61+
addToBin(topicName, id, valueField, timeField, 1);
6062
}
6163

6264
public void printBins() {

src/main/java/org/radarcns/RestructureAvroRecords.java

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.IOException;
2121
import java.text.SimpleDateFormat;
2222
import java.util.Date;
23+
import org.apache.avro.Schema.Field;
2324
import org.apache.avro.file.DataFileReader;
2425
import org.apache.avro.generic.GenericDatumReader;
2526
import org.apache.avro.generic.GenericRecord;
@@ -208,12 +209,16 @@ record = dataFileReader.next(record);
208209

209210
private void writeRecord(GenericRecord record, String topicName, FileCache cache)
210211
throws IOException {
211-
GenericRecord keyField = (GenericRecord) record.get("keyField");
212-
GenericRecord valueField = (GenericRecord) record.get("valueField");
212+
GenericRecord keyField = (GenericRecord) record.get("key");
213+
GenericRecord valueField = (GenericRecord) record.get("value");
213214

214-
// Make a timestamped filename YYYYMMDD_HH00.json
215-
String hourlyTimestamp = createHourTimestamp( (Double) valueField.get("time"));
216-
String outputFileName = hourlyTimestamp + "00." + outputFileExtension;
215+
if (keyField == null || valueField == null) {
216+
logger.error("Failed to process {}", record);
217+
throw new IOException("Failed to process " + record + "; no key or value");
218+
}
219+
220+
Field timeField = valueField.getSchema().getField("time");
221+
String outputFileName = createFilename(valueField, timeField);
217222

218223
// Clean user id and create final output pathname
219224
String userId = keyField.get("userId").toString().replaceAll("\\W+", "");
@@ -225,13 +230,28 @@ private void writeRecord(GenericRecord record, String topicName, FileCache cache
225230
cache.writeRecord(outputFile, record);
226231

227232
// Count data (binned and total)
228-
bins.addToBin(topicName, keyField.get("sourceId").toString(), (Double) valueField.get("time"));
233+
bins.addToBin(topicName, keyField.get("sourceId").toString(), valueField, timeField);
229234
processedRecordsCount++;
230235
}
231236

232-
public static String createHourTimestamp(Double time) {
237+
private String createFilename(GenericRecord valueField, Field timeField) {
238+
if (timeField == null) {
239+
logger.warn("Time field of record valueField " + valueField + " is not set");
240+
return "unknown." + outputFileExtension;
241+
}
242+
// Make a timestamped filename YYYYMMDD_HH00.json
243+
String hourlyTimestamp = createHourTimestamp(valueField, timeField);
244+
return hourlyTimestamp + "00." + outputFileExtension;
245+
}
246+
247+
public static String createHourTimestamp(GenericRecord valueField, Field timeField) {
248+
if (timeField == null) {
249+
return "unknown";
250+
}
251+
252+
double time = (Double) valueField.get(timeField.pos());
233253
// Convert from millis to date and apply dateFormat
234-
Date date = new Date( time.longValue() * 1000 );
254+
Date date = new Date((long) (time * 1000d));
235255
return FILE_DATE_FORMAT.format(date);
236256
}
237257
}

0 commit comments

Comments
 (0)