Skip to content

Commit c9cd842

Browse files
authored
Merge pull request #34 from RADAR-base/release-0.5.1
Release 0.5.1
2 parents 04583dc + 9ee1864 commit c9cd842

File tree

17 files changed

+172
-105
lines changed

17 files changed

+172
-105
lines changed

README.md

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,10 @@ Data streamed to HDFS using the [RADAR HDFS sink connector](https://github.com/R
88

99
This package is available as docker image [`radarbase/radar-hdfs-restructure`](https://hub.docker.com/r/radarbase/radar-hdfs-restructure). The entrypoint of the image is the current application. So in all of the commands listed in usage, replace `radar-hdfs-restructure` with for example:
1010
```shell
11-
docker run --rm -t --network hadoop -v "$PWD/output:/output" radarbase/radar-hdfs-restructure:0.5.0 -u hdfs://hdfs -o /output /myTopic
11+
docker run --rm -t --network hadoop -v "$PWD/output:/output" radarbase/radar-hdfs-restructure:0.5.1 -n hdfs-namenode -o /output /myTopic
1212
```
1313
if your docker cluster is running in the `hadoop` network and your output directory should be `./output`.
1414

15-
1615
## Local build
1716

1817
This package requires at least Java JDK 8. Build the distribution with
@@ -24,7 +23,7 @@ This package requires at least Java JDK 8. Build the distribution with
2423
and install the package into `/usr/local` with for example
2524
```shell
2625
sudo mkdir -p /usr/local
27-
sudo tar -xzf build/distributions/radar-hdfs-restructure-0.5.0.tar.gz -C /usr/local --strip-components=1
26+
sudo tar -xzf build/distributions/radar-hdfs-restructure-0.5.1.tar.gz -C /usr/local --strip-components=1
2827
```
2928

3029
Now the `radar-hdfs-restructure` command should be available.
@@ -34,11 +33,11 @@ Now the `radar-hdfs-restructure` command should be available.
3433
When the application is installed, it can be used as follows:
3534

3635
```shell
37-
radar-hdfs-restructure --hdfs-uri <webhdfs_url> --output-directory <output_folder> <input_path_1> [<input_path_2> ...]
36+
radar-hdfs-restructure --nameservice <hdfs_node> --output-directory <output_folder> <input_path_1> [<input_path_2> ...]
3837
```
3938
or you can use the short form as well like -
4039
```shell
41-
radar-hdfs-restructure -u <webhdfs_url> -o <output_folder> <input_path_1> [<input_path_2> ...]
40+
radar-hdfs-restructure -n <hdfs_node> -o <output_folder> <input_path_1> [<input_path_2> ...]
4241
```
4342

4443
To display the usage and all available options you can use the help option as follows -
@@ -49,12 +48,12 @@ Note that the options preceded by the `*` in the above output are required to ru
4948

5049
By default, this will output the data in CSV format. If JSON format is preferred, use the following instead:
5150
```shell
52-
radar-hdfs-restructure --format json --hdfs-uri <webhdfs_url> --output-directory <output_folder> <input_path_1> [<input_path_2> ...]
51+
radar-hdfs-restructure --format json --nameservice <hdfs_node> --output-directory <output_folder> <input_path_1> [<input_path_2> ...]
5352
```
5453

5554
Another option is to output the data in compressed form. All files will get the `gz` suffix, and can be decompressed with a GZIP decoder. Note that for a very small number of records, this may actually increase the file size.
5655
```
57-
radar-hdfs-restructure --compression gzip --hdfs-uri <webhdfs_url> --output-directory <output_folder> <input_path_1> [<input_path_2> ...]
56+
radar-hdfs-restructure --compression gzip --nameservice <hdfs_node> --output-directory <output_folder> <input_path_1> [<input_path_2> ...]
5857
```
5958

6059
By default, files records are not deduplicated after writing. To enable this behaviour, specify the option `--deduplicate` or `-d`. This set to false by default because of an issue with Biovotion data. Please see - [issue #16](https://github.com/RADAR-base/Restructure-HDFS-topic/issues/16) before enabling it.

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ plugins {
77
}
88

99
group 'org.radarcns'
10-
version '0.5.0'
10+
version '0.5.1'
1111
mainClassName = 'org.radarcns.hdfs.Application'
1212

1313
sourceCompatibility = '1.8'

src/main/java/org/radarcns/hdfs/Application.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.slf4j.LoggerFactory;
1919

2020
import java.io.IOException;
21+
import java.io.UncheckedIOException;
2122
import java.nio.file.Files;
2223
import java.text.SimpleDateFormat;
2324
import java.time.Duration;
@@ -88,18 +89,18 @@ public static void main(String [] args) {
8889
() -> System.out.println(Timer.getInstance()), "Timer"));
8990
}
9091

91-
RestructureSettings settings = new RestructureSettings.Builder(commandLineArgs.outputDirectory)
92-
.compression(commandLineArgs.compression)
93-
.cacheSize(commandLineArgs.cacheSize)
94-
.format(commandLineArgs.format)
95-
.doDeduplicate(commandLineArgs.deduplicate)
96-
.tempDir(commandLineArgs.tmpDir)
97-
.numThreads(commandLineArgs.numThreads)
98-
.build();
99-
10092
Application application;
10193

10294
try {
95+
RestructureSettings settings = new RestructureSettings.Builder(commandLineArgs.outputDirectory)
96+
.compression(commandLineArgs.compression)
97+
.cacheSize(commandLineArgs.cacheSize)
98+
.format(commandLineArgs.format)
99+
.doDeduplicate(commandLineArgs.deduplicate)
100+
.tempDir(commandLineArgs.tmpDir)
101+
.numThreads(commandLineArgs.numThreads)
102+
.build();
103+
103104
HdfsSettings hdfsSettings = new HdfsSettings.Builder(commandLineArgs.hdfsName)
104105
.hdfsHighAvailability(commandLineArgs.hdfsHa,
105106
commandLineArgs.hdfsUri1, commandLineArgs.hdfsUri2)
@@ -119,6 +120,10 @@ public static void main(String [] args) {
119120
+ " Configure --namenode-1, --namenode-2 and --namenode-ha");
120121
System.exit(1);
121122
return;
123+
} catch (UncheckedIOException ex) {
124+
logger.error("Failed to create temporary directory " + commandLineArgs.tmpDir);
125+
System.exit(1);
126+
return;
122127
} catch (IOException ex) {
123128
logger.error("Failed to initialize plugins", ex);
124129
System.exit(1);

src/main/java/org/radarcns/hdfs/RadarHdfsRestructure.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.slf4j.LoggerFactory;
3939

4040
import java.io.IOException;
41+
import java.io.UncheckedIOException;
4142
import java.text.NumberFormat;
4243
import java.text.SimpleDateFormat;
4344
import java.time.Duration;
@@ -58,14 +59,9 @@
5859
public class RadarHdfsRestructure {
5960
private static final Logger logger = LoggerFactory.getLogger(RadarHdfsRestructure.class);
6061

61-
private static final SimpleDateFormat FILE_DATE_FORMAT = new SimpleDateFormat("yyyyMMdd_HH");
6262
/** Number of offsets to process in a single task. */
6363
private static final int BATCH_SIZE = 500_000;
6464

65-
static {
66-
FILE_DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC"));
67-
}
68-
6965
private final int numThreads;
7066
private final Configuration conf;
7167
private final FileStoreFactory fileStoreFactory;
@@ -124,7 +120,7 @@ private Stream<Path> walk(FileSystem fs, Path path) {
124120
try {
125121
files = fs.listStatus(path);
126122
} catch (IOException e) {
127-
throw new IllegalStateException(e);
123+
throw new UncheckedIOException(e);
128124
}
129125
return Stream.of(files).parallel()
130126
.flatMap(f -> {
@@ -151,7 +147,7 @@ private void processPaths(TopicFileList topicPaths, Accountant accountant) throw
151147

152148
ExecutorService executor = Executors.newWorkStealingPool(pathFactory.isTopicPartitioned() ? this.numThreads : 1);
153149

154-
ProgressBar progressBar = new ProgressBar(topicPaths.size, 50, 100, TimeUnit.MILLISECONDS);
150+
ProgressBar progressBar = new ProgressBar(topicPaths.size, 50, 500, TimeUnit.MILLISECONDS);
155151

156152
// Actually process the files
157153
topicPaths.files.stream()
@@ -181,7 +177,7 @@ private void processPaths(TopicFileList topicPaths, Accountant accountant) throw
181177
cache.flush();
182178
}
183179
}
184-
} catch (IOException ex) {
180+
} catch (IOException | UncheckedIOException ex) {
185181
logger.error("Failed to process file", ex);
186182
} catch (IllegalStateException ex) {
187183
logger.warn("Shutting down");

src/main/java/org/radarcns/hdfs/accounting/Accountant.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public Accountant(FileStoreFactory factory) throws IOException {
3333
tempDir = new TemporaryDirectory(settings.getTempDir(), "accounting-");
3434
this.offsetFile = OffsetRangeFile.read(storage, settings.getOutputPath().resolve(OFFSETS_FILE_NAME));
3535
this.offsetFile.setTempDir(tempDir.getPath());
36-
this.binFile = BinFile.read(storage, settings.getOutputPath().resolve(BINS_FILE_NAME));
36+
this.binFile = new BinFile(storage, settings.getOutputPath().resolve(BINS_FILE_NAME));
3737
this.binFile.setTempDir(tempDir.getPath());
3838
}
3939

src/main/java/org/radarcns/hdfs/accounting/Bin.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@ public class Bin {
77
private final String topic;
88
private final String category;
99
private final String time;
10+
private final int hash;
1011

1112
public Bin(@Nonnull String topic, @Nonnull String category, @Nonnull String time) {
1213
this.topic = topic;
1314
this.category = category;
1415
this.time = time;
16+
this.hash = Objects.hash(topic, category, time);
1517
}
1618

1719
public String getTopic() {
@@ -28,7 +30,7 @@ public String getTime() {
2830

2931
@Override
3032
public String toString() {
31-
return topic + '|' + category + '|' + time;
33+
return topic + ',' + category + ',' + time;
3234
}
3335

3436
@Override
@@ -47,6 +49,6 @@ public boolean equals(Object o) {
4749

4850
@Override
4951
public int hashCode() {
50-
return Objects.hash(topic, category, time);
52+
return hash;
5153
}
5254
}

src/main/java/org/radarcns/hdfs/accounting/BinFile.java

Lines changed: 87 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -25,63 +25,48 @@
2525
import java.io.BufferedReader;
2626
import java.io.BufferedWriter;
2727
import java.io.IOException;
28+
import java.io.UncheckedIOException;
2829
import java.nio.file.Files;
2930
import java.nio.file.Path;
31+
import java.util.HashSet;
3032
import java.util.Map;
3133
import java.util.Objects;
34+
import java.util.Set;
3235
import java.util.concurrent.ConcurrentHashMap;
3336
import java.util.concurrent.ConcurrentMap;
3437
import java.util.concurrent.TimeUnit;
3538
import java.util.concurrent.atomic.LongAdder;
39+
import java.util.function.BiFunction;
40+
import java.util.function.Function;
41+
import java.util.function.Supplier;
3642
import java.util.stream.Collectors;
43+
import java.util.stream.Stream;
44+
45+
import static org.radarcns.hdfs.accounting.OffsetRangeFile.COMMA_PATTERN;
46+
import static org.radarcns.hdfs.accounting.OffsetRangeFile.read;
47+
import static org.radarcns.hdfs.util.ThrowingConsumer.tryCatch;
3748

3849
/** Store overview of records written, divided into bins. */
3950
public class BinFile extends PostponedWriter {
4051
private static final Logger logger = LoggerFactory.getLogger(BinFile.class);
52+
private static final String BINS_HEADER = String.join(
53+
",", "topic", "device", "timestamp", "count") + "\n";
4154

42-
private final ConcurrentMap<Bin, LongAdder> bins;
55+
private final ConcurrentMap<Bin, Long> bins;
4356
private final Path path;
4457
private final StorageDriver storage;
4558

46-
public BinFile(@Nonnull StorageDriver storageDriver, @Nonnull Path path,
47-
@Nonnull ConcurrentMap<Bin, LongAdder> initialData) {
59+
public BinFile(@Nonnull StorageDriver storageDriver, @Nonnull Path path) {
4860
super("bins", 5, TimeUnit.SECONDS);
4961
Objects.requireNonNull(path);
50-
Objects.requireNonNull(initialData);
5162
this.storage = storageDriver;
5263
this.path = path;
53-
this.bins = initialData;
54-
}
55-
56-
public static BinFile read(StorageDriver storage, Path path) {
57-
ConcurrentMap<Bin, LongAdder> map = new ConcurrentHashMap<>();
58-
try (BufferedReader input = storage.newBufferedReader(path)){
59-
// Read in all lines as multikeymap (key, key, key, value)
60-
String line = input.readLine();
61-
if (line != null) {
62-
line = input.readLine();
63-
while (line != null) {
64-
String[] columns = line.split(",");
65-
try {
66-
Bin bin = new Bin(columns[0], columns[1], columns[2]);
67-
LongAdder adder = new LongAdder();
68-
adder.add(Long.valueOf(columns[3]));
69-
map.put(bin, adder);
70-
} catch (ArrayIndexOutOfBoundsException ex) {
71-
logger.warn("Unable to read row of the bins file. Skipping.");
72-
}
73-
line = input.readLine();
74-
}
75-
}
76-
} catch (IOException e) {
77-
logger.warn("Could not read the file with bins. Creating new file when writing.");
78-
}
79-
return new BinFile(storage, path, map);
64+
this.bins = new ConcurrentHashMap<>();
8065
}
8166

8267
/** Add number of instances to given bin. */
8368
public void add(Bin bin, long value) {
84-
bins.computeIfAbsent(bin, b -> new LongAdder()).add(value);
69+
bins.compute(bin, compute(() -> 0L, v -> v + value));
8570
}
8671

8772
/** Put a map of bins. */
@@ -92,38 +77,87 @@ public void putAll(Map<? extends Bin, ? extends Number> binMap) {
9277
@Override
9378
public String toString() {
9479
return bins.entrySet().stream()
95-
.map(e -> e.getKey() + " - " + e.getValue().sum())
80+
.map(e -> e.getKey() + " - " + e.getValue())
9681
.collect(Collectors.joining("\n"));
9782
}
9883

9984
@Override
10085
protected void doWrite() {
86+
Path tempPath;
10187
try {
102-
Path tempPath = createTempFile("bins", ".csv");
103-
104-
// Write all bins to csv
105-
try (BufferedWriter bw = Files.newBufferedWriter(tempPath)) {
106-
String header = String.join(",", "topic", "device", "timestamp", "count");
107-
bw.write(header);
108-
bw.write('\n');
109-
110-
for (Map.Entry<Bin, LongAdder> entry : bins.entrySet()) {
111-
Bin bin = entry.getKey();
112-
bw.write(bin.getTopic());
113-
bw.write(',');
114-
bw.write(bin.getCategory());
115-
bw.write(',');
116-
bw.write(bin.getTime());
117-
bw.write(',');
118-
bw.write(String.valueOf(entry.getValue().sum()));
119-
bw.write('\n');
120-
}
88+
tempPath = createTempFile("bins", ".csv");
89+
} catch (IOException e) {
90+
logger.error("Cannot create temporary bins file: {}", e.toString());
91+
return;
92+
}
93+
94+
BufferedReader reader = null;
95+
Stream<String[]> lines;
96+
97+
try {
98+
reader = storage.newBufferedReader(path);
99+
100+
if (reader.readLine() == null) {
101+
lines = Stream.empty();
102+
} else {
103+
lines = reader.lines()
104+
.map(COMMA_PATTERN::split);
121105
}
106+
} catch (IOException ex){
107+
logger.warn("Could not read the file with bins. Creating new file when writing.");
108+
lines = Stream.empty();
109+
}
110+
111+
Set<Bin> binKeys = new HashSet<>(bins.keySet());
112+
113+
try (BufferedWriter bw = Files.newBufferedWriter(tempPath)) {
114+
bw.write(BINS_HEADER);
115+
116+
lines.forEach(s -> {
117+
Bin bin = new Bin(s[0], s[1], s[2]);
118+
long value = Long.parseLong(s[3]);
119+
if (binKeys.remove(bin)) {
120+
value += bins.remove(bin);
121+
}
122+
writeLine(bw, bin, value);
123+
});
124+
binKeys.forEach(bin -> writeLine(bw, bin, bins.remove(bin)));
122125

123126
storage.store(tempPath, path);
124-
} catch (IOException e) {
127+
} catch (UncheckedIOException | IOException e) {
125128
logger.error("Failed to write bins: {}", e);
129+
} finally {
130+
if (reader != null) {
131+
try {
132+
reader.close();
133+
} catch (IOException e) {
134+
logger.debug("Failed to close bin file reader", e);
135+
}
136+
}
126137
}
127138
}
128139

140+
private static void writeLine(BufferedWriter writer, Bin bin, long value) {
141+
try {
142+
writer.write(bin.getTopic());
143+
writer.write(',');
144+
writer.write(bin.getCategory());
145+
writer.write(',');
146+
writer.write(bin.getTime());
147+
writer.write(',');
148+
writer.write(Long.toString(value));
149+
writer.write('\n');
150+
} catch (IOException e) {
151+
throw new UncheckedIOException(e);
152+
}
153+
}
154+
155+
private static <K, V1, V2> BiFunction<K, V1, V2> compute(Supplier<? extends V1> init, Function<? super V1, ? extends V2> update) {
156+
return (k, v) -> {
157+
if (v == null) {
158+
v = init.get();
159+
}
160+
return update.apply(v);
161+
};
162+
}
129163
}

0 commit comments

Comments
 (0)