Skip to content

Commit 1730cb2

Browse files
committed
Write data to staging files before adding them to the output directory
1 parent 5124412 commit 1730cb2

12 files changed

+158
-58
lines changed

src/main/java/org/radarcns/RestructureAvroRecords.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,11 @@
2828
import org.apache.hadoop.fs.LocatedFileStatus;
2929
import org.apache.hadoop.fs.Path;
3030
import org.apache.hadoop.fs.RemoteIterator;
31-
import org.radarcns.util.CsvAvroConverter;
32-
import org.radarcns.util.FileCacheStore;
33-
import org.radarcns.util.JsonAvroConverter;
31+
import org.radarcns.data.CsvAvroConverter;
32+
import org.radarcns.data.FileCacheStore;
33+
import org.radarcns.data.JsonAvroConverter;
3434
import org.radarcns.util.ProgressBar;
35-
import org.radarcns.util.RecordConverterFactory;
35+
import org.radarcns.data.RecordConverterFactory;
3636
import org.radarcns.util.commandline.CommandLineArgs;
3737
import org.slf4j.Logger;
3838
import org.slf4j.LoggerFactory;
@@ -302,9 +302,9 @@ private void writeRecord(GenericRecord record, String topicName, FileCacheStore
302302
java.nio.file.Path outputPath = userTopicDir.resolve(outputFileName);
303303

304304
// Write data
305-
int response = cache.writeRecord(outputPath, record);
305+
FileCacheStore.WriteStatus response = cache.writeRecord(outputPath, record);
306306

307-
if (response == FileCacheStore.CACHE_AND_NO_WRITE || response == FileCacheStore.NO_CACHE_AND_NO_WRITE) {
307+
if (!response.isSuccessful()) {
308308
// Write was unsuccessful due to different number of columns,
309309
// try again with new file name
310310
writeRecord(record, topicName, cache, ++suffix);

src/main/java/org/radarcns/util/CsvAvroConverter.java renamed to src/main/java/org/radarcns/data/CsvAvroConverter.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,8 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.radarcns.util;
17+
package org.radarcns.data;
1818

19-
import com.fasterxml.jackson.databind.JsonMappingException;
2019
import com.fasterxml.jackson.databind.MappingIterator;
2120
import com.fasterxml.jackson.databind.ObjectReader;
2221
import com.fasterxml.jackson.databind.ObjectWriter;
@@ -34,7 +33,10 @@
3433
import java.io.Reader;
3534
import java.io.Writer;
3635
import java.nio.ByteBuffer;
37-
import java.util.*;
36+
import java.util.Iterator;
37+
import java.util.LinkedHashMap;
38+
import java.util.List;
39+
import java.util.Map;
3840

3941
/**
4042
* Converts deep hierarchical Avro records into flat CSV format. It uses a simple dot syntax in the

src/main/java/org/radarcns/util/FileCache.java renamed to src/main/java/org/radarcns/data/FileCache.java

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.radarcns.util;
17+
package org.radarcns.data;
1818

1919
import java.io.*;
2020
import java.nio.file.Files;
@@ -27,13 +27,17 @@
2727
import org.slf4j.Logger;
2828
import org.slf4j.LoggerFactory;
2929

30+
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
31+
3032
/** Keeps path handles of a path. */
3133
public class FileCache implements Closeable, Flushable, Comparable<FileCache> {
3234
private static final Logger logger = LoggerFactory.getLogger(FileCache.class);
35+
private static final int BUFFER_SIZE = 8192;
3336

3437
private final Writer writer;
3538
private final RecordConverter recordConverter;
3639
private final Path path;
40+
private final Path tmpPath;
3741
private long lastUse;
3842

3943
/**
@@ -45,19 +49,29 @@ public class FileCache implements Closeable, Flushable, Comparable<FileCache> {
4549
* @throws IOException
4650
*/
4751
public FileCache(RecordConverterFactory converterFactory, Path path,
48-
GenericRecord record, boolean gzip) throws IOException {
52+
GenericRecord record, boolean gzip, Path tmpDir) throws IOException {
4953
this.path = path;
5054
boolean fileIsNew = !Files.exists(path) || Files.size(path) == 0;
55+
this.tmpPath = Files.createTempFile(tmpDir, path.getFileName().toString(),
56+
gzip ? ".tmp.gz" : ".tmp");
5157

52-
OutputStream outFile = Files.newOutputStream(path,
53-
StandardOpenOption.APPEND, StandardOpenOption.CREATE);
54-
InputStream inputStream = new BufferedInputStream(Files.newInputStream(path));
58+
OutputStream outFile = Files.newOutputStream(tmpPath, StandardOpenOption.WRITE);
5559
OutputStream bufOut = new BufferedOutputStream(outFile);
5660
if (gzip) {
5761
bufOut = new GZIPOutputStream(bufOut);
58-
if (!fileIsNew) {
62+
}
63+
64+
InputStream inputStream;
65+
if (fileIsNew) {
66+
inputStream = new ByteArrayInputStream(new byte[0]);
67+
} else {
68+
inputStream = new BufferedInputStream(Files.newInputStream(path));
69+
InputStream copyStream = Files.newInputStream(path);
70+
if (gzip) {
71+
copyStream = new GZIPInputStream(copyStream);
5972
inputStream = new GZIPInputStream(inputStream);
6073
}
74+
copy(copyStream, bufOut);
6175
}
6276

6377
this.writer = new OutputStreamWriter(bufOut);
@@ -78,7 +92,7 @@ public FileCache(RecordConverterFactory converterFactory, Path path,
7892
* Write a record to the cache.
7993
* @param record AVRO record
8094
* @return true or false based on {@link RecordConverter} write result
81-
* @throws IOException
95+
* @throws IOException if the record cannot be used.
8296
*/
8397
public boolean writeRecord(GenericRecord record) throws IOException {
8498
boolean result = this.recordConverter.writeRecord(record);
@@ -90,6 +104,7 @@ public boolean writeRecord(GenericRecord record) throws IOException {
90104
public void close() throws IOException {
91105
recordConverter.close();
92106
writer.close();
107+
Files.move(tmpPath, path, REPLACE_EXISTING);
93108
}
94109

95110
@Override
@@ -115,4 +130,16 @@ public int compareTo(@Nonnull FileCache other) {
115130
public Path getPath() {
116131
return path;
117132
}
133+
134+
135+
/**
136+
* Reads all bytes from an input stream and writes them to an output stream.
137+
*/
138+
private static void copy(InputStream source, OutputStream sink) throws IOException {
139+
byte[] buf = new byte[BUFFER_SIZE];
140+
int n;
141+
while ((n = source.read(buf)) > 0) {
142+
sink.write(buf, 0, n);
143+
}
144+
}
118145
}

src/main/java/org/radarcns/util/FileCacheStore.java renamed to src/main/java/org/radarcns/data/FileCacheStore.java

Lines changed: 54 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.radarcns.util;
17+
package org.radarcns.data;
1818

1919
import org.apache.avro.generic.GenericRecord;
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
2022

2123
import java.io.Closeable;
2224
import java.io.Flushable;
@@ -25,34 +27,34 @@
2527
import java.nio.file.Path;
2628
import java.util.ArrayList;
2729
import java.util.Collections;
30+
import java.util.Comparator;
2831
import java.util.HashMap;
2932
import java.util.Map;
3033

34+
import static org.radarcns.util.ThrowingConsumer.tryCatch;
35+
3136
/**
3237
* Caches open file handles. If more than the limit is cached, the half of the files that were used
3338
* the longest ago cache are evicted from cache.
3439
*/
3540
public class FileCacheStore implements Flushable, Closeable {
41+
private static final Logger logger = LoggerFactory.getLogger(FileCacheStore.class);
42+
3643
private final boolean gzip;
3744
private final boolean deduplicate;
45+
private final Path tmpDir;
3846

3947
private RecordConverterFactory converterFactory;
4048
private final int maxFiles;
4149
private final Map<Path, FileCache> caches;
4250

43-
// Response codes for each write record case
44-
public static final int CACHE_AND_WRITE = 1; //used cache and write successful
45-
public static final int NO_CACHE_AND_WRITE= 2;
46-
public static final int CACHE_AND_NO_WRITE =3;
47-
public static final int NO_CACHE_AND_NO_WRITE =4;
48-
49-
50-
public FileCacheStore(RecordConverterFactory converterFactory, int maxFiles, boolean gzip, boolean deduplicate) {
51+
public FileCacheStore(RecordConverterFactory converterFactory, int maxFiles, boolean gzip, boolean deduplicate) throws IOException {
5152
this.converterFactory = converterFactory;
5253
this.maxFiles = maxFiles;
5354
this.caches = new HashMap<>(maxFiles * 4 / 3 + 1);
5455
this.gzip = gzip;
5556
this.deduplicate = deduplicate;
57+
this.tmpDir = Files.createTempDirectory("restructurehdfs");
5658
}
5759

5860
/**
@@ -64,30 +66,30 @@ public FileCacheStore(RecordConverterFactory converterFactory, int maxFiles, boo
6466
* @return Integer value according to one of the response codes.
6567
* @throws IOException when failing to open a file or writing to it.
6668
*/
67-
public int writeRecord(Path path, GenericRecord record) throws IOException {
69+
public WriteStatus writeRecord(Path path, GenericRecord record) throws IOException {
6870
FileCache cache = caches.get(path);
6971
if (cache != null) {
7072
if(cache.writeRecord(record)){
71-
return CACHE_AND_WRITE;
73+
return WriteStatus.CACHE_AND_WRITE;
7274
} else {
7375
// This is the case when cache is used but write is unsuccessful
7476
// because of different number columns in same topic
75-
return CACHE_AND_NO_WRITE;
77+
return WriteStatus.CACHE_AND_NO_WRITE;
7678
}
7779
} else {
7880
ensureCapacity();
7981

8082
Path dir = path.getParent();
8183
Files.createDirectories(dir);
8284

83-
cache = new FileCache(converterFactory, path, record, gzip);
85+
cache = new FileCache(converterFactory, path, record, gzip, tmpDir);
8486
caches.put(path, cache);
85-
if(cache.writeRecord(record)) {
86-
return NO_CACHE_AND_WRITE;
87+
if (cache.writeRecord(record)) {
88+
return WriteStatus.NO_CACHE_AND_WRITE;
8789
} else {
8890
// The file path was not in cache but the file exists and this write is
8991
// unsuccessful because of different number of columns
90-
return NO_CACHE_AND_NO_WRITE;
92+
return WriteStatus.NO_CACHE_AND_NO_WRITE;
9193
}
9294

9395
}
@@ -127,9 +129,45 @@ public void close() throws IOException {
127129
converterFactory.sortUnique(cache.getPath());
128130
}
129131
}
132+
Files.walk(tmpDir)
133+
.sorted(Comparator.reverseOrder())
134+
.forEach(tryCatch(Files::delete, (p, ex) -> logger.warn(
135+
"Failed to remove temporary file {}: {}", p, ex)));
130136
} finally {
131137
caches.clear();
132138
}
133139
}
134140

141+
// Response codes for each write record case
142+
public enum WriteStatus {
143+
/** Cache hit and write was successful. */
144+
CACHE_AND_WRITE(true, true),
145+
/** Cache hit and write was unsuccessful because of a mismatch in number of columns. */
146+
CACHE_AND_NO_WRITE(true, false),
147+
/** Cache miss and write was successful. */
148+
NO_CACHE_AND_WRITE(false, true),
149+
/** Cache miss and write was unsuccessful because of a mismatch in number of columns. */
150+
NO_CACHE_AND_NO_WRITE(false, false);
151+
152+
private final boolean successful;
153+
private final boolean cacheHit;
154+
155+
/**
156+
* Write status.
157+
* @param cacheHit whether the cache was used to write.
158+
* @param successful whether the write was successful.
159+
*/
160+
WriteStatus(boolean cacheHit, boolean successful) {
161+
this.cacheHit = cacheHit;
162+
this.successful = successful;
163+
}
164+
165+
public boolean isSuccessful() {
166+
return successful;
167+
}
168+
169+
public boolean isCacheHit() {
170+
return cacheHit;
171+
}
172+
}
135173
}

src/main/java/org/radarcns/util/JsonAvroConverter.java renamed to src/main/java/org/radarcns/data/JsonAvroConverter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.radarcns.util;
17+
package org.radarcns.data;
1818

1919
import com.fasterxml.jackson.core.JsonFactory;
2020
import com.fasterxml.jackson.core.JsonGenerator;

src/main/java/org/radarcns/util/RecordConverter.java renamed to src/main/java/org/radarcns/data/RecordConverter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.radarcns.util;
17+
package org.radarcns.data;
1818

1919
import java.io.Closeable;
2020
import java.io.Flushable;

src/main/java/org/radarcns/util/RecordConverterFactory.java renamed to src/main/java/org/radarcns/data/RecordConverterFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.radarcns.util;
17+
package org.radarcns.data;
1818

1919
import org.apache.avro.generic.GenericRecord;
2020

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package org.radarcns.util;
2+
3+
import java.io.IOException;
4+
import java.util.function.BiConsumer;
5+
import java.util.function.Consumer;
6+
7+
@FunctionalInterface
8+
public interface ThrowingConsumer<T> {
9+
void accept(T t) throws IOException;
10+
11+
static <T> Consumer<T> tryCatch(ThrowingConsumer<T> consumer, BiConsumer<T, IOException> catchClause) {
12+
return t -> {
13+
try {
14+
consumer.accept(t);
15+
} catch (IOException ex) {
16+
catchClause.accept(t, ex);
17+
}
18+
};
19+
}
20+
}

src/test/java/org/radarcns/util/CsvAvroConverterTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222
import static org.junit.Assert.assertNull;
2323
import static org.junit.Assert.assertTrue;
2424

25-
import com.fasterxml.jackson.databind.JsonMappingException;
26-
2725
import java.io.*;
2826
import java.nio.file.Files;
2927
import java.nio.file.Path;
@@ -49,6 +47,9 @@
4947
import org.junit.Test;
5048
import org.junit.rules.ExpectedException;
5149
import org.junit.rules.TemporaryFolder;
50+
import org.radarcns.data.CsvAvroConverter;
51+
import org.radarcns.data.RecordConverter;
52+
import org.radarcns.data.RecordConverterFactory;
5253

5354
public class CsvAvroConverterTest {
5455
@Rule

0 commit comments

Comments
 (0)