Skip to content

Commit 538d248

Browse files
committed
Process WindowedKey streams
1 parent f7a63d5 commit 538d248

File tree

3 files changed

+62
-33
lines changed

3 files changed

+62
-33
lines changed

src/main/java/org/radarcns/Frequency.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.radarcns;
1818

1919
import java.nio.file.Files;
20+
import java.util.Date;
2021
import java.util.List;
2122
import java.util.Objects;
2223
import javax.annotation.Nonnull;
@@ -63,8 +64,8 @@ public static Frequency read(File file) {
6364
return new Frequency(file, map);
6465
}
6566

66-
public void add(String topicName, String id, GenericRecord valueField, Field timeField) {
67-
String timestamp = RestructureAvroRecords.createHourTimestamp(valueField, timeField);
67+
public void add(String topicName, String id, Date date) {
68+
String timestamp = RestructureAvroRecords.createHourTimestamp(date);
6869

6970
Integer count = (Integer) bins.get(topicName, id, timestamp);
7071
if (count == null) {

src/main/java/org/radarcns/RestructureAvroRecords.java

Lines changed: 37 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,6 @@
1616

1717
package org.radarcns;
1818

19-
import java.io.File;
20-
import java.io.FileWriter;
21-
import java.io.IOException;
22-
import java.nio.file.Files;
23-
import java.text.SimpleDateFormat;
24-
import java.util.ArrayList;
25-
import java.util.Date;
26-
import java.util.HashMap;
27-
import java.util.List;
28-
import java.util.Map;
29-
import java.util.TimeZone;
3019
import org.apache.avro.Schema.Field;
3120
import org.apache.avro.file.DataFileReader;
3221
import org.apache.avro.generic.GenericDatumReader;
@@ -45,7 +34,16 @@
4534
import org.slf4j.Logger;
4635
import org.slf4j.LoggerFactory;
4736

48-
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
37+
import java.io.File;
38+
import java.io.FileWriter;
39+
import java.io.IOException;
40+
import java.text.SimpleDateFormat;
41+
import java.util.ArrayList;
42+
import java.util.Date;
43+
import java.util.HashMap;
44+
import java.util.List;
45+
import java.util.Map;
46+
import java.util.TimeZone;
4947

5048
public class RestructureAvroRecords {
5149
private static final Logger logger = LoggerFactory.getLogger(RestructureAvroRecords.class);
@@ -255,8 +253,8 @@ private void writeRecord(GenericRecord record, String topicName, FileCacheStore
255253
throw new IOException("Failed to process " + record + "; no key or value");
256254
}
257255

258-
Field timeField = valueField.getSchema().getField("time");
259-
String outputFileName = createFilename(valueField, timeField);
256+
Date time = getDate(keyField, valueField);
257+
String outputFileName = createFilename(time);
260258

261259
// Clean user id and create final output pathname
262260
String userId = keyField.get("userId").toString().replaceAll("[^a-zA-Z0-9_-]+", "");
@@ -275,29 +273,42 @@ private void writeRecord(GenericRecord record, String topicName, FileCacheStore
275273
}
276274

277275
// Count data (binned and total)
278-
bins.add(topicName, keyField.get("sourceId").toString(), valueField, timeField);
276+
bins.add(topicName, keyField.get("sourceId").toString(), time);
279277
processedRecordsCount++;
280278
}
281279

282-
private String createFilename(GenericRecord valueField, Field timeField) {
283-
if (timeField == null) {
284-
logger.warn("Time field of record valueField " + valueField + " is not set");
285-
return "unknown." + outputFileExtension;
280+
private String createFilename(Date date) {
281+
if (date == null) {
282+
logger.warn("Time field of record valueField is not set");
283+
return "unknown_date." + outputFileExtension;
286284
}
287285
// Make a timestamped filename YYYYMMDD_HH00.json
288-
String hourlyTimestamp = createHourTimestamp(valueField, timeField);
286+
String hourlyTimestamp = createHourTimestamp(date);
289287
return hourlyTimestamp + "00." + outputFileExtension;
290288
}
291289

292-
public static String createHourTimestamp(GenericRecord valueField, Field timeField) {
293-
if (timeField == null) {
294-
return "unknown";
290+
public static String createHourTimestamp(Date date) {
291+
if (date == null) {
292+
return "unknown_date";
295293
}
296294

297-
double time = (Double) valueField.get(timeField.pos());
298-
// Convert from millis to date and apply dateFormat
299-
Date date = new Date((long) (time * 1000d));
300295
return FILE_DATE_FORMAT.format(date);
301296
}
302297

298+
public static Date getDate(GenericRecord keyField, GenericRecord valueField) {
299+
Field timeField = valueField.getSchema().getField("time");
300+
if (timeField != null) {
301+
double time = (Double) valueField.get(timeField.pos());
302+
// Convert from millis to date and apply dateFormat
303+
return new Date((long) (time * 1000d));
304+
}
305+
306+
// WindowedKey
307+
timeField = keyField.getSchema().getField("start");
308+
if (timeField == null) {
309+
return null;
310+
}
311+
long time = (Long) keyField.get("start");
312+
return new Date(time);
313+
}
303314
}

src/test/java/org/radarcns/RestructureAvroRecordsTest.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,32 @@ public class RestructureAvroRecordsTest {
3333
@Test
3434
public void createHourTimestamp() throws Exception {
3535
long currentTime = 1493711175; // Tue May 2 07:46:15 UTC 2017
36+
long startTime = (currentTime - 3600) * 1000L;
3637

37-
Schema schema = SchemaBuilder.record("value").fields()
38+
Schema keySchema = SchemaBuilder.record("value").fields()
39+
.name("start").type("long").noDefault()
40+
.endRecord();
41+
GenericRecord keyField = new GenericRecordBuilder(keySchema)
42+
.set("start", startTime).build();
43+
44+
Schema valueSchema1 = SchemaBuilder.record("value").fields()
3845
.name("time").type("double").noDefault()
3946
.endRecord();
40-
GenericRecord record = new GenericRecordBuilder(schema).set("time",
41-
(double)currentTime).build();
42-
Field timeField = schema.getField("time");
43-
String result = RestructureAvroRecords.createHourTimestamp(record, timeField);
47+
GenericRecord valueField1 = new GenericRecordBuilder(valueSchema1)
48+
.set("time", (double)currentTime).build();
49+
50+
Date date = RestructureAvroRecords.getDate(keyField, valueField1);
51+
String result = RestructureAvroRecords.createHourTimestamp(date);
4452

4553
assertEquals("20170502_07", result);
54+
55+
Schema valueSchema2 = SchemaBuilder.record("value").fields()
56+
.name("a").type("double").noDefault()
57+
.endRecord();
58+
GenericRecord valueField2 = new GenericRecordBuilder(valueSchema2)
59+
.set("a", 0.1).build();
60+
date = RestructureAvroRecords.getDate(keyField, valueField2);
61+
result = RestructureAvroRecords.createHourTimestamp(date);
62+
assertEquals("20170502_06", result);
4663
}
4764
}

0 commit comments

Comments
 (0)