3131import java .util .HashMap ;
3232import java .util .List ;
3333import java .util .Map ;
34+ import java .util .concurrent .Executors ;
35+ import java .util .concurrent .ScheduledExecutorService ;
36+ import java .util .concurrent .TimeUnit ;
3437import org .radarcns .hdfs .accounting .Accountant ;
3538import org .radarcns .hdfs .config .HdfsSettings ;
3639import org .radarcns .hdfs .config .RestructureSettings ;
@@ -58,10 +61,15 @@ public class Application implements FileStoreFactory {
5861 private final RecordPathFactory pathFactory ;
5962 private final List <String > inputPaths ;
6063 private final RestructureSettings settings ;
64+ private final int pollInterval ;
65+ private final boolean isService ;
66+ private RadarHdfsRestructure hdfsReader ;
6167
6268 private Application (Builder builder ) {
6369 this .storageDriver = builder .storageDriver ;
6470 this .settings = builder .settings ;
71+ this .isService = builder .asService ;
72+ this .pollInterval = builder .pollInterval ;
6573
6674 converterFactory = builder .formatFactory .get (settings .getFormat ());
6775 compression = builder .compressionFactory .get (settings .getCompression ());
@@ -131,6 +139,8 @@ public static void main(String [] args) {
131139 .storageDriver (commandLineArgs .storageDriver )
132140 .properties (commandLineArgs .properties )
133141 .inputPaths (commandLineArgs .inputPaths )
142+ .asService (commandLineArgs .asService )
143+ .pollInterval (commandLineArgs .pollInterval )
134144 .build ();
135145 } catch (IllegalArgumentException ex ) {
136146 logger .error ("HDFS High availability name node configuration is incomplete."
@@ -193,10 +203,41 @@ public void start() {
193203 System .setProperty ("java.util.concurrent.ForkJoinPool.common.parallelism" ,
194204 String .valueOf (settings .getNumThreads () - 1 ));
195205
196- Instant timeStart = Instant .now ();
197- RadarHdfsRestructure hdfsReader = new RadarHdfsRestructure (this );
198206 try {
199207 Files .createDirectories (settings .getTempDir ());
208+ } catch (IOException ex ) {
209+ logger .error ("Failed to create temporary directory" );
210+ return ;
211+ }
212+
213+ ScheduledExecutorService executorService = Executors .newSingleThreadScheduledExecutor ();
214+ executorService .execute (() -> hdfsReader = new RadarHdfsRestructure (this ));
215+
216+ if (isService ) {
217+ logger .info ("Press Ctrl+C to exit..." );
218+ executorService .scheduleAtFixedRate (this ::runRestructure ,
219+ pollInterval / 4 , pollInterval , TimeUnit .SECONDS );
220+ } else {
221+ executorService .execute (this ::runRestructure );
222+ }
223+
224+ try {
225+ Thread .sleep (Long .MAX_VALUE );
226+ } catch (InterruptedException e ) {
227+ logger .info ("Interrupted, shutting down..." );
228+ executorService .shutdownNow ();
229+ try {
230+ executorService .awaitTermination (Long .MAX_VALUE , TimeUnit .SECONDS );
231+ Thread .currentThread ().interrupt ();
232+ } catch (InterruptedException ex ) {
233+ logger .info ("Interrupted again..." );
234+ }
235+ }
236+ }
237+
238+ private void runRestructure () {
239+ Instant timeStart = Instant .now ();
240+ try {
200241 for (String input : inputPaths ) {
201242 logger .info ("In: {}" , input );
202243 logger .info ("Out: {}" , pathFactory .getRoot ());
@@ -220,6 +261,8 @@ public static class Builder {
220261 private FormatFactory formatFactory ;
221262 private Map <String , String > properties = new HashMap <>();
222263 private List <String > inputPaths ;
264+ private boolean asService ;
265+ private int pollInterval ;
223266
224267 public Builder (RestructureSettings settings ) {
225268 this .settings = settings ;
@@ -256,6 +299,16 @@ public Builder properties(Map<String, String> props) {
256299 return this ;
257300 }
258301
302+ public Builder asService (boolean asService ) {
303+ this .asService = asService ;
304+ return this ;
305+ }
306+
307+ public Builder pollInterval (int pollInterval ) {
308+ this .pollInterval = pollInterval ;
309+ return this ;
310+ }
311+
259312 public Application build () throws IOException {
260313 pathFactory = nonNullOrDefault (pathFactory , ObservationKeyPathFactory ::new );
261314 pathFactory .init (properties );
0 commit comments