Skip to content

Commit d08a759

Browse files
committed
Make the staging behaviour optional
1 parent 1730cb2 commit d08a759

File tree

6 files changed

+81
-20
lines changed

6 files changed

+81
-20
lines changed

src/main/java/org/radarcns/RestructureAvroRecords.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ public class RestructureAvroRecords {
6363
}
6464

6565
private final RecordConverterFactory converterFactory;
66+
private final boolean doStage;
6667

6768
private java.nio.file.Path outputPath;
6869
private java.nio.file.Path offsetsPath;
@@ -97,6 +98,7 @@ public static void main(String [] args) {
9798
commandLineArgs.outputDirectory)
9899
.useGzip("gzip".equalsIgnoreCase(commandLineArgs.compression))
99100
.doDeduplicate(commandLineArgs.deduplicate).format(commandLineArgs.format)
101+
.doStage(!commandLineArgs.noStage)
100102
.build();
101103

102104
try {
@@ -119,6 +121,7 @@ private RestructureAvroRecords(RestructureAvroRecords.Builder builder) {
119121

120122
this.useGzip = builder.useGzip;
121123
this.doDeduplicate = builder.doDeduplicate;
124+
this.doStage = builder.doStage;
122125
logger.info("Deduplicate set to {}", doDeduplicate);
123126

124127
String extension;
@@ -197,7 +200,7 @@ public void start(String directoryName) throws IOException {
197200

198201
// Actually process the files
199202
for (Map.Entry<String, List<Path>> entry : topicPaths.entrySet()) {
200-
try (FileCacheStore cache = new FileCacheStore(converterFactory, 100, useGzip, doDeduplicate)) {
203+
try (FileCacheStore cache = new FileCacheStore(converterFactory, 100, useGzip, doDeduplicate, doStage)) {
201204
for (Path filePath : entry.getValue()) {
202205
// If JsonMappingException occurs, log the error and continue with other files
203206
try {
@@ -372,6 +375,7 @@ public static class Builder {
372375
private String hdfsUri;
373376
private String outputPath;
374377
private String format;
378+
private boolean doStage;
375379

376380
public Builder(final String uri, final String outputPath) {
377381
this.hdfsUri = uri;
@@ -397,5 +401,9 @@ public RestructureAvroRecords build() {
397401
return new RestructureAvroRecords(this);
398402
}
399403

404+
public Builder doStage(boolean stage) {
405+
this.doStage = stage;
406+
return this;
407+
}
400408
}
401409
}

src/main/java/org/radarcns/data/FileCache.java

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,16 +46,22 @@ public class FileCache implements Closeable, Flushable, Comparable<FileCache> {
4646
* @param path path to cache.
4747
* @param record example record to create converter from, this is not written to path.
4848
* @param gzip whether to gzip the records
49-
* @throws IOException
49+
* @throws IOException if the file and/or temporary files cannot be correctly read or written to.
5050
*/
5151
public FileCache(RecordConverterFactory converterFactory, Path path,
5252
GenericRecord record, boolean gzip, Path tmpDir) throws IOException {
5353
this.path = path;
5454
boolean fileIsNew = !Files.exists(path) || Files.size(path) == 0;
55-
this.tmpPath = Files.createTempFile(tmpDir, path.getFileName().toString(),
56-
gzip ? ".tmp.gz" : ".tmp");
55+
OutputStream outFile;
56+
if (tmpDir == null) {
57+
this.tmpPath = null;
58+
outFile = Files.newOutputStream(path, StandardOpenOption.APPEND, StandardOpenOption.CREATE);
59+
} else {
60+
this.tmpPath = Files.createTempFile(tmpDir, path.getFileName().toString(),
61+
gzip ? ".tmp.gz" : ".tmp");
62+
outFile = Files.newOutputStream(tmpPath, StandardOpenOption.WRITE);
63+
}
5764

58-
OutputStream outFile = Files.newOutputStream(tmpPath, StandardOpenOption.WRITE);
5965
OutputStream bufOut = new BufferedOutputStream(outFile);
6066
if (gzip) {
6167
bufOut = new GZIPOutputStream(bufOut);
@@ -65,13 +71,11 @@ public FileCache(RecordConverterFactory converterFactory, Path path,
6571
if (fileIsNew) {
6672
inputStream = new ByteArrayInputStream(new byte[0]);
6773
} else {
68-
inputStream = new BufferedInputStream(Files.newInputStream(path));
69-
InputStream copyStream = Files.newInputStream(path);
70-
if (gzip) {
71-
copyStream = new GZIPInputStream(copyStream);
72-
inputStream = new GZIPInputStream(inputStream);
74+
inputStream = inputStream(new BufferedInputStream(Files.newInputStream(path)), gzip);
75+
76+
if (tmpPath != null) {
77+
copy(path, bufOut, gzip);
7378
}
74-
copy(copyStream, bufOut);
7579
}
7680

7781
this.writer = new OutputStreamWriter(bufOut);
@@ -104,7 +108,9 @@ public boolean writeRecord(GenericRecord record) throws IOException {
104108
public void close() throws IOException {
105109
recordConverter.close();
106110
writer.close();
107-
Files.move(tmpPath, path, REPLACE_EXISTING);
111+
if (tmpPath != null) {
112+
Files.move(tmpPath, path, REPLACE_EXISTING);
113+
}
108114
}
109115

110116
@Override
@@ -131,6 +137,19 @@ public Path getPath() {
131137
return path;
132138
}
133139

140+
private static void copy(Path source, OutputStream sink, boolean gzip) throws IOException {
141+
try (InputStream copyStream = inputStream(Files.newInputStream(source), gzip)) {
142+
copy(copyStream, sink);
143+
}
144+
}
145+
146+
private static InputStream inputStream(InputStream in, boolean gzip) throws IOException {
147+
if (gzip) {
148+
return new GZIPInputStream(in);
149+
} else {
150+
return in;
151+
}
152+
}
134153

135154
/**
136155
* Reads all bytes from an input stream and writes them to an output stream.

src/main/java/org/radarcns/data/FileCacheStore.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,13 @@ public class FileCacheStore implements Flushable, Closeable {
4848
private final int maxFiles;
4949
private final Map<Path, FileCache> caches;
5050

51-
public FileCacheStore(RecordConverterFactory converterFactory, int maxFiles, boolean gzip, boolean deduplicate) throws IOException {
51+
public FileCacheStore(RecordConverterFactory converterFactory, int maxFiles, boolean gzip, boolean deduplicate, boolean stage) throws IOException {
5252
this.converterFactory = converterFactory;
5353
this.maxFiles = maxFiles;
5454
this.caches = new HashMap<>(maxFiles * 4 / 3 + 1);
5555
this.gzip = gzip;
5656
this.deduplicate = deduplicate;
57-
this.tmpDir = Files.createTempDirectory("restructurehdfs");
57+
this.tmpDir = stage ? Files.createTempDirectory("restructurehdfs") : null;
5858
}
5959

6060
/**
@@ -129,10 +129,12 @@ public void close() throws IOException {
129129
converterFactory.sortUnique(cache.getPath());
130130
}
131131
}
132-
Files.walk(tmpDir)
133-
.sorted(Comparator.reverseOrder())
134-
.forEach(tryCatch(Files::delete, (p, ex) -> logger.warn(
135-
"Failed to remove temporary file {}: {}", p, ex)));
132+
if (tmpDir != null) {
133+
Files.walk(tmpDir)
134+
.sorted(Comparator.reverseOrder())
135+
.forEach(tryCatch(Files::delete, (p, ex) -> logger.warn(
136+
"Failed to remove temporary file {}: {}", p, ex)));
137+
}
136138
} finally {
137139
caches.clear();
138140
}

src/main/java/org/radarcns/util/commandline/CommandLineArgs.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,7 @@ public class CommandLineArgs {
2828

2929
@Parameter(names = { "-h", "--help"}, help = true, description = "Display the usage of the program with available options.")
3030
public boolean help;
31+
32+
@Parameter(names = { "--no-stage"}, description = "Do not stage output files into a temporary directory before moving them to the data directory. This increases performance but may leave corrupted data files.")
33+
public boolean noStage = false;
3134
}

src/test/java/org/radarcns/util/FileCacheStoreTest.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import java.io.IOException;
2222
import java.nio.file.Files;
2323
import java.nio.file.Path;
24+
import java.util.Arrays;
25+
import java.util.Collection;
2426

2527
import org.apache.avro.Schema;
2628
import org.apache.avro.SchemaBuilder;
@@ -29,11 +31,23 @@
2931
import org.junit.Rule;
3032
import org.junit.Test;
3133
import org.junit.rules.TemporaryFolder;
34+
import org.junit.runner.RunWith;
35+
import org.junit.runners.Parameterized;
3236
import org.radarcns.data.CsvAvroConverter;
3337
import org.radarcns.data.FileCacheStore;
3438
import org.radarcns.data.RecordConverterFactory;
3539

40+
@RunWith(Parameterized.class)
3641
public class FileCacheStoreTest {
42+
43+
@Parameterized.Parameters
44+
public static Collection<Boolean> doStage() {
45+
return Arrays.asList(Boolean.TRUE, Boolean.FALSE);
46+
}
47+
48+
@Parameterized.Parameter
49+
public Boolean doStage;
50+
3751
@Rule
3852
public TemporaryFolder folder = new TemporaryFolder();
3953

@@ -61,7 +75,7 @@ public void appendLine() throws IOException {
6175

6276
GenericRecord record;
6377

64-
try (FileCacheStore cache = new FileCacheStore(csvFactory, 2, false, false)) {
78+
try (FileCacheStore cache = new FileCacheStore(csvFactory, 2, false, false, doStage)) {
6579
record = new GenericRecordBuilder(simpleSchema).set("a", "something").build();
6680
assertEquals(cache.writeRecord(f1, record), FileCacheStore.WriteStatus.NO_CACHE_AND_WRITE);
6781
record = new GenericRecordBuilder(simpleSchema).set("a", "somethingElse").build();

src/test/java/org/radarcns/util/FileCacheTest.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.junit.Rule;
2525
import org.junit.Test;
2626
import org.junit.rules.TemporaryFolder;
27+
import org.junit.runner.RunWith;
28+
import org.junit.runners.Parameterized;
2729
import org.radarcns.data.CsvAvroConverter;
2830
import org.radarcns.data.FileCache;
2931
import org.radarcns.data.RecordConverterFactory;
@@ -35,6 +37,10 @@
3537
import java.io.Reader;
3638
import java.nio.file.Files;
3739
import java.nio.file.Path;
40+
import java.util.Arrays;
41+
import java.util.Collection;
42+
import java.util.Collections;
43+
import java.util.List;
3844
import java.util.zip.GZIPInputStream;
3945

4046
import static org.junit.Assert.assertEquals;
@@ -43,18 +49,27 @@
4349
/**
4450
* Created by joris on 03/07/2017.
4551
*/
52+
@RunWith(Parameterized.class)
4653
public class FileCacheTest {
54+
@Parameterized.Parameters
55+
public static Collection<Boolean> useTmpDir() {
56+
return Arrays.asList(Boolean.TRUE, Boolean.FALSE);
57+
}
58+
4759
@Rule
4860
public TemporaryFolder folder = new TemporaryFolder();
4961
private Path path;
5062
private RecordConverterFactory csvFactory;
5163
private Record exampleRecord;
5264
private Path tmpDir;
5365

66+
@Parameterized.Parameter
67+
public Boolean useTmpDir;
68+
5469
@Before
5570
public void setUp() throws IOException {
5671
this.path = folder.newFile("f").toPath();
57-
this.tmpDir = folder.newFolder().toPath();
72+
this.tmpDir = useTmpDir ? folder.newFolder().toPath() : null;
5873

5974
this.csvFactory = CsvAvroConverter.getFactory();
6075
Schema schema = SchemaBuilder.record("simple").fields()

0 commit comments

Comments
 (0)