Skip to content

Commit 5fa5b2a

Browse files
authored
Merge pull request #47 from RADAR-base/release-0.5.7
Release 0.5.7
2 parents 7b6f49e + b40f153 commit 5fa5b2a

File tree

9 files changed

+104
-9
lines changed

9 files changed

+104
-9
lines changed

.travis.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
language: java
2+
# Force use of dist where oraclejdk8 is supported
3+
dist: trusty
24
jdk:
35
- oraclejdk8
46
sudo: false

README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ Data streamed to HDFS using the [RADAR HDFS sink connector](https://github.com/R
88

99
This package is available as docker image [`radarbase/radar-hdfs-restructure`](https://hub.docker.com/r/radarbase/radar-hdfs-restructure). The entrypoint of the image is the current application. So in all of the commands listed in usage, replace `radar-hdfs-restructure` with for example:
1010
```shell
11-
docker run --rm -t --network hadoop -v "$PWD/output:/output" radarbase/radar-hdfs-restructure:0.5.6 -n hdfs-namenode -o /output /myTopic
11+
docker run --rm -t --network hadoop -v "$PWD/output:/output" radarbase/radar-hdfs-restructure:0.5.7 -n hdfs-namenode -o /output /myTopic
1212
```
1313
if your docker cluster is running in the `hadoop` network and your output directory should be `./output`.
1414

@@ -23,7 +23,7 @@ This package requires at least Java JDK 8. Build the distribution with
2323
and install the package into `/usr/local` with for example
2424
```shell
2525
sudo mkdir -p /usr/local
26-
sudo tar -xzf build/distributions/radar-hdfs-restructure-0.5.6.tar.gz -C /usr/local --strip-components=1
26+
sudo tar -xzf build/distributions/radar-hdfs-restructure-0.5.7.tar.gz -C /usr/local --strip-components=1
2727
```
2828

2929
Now the `radar-hdfs-restructure` command should be available.
@@ -60,6 +60,8 @@ By default, files records are not deduplicated after writing. To enable this beh
6060

6161
To set the output user ID and group ID, specify the `-p local-uid=123` and `-p local-gid=12` properties.
6262

63+
To run the output generator as a service that will regularly poll the HDFS directory, add the `--service` flag and optionally the `--interval` flag to adjust the polling interval.
64+
6365
## Extending the connector
6466

6567
To implement alternative storage paths, storage drivers or storage formats, put your custom JAR in

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ plugins {
77
}
88

99
group 'org.radarcns'
10-
version '0.5.6'
10+
version '0.5.7'
1111
mainClassName = 'org.radarcns.hdfs.Application'
1212

1313
sourceCompatibility = '1.8'
@@ -190,5 +190,5 @@ bintray {
190190
}
191191

192192
wrapper {
193-
gradleVersion '5.2.1'
193+
gradleVersion '5.4.1'
194194
}

gradle/wrapper/gradle-wrapper.jar

426 Bytes
Binary file not shown.
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
distributionBase=GRADLE_USER_HOME
22
distributionPath=wrapper/dists
3-
distributionUrl=https\://services.gradle.org/distributions/gradle-5.2.1-bin.zip
3+
distributionUrl=https\://services.gradle.org/distributions/gradle-5.4.1-bin.zip
44
zipStoreBase=GRADLE_USER_HOME
55
zipStorePath=wrapper/dists

gradlew

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,21 @@
11
#!/usr/bin/env sh
22

3+
#
4+
# Copyright 2015 the original author or authors.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
319
##############################################################################
420
##
521
## Gradle start up script for UN*X
@@ -28,7 +44,7 @@ APP_NAME="Gradle"
2844
APP_BASE_NAME=`basename "$0"`
2945

3046
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
31-
DEFAULT_JVM_OPTS='"-Xmx64m"'
47+
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
3248

3349
# Use the maximum available, or set MAX_FD != -1 to use that value.
3450
MAX_FD="maximum"

gradlew.bat

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
@rem
2+
@rem Copyright 2015 the original author or authors.
3+
@rem
4+
@rem Licensed under the Apache License, Version 2.0 (the "License");
5+
@rem you may not use this file except in compliance with the License.
6+
@rem You may obtain a copy of the License at
7+
@rem
8+
@rem http://www.apache.org/licenses/LICENSE-2.0
9+
@rem
10+
@rem Unless required by applicable law or agreed to in writing, software
11+
@rem distributed under the License is distributed on an "AS IS" BASIS,
12+
@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
@rem See the License for the specific language governing permissions and
14+
@rem limitations under the License.
15+
@rem
16+
117
@if "%DEBUG%" == "" @echo off
218
@rem ##########################################################################
319
@rem
@@ -14,7 +30,7 @@ set APP_BASE_NAME=%~n0
1430
set APP_HOME=%DIRNAME%
1531

1632
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
17-
set DEFAULT_JVM_OPTS="-Xmx64m"
33+
set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m"
1834

1935
@rem Find java.exe
2036
if defined JAVA_HOME goto findJavaFromJavaHome

src/main/java/org/radarcns/hdfs/Application.java

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@
3131
import java.util.HashMap;
3232
import java.util.List;
3333
import java.util.Map;
34+
import java.util.concurrent.Executors;
35+
import java.util.concurrent.ScheduledExecutorService;
36+
import java.util.concurrent.TimeUnit;
3437
import org.radarcns.hdfs.accounting.Accountant;
3538
import org.radarcns.hdfs.config.HdfsSettings;
3639
import org.radarcns.hdfs.config.RestructureSettings;
@@ -58,10 +61,15 @@ public class Application implements FileStoreFactory {
5861
private final RecordPathFactory pathFactory;
5962
private final List<String> inputPaths;
6063
private final RestructureSettings settings;
64+
private final int pollInterval;
65+
private final boolean isService;
66+
private RadarHdfsRestructure hdfsReader;
6167

6268
private Application(Builder builder) {
6369
this.storageDriver = builder.storageDriver;
6470
this.settings = builder.settings;
71+
this.isService = builder.asService;
72+
this.pollInterval = builder.pollInterval;
6573

6674
converterFactory = builder.formatFactory.get(settings.getFormat());
6775
compression = builder.compressionFactory.get(settings.getCompression());
@@ -131,6 +139,8 @@ public static void main(String [] args) {
131139
.storageDriver(commandLineArgs.storageDriver)
132140
.properties(commandLineArgs.properties)
133141
.inputPaths(commandLineArgs.inputPaths)
142+
.asService(commandLineArgs.asService)
143+
.pollInterval(commandLineArgs.pollInterval)
134144
.build();
135145
} catch (IllegalArgumentException ex) {
136146
logger.error("HDFS High availability name node configuration is incomplete."
@@ -193,10 +203,41 @@ public void start() {
193203
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism",
194204
String.valueOf(settings.getNumThreads() - 1));
195205

196-
Instant timeStart = Instant.now();
197-
RadarHdfsRestructure hdfsReader = new RadarHdfsRestructure(this);
198206
try {
199207
Files.createDirectories(settings.getTempDir());
208+
} catch (IOException ex) {
209+
logger.error("Failed to create temporary directory");
210+
return;
211+
}
212+
213+
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
214+
executorService.execute(() -> hdfsReader = new RadarHdfsRestructure(this));
215+
216+
if (isService) {
217+
logger.info("Press Ctrl+C to exit...");
218+
executorService.scheduleAtFixedRate(this::runRestructure,
219+
pollInterval / 4, pollInterval, TimeUnit.SECONDS);
220+
} else {
221+
executorService.execute(this::runRestructure);
222+
}
223+
224+
try {
225+
Thread.sleep(Long.MAX_VALUE);
226+
} catch (InterruptedException e) {
227+
logger.info("Interrupted, shutting down...");
228+
executorService.shutdownNow();
229+
try {
230+
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
231+
Thread.currentThread().interrupt();
232+
} catch (InterruptedException ex) {
233+
logger.info("Interrupted again...");
234+
}
235+
}
236+
}
237+
238+
private void runRestructure() {
239+
Instant timeStart = Instant.now();
240+
try {
200241
for (String input : inputPaths) {
201242
logger.info("In: {}", input);
202243
logger.info("Out: {}", pathFactory.getRoot());
@@ -220,6 +261,8 @@ public static class Builder {
220261
private FormatFactory formatFactory;
221262
private Map<String, String> properties = new HashMap<>();
222263
private List<String> inputPaths;
264+
private boolean asService;
265+
private int pollInterval;
223266

224267
public Builder(RestructureSettings settings) {
225268
this.settings = settings;
@@ -256,6 +299,16 @@ public Builder properties(Map<String, String> props) {
256299
return this;
257300
}
258301

302+
public Builder asService(boolean asService) {
303+
this.asService = asService;
304+
return this;
305+
}
306+
307+
public Builder pollInterval(int pollInterval) {
308+
this.pollInterval = pollInterval;
309+
return this;
310+
}
311+
259312
public Application build() throws IOException {
260313
pathFactory = nonNullOrDefault(pathFactory, ObservationKeyPathFactory::new);
261314
pathFactory.init(properties);

src/main/java/org/radarcns/hdfs/util/commandline/CommandLineArgs.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,12 @@ public class CommandLineArgs {
9696
@Parameter(names = {"--exclude-topic"}, description = "Topic to exclude when processing the records. Can be supplied more than once to exclude multiple topics.")
9797
public List<String> excludeTopics = new ArrayList<>();
9898

99+
@Parameter(names = {"-S", "--service"}, description = "Run the output generation as a service")
100+
public boolean asService = false;
101+
102+
@Parameter(names = {"-i", "--interval"}, description = "Polling interval when running as a service (seconds)")
103+
public int pollInterval = 3600;
104+
99105
public static <T> T nonNullOrDefault(T value, Supplier<T> defaultValue) {
100106
return value != null ? value : defaultValue.get();
101107
}

0 commit comments

Comments
 (0)