Skip to content

Commit 9b93ba9

Browse files
committed
Added ZIP compression
1 parent 44b2c33 commit 9b93ba9

File tree

7 files changed

+51
-11
lines changed

7 files changed

+51
-11
lines changed

src/main/java/org/radarcns/hdfs/data/Compression.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import java.io.OutputStream;
2222

2323
public interface Compression extends Format {
24-
OutputStream compress(OutputStream out) throws IOException;
24+
OutputStream compress(String fileName, OutputStream out) throws IOException;
2525
InputStream decompress(InputStream in) throws IOException;
2626

2727
String getExtension();

src/main/java/org/radarcns/hdfs/data/CompressionFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ public class CompressionFactory implements FormatProvider<Compression> {
2323
public final List<Compression> getAll() {
2424
return Arrays.asList(
2525
new GzipCompression(),
26-
new IdentityCompression());
26+
new IdentityCompression(),
27+
new ZipCompression());
2728
}
2829
}

src/main/java/org/radarcns/hdfs/data/FileCache.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public class FileCache implements Closeable, Flushable, Comparable<FileCache> {
5252
private final boolean deduplicate;
5353
private final Accountant.Ledger ledger;
5454
private final Accountant accountant;
55+
private final String fileName;
5556
private long lastUse;
5657
private final AtomicBoolean hasError;
5758

@@ -70,12 +71,12 @@ public FileCache(FileStoreFactory factory, Path path, GenericRecord record,
7071
this.ledger = new Accountant.Ledger();
7172
this.compression = factory.getCompression();
7273
boolean fileIsNew = !storageDriver.exists(path) || storageDriver.size(path) == 0;
73-
this.tmpPath = Files.createTempFile(tmpDir, path.getFileName().toString(),
74-
".tmp" + compression.getExtension());
74+
this.fileName = path.getFileName().toString();
75+
this.tmpPath = Files.createTempFile(tmpDir, fileName,".tmp" + compression.getExtension());
7576
this.converterFactory = factory.getRecordConverter();
7677
this.accountant = accountant;
7778

78-
OutputStream outStream = compression.compress(
79+
OutputStream outStream = compression.compress(fileName,
7980
new BufferedOutputStream(Files.newOutputStream(tmpPath)));
8081

8182
InputStream inputStream;
@@ -90,7 +91,7 @@ public FileCache(FileStoreFactory factory, Path path, GenericRecord record,
9091
outStream.close();
9192
// clear output file
9293
outStream = compression.compress(
93-
new BufferedOutputStream(Files.newOutputStream(tmpPath)));
94+
fileName, new BufferedOutputStream(Files.newOutputStream(tmpPath)));
9495
}
9596
Timer.getInstance().add("write.copyOriginal", timeCopy);
9697
}
@@ -141,7 +142,7 @@ public void close() throws IOException {
141142
if (!hasError.get()) {
142143
if (deduplicate) {
143144
long timeDedup = System.nanoTime();
144-
converterFactory.sortUnique(tmpPath, tmpPath, compression);
145+
converterFactory.sortUnique(fileName, tmpPath, tmpPath, compression);
145146
Timer.getInstance().add("close.deduplicate", timeDedup);
146147
}
147148

src/main/java/org/radarcns/hdfs/data/GzipCompression.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626

2727
public class GzipCompression implements Compression {
2828
@Override
29-
public OutputStream compress(OutputStream out) throws IOException {
29+
public OutputStream compress(String fileName, OutputStream out) throws IOException {
3030
return new GZIPOutputStream(out);
3131
}
3232

src/main/java/org/radarcns/hdfs/data/IdentityCompression.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
public class IdentityCompression implements Compression {
2525
@Override
26-
public OutputStream compress(OutputStream out) {
26+
public OutputStream compress(String fileName, OutputStream out) {
2727
return out;
2828
}
2929

src/main/java/org/radarcns/hdfs/data/RecordConverterFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ default boolean hasHeader() {
4848
return false;
4949
}
5050

51-
default void sortUnique(Path source, Path target, Compression compression)
51+
default void sortUnique(String fileName, Path source, Path target,
52+
Compression compression)
5253
throws IOException {
5354
// read all lines into memory; assume a 100-byte line length
5455
LinkedHashSet<String> sortedLines = new LinkedHashSet<>((int)(Files.size(source) / 100));
@@ -63,7 +64,7 @@ default void sortUnique(Path source, Path target, Compression compression)
6364
}
6465
try (OutputStream fileOut = Files.newOutputStream(target);
6566
OutputStream bufOut = new BufferedOutputStream(fileOut);
66-
OutputStream zipOut = compression.compress(bufOut);
67+
OutputStream zipOut = compression.compress(fileName, bufOut);
6768
Writer writer = new OutputStreamWriter(zipOut)) {
6869
writeFile(writer, header, sortedLines);
6970
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package org.radarcns.hdfs.data;
2+
3+
import java.io.IOException;
4+
import java.io.InputStream;
5+
import java.io.OutputStream;
6+
import java.util.Collection;
7+
import java.util.Collections;
8+
import java.util.zip.ZipEntry;
9+
import java.util.zip.ZipInputStream;
10+
import java.util.zip.ZipOutputStream;
11+
12+
public class ZipCompression implements Compression {
13+
@Override
14+
public OutputStream compress(String name, OutputStream out) throws IOException {
15+
ZipOutputStream zipOut = new ZipOutputStream(out);
16+
zipOut.setLevel(7);
17+
zipOut.putNextEntry(new ZipEntry(name));
18+
return zipOut;
19+
}
20+
21+
@Override
22+
public InputStream decompress(InputStream in) throws IOException {
23+
ZipInputStream zipIn = new ZipInputStream(in);
24+
zipIn.getNextEntry();
25+
return zipIn;
26+
}
27+
28+
@Override
29+
public Collection<String> getFormats() {
30+
return Collections.singleton("zip");
31+
}
32+
33+
@Override
34+
public String getExtension() {
35+
return ".zip";
36+
}
37+
}

0 commit comments

Comments
 (0)