Skip to content

Commit c5c32fb

Browse files
committed
Merge branch 'master' into dev
2 parents 0d687fa + b252b09 commit c5c32fb

File tree

7 files changed

+148
-58
lines changed

7 files changed

+148
-58
lines changed

.travis.yml

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
language: java
22
jdk:
33
- oraclejdk8
4-
env:
5-
TERM: dumb
6-
before_cache:
7-
- rm -f $HOME/.gradle/caches/modules-2/modules-2.lock
8-
- rm -fr $HOME/.gradle/caches/*/plugin-resolution
9-
- rm -fr $HOME/.gradle/caches/*/fileHashes
4+
sudo: false
5+
106
cache:
117
directories:
12-
- $HOME/.gradle/caches/
13-
- $HOME/.gradle/wrapper/
8+
- $HOME/.gradle/caches/jars-1
9+
- $HOME/.gradle/caches/jars-2
10+
- $HOME/.gradle/caches/jars-3
11+
- $HOME/.gradle/caches/modules-2/files-2.1/
12+
- $HOME/.gradle/native
13+
- $HOME/.gradle/wrapper
1414

1515
deploy:
1616
provider: releases

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,18 @@ Build jar from source with
1515
```shell
1616
./gradlew build
1717
```
18-
and find the output JAR file as `build/libs/restructurehdfs-all-0.1.2-SNAPSHOT.jar`. Then run with:
18+
and find the output JAR file as `build/libs/restructurehdfs-all-0.2.jar`. Then run with:
1919

2020
```shell
21-
java -jar restructurehdfs-all-0.1.2-SNAPSHOT.jar <webhdfs_url> <hdfs_topic_path> <output_folder>
21+
java -jar restructurehdfs-all-0.2.jar <webhdfs_url> <hdfs_topic_path> <output_folder>
2222
```
2323

2424
By default, this will output the data in CSV format. If JSON format is preferred, use the following instead:
2525
```
26-
java -Dorg.radarcns.format=json -jar restructurehdfs-all-0.1.2-SNAPSHOT.jar <webhdfs_url> <hdfs_topic_path> <output_folder>
26+
java -Dorg.radarcns.format=json -jar restructurehdfs-all-0.2.jar <webhdfs_url> <hdfs_topic_path> <output_folder>
2727
```
2828

2929
Another option is to output the data in compressed form. All files will get the `gz` suffix, and can be decompressed with a GZIP decoder. Note that for a very small number of records, this may actually increase the file size.
3030
```
31-
java -Dorg.radarcns.compress=gzip -jar restructurehdfs-all-0.1.1.jar <webhdfs_url> <hdfs_topic_path> <output_folder>
31+
java -Dorg.radarcns.compress=gzip -jar restructurehdfs-all-0.2.jar <webhdfs_url> <hdfs_topic_path> <output_folder>
3232
```

build.gradle

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ apply plugin: 'java'
22
apply plugin: 'application'
33

44
group 'org.radarcns.restructurehdfs'
5-
version '0.1.2-SNAPSHOT'
5+
version '0.2'
66
mainClassName = 'org.radarcns.RestructureAvroRecords'
77

88
run {
@@ -12,10 +12,9 @@ run {
1212
sourceCompatibility = '1.8'
1313
targetCompatibility = '1.8'
1414

15-
ext.avroVersion = '1.8.1'
16-
ext.jacksonVersion = '2.8.5'
15+
ext.avroVersion = '1.8.2'
16+
ext.jacksonVersion = '2.8.9'
1717
ext.hadoopVersion = '2.7.3'
18-
ext.log4jVersion = '2.8.1'
1918

2019
repositories {
2120
jcenter()
@@ -57,6 +56,6 @@ artifacts {
5756
}
5857

5958
task wrapper(type: Wrapper) {
60-
gradleVersion = '3.4.1'
61-
distributionUrl distributionUrl.replace("bin", "all")
59+
gradleVersion = '3.5'
60+
distributionType 'all'
6261
}

gradle/wrapper/gradle-wrapper.jar

571 Bytes
Binary file not shown.
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
#Thu Apr 13 12:11:40 CEST 2017
1+
#Mon Jul 03 09:50:31 CEST 2017
22
distributionBase=GRADLE_USER_HOME
33
distributionPath=wrapper/dists
44
zipStoreBase=GRADLE_USER_HOME
55
zipStorePath=wrapper/dists
6-
distributionUrl=https\://services.gradle.org/distributions/gradle-3.4.1-all.zip
6+
distributionUrl=https\://services.gradle.org/distributions/gradle-3.5-all.zip

src/main/java/org/radarcns/RestructureAvroRecords.java

Lines changed: 51 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,17 @@
1919
import java.io.File;
2020
import java.io.IOException;
2121
import java.text.SimpleDateFormat;
22+
import java.util.ArrayList;
2223
import java.util.Date;
24+
import java.util.HashMap;
25+
import java.util.List;
26+
import java.util.Map;
2327
import java.util.TimeZone;
2428
import org.apache.avro.Schema.Field;
2529
import org.apache.avro.file.DataFileReader;
2630
import org.apache.avro.generic.GenericDatumReader;
2731
import org.apache.avro.generic.GenericRecord;
2832
import org.apache.avro.mapred.FsInput;
29-
import org.apache.commons.io.FilenameUtils;
3033
import org.apache.hadoop.conf.Configuration;
3134
import org.apache.hadoop.fs.FileSystem;
3235
import org.apache.hadoop.fs.LocatedFileStatus;
@@ -35,6 +38,7 @@
3538
import org.radarcns.util.CsvAvroConverter;
3639
import org.radarcns.util.FileCacheStore;
3740
import org.radarcns.util.JsonAvroConverter;
41+
import org.radarcns.util.ProgressBar;
3842
import org.radarcns.util.RecordConverterFactory;
3943
import org.slf4j.Logger;
4044
import org.slf4j.LoggerFactory;
@@ -55,18 +59,17 @@ public class RestructureAvroRecords {
5559

5660
private File outputPath;
5761
private File offsetsPath;
58-
private OffsetRangeSet seenFiles;
5962
private Frequency bins;
6063

6164
private final Configuration conf = new Configuration();
6265

63-
private int processedFileCount;
64-
private int processedRecordsCount;
66+
private long processedFileCount;
67+
private long processedRecordsCount;
6568
private static final boolean USE_GZIP = "gzip".equalsIgnoreCase(System.getProperty("org.radarcns.compression"));
6669

6770
public static void main(String [] args) throws Exception {
6871
if (args.length != 3) {
69-
System.out.println("Usage: hadoop jar restructurehdfs-all-0.1.0.jar <webhdfs_url> <hdfs_topic> <output_folder>");
72+
System.out.println("Usage: hadoop jar restructurehdfs-all-0.2.jar <webhdfs_url> <hdfs_root_directory> <output_folder>");
7073
System.exit(1);
7174
}
7275

@@ -120,80 +123,89 @@ public void setOutputPath(String path) {
120123
bins = Frequency.read(new File(outputPath, BINS_FILE_NAME));
121124
}
122125

123-
public int getProcessedFileCount() {
126+
public long getProcessedFileCount() {
124127
return processedFileCount;
125128
}
126129

127-
public int getProcessedRecordsCount() {
130+
public long getProcessedRecordsCount() {
128131
return processedRecordsCount;
129132
}
130133

131134
public void start(String directoryName) throws IOException {
132135
// Get files and directories
133136
Path path = new Path(directoryName);
134137
FileSystem fs = FileSystem.get(conf);
135-
RemoteIterator<LocatedFileStatus> files = fs.listLocatedStatus(path);
138+
136139

137140
try (OffsetRangeFile offsets = new OffsetRangeFile(offsetsPath)) {
141+
OffsetRangeSet seenFiles;
138142
try {
139143
seenFiles = offsets.read();
140144
} catch (IOException ex) {
141145
logger.error("Error reading offsets file. Processing all offsets.");
142146
seenFiles = new OffsetRangeSet();
143147
}
144-
// Process the directories topics
145-
processedFileCount = 0;
148+
logger.info("Retrieving file list from {}", path);
149+
// Get filenames to process
150+
Map<String, List<Path>> topicPaths = new HashMap<>();
151+
long toProcessFileCount = 0L;
152+
processedFileCount = 0L;
153+
RemoteIterator<LocatedFileStatus> files = fs.listFiles(path, true);
146154
while (files.hasNext()) {
147155
LocatedFileStatus locatedFileStatus = files.next();
148-
Path filePath = locatedFileStatus.getPath();
149-
150-
if (filePath.toString().contains("+tmp")) {
156+
if (locatedFileStatus.isDirectory()) {
151157
continue;
152158
}
159+
Path filePath = locatedFileStatus.getPath();
153160

154-
if (locatedFileStatus.isDirectory()) {
155-
processTopic(filePath, converterFactory, offsets);
161+
String topic = getTopic(filePath, seenFiles);
162+
if (topic != null) {
163+
topicPaths.computeIfAbsent(topic, k -> new ArrayList<>()).add(filePath);
164+
toProcessFileCount++;
156165
}
157166
}
158-
}
159-
}
160-
161-
private void processTopic(Path topicPath, RecordConverterFactory converterFactory,
162-
OffsetRangeFile offsets) throws IOException {
163-
// Get files in this topic directory
164-
FileSystem fs = FileSystem.get(conf);
165-
RemoteIterator<LocatedFileStatus> files = fs.listFiles(topicPath, true);
166167

167-
String topicName = topicPath.getName();
168+
logger.info("Converting {} files", toProcessFileCount);
168169

169-
try (FileCacheStore cache = new FileCacheStore(converterFactory, 100, USE_GZIP)) {
170-
while (files.hasNext()) {
171-
LocatedFileStatus locatedFileStatus = files.next();
170+
ProgressBar progressBar = new ProgressBar(toProcessFileCount, 10);
171+
progressBar.update(0);
172172

173-
if (locatedFileStatus.isFile()) {
174-
this.processFile(locatedFileStatus.getPath(), topicName, cache, offsets);
173+
// Actually process the files
174+
for (Map.Entry<String, List<Path>> entry : topicPaths.entrySet()) {
175+
try (FileCacheStore cache = new FileCacheStore(converterFactory, 100, USE_GZIP)) {
176+
for (Path filePath : entry.getValue()) {
177+
this.processFile(filePath, entry.getKey(), cache, offsets);
178+
progressBar.update(++processedFileCount);
179+
}
175180
}
176181
}
177182
}
178183
}
179184

180-
private void processFile(Path filePath, String topicName, FileCacheStore cache,
181-
OffsetRangeFile offsets) throws IOException {
182-
String fileName = filePath.getName();
185+
private static String getTopic(Path filePath, OffsetRangeSet seenFiles) {
186+
if (filePath.toString().contains("+tmp")) {
187+
return null;
188+
}
183189

190+
String fileName = filePath.getName();
184191
// Skip if extension is not .avro
185-
if (!FilenameUtils.getExtension(fileName).equals("avro")) {
186-
logger.info("Skipped non-avro file: {}", fileName);
187-
return;
192+
if (!fileName.endsWith(".avro")) {
193+
logger.info("Skipping non-avro file: {}", fileName);
194+
return null;
188195
}
189196

190197
OffsetRange range = OffsetRange.parse(fileName);
191198
// Skip already processed avro files
192199
if (seenFiles.contains(range)) {
193-
return;
200+
return null;
194201
}
195202

196-
logger.info("{}", filePath);
203+
return filePath.getParent().getParent().getName();
204+
}
205+
206+
private void processFile(Path filePath, String topicName, FileCacheStore cache,
207+
OffsetRangeFile offsets) throws IOException {
208+
logger.debug("Reading {}", filePath);
197209

198210
// Read and parse avro file
199211
FsInput input = new FsInput(filePath, conf);
@@ -218,12 +230,12 @@ record = dataFileReader.next(record);
218230

219231
// Write which file has been processed and update bins
220232
try {
233+
OffsetRange range = OffsetRange.parse(filePath.getName());
221234
offsets.write(range);
222235
bins.write();
223236
} catch (IOException ex) {
224237
logger.warn("Failed to update status. Continuing processing.", ex);
225238
}
226-
processedFileCount++;
227239
}
228240

229241
private void writeRecord(GenericRecord record, String topicName, FileCacheStore cache)
@@ -273,4 +285,5 @@ public static String createHourTimestamp(GenericRecord valueField, Field timeFie
273285
Date date = new Date((long) (time * 1000d));
274286
return FILE_DATE_FORMAT.format(date);
275287
}
288+
276289
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright 2017 The Hyve
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.radarcns.util;
18+
19+
/**
20+
* Progress bar.
21+
* Based on https://stackoverflow.com/a/43381186/574082.
22+
*/
23+
public class ProgressBar {
24+
private final long total;
25+
private final int numStripes;
26+
private int previousPercentage;
27+
28+
public ProgressBar(long total, int numStripes) {
29+
if (total < 0) {
30+
throw new IllegalArgumentException("Total of progress bar must be positive");
31+
}
32+
if (numStripes <= 0) {
33+
throw new IllegalArgumentException("Number of stripes in progress bar must be positive");
34+
}
35+
this.total = total;
36+
this.numStripes = numStripes;
37+
this.previousPercentage = -1;
38+
}
39+
40+
public void update(long remain) {
41+
if (remain > total || remain < 0) {
42+
throw new IllegalArgumentException(
43+
"Update value " + remain + " out of range [0, " + total + "].");
44+
}
45+
int remainPercent;
46+
if (total > 0) {
47+
remainPercent = (int) ((100 * remain) / total);
48+
} else {
49+
remainPercent = 100;
50+
}
51+
if (remainPercent == previousPercentage) {
52+
return;
53+
}
54+
previousPercentage = remainPercent;
55+
int stripesFilled = remainPercent / numStripes;
56+
char notFilled = '-';
57+
char filled = '*';
58+
// 2 init + numStripes + 2 end + 4 percentage
59+
StringBuilder builder = new StringBuilder(numStripes + 8);
60+
builder.append("\r[");
61+
for (int i = 0; i < stripesFilled; i++) {
62+
builder.append(filled);
63+
}
64+
for (int i = stripesFilled; i < numStripes; i++) {
65+
builder.append(notFilled);
66+
}
67+
builder.append("] ").append(remainPercent).append('%');
68+
if (remain < total) {
69+
System.out.print(builder.toString());
70+
} else {
71+
System.out.println(builder.toString());
72+
}
73+
}
74+
75+
public boolean isDone() {
76+
return previousPercentage == 100;
77+
}
78+
}

0 commit comments

Comments
 (0)