Skip to content

Commit 1bbb04e

Browse files
committed
Merge branch 'master' into dev
2 parents fb5fd15 + 1b1b9a2 commit 1bbb04e

File tree

9 files changed

+409
-163
lines changed

9 files changed

+409
-163
lines changed

.travis.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ env:
55
TERM: dumb
66
before_cache:
77
- rm -f $HOME/.gradle/caches/modules-2/modules-2.lock
8-
- rm -fr $HOME/.gradle/caches/*/plugin-resolution/
8+
- rm -fr $HOME/.gradle/caches/*/plugin-resolution
9+
- rm -fr $HOME/.gradle/caches/*/fileHashes
910
cache:
1011
directories:
1112
- $HOME/.gradle/caches/

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,8 @@ By default, this will output the data in CSV format. If JSON format is preferred
2525
```
2626
java -Dorg.radarcns.format=json -jar restructurehdfs-all-0.1.2-SNAPSHOT.jar <webhdfs_url> <hdfs_topic_path> <output_folder>
2727
```
28+
29+
Another option is to output the data in compressed form. All files will get the `gz` suffix, and can be decompressed with a GZIP decoder. Note that for a very small number of records, this may actually increase the file size.
30+
```
31+
java -Dorg.radarcns.compress=gzip -jar restructurehdfs-all-0.1.1.jar <webhdfs_url> <hdfs_topic_path> <output_folder>
32+
```

src/main/java/org/radarcns/RestructureAvroRecords.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import org.apache.hadoop.fs.Path;
3434
import org.apache.hadoop.fs.RemoteIterator;
3535
import org.radarcns.util.CsvAvroConverter;
36-
import org.radarcns.util.FileCache;
36+
import org.radarcns.util.FileCacheStore;
3737
import org.radarcns.util.JsonAvroConverter;
3838
import org.radarcns.util.RecordConverterFactory;
3939
import org.slf4j.Logger;
@@ -62,6 +62,7 @@ public class RestructureAvroRecords {
6262

6363
private int processedFileCount;
6464
private int processedRecordsCount;
65+
private static final boolean USE_GZIP = "gzip".equalsIgnoreCase(System.getProperty("org.radarcns.compression"));
6566

6667
public static void main(String [] args) throws Exception {
6768
if (args.length != 3) {
@@ -91,13 +92,21 @@ public RestructureAvroRecords(String inputPath, String outputPath) {
9192
this.setInputWebHdfsURL(inputPath);
9293
this.setOutputPath(outputPath);
9394

95+
String extension;
9496
if (System.getProperty("org.radarcns.format", "csv").equalsIgnoreCase("json")) {
97+
logger.info("Writing output files in JSON format");
9598
converterFactory = JsonAvroConverter.getFactory();
96-
outputFileExtension = "json";
99+
extension = "json";
97100
} else {
101+
logger.info("Writing output files in CSV format");
98102
converterFactory = CsvAvroConverter.getFactory();
99-
outputFileExtension = "csv";
103+
extension = "csv";
100104
}
105+
if (USE_GZIP) {
106+
logger.info("Compressing output files in GZIP format");
107+
extension += ".gz";
108+
}
109+
outputFileExtension = extension;
101110
}
102111

103112
public void setInputWebHdfsURL(String fileSystemURL) {
@@ -157,7 +166,7 @@ private void processTopic(Path topicPath, RecordConverterFactory converterFactor
157166

158167
String topicName = topicPath.getName();
159168

160-
try (FileCache cache = new FileCache(converterFactory, 100)) {
169+
try (FileCacheStore cache = new FileCacheStore(converterFactory, 100, USE_GZIP)) {
161170
while (files.hasNext()) {
162171
LocatedFileStatus locatedFileStatus = files.next();
163172

@@ -168,7 +177,7 @@ private void processTopic(Path topicPath, RecordConverterFactory converterFactor
168177
}
169178
}
170179

171-
private void processFile(Path filePath, String topicName, FileCache cache,
180+
private void processFile(Path filePath, String topicName, FileCacheStore cache,
172181
OffsetRangeFile offsets) throws IOException {
173182
String fileName = filePath.getName();
174183

@@ -209,7 +218,7 @@ record = dataFileReader.next(record);
209218
processedFileCount++;
210219
}
211220

212-
private void writeRecord(GenericRecord record, String topicName, FileCache cache)
221+
private void writeRecord(GenericRecord record, String topicName, FileCacheStore cache)
213222
throws IOException {
214223
GenericRecord keyField = (GenericRecord) record.get("key");
215224
GenericRecord valueField = (GenericRecord) record.get("value");

src/main/java/org/radarcns/util/CsvAvroConverter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,9 @@ public CsvAvroConverter(CsvFactory factory, Writer writer, GenericRecord record,
6767

6868
@Override
6969
public void writeRecord(GenericRecord record) throws IOException {
70-
csvWriter.writeValue(generator, convertRecord(record));
70+
Map<String, Object> localMap = convertRecord(record);
71+
csvWriter.writeValue(generator, localMap);
72+
localMap.clear();
7173
}
7274

7375
public Map<String, Object> convertRecord(GenericRecord record) {

src/main/java/org/radarcns/util/FileCache.java

Lines changed: 56 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -16,144 +16,87 @@
1616

1717
package org.radarcns.util;
1818

19-
import java.io.BufferedWriter;
19+
import java.io.BufferedOutputStream;
2020
import java.io.Closeable;
2121
import java.io.File;
22-
import java.io.FileWriter;
22+
import java.io.FileOutputStream;
2323
import java.io.Flushable;
2424
import java.io.IOException;
25+
import java.io.OutputStream;
26+
import java.io.OutputStreamWriter;
2527
import java.io.Writer;
26-
import java.util.ArrayList;
27-
import java.util.Collections;
28-
import java.util.HashMap;
29-
import java.util.Map;
28+
import java.util.zip.GZIPOutputStream;
3029
import javax.annotation.Nonnull;
3130
import org.apache.avro.generic.GenericRecord;
32-
import org.slf4j.Logger;
33-
import org.slf4j.LoggerFactory;
3431

35-
/**
36-
* Caches open file handles. If more than the limit is cached, the half of the files that were used
37-
* the longest ago cache are evicted from cache.
38-
*/
39-
public class FileCache implements Flushable, Closeable {
40-
private static final Logger logger = LoggerFactory.getLogger(FileCache.class);
41-
42-
private RecordConverterFactory converterFactory;
43-
private final int maxFiles;
44-
private final Map<File, SingleFileCache> caches;
45-
46-
public FileCache(RecordConverterFactory converterFactory, int maxFiles) {
47-
this.converterFactory = converterFactory;
48-
this.maxFiles = maxFiles;
49-
this.caches = new HashMap<>(maxFiles * 4 / 3 + 1);
50-
}
32+
/** Keeps file handles of a file. */
33+
public class FileCache implements Closeable, Flushable, Comparable<FileCache> {
34+
private final OutputStream[] streams;
35+
private final Writer writer;
36+
private final RecordConverter recordConverter;
37+
private final File file;
38+
private long lastUse;
5139

5240
/**
53-
* Append a record to given file. If the file handle and writer are already open in this cache,
54-
* those will be used. Otherwise, the file will be opened and the file handle cached.
55-
*
56-
* @param file file to append data to
57-
* @param record data
58-
* @return true if the cache was used, false if a new file was opened.
59-
* @throws IOException when failing to open a file or writing to it.
41+
* File cache of given file, using given converter factory.
42+
* @param converterFactory converter factory to create a converter to write files with.
43+
* @param file file to cache.
44+
* @param record example record to create converter from, this is not written to file.
45+
* @param gzip whether to gzip the records
46+
* @throws IOException
6047
*/
61-
public boolean writeRecord(File file, GenericRecord record) throws IOException {
62-
SingleFileCache cache = caches.get(file);
63-
if (cache != null) {
64-
cache.writeRecord(record);
65-
return true;
66-
} else {
67-
ensureCapacity();
48+
public FileCache(RecordConverterFactory converterFactory, File file,
49+
GenericRecord record, boolean gzip) throws IOException {
50+
this.file = file;
51+
boolean fileIsNew = !file.exists() || file.length() == 0;
52+
53+
this.streams = new OutputStream[gzip ? 3 : 2];
54+
this.streams[0] = new FileOutputStream(file, true);
55+
this.streams[1] = new BufferedOutputStream(this.streams[0]);
56+
if (gzip) {
57+
this.streams[2] = new GZIPOutputStream(this.streams[1]);
58+
}
6859

69-
File dir = file.getParentFile();
70-
if (!dir.exists()){
71-
if (dir.mkdirs()) {
72-
logger.debug("Created directory: {}", dir.getAbsolutePath());
73-
} else {
74-
logger.warn("FAILED to create directory: {}", dir.getAbsolutePath());
75-
}
76-
}
60+
this.writer = new OutputStreamWriter(this.streams[this.streams.length - 1]);
61+
this.recordConverter = converterFactory.converterFor(writer, record, fileIsNew);
62+
}
7763

78-
cache = new SingleFileCache(file, record);
79-
caches.put(file, cache);
80-
cache.writeRecord(record);
81-
return false;
82-
}
64+
/** Write a record to the cache. */
65+
public void writeRecord(GenericRecord record) throws IOException {
66+
this.recordConverter.writeRecord(record);
67+
lastUse = System.nanoTime();
8368
}
8469

85-
/**
86-
* Ensure that a new filecache can be added. Evict files used longest ago from cache if needed.
87-
*/
88-
private void ensureCapacity() throws IOException {
89-
if (caches.size() == maxFiles) {
90-
ArrayList<SingleFileCache> cacheList = new ArrayList<>(caches.values());
91-
Collections.sort(cacheList);
92-
for (int i = 0; i < cacheList.size() / 2; i++) {
93-
SingleFileCache rmCache = cacheList.get(i);
94-
caches.remove(rmCache.getFile());
95-
rmCache.close();
96-
}
70+
@Override
71+
public void close() throws IOException {
72+
recordConverter.close();
73+
writer.close();
74+
for (int i = streams.length - 1; i >= 0; i--) {
75+
streams[i].close();
9776
}
9877
}
9978

10079
@Override
10180
public void flush() throws IOException {
102-
for (SingleFileCache cache : caches.values()) {
103-
cache.flush();
104-
}
81+
recordConverter.flush();
10582
}
10683

84+
/**
85+
* Compares time that the filecaches were last used. If equal, it lexicographically compares
86+
* the absolute path of the file.
87+
* @param other FileCache to compare with.
88+
*/
10789
@Override
108-
public void close() throws IOException {
109-
try {
110-
for (SingleFileCache cache : caches.values()) {
111-
cache.close();
112-
}
113-
} finally {
114-
caches.clear();
90+
public int compareTo(@Nonnull FileCache other) {
91+
int result = Long.compare(lastUse, other.lastUse);
92+
if (result != 0) {
93+
return result;
11594
}
95+
return file.compareTo(other.file);
11696
}
11797

118-
private class SingleFileCache implements Closeable, Flushable, Comparable<SingleFileCache> {
119-
private final BufferedWriter bufferedWriter;
120-
private final Writer fileWriter;
121-
private final RecordConverter recordConverter;
122-
private final File file;
123-
private long lastUse;
124-
125-
private SingleFileCache(File file, GenericRecord record) throws IOException {
126-
this.file = file;
127-
boolean fileIsNew = !file.exists() || file.length() == 0;
128-
this.fileWriter = new FileWriter(file, true);
129-
this.bufferedWriter = new BufferedWriter(fileWriter);
130-
this.recordConverter = converterFactory.converterFor(bufferedWriter, record, fileIsNew);
131-
}
132-
133-
private void writeRecord(GenericRecord record) throws IOException {
134-
this.recordConverter.writeRecord(record);
135-
lastUse = System.nanoTime();
136-
}
137-
138-
@Override
139-
public void close() throws IOException {
140-
recordConverter.close();
141-
bufferedWriter.close();
142-
fileWriter.close();
143-
}
144-
145-
@Override
146-
public void flush() throws IOException {
147-
recordConverter.flush();
148-
}
149-
150-
@Override
151-
public int compareTo(@Nonnull SingleFileCache other) {
152-
return Long.compare(lastUse, other.lastUse);
153-
}
154-
155-
private File getFile() {
156-
return file;
157-
}
98+
/** File that the cache is maintaining. */
99+
public File getFile() {
100+
return file;
158101
}
159102
}

0 commit comments

Comments
 (0)