Skip to content

Commit 3eb4f48

Browse files
authored
Merge pull request #27 from RADAR-base/stage_files
Stage files
2 parents 5124412 + a313e77 commit 3eb4f48

22 files changed

+443
-235
lines changed

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,6 @@ Another option is to output the data in compressed form. All files will get the
4141
java -jar restructurehdfs-0.3.3-all.jar --compression gzip --hdfs-uri <webhdfs_url> --output-directory <output_folder> <input_path_1> [<input_path_2> ...]
4242
```
4343

44-
Finally, by default, files records are not deduplicated after writing. To enable this behaviour, specify the option `--deduplicate` or `-d`. This set to false by default because of an issue with Biovotion data. Please see - [issue #16](https://github.com/RADAR-base/Restructure-HDFS-topic/issues/16) before enabling it.
44+
By default, files records are not deduplicated after writing. To enable this behaviour, specify the option `--deduplicate` or `-d`. This set to false by default because of an issue with Biovotion data. Please see - [issue #16](https://github.com/RADAR-base/Restructure-HDFS-topic/issues/16) before enabling it.
45+
46+
Finally, while processing, files are staged to a temporary directory and moved to the output directory afterwards. This has the advantage of less chance of data corruption, but it may result in slower performance. Disable staging using the `--no-stage` option.

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ apply plugin: 'java'
22
apply plugin: 'application'
33

44
group 'org.radarcns.restructurehdfs'
5-
version '0.3.3-SNAPSHOT'
5+
version '0.4.0-SNAPSHOT'
66
mainClassName = 'org.radarcns.RestructureAvroRecords'
77

88
run {

src/main/java/org/radarcns/RestructureAvroRecords.java

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,11 @@
2828
import org.apache.hadoop.fs.LocatedFileStatus;
2929
import org.apache.hadoop.fs.Path;
3030
import org.apache.hadoop.fs.RemoteIterator;
31-
import org.radarcns.util.CsvAvroConverter;
32-
import org.radarcns.util.FileCacheStore;
33-
import org.radarcns.util.JsonAvroConverter;
31+
import org.radarcns.data.CsvAvroConverter;
32+
import org.radarcns.data.FileCacheStore;
33+
import org.radarcns.data.JsonAvroConverter;
34+
import org.radarcns.data.RecordConverterFactory;
3435
import org.radarcns.util.ProgressBar;
35-
import org.radarcns.util.RecordConverterFactory;
3636
import org.radarcns.util.commandline.CommandLineArgs;
3737
import org.slf4j.Logger;
3838
import org.slf4j.LoggerFactory;
@@ -48,6 +48,7 @@
4848
import java.util.List;
4949
import java.util.Map;
5050
import java.util.TimeZone;
51+
import java.util.regex.Pattern;
5152

5253
public class RestructureAvroRecords {
5354
private static final Logger logger = LoggerFactory.getLogger(RestructureAvroRecords.class);
@@ -57,12 +58,14 @@ public class RestructureAvroRecords {
5758
private static final java.nio.file.Path BINS_FILE_NAME = Paths.get("bins.csv");
5859
private static final java.nio.file.Path SCHEMA_OUTPUT_FILE_NAME = Paths.get("schema.json");
5960
private static final SimpleDateFormat FILE_DATE_FORMAT = new SimpleDateFormat("yyyyMMdd_HH");
61+
private static final Pattern ILLEGAL_CHARACTER_PATTERN = Pattern.compile("[^a-zA-Z0-9_-]+");
6062

6163
static {
6264
FILE_DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC"));
6365
}
6466

6567
private final RecordConverterFactory converterFactory;
68+
private final boolean doStage;
6669

6770
private java.nio.file.Path outputPath;
6871
private java.nio.file.Path offsetsPath;
@@ -97,6 +100,7 @@ public static void main(String [] args) {
97100
commandLineArgs.outputDirectory)
98101
.useGzip("gzip".equalsIgnoreCase(commandLineArgs.compression))
99102
.doDeduplicate(commandLineArgs.deduplicate).format(commandLineArgs.format)
103+
.doStage(!commandLineArgs.noStage)
100104
.build();
101105

102106
try {
@@ -119,6 +123,7 @@ private RestructureAvroRecords(RestructureAvroRecords.Builder builder) {
119123

120124
this.useGzip = builder.useGzip;
121125
this.doDeduplicate = builder.doDeduplicate;
126+
this.doStage = builder.doStage;
122127
logger.info("Deduplicate set to {}", doDeduplicate);
123128

124129
String extension;
@@ -197,7 +202,7 @@ public void start(String directoryName) throws IOException {
197202

198203
// Actually process the files
199204
for (Map.Entry<String, List<Path>> entry : topicPaths.entrySet()) {
200-
try (FileCacheStore cache = new FileCacheStore(converterFactory, 100, useGzip, doDeduplicate)) {
205+
try (FileCacheStore cache = new FileCacheStore(converterFactory, 100, useGzip, doDeduplicate, doStage)) {
201206
for (Path filePath : entry.getValue()) {
202207
// If JsonMappingException occurs, log the error and continue with other files
203208
try {
@@ -284,27 +289,18 @@ private void writeRecord(GenericRecord record, String topicName, FileCacheStore
284289
Date time = getDate(keyField, valueField);
285290
java.nio.file.Path outputFileName = createFilename(time, suffix);
286291

287-
String projectId;
288-
289-
if(keyField.get("projectId") == null) {
290-
projectId = "unknown-project";
291-
} else {
292-
// Clean Project id for use in final pathname
293-
projectId = keyField.get("projectId").toString().replaceAll("[^a-zA-Z0-9_-]+", "");
294-
}
295-
296-
// Clean user id and create final output pathname
297-
String userId = keyField.get("userId").toString().replaceAll("[^a-zA-Z0-9_-]+", "");
292+
String projectId = sanitizeId(keyField.get("projectId"), "unknown-project");
293+
String userId = sanitizeId(keyField.get("userId"), "unknown-user");
298294

299295
java.nio.file.Path projectDir = this.outputPath.resolve(projectId);
300296
java.nio.file.Path userDir = projectDir.resolve(userId);
301297
java.nio.file.Path userTopicDir = userDir.resolve(topicName);
302298
java.nio.file.Path outputPath = userTopicDir.resolve(outputFileName);
303299

304300
// Write data
305-
int response = cache.writeRecord(outputPath, record);
301+
FileCacheStore.WriteResponse response = cache.writeRecord(outputPath, record);
306302

307-
if (response == FileCacheStore.CACHE_AND_NO_WRITE || response == FileCacheStore.NO_CACHE_AND_NO_WRITE) {
303+
if (!response.isSuccessful()) {
308304
// Write was unsuccessful due to different number of columns,
309305
// try again with new file name
310306
writeRecord(record, topicName, cache, ++suffix);
@@ -317,8 +313,9 @@ private void writeRecord(GenericRecord record, String topicName, FileCacheStore
317313
}
318314
}
319315

316+
String sourceId = sanitizeId(keyField.get("sourceId"), "unknown-source");
320317
// Count data (binned and total)
321-
bins.add(topicName, keyField.get("sourceId").toString(), time);
318+
bins.add(topicName, sourceId, time);
322319
processedRecordsCount++;
323320
}
324321
}
@@ -366,12 +363,25 @@ public static Date getDate(GenericRecord keyField, GenericRecord valueField) {
366363
return new Date(time);
367364
}
368365

366+
private static String sanitizeId(Object id, String defaultValue) {
367+
if (id == null) {
368+
return defaultValue;
369+
}
370+
String idString = ILLEGAL_CHARACTER_PATTERN.matcher(id.toString()).replaceAll("");
371+
if (idString.isEmpty()) {
372+
return defaultValue;
373+
} else {
374+
return idString;
375+
}
376+
}
377+
369378
public static class Builder {
370379
private boolean useGzip;
371380
private boolean doDeduplicate;
372381
private String hdfsUri;
373382
private String outputPath;
374383
private String format;
384+
private boolean doStage;
375385

376386
public Builder(final String uri, final String outputPath) {
377387
this.hdfsUri = uri;
@@ -397,5 +407,9 @@ public RestructureAvroRecords build() {
397407
return new RestructureAvroRecords(this);
398408
}
399409

410+
public Builder doStage(boolean stage) {
411+
this.doStage = stage;
412+
return this;
413+
}
400414
}
401415
}

src/main/java/org/radarcns/util/CsvAvroConverter.java renamed to src/main/java/org/radarcns/data/CsvAvroConverter.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,8 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.radarcns.util;
17+
package org.radarcns.data;
1818

19-
import com.fasterxml.jackson.databind.JsonMappingException;
2019
import com.fasterxml.jackson.databind.MappingIterator;
2120
import com.fasterxml.jackson.databind.ObjectReader;
2221
import com.fasterxml.jackson.databind.ObjectWriter;
@@ -34,7 +33,10 @@
3433
import java.io.Reader;
3534
import java.io.Writer;
3635
import java.nio.ByteBuffer;
37-
import java.util.*;
36+
import java.util.Iterator;
37+
import java.util.LinkedHashMap;
38+
import java.util.List;
39+
import java.util.Map;
3840

3941
/**
4042
* Converts deep hierarchical Avro records into flat CSV format. It uses a simple dot syntax in the
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
/*
2+
* Copyright 2017 The Hyve
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.radarcns.data;
18+
19+
import org.apache.avro.generic.GenericRecord;
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
22+
23+
import javax.annotation.Nonnull;
24+
import java.io.BufferedInputStream;
25+
import java.io.BufferedOutputStream;
26+
import java.io.ByteArrayInputStream;
27+
import java.io.Closeable;
28+
import java.io.Flushable;
29+
import java.io.IOException;
30+
import java.io.InputStream;
31+
import java.io.InputStreamReader;
32+
import java.io.OutputStream;
33+
import java.io.OutputStreamWriter;
34+
import java.io.Reader;
35+
import java.io.Writer;
36+
import java.nio.file.Files;
37+
import java.nio.file.Path;
38+
import java.nio.file.StandardOpenOption;
39+
import java.util.zip.GZIPInputStream;
40+
import java.util.zip.GZIPOutputStream;
41+
import java.util.zip.ZipException;
42+
43+
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
44+
45+
/** Keeps path handles of a path. */
46+
public class FileCache implements Closeable, Flushable, Comparable<FileCache> {
47+
private static final Logger logger = LoggerFactory.getLogger(FileCache.class);
48+
private static final int BUFFER_SIZE = 8192;
49+
50+
private final Writer writer;
51+
private final RecordConverter recordConverter;
52+
private final Path path;
53+
private final Path tmpPath;
54+
private long lastUse;
55+
56+
/**
57+
* File cache of given path, using given converter factory.
58+
* @param converterFactory converter factory to create a converter to write files with.
59+
* @param path path to cache.
60+
* @param record example record to create converter from, this is not written to path.
61+
* @param gzip whether to gzip the records
62+
* @throws IOException if the file and/or temporary files cannot be correctly read or written to.
63+
*/
64+
public FileCache(RecordConverterFactory converterFactory, Path path,
65+
GenericRecord record, boolean gzip, Path tmpDir) throws IOException {
66+
this.path = path;
67+
boolean fileIsNew = !Files.exists(path) || Files.size(path) == 0;
68+
OutputStream outFile;
69+
if (tmpDir == null) {
70+
this.tmpPath = null;
71+
outFile = Files.newOutputStream(path, StandardOpenOption.APPEND, StandardOpenOption.CREATE);
72+
} else {
73+
this.tmpPath = Files.createTempFile(tmpDir, path.getFileName().toString(),
74+
gzip ? ".tmp.gz" : ".tmp");
75+
outFile = Files.newOutputStream(tmpPath);
76+
}
77+
78+
OutputStream bufOut = new BufferedOutputStream(outFile);
79+
if (gzip) {
80+
bufOut = new GZIPOutputStream(bufOut);
81+
}
82+
83+
InputStream inputStream;
84+
if (fileIsNew) {
85+
inputStream = new ByteArrayInputStream(new byte[0]);
86+
} else {
87+
inputStream = inputStream(new BufferedInputStream(Files.newInputStream(path)), gzip);
88+
89+
if (tmpPath != null) {
90+
try {
91+
copy(path, bufOut, gzip);
92+
} catch (ZipException ex) {
93+
// restart output buffer
94+
bufOut.close();
95+
// clear output file
96+
outFile = Files.newOutputStream(tmpPath);
97+
bufOut = new GZIPOutputStream(new BufferedOutputStream(outFile));
98+
}
99+
}
100+
}
101+
102+
this.writer = new OutputStreamWriter(bufOut);
103+
104+
try (Reader reader = new InputStreamReader(inputStream)) {
105+
this.recordConverter = converterFactory.converterFor(writer, record, fileIsNew, reader);
106+
} catch (IOException ex) {
107+
try {
108+
writer.close();
109+
} catch (IOException exClose) {
110+
logger.error("Failed to close writer for {}", path, ex);
111+
}
112+
throw ex;
113+
}
114+
}
115+
116+
/**
117+
* Write a record to the cache.
118+
* @param record AVRO record
119+
* @return true or false based on {@link RecordConverter} write result
120+
* @throws IOException if the record cannot be used.
121+
*/
122+
public boolean writeRecord(GenericRecord record) throws IOException {
123+
boolean result = this.recordConverter.writeRecord(record);
124+
lastUse = System.nanoTime();
125+
return result;
126+
}
127+
128+
@Override
129+
public void close() throws IOException {
130+
recordConverter.close();
131+
writer.close();
132+
if (tmpPath != null) {
133+
Files.move(tmpPath, path, REPLACE_EXISTING);
134+
}
135+
}
136+
137+
@Override
138+
public void flush() throws IOException {
139+
recordConverter.flush();
140+
}
141+
142+
/**
143+
* Compares time that the filecaches were last used. If equal, it lexicographically compares
144+
* the absolute path of the path.
145+
* @param other FileCache to compare with.
146+
*/
147+
@Override
148+
public int compareTo(@Nonnull FileCache other) {
149+
int result = Long.compare(lastUse, other.lastUse);
150+
if (result != 0) {
151+
return result;
152+
}
153+
return path.compareTo(other.path);
154+
}
155+
156+
/** File that the cache is maintaining. */
157+
public Path getPath() {
158+
return path;
159+
}
160+
161+
private static void copy(Path source, OutputStream sink, boolean gzip) throws IOException {
162+
try (InputStream copyStream = inputStream(Files.newInputStream(source), gzip)) {
163+
copy(copyStream, sink);
164+
} catch (ZipException ex) {
165+
Path corruptPath = null;
166+
String suffix = "";
167+
for (int i = 0; corruptPath == null && i < 100; i++) {
168+
Path path = source.resolveSibling(source.getFileName() + ".corrupted" + suffix);
169+
if (!Files.exists(path)) {
170+
corruptPath = path;
171+
}
172+
suffix = "-" + i;
173+
}
174+
if (corruptPath != null) {
175+
logger.error("Original file {} was corrupted: {}."
176+
+ " Moved to {}.", source, ex, corruptPath);
177+
Files.move(source, corruptPath);
178+
} else {
179+
logger.error("Original file {} was corrupted: {}."
180+
+ " Too many corrupt backups stored, removing file.", source, ex);
181+
}
182+
throw ex;
183+
}
184+
}
185+
186+
private static InputStream inputStream(InputStream in, boolean gzip) throws IOException {
187+
return gzip ? new GZIPInputStream(in) : in;
188+
}
189+
190+
/**
191+
* Reads all bytes from an input stream and writes them to an output stream.
192+
*/
193+
private static void copy(InputStream source, OutputStream sink) throws IOException {
194+
byte[] buf = new byte[BUFFER_SIZE];
195+
int n;
196+
while ((n = source.read(buf)) > 0) {
197+
sink.write(buf, 0, n);
198+
}
199+
}
200+
}

0 commit comments

Comments
 (0)