Skip to content

Commit 1b1b9a2

Browse files
committed
Enabled GZIP compression for output files
1 parent 6d6f69a commit 1b1b9a2

File tree

7 files changed

+404
-161
lines changed

7 files changed

+404
-161
lines changed

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,8 @@ By default, this will output the data in CSV format. If JSON format is preferred
2525
```
2626
java -Dorg.radarcns.format=json -jar restructurehdfs-all-0.1.1.jar <webhdfs_url> <hdfs_topic_path> <output_folder>
2727
```
28+
29+
Another option is to output the data in compressed form. All files will get the `gz` suffix, and can be decompressed with a GZIP decoder. Note that for a very small number of records, this may actually increase the file size.
30+
```
31+
java -Dorg.radarcns.compress=gzip -jar restructurehdfs-all-0.1.1.jar <webhdfs_url> <hdfs_topic_path> <output_folder>
32+
```

src/main/java/org/radarcns/RestructureAvroRecords.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import org.apache.hadoop.fs.Path;
3434
import org.apache.hadoop.fs.RemoteIterator;
3535
import org.radarcns.util.CsvAvroConverter;
36-
import org.radarcns.util.FileCache;
36+
import org.radarcns.util.FileCacheStore;
3737
import org.radarcns.util.JsonAvroConverter;
3838
import org.radarcns.util.RecordConverterFactory;
3939
import org.slf4j.Logger;
@@ -62,6 +62,7 @@ public class RestructureAvroRecords {
6262

6363
private int processedFileCount;
6464
private int processedRecordsCount;
65+
private static final boolean USE_GZIP = "gzip".equalsIgnoreCase(System.getProperty("org.radarcns.compression"));
6566

6667
public static void main(String [] args) throws Exception {
6768
if (args.length != 3) {
@@ -91,13 +92,21 @@ public RestructureAvroRecords(String inputPath, String outputPath) {
9192
this.setInputWebHdfsURL(inputPath);
9293
this.setOutputPath(outputPath);
9394

95+
String extension;
9496
if (System.getProperty("org.radarcns.format", "csv").equalsIgnoreCase("json")) {
97+
logger.info("Writing output files in JSON format");
9598
converterFactory = JsonAvroConverter.getFactory();
96-
outputFileExtension = "json";
99+
extension = "json";
97100
} else {
101+
logger.info("Writing output files in CSV format");
98102
converterFactory = CsvAvroConverter.getFactory();
99-
outputFileExtension = "csv";
103+
extension = "csv";
100104
}
105+
if (USE_GZIP) {
106+
logger.info("Compressing output files in GZIP format");
107+
extension += ".gz";
108+
}
109+
outputFileExtension = extension;
101110
}
102111

103112
public void setInputWebHdfsURL(String fileSystemURL) {
@@ -157,7 +166,7 @@ private void processTopic(Path topicPath, RecordConverterFactory converterFactor
157166

158167
String topicName = topicPath.getName();
159168

160-
try (FileCache cache = new FileCache(converterFactory, 100)) {
169+
try (FileCacheStore cache = new FileCacheStore(converterFactory, 100, USE_GZIP)) {
161170
while (files.hasNext()) {
162171
LocatedFileStatus locatedFileStatus = files.next();
163172

@@ -168,7 +177,7 @@ private void processTopic(Path topicPath, RecordConverterFactory converterFactor
168177
}
169178
}
170179

171-
private void processFile(Path filePath, String topicName, FileCache cache,
180+
private void processFile(Path filePath, String topicName, FileCacheStore cache,
172181
OffsetRangeFile offsets) throws IOException {
173182
String fileName = filePath.getName();
174183

@@ -209,7 +218,7 @@ record = dataFileReader.next(record);
209218
processedFileCount++;
210219
}
211220

212-
private void writeRecord(GenericRecord record, String topicName, FileCache cache)
221+
private void writeRecord(GenericRecord record, String topicName, FileCacheStore cache)
213222
throws IOException {
214223
GenericRecord keyField = (GenericRecord) record.get("key");
215224
GenericRecord valueField = (GenericRecord) record.get("value");

src/main/java/org/radarcns/util/FileCache.java

Lines changed: 56 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -16,144 +16,87 @@
1616

1717
package org.radarcns.util;
1818

19-
import java.io.BufferedWriter;
19+
import java.io.BufferedOutputStream;
2020
import java.io.Closeable;
2121
import java.io.File;
22-
import java.io.FileWriter;
22+
import java.io.FileOutputStream;
2323
import java.io.Flushable;
2424
import java.io.IOException;
25+
import java.io.OutputStream;
26+
import java.io.OutputStreamWriter;
2527
import java.io.Writer;
26-
import java.util.ArrayList;
27-
import java.util.Collections;
28-
import java.util.HashMap;
29-
import java.util.Map;
28+
import java.util.zip.GZIPOutputStream;
3029
import javax.annotation.Nonnull;
3130
import org.apache.avro.generic.GenericRecord;
32-
import org.slf4j.Logger;
33-
import org.slf4j.LoggerFactory;
3431

35-
/**
36-
* Caches open file handles. If more than the limit is cached, the half of the files that were used
37-
* the longest ago cache are evicted from cache.
38-
*/
39-
public class FileCache implements Flushable, Closeable {
40-
private static final Logger logger = LoggerFactory.getLogger(FileCache.class);
41-
42-
private RecordConverterFactory converterFactory;
43-
private final int maxFiles;
44-
private final Map<File, SingleFileCache> caches;
45-
46-
public FileCache(RecordConverterFactory converterFactory, int maxFiles) {
47-
this.converterFactory = converterFactory;
48-
this.maxFiles = maxFiles;
49-
this.caches = new HashMap<>(maxFiles * 4 / 3 + 1);
50-
}
32+
/** Keeps file handles of a file. */
33+
public class FileCache implements Closeable, Flushable, Comparable<FileCache> {
34+
private final OutputStream[] streams;
35+
private final Writer writer;
36+
private final RecordConverter recordConverter;
37+
private final File file;
38+
private long lastUse;
5139

5240
/**
53-
* Append a record to given file. If the file handle and writer are already open in this cache,
54-
* those will be used. Otherwise, the file will be opened and the file handle cached.
55-
*
56-
* @param file file to append data to
57-
* @param record data
58-
* @return true if the cache was used, false if a new file was opened.
59-
* @throws IOException when failing to open a file or writing to it.
41+
* File cache of given file, using given converter factory.
42+
* @param converterFactory converter factory to create a converter to write files with.
43+
* @param file file to cache.
44+
* @param record example record to create converter from, this is not written to file.
45+
* @param gzip whether to gzip the records
46+
* @throws IOException
6047
*/
61-
public boolean writeRecord(File file, GenericRecord record) throws IOException {
62-
SingleFileCache cache = caches.get(file);
63-
if (cache != null) {
64-
cache.writeRecord(record);
65-
return true;
66-
} else {
67-
ensureCapacity();
48+
public FileCache(RecordConverterFactory converterFactory, File file,
49+
GenericRecord record, boolean gzip) throws IOException {
50+
this.file = file;
51+
boolean fileIsNew = !file.exists() || file.length() == 0;
52+
53+
this.streams = new OutputStream[gzip ? 3 : 2];
54+
this.streams[0] = new FileOutputStream(file, true);
55+
this.streams[1] = new BufferedOutputStream(this.streams[0]);
56+
if (gzip) {
57+
this.streams[2] = new GZIPOutputStream(this.streams[1]);
58+
}
6859

69-
File dir = file.getParentFile();
70-
if (!dir.exists()){
71-
if (dir.mkdirs()) {
72-
logger.debug("Created directory: {}", dir.getAbsolutePath());
73-
} else {
74-
logger.warn("FAILED to create directory: {}", dir.getAbsolutePath());
75-
}
76-
}
60+
this.writer = new OutputStreamWriter(this.streams[this.streams.length - 1]);
61+
this.recordConverter = converterFactory.converterFor(writer, record, fileIsNew);
62+
}
7763

78-
cache = new SingleFileCache(file, record);
79-
caches.put(file, cache);
80-
cache.writeRecord(record);
81-
return false;
82-
}
64+
/** Write a record to the cache. */
65+
public void writeRecord(GenericRecord record) throws IOException {
66+
this.recordConverter.writeRecord(record);
67+
lastUse = System.nanoTime();
8368
}
8469

85-
/**
86-
* Ensure that a new filecache can be added. Evict files used longest ago from cache if needed.
87-
*/
88-
private void ensureCapacity() throws IOException {
89-
if (caches.size() == maxFiles) {
90-
ArrayList<SingleFileCache> cacheList = new ArrayList<>(caches.values());
91-
Collections.sort(cacheList);
92-
for (int i = 0; i < cacheList.size() / 2; i++) {
93-
SingleFileCache rmCache = cacheList.get(i);
94-
caches.remove(rmCache.getFile());
95-
rmCache.close();
96-
}
70+
@Override
71+
public void close() throws IOException {
72+
recordConverter.close();
73+
writer.close();
74+
for (int i = streams.length - 1; i >= 0; i--) {
75+
streams[i].close();
9776
}
9877
}
9978

10079
@Override
10180
public void flush() throws IOException {
102-
for (SingleFileCache cache : caches.values()) {
103-
cache.flush();
104-
}
81+
recordConverter.flush();
10582
}
10683

84+
/**
85+
* Compares time that the filecaches were last used. If equal, it lexicographically compares
86+
* the absolute path of the file.
87+
* @param other FileCache to compare with.
88+
*/
10789
@Override
108-
public void close() throws IOException {
109-
try {
110-
for (SingleFileCache cache : caches.values()) {
111-
cache.close();
112-
}
113-
} finally {
114-
caches.clear();
90+
public int compareTo(@Nonnull FileCache other) {
91+
int result = Long.compare(lastUse, other.lastUse);
92+
if (result != 0) {
93+
return result;
11594
}
95+
return file.compareTo(other.file);
11696
}
11797

118-
private class SingleFileCache implements Closeable, Flushable, Comparable<SingleFileCache> {
119-
private final BufferedWriter bufferedWriter;
120-
private final Writer fileWriter;
121-
private final RecordConverter recordConverter;
122-
private final File file;
123-
private long lastUse;
124-
125-
private SingleFileCache(File file, GenericRecord record) throws IOException {
126-
this.file = file;
127-
boolean fileIsNew = !file.exists() || file.length() == 0;
128-
this.fileWriter = new FileWriter(file, true);
129-
this.bufferedWriter = new BufferedWriter(fileWriter);
130-
this.recordConverter = converterFactory.converterFor(bufferedWriter, record, fileIsNew);
131-
}
132-
133-
private void writeRecord(GenericRecord record) throws IOException {
134-
this.recordConverter.writeRecord(record);
135-
lastUse = System.nanoTime();
136-
}
137-
138-
@Override
139-
public void close() throws IOException {
140-
recordConverter.close();
141-
bufferedWriter.close();
142-
fileWriter.close();
143-
}
144-
145-
@Override
146-
public void flush() throws IOException {
147-
recordConverter.flush();
148-
}
149-
150-
@Override
151-
public int compareTo(@Nonnull SingleFileCache other) {
152-
return Long.compare(lastUse, other.lastUse);
153-
}
154-
155-
private File getFile() {
156-
return file;
157-
}
98+
/** File that the cache is maintaining. */
99+
public File getFile() {
100+
return file;
158101
}
159102
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Copyright 2017 The Hyve
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.radarcns.util;
18+
19+
import java.io.Closeable;
20+
import java.io.File;
21+
import java.io.Flushable;
22+
import java.io.IOException;
23+
import java.util.ArrayList;
24+
import java.util.Collections;
25+
import java.util.HashMap;
26+
import java.util.Map;
27+
import org.apache.avro.generic.GenericRecord;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
/**
32+
* Caches open file handles. If more than the limit is cached, the half of the files that were used
33+
* the longest ago cache are evicted from cache.
34+
*/
35+
public class FileCacheStore implements Flushable, Closeable {
36+
private static final Logger logger = LoggerFactory.getLogger(FileCacheStore.class);
37+
private final boolean gzip;
38+
39+
private RecordConverterFactory converterFactory;
40+
private final int maxFiles;
41+
private final Map<File, FileCache> caches;
42+
43+
public FileCacheStore(RecordConverterFactory converterFactory, int maxFiles, boolean gzip) {
44+
this.converterFactory = converterFactory;
45+
this.maxFiles = maxFiles;
46+
this.caches = new HashMap<>(maxFiles * 4 / 3 + 1);
47+
this.gzip = gzip;
48+
}
49+
50+
/**
51+
* Append a record to given file. If the file handle and writer are already open in this cache,
52+
* those will be used. Otherwise, the file will be opened and the file handle cached.
53+
*
54+
* @param file file to append data to
55+
* @param record data
56+
* @return true if the cache was used, false if a new file was opened.
57+
* @throws IOException when failing to open a file or writing to it.
58+
*/
59+
public boolean writeRecord(File file, GenericRecord record) throws IOException {
60+
FileCache cache = caches.get(file);
61+
if (cache != null) {
62+
cache.writeRecord(record);
63+
return true;
64+
} else {
65+
ensureCapacity();
66+
67+
File dir = file.getParentFile();
68+
if (!dir.exists()){
69+
if (dir.mkdirs()) {
70+
logger.debug("Created directory: {}", dir.getAbsolutePath());
71+
} else {
72+
logger.warn("FAILED to create directory: {}", dir.getAbsolutePath());
73+
}
74+
}
75+
76+
cache = new FileCache(converterFactory, file, record, gzip);
77+
caches.put(file, cache);
78+
cache.writeRecord(record);
79+
return false;
80+
}
81+
}
82+
83+
/**
84+
* Ensure that a new filecache can be added. Evict files used longest ago from cache if needed.
85+
*/
86+
private void ensureCapacity() throws IOException {
87+
if (caches.size() == maxFiles) {
88+
ArrayList<FileCache> cacheList = new ArrayList<>(caches.values());
89+
Collections.sort(cacheList);
90+
for (int i = 0; i < cacheList.size() / 2; i++) {
91+
FileCache rmCache = cacheList.get(i);
92+
caches.remove(rmCache.getFile());
93+
rmCache.close();
94+
}
95+
}
96+
}
97+
98+
@Override
99+
public void flush() throws IOException {
100+
for (FileCache cache : caches.values()) {
101+
cache.flush();
102+
}
103+
}
104+
105+
@Override
106+
public void close() throws IOException {
107+
try {
108+
for (FileCache cache : caches.values()) {
109+
cache.close();
110+
}
111+
} finally {
112+
caches.clear();
113+
}
114+
}
115+
116+
}

0 commit comments

Comments
 (0)