Skip to content

Commit d936879

Browse files
Merge pull request #19 from RADAR-base/release-0.3.2
Release 0.3.2
2 parents d225f6f + 1b7b554 commit d936879

12 files changed

+207
-90
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Restructure HDFS files
22

3-
[![Build Status](https://travis-ci.org/RADAR-CNS/Restructure-HDFS-topic.svg?branch=master)](https://travis-ci.org/RADAR-CNS/Restructure-HDFS-topic)
3+
[![Build Status](https://travis-ci.org/RADAR-base/Restructure-HDFS-topic.svg?branch=master)](https://travis-ci.org/RADAR-base/Restructure-HDFS-topic)
44

55
Data streamed to HDFS using the [RADAR HDFS sink connector](https://github.com/RADAR-CNS/RADAR-HDFS-Sink-Connector) is streamed to files based on sensor only. This package can transform that output to a local directory structure as follows: `userId/topic/date_hour.csv`. The date and hour is extracted from the `time` field of each record, and is formatted in UTC time.
66

@@ -31,4 +31,4 @@ Another option is to output the data in compressed form. All files will get the
3131
java -Dorg.radarcns.compress=gzip -jar restructurehdfs-0.3.1-all.jar <webhdfs_url> <hdfs_topic_path> <output_folder>
3232
```
3333

34-
Finally, files records are deduplicated after writing. To disable this behaviour, specify the option `-Dorg.radarcns.deduplicate=false`.
34+
Finally, by default, files records are not deduplicated after writing. To enable this behaviour, specify the option `-Dorg.radarcns.deduplicate=true`. This set to false by default because of an issue with Biovotion data. Please see - [issue #16](https://github.com/RADAR-base/Restructure-HDFS-topic/issues/16) before enabling it.

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ apply plugin: 'java'
22
apply plugin: 'application'
33

44
group 'org.radarcns.restructurehdfs'
5-
version '0.3.1'
5+
version '0.3.2'
66
mainClassName = 'org.radarcns.RestructureAvroRecords'
77

88
run {

src/main/java/org/radarcns/RestructureAvroRecords.java

Lines changed: 51 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.radarcns;
1818

19+
import com.fasterxml.jackson.databind.JsonMappingException;
1920
import org.apache.avro.Schema.Field;
2021
import org.apache.avro.file.DataFileReader;
2122
import org.apache.avro.generic.GenericDatumReader;
@@ -70,7 +71,9 @@ public class RestructureAvroRecords {
7071
private long processedFileCount;
7172
private long processedRecordsCount;
7273
private static final boolean USE_GZIP = "gzip".equalsIgnoreCase(System.getProperty("org.radarcns.compression"));
73-
private static final boolean DO_DEDUPLICATE = "true".equalsIgnoreCase(System.getProperty("org.radarcns.deduplicate", "true"));
74+
75+
// Default set to false because causes loss of records from Biovotion data. https://github.com/RADAR-base/Restructure-HDFS-topic/issues/16
76+
private static final boolean DO_DEDUPLICATE = "true".equalsIgnoreCase(System.getProperty("org.radarcns.deduplicate", "false"));
7477

7578
public static void main(String [] args) throws Exception {
7679
if (args.length != 3) {
@@ -178,7 +181,12 @@ public void start(String directoryName) throws IOException {
178181
for (Map.Entry<String, List<Path>> entry : topicPaths.entrySet()) {
179182
try (FileCacheStore cache = new FileCacheStore(converterFactory, 100, USE_GZIP, DO_DEDUPLICATE)) {
180183
for (Path filePath : entry.getValue()) {
181-
this.processFile(filePath, entry.getKey(), cache, offsets);
184+
// If JsonMappingException occurs, log the error and continue with other files
185+
try {
186+
this.processFile(filePath, entry.getKey(), cache, offsets);
187+
} catch (JsonMappingException exc) {
188+
logger.error("Cannot map values", exc);
189+
}
182190
progressBar.update(++processedFileCount);
183191
}
184192
}
@@ -232,7 +240,7 @@ private void processFile(Path filePath, String topicName, FileCacheStore cache,
232240
record = dataFileReader.next(record);
233241

234242
// Get the fields
235-
this.writeRecord(record, topicName, cache);
243+
this.writeRecord(record, topicName, cache, 0);
236244
}
237245

238246
// Write which file has been processed and update bins
@@ -245,7 +253,7 @@ record = dataFileReader.next(record);
245253
}
246254
}
247255

248-
private void writeRecord(GenericRecord record, String topicName, FileCacheStore cache)
256+
private void writeRecord(GenericRecord record, String topicName, FileCacheStore cache, int suffix)
249257
throws IOException {
250258
GenericRecord keyField = (GenericRecord) record.get("key");
251259
GenericRecord valueField = (GenericRecord) record.get("value");
@@ -256,37 +264,63 @@ private void writeRecord(GenericRecord record, String topicName, FileCacheStore
256264
}
257265

258266
Date time = getDate(keyField, valueField);
259-
java.nio.file.Path outputFileName = createFilename(time);
267+
java.nio.file.Path outputFileName = createFilename(time, suffix);
268+
269+
String projectId;
270+
271+
if(keyField.get("projectId") == null) {
272+
projectId = "unknown-project";
273+
} else {
274+
// Clean Project id for use in final pathname
275+
projectId = keyField.get("projectId").toString().replaceAll("[^a-zA-Z0-9_-]+", "");
276+
}
260277

261278
// Clean user id and create final output pathname
262279
String userId = keyField.get("userId").toString().replaceAll("[^a-zA-Z0-9_-]+", "");
263-
java.nio.file.Path userDir = this.outputPath.resolve(userId);
280+
281+
java.nio.file.Path projectDir = this.outputPath.resolve(projectId);
282+
java.nio.file.Path userDir = projectDir.resolve(userId);
264283
java.nio.file.Path userTopicDir = userDir.resolve(topicName);
265284
java.nio.file.Path outputPath = userTopicDir.resolve(outputFileName);
266285

267286
// Write data
268-
cache.writeRecord(outputPath, record);
287+
int response = cache.writeRecord(outputPath, record);
269288

270-
java.nio.file.Path schemaPath = userTopicDir.resolve(SCHEMA_OUTPUT_FILE_NAME);
271-
if (!Files.exists(schemaPath)) {
272-
try (Writer writer = Files.newBufferedWriter(schemaPath)) {
273-
writer.write(record.getSchema().toString(true));
289+
if (response == FileCacheStore.CACHE_AND_NO_WRITE || response == FileCacheStore.NO_CACHE_AND_NO_WRITE) {
290+
// Write was unsuccessful due to different number of columns,
291+
// try again with new file name
292+
writeRecord(record, topicName, cache, ++suffix);
293+
} else {
294+
// Write was successful, finalize the write
295+
java.nio.file.Path schemaPath = userTopicDir.resolve(SCHEMA_OUTPUT_FILE_NAME);
296+
if (!Files.exists(schemaPath)) {
297+
try (Writer writer = Files.newBufferedWriter(schemaPath)) {
298+
writer.write(record.getSchema().toString(true));
299+
}
274300
}
275-
}
276301

277-
// Count data (binned and total)
278-
bins.add(topicName, keyField.get("sourceId").toString(), time);
279-
processedRecordsCount++;
302+
// Count data (binned and total)
303+
bins.add(topicName, keyField.get("sourceId").toString(), time);
304+
processedRecordsCount++;
305+
}
280306
}
281307

282-
private java.nio.file.Path createFilename(Date date) {
308+
private java.nio.file.Path createFilename(Date date, int suffix) {
283309
if (date == null) {
284310
logger.warn("Time field of record valueField is not set");
285311
return Paths.get("unknown_date." + outputFileExtension);
286312
}
313+
314+
String finalSuffix;
315+
if(suffix == 0) {
316+
finalSuffix = "";
317+
} else {
318+
finalSuffix = "_" + suffix;
319+
}
320+
287321
// Make a timestamped filename YYYYMMDD_HH00.json
288322
String hourlyTimestamp = createHourTimestamp(date);
289-
return Paths.get(hourlyTimestamp + "00." + outputFileExtension);
323+
return Paths.get(hourlyTimestamp + "00" + finalSuffix +"." + outputFileExtension);
290324
}
291325

292326
public static String createHourTimestamp(Date date) {

src/main/java/org/radarcns/util/CsvAvroConverter.java

Lines changed: 52 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616

1717
package org.radarcns.util;
1818

19+
import com.fasterxml.jackson.databind.JsonMappingException;
20+
import com.fasterxml.jackson.databind.MappingIterator;
21+
import com.fasterxml.jackson.databind.ObjectReader;
1922
import com.fasterxml.jackson.databind.ObjectWriter;
2023
import com.fasterxml.jackson.dataformat.csv.CsvFactory;
2124
import com.fasterxml.jackson.dataformat.csv.CsvGenerator;
@@ -28,11 +31,10 @@
2831
import org.apache.avro.generic.GenericRecord;
2932

3033
import java.io.IOException;
34+
import java.io.Reader;
3135
import java.io.Writer;
3236
import java.nio.ByteBuffer;
33-
import java.util.LinkedHashMap;
34-
import java.util.List;
35-
import java.util.Map;
37+
import java.util.*;
3638

3739
/**
3840
* Converts deep hierarchical Avro records into flat CSV format. It uses a simple dot syntax in the
@@ -45,8 +47,8 @@ public static RecordConverterFactory getFactory() {
4547
CsvFactory factory = new CsvFactory();
4648
return new RecordConverterFactory() {
4749
@Override
48-
public RecordConverter converterFor(Writer writer, GenericRecord record, boolean writeHeader) throws IOException {
49-
return new CsvAvroConverter(factory, writer, record, writeHeader);
50+
public RecordConverter converterFor(Writer writer, GenericRecord record, boolean writeHeader, Reader reader) throws IOException {
51+
return new CsvAvroConverter(factory, writer, record, writeHeader, reader);
5052
}
5153

5254
@Override
@@ -59,28 +61,68 @@ public boolean hasHeader() {
5961
private final ObjectWriter csvWriter;
6062
private final Map<String, Object> map;
6163
private final CsvGenerator generator;
64+
private CsvSchema schema;
6265

63-
public CsvAvroConverter(CsvFactory factory, Writer writer, GenericRecord record, boolean writeHeader)
66+
public CsvAvroConverter(CsvFactory factory, Writer writer, GenericRecord record, boolean writeHeader, Reader reader)
6467
throws IOException {
6568
map = new LinkedHashMap<>();
66-
Map<String, Object> value = convertRecord(record);
69+
70+
CsvMapper mapper = new CsvMapper(factory);
71+
Map<String, Object> value;
72+
73+
schema = CsvSchema.emptySchema().withHeader();
74+
if (!writeHeader) {
75+
// If file already exists read the schema from the CSV file
76+
ObjectReader objectReader = mapper.readerFor(Map.class).with(schema);
77+
MappingIterator<Map<String,Object>> iterator = objectReader.readValues(reader);
78+
value = iterator.next();
79+
} else {
80+
value = convertRecord(record);
81+
}
82+
6783
CsvSchema.Builder builder = new CsvSchema.Builder();
6884
for (String key : value.keySet()) {
6985
builder.addColumn(key);
7086
}
71-
CsvSchema schema = builder.build();
87+
schema = builder.build();
88+
7289
if (writeHeader) {
7390
schema = schema.withHeader();
7491
}
92+
7593
generator = factory.createGenerator(writer);
76-
csvWriter = new CsvMapper(factory).writer(schema);
94+
csvWriter = mapper.writer(schema);
95+
7796
}
7897

98+
/**
99+
* Write AVRO record to CSV file.
100+
* @param record the AVRO record to be written to CSV file
101+
* @return true if write was successful, false if cannot write record to the current CSV file
102+
* @throws IOException for other IO and Mapping errors
103+
*/
79104
@Override
80-
public void writeRecord(GenericRecord record) throws IOException {
105+
public boolean writeRecord(GenericRecord record) throws IOException {
81106
Map<String, Object> localMap = convertRecord(record);
107+
108+
if(localMap.size() > schema.size()) {
109+
// Cannot write to same file so return false
110+
return false;
111+
} else {
112+
Iterator<String> localColumnIterator = localMap.keySet().iterator();
113+
for(int i = 0; i < schema.size(); i++) {
114+
if (!schema.columnName(i).equals(localColumnIterator.next())) {
115+
/* The order or name of columns is different and
116+
thus cannot write to this csv file. return false.
117+
*/
118+
return false;
119+
}
120+
}
121+
}
122+
82123
csvWriter.writeValue(generator, localMap);
83124
localMap.clear();
125+
return true;
84126
}
85127

86128
public Map<String, Object> convertRecord(GenericRecord record) {

src/main/java/org/radarcns/util/FileCache.java

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,11 @@
1616

1717
package org.radarcns.util;
1818

19-
import java.io.BufferedOutputStream;
20-
import java.io.Closeable;
21-
import java.io.FileOutputStream;
22-
import java.io.Flushable;
23-
import java.io.IOException;
24-
import java.io.OutputStream;
25-
import java.io.OutputStreamWriter;
26-
import java.io.Writer;
19+
import java.io.*;
2720
import java.nio.file.Files;
2821
import java.nio.file.Path;
2922
import java.nio.file.StandardOpenOption;
23+
import java.util.zip.GZIPInputStream;
3024
import java.util.zip.GZIPOutputStream;
3125
import javax.annotation.Nonnull;
3226
import org.apache.avro.generic.GenericRecord;
@@ -57,15 +51,19 @@ public FileCache(RecordConverterFactory converterFactory, Path path,
5751

5852
OutputStream outFile = Files.newOutputStream(path,
5953
StandardOpenOption.APPEND, StandardOpenOption.CREATE);
54+
InputStream inputStream = new BufferedInputStream(Files.newInputStream(path));
6055
OutputStream bufOut = new BufferedOutputStream(outFile);
6156
if (gzip) {
6257
bufOut = new GZIPOutputStream(bufOut);
58+
if (!fileIsNew) {
59+
inputStream = new GZIPInputStream(inputStream);
60+
}
6361
}
6462

6563
this.writer = new OutputStreamWriter(bufOut);
6664

67-
try {
68-
this.recordConverter = converterFactory.converterFor(writer, record, fileIsNew);
65+
try (Reader reader = new InputStreamReader(inputStream)) {
66+
this.recordConverter = converterFactory.converterFor(writer, record, fileIsNew, reader);
6967
} catch (IOException ex) {
7068
try {
7169
writer.close();
@@ -76,10 +74,16 @@ public FileCache(RecordConverterFactory converterFactory, Path path,
7674
}
7775
}
7876

79-
/** Write a record to the cache. */
80-
public void writeRecord(GenericRecord record) throws IOException {
81-
this.recordConverter.writeRecord(record);
77+
/**
78+
* Write a record to the cache.
79+
* @param record AVRO record
80+
* @return true or false based on {@link RecordConverter} write result
81+
* @throws IOException
82+
*/
83+
public boolean writeRecord(GenericRecord record) throws IOException {
84+
boolean result = this.recordConverter.writeRecord(record);
8285
lastUse = System.nanoTime();
86+
return result;
8387
}
8488

8589
@Override

src/main/java/org/radarcns/util/FileCacheStore.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,13 @@ public class FileCacheStore implements Flushable, Closeable {
4040
private final int maxFiles;
4141
private final Map<Path, FileCache> caches;
4242

43+
// Response codes for each write record case
44+
public static final int CACHE_AND_WRITE = 1; //used cache and write successful
45+
public static final int NO_CACHE_AND_WRITE= 2;
46+
public static final int CACHE_AND_NO_WRITE =3;
47+
public static final int NO_CACHE_AND_NO_WRITE =4;
48+
49+
4350
public FileCacheStore(RecordConverterFactory converterFactory, int maxFiles, boolean gzip, boolean deduplicate) {
4451
this.converterFactory = converterFactory;
4552
this.maxFiles = maxFiles;
@@ -54,14 +61,19 @@ public FileCacheStore(RecordConverterFactory converterFactory, int maxFiles, boo
5461
*
5562
* @param path file to append data to
5663
* @param record data
57-
* @return true if the cache was used, false if a new file was opened.
64+
* @return Integer value according to one of the response codes.
5865
* @throws IOException when failing to open a file or writing to it.
5966
*/
60-
public boolean writeRecord(Path path, GenericRecord record) throws IOException {
67+
public int writeRecord(Path path, GenericRecord record) throws IOException {
6168
FileCache cache = caches.get(path);
6269
if (cache != null) {
63-
cache.writeRecord(record);
64-
return true;
70+
if(cache.writeRecord(record)){
71+
return CACHE_AND_WRITE;
72+
} else {
73+
// This is the case when cache is used but write is unsuccessful
74+
// because of different number columns in same topic
75+
return CACHE_AND_NO_WRITE;
76+
}
6577
} else {
6678
ensureCapacity();
6779

@@ -70,8 +82,14 @@ public boolean writeRecord(Path path, GenericRecord record) throws IOException {
7082

7183
cache = new FileCache(converterFactory, path, record, gzip);
7284
caches.put(path, cache);
73-
cache.writeRecord(record);
74-
return false;
85+
if(cache.writeRecord(record)) {
86+
return NO_CACHE_AND_WRITE;
87+
} else {
88+
// The file path was not in cache but the file exists and this write is
89+
// unsuccessful because of different number of columns
90+
return NO_CACHE_AND_NO_WRITE;
91+
}
92+
7593
}
7694
}
7795

src/main/java/org/radarcns/util/JsonAvroConverter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public final class JsonAvroConverter implements RecordConverter {
4242

4343
public static RecordConverterFactory getFactory() {
4444
JsonFactory factory = new JsonFactory();
45-
return (writer, record, writeHeader) -> new JsonAvroConverter(factory, writer);
45+
return (writer, record, writeHeader, reader) -> new JsonAvroConverter(factory, writer);
4646
}
4747

4848
private final ObjectWriter jsonWriter;
@@ -54,8 +54,9 @@ public JsonAvroConverter(JsonFactory factory, Writer writer) throws IOException
5454
}
5555

5656
@Override
57-
public void writeRecord(GenericRecord record) throws IOException {
57+
public boolean writeRecord(GenericRecord record) throws IOException {
5858
jsonWriter.writeValue(generator, convertRecord(record));
59+
return true;
5960
}
6061

6162
public Map<String, Object> convertRecord(GenericRecord record) {

0 commit comments

Comments
 (0)