Skip to content

Commit 9f718bb

Browse files
authored
Merge pull request #46 from RADAR-base/asService
Run the HDFS restructuring as a service
2 parents 999fc65 + 1fde67a commit 9f718bb

File tree

4 files changed

+65
-2
lines changed

4 files changed

+65
-2
lines changed

.travis.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
language: java
2+
# Force use of dist where oraclejdk8 is supported
3+
dist: trusty
24
jdk:
35
- oraclejdk8
46
sudo: false

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ By default, files records are not deduplicated after writing. To enable this beh
6060

6161
To set the output user ID and group ID, specify the `-p local-uid=123` and `-p local-gid=12` properties.
6262

63+
To run the output generator as a service that will regularly poll the HDFS directory, add the `--service` flag and optionally the `--interval` flag to adjust the polling interval.
64+
6365
## Extending the connector
6466

6567
To implement alternative storage paths, storage drivers or storage formats, put your custom JAR in

src/main/java/org/radarcns/hdfs/Application.java

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@
3131
import java.util.HashMap;
3232
import java.util.List;
3333
import java.util.Map;
34+
import java.util.concurrent.Executors;
35+
import java.util.concurrent.ScheduledExecutorService;
36+
import java.util.concurrent.TimeUnit;
3437
import org.radarcns.hdfs.accounting.Accountant;
3538
import org.radarcns.hdfs.config.HdfsSettings;
3639
import org.radarcns.hdfs.config.RestructureSettings;
@@ -58,10 +61,15 @@ public class Application implements FileStoreFactory {
5861
private final RecordPathFactory pathFactory;
5962
private final List<String> inputPaths;
6063
private final RestructureSettings settings;
64+
private final int pollInterval;
65+
private final boolean isService;
66+
private RadarHdfsRestructure hdfsReader;
6167

6268
private Application(Builder builder) {
6369
this.storageDriver = builder.storageDriver;
6470
this.settings = builder.settings;
71+
this.isService = builder.asService;
72+
this.pollInterval = builder.pollInterval;
6573

6674
converterFactory = builder.formatFactory.get(settings.getFormat());
6775
compression = builder.compressionFactory.get(settings.getCompression());
@@ -131,6 +139,8 @@ public static void main(String [] args) {
131139
.storageDriver(commandLineArgs.storageDriver)
132140
.properties(commandLineArgs.properties)
133141
.inputPaths(commandLineArgs.inputPaths)
142+
.asService(commandLineArgs.asService)
143+
.pollInterval(commandLineArgs.pollInterval)
134144
.build();
135145
} catch (IllegalArgumentException ex) {
136146
logger.error("HDFS High availability name node configuration is incomplete."
@@ -193,10 +203,41 @@ public void start() {
193203
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism",
194204
String.valueOf(settings.getNumThreads() - 1));
195205

196-
Instant timeStart = Instant.now();
197-
RadarHdfsRestructure hdfsReader = new RadarHdfsRestructure(this);
198206
try {
199207
Files.createDirectories(settings.getTempDir());
208+
} catch (IOException ex) {
209+
logger.error("Failed to create temporary directory");
210+
return;
211+
}
212+
213+
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
214+
executorService.execute(() -> hdfsReader = new RadarHdfsRestructure(this));
215+
216+
if (isService) {
217+
logger.info("Press Ctrl+C to exit...");
218+
executorService.scheduleAtFixedRate(this::runRestructure,
219+
pollInterval / 4, pollInterval, TimeUnit.SECONDS);
220+
} else {
221+
executorService.execute(this::runRestructure);
222+
}
223+
224+
try {
225+
Thread.sleep(Long.MAX_VALUE);
226+
} catch (InterruptedException e) {
227+
logger.info("Interrupted, shutting down...");
228+
executorService.shutdownNow();
229+
try {
230+
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
231+
Thread.currentThread().interrupt();
232+
} catch (InterruptedException ex) {
233+
logger.info("Interrupted again...");
234+
}
235+
}
236+
}
237+
238+
private void runRestructure() {
239+
Instant timeStart = Instant.now();
240+
try {
200241
for (String input : inputPaths) {
201242
logger.info("In: {}", input);
202243
logger.info("Out: {}", pathFactory.getRoot());
@@ -220,6 +261,8 @@ public static class Builder {
220261
private FormatFactory formatFactory;
221262
private Map<String, String> properties = new HashMap<>();
222263
private List<String> inputPaths;
264+
private boolean asService;
265+
private int pollInterval;
223266

224267
public Builder(RestructureSettings settings) {
225268
this.settings = settings;
@@ -256,6 +299,16 @@ public Builder properties(Map<String, String> props) {
256299
return this;
257300
}
258301

302+
public Builder asService(boolean asService) {
303+
this.asService = asService;
304+
return this;
305+
}
306+
307+
public Builder pollInterval(int pollInterval) {
308+
this.pollInterval = pollInterval;
309+
return this;
310+
}
311+
259312
public Application build() throws IOException {
260313
pathFactory = nonNullOrDefault(pathFactory, ObservationKeyPathFactory::new);
261314
pathFactory.init(properties);

src/main/java/org/radarcns/hdfs/util/commandline/CommandLineArgs.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,12 @@ public class CommandLineArgs {
9696
@Parameter(names = {"--exclude-topic"}, description = "Topic to exclude when processing the records. Can be supplied more than once to exclude multiple topics.")
9797
public List<String> excludeTopics = new ArrayList<>();
9898

99+
@Parameter(names = {"-S", "--service"}, description = "Run the output generation as a service")
100+
public boolean asService = false;
101+
102+
@Parameter(names = {"-i", "--interval"}, description = "Polling interval when running as a service (seconds)")
103+
public int pollInterval = 3600;
104+
99105
public static <T> T nonNullOrDefault(T value, Supplier<T> defaultValue) {
100106
return value != null ? value : defaultValue.get();
101107
}

0 commit comments

Comments
 (0)