3636import java .util .LinkedHashMap ;
3737import java .util .Map ;
3838import java .util .Optional ;
39+ import java .util .concurrent .CountDownLatch ;
3940import java .util .concurrent .TimeUnit ;
4041import java .util .function .Supplier ;
4142import java .util .stream .Stream ;
4445import org .apache .commons .exec .DefaultExecutor ;
4546import org .apache .commons .io .FileUtils ;
4647import org .apache .commons .lang3 .StringUtils ;
47- import org .apache .commons .lang3 .exception .ExceptionUtils ;
4848import org .apache .solr .common .SolrInputDocument ;
49+ import org .apache .zookeeper .CreateMode ;
50+ import org .apache .zookeeper .KeeperException ;
51+ import org .apache .zookeeper .Watcher ;
52+ import org .apache .zookeeper .ZooDefs ;
53+ import org .apache .zookeeper .ZooKeeper ;
54+ import org .apache .zookeeper .client .ZKClientConfig ;
55+ import org .apache .zookeeper .data .Stat ;
56+ import org .computate .i18n .I18n ;
57+ import org .computate .vertx .config .ComputateConfigKeys ;
4958import org .slf4j .Logger ;
5059
5160import com .hubspot .jinjava .Jinjava ;
6271import io .vertx .core .eventbus .Message ;
6372import io .vertx .core .json .JsonObject ;
6473
65- import org .computate .i18n .I18n ;
66- import org .computate .vertx .config .ComputateConfigKeys ;
67-
6874/**
6975 * NomCanonique.enUS: org.computate.enUS.java.WatchDirectory
7076 */
@@ -166,6 +172,7 @@ public class RegarderRepertoire extends AbstractVerticle {
166172 * Var.enUS: SITE_NAME
167173 */
168174 protected String SITE_NOM ;
175+ protected String ZOOKEEPER_ROOT_PATH ;
169176 /**
170177 * Var.enUS: SITE_PATH
171178 */
@@ -174,6 +181,7 @@ public class RegarderRepertoire extends AbstractVerticle {
174181 protected String COMPUTATE_VERTX_SRC ;
175182
176183 protected WorkerExecutor workerExecutor ;
184+ protected ZooKeeper zookeeper ;
177185
178186 /**
179187 * r: SITE_NOM
@@ -218,6 +226,7 @@ public static void main(String[] args) throws Exception {
218226
219227 String SITE_NOM = regarderRepertoire .configuration .getString (classeLangueConfig .getString ("var_SITE_NOM" ));
220228 String SITE_SRC = regarderRepertoire .configuration .getString (classeLangueConfig .getString ("var_SITE_SRC" ));
229+ String ZOOKEEPER_ROOT_PATH = regarderRepertoire .configuration .getString ("ZOOKEEPER_ROOT_PATH" );
221230 Boolean REGARDER = Boolean .parseBoolean (Optional .ofNullable (System .getenv (classeLangueConfig .getString ("var_REGARDER" ))).orElse ("true" ));
222231 Boolean REGARDER_MAINTENANT = Boolean .parseBoolean (Optional .ofNullable (System .getenv (classeLangueConfig .getString ("var_REGARDER_MAINTENANT" ))).orElse ("false" ));
223232 Boolean GENERER = Boolean .parseBoolean (Optional .ofNullable (System .getenv (classeLangueConfig .getString ("var_GENERER" ))).orElse ("true" ));
@@ -226,6 +235,7 @@ public static void main(String[] args) throws Exception {
226235 regarderRepertoire .langueNom = lang ;
227236 regarderRepertoire .SITE_NOM = SITE_NOM ;
228237 regarderRepertoire .SITE_SRC = SITE_SRC ;
238+ regarderRepertoire .ZOOKEEPER_ROOT_PATH = ZOOKEEPER_ROOT_PATH ;
229239 regarderRepertoire .COMPUTATE_SRC = appComputate ;
230240 regarderRepertoire .COMPUTATE_VERTX_SRC = appComputateVertx ;
231241 regarderRepertoire .classeCheminRepertoireAppli = SITE_SRC ;
@@ -585,6 +595,9 @@ private void regarderClasseEvenement(Message<Object> message) {
585595 String cheminCompletStr = body .getString ("cheminComplet" );
586596 LOG .debug (String .format ("Received request on the event bus: %s" , cheminCompletStr ));
587597 Path cheminComplet = Path .of (cheminCompletStr );
598+ Stat zookeeperStat = new Stat ();
599+ String zookeeperNodeName = String .format ("/%s%s" , ZOOKEEPER_ROOT_PATH , cheminCompletStr .replace ("/" , "-" ));
600+ zookeeper .create (zookeeperNodeName , "reserved" .getBytes (), ZooDefs .Ids .OPEN_ACL_UNSAFE , CreateMode .EPHEMERAL , zookeeperStat );
588601 String classeCheminAbsolu = cheminComplet .toAbsolutePath ().toString ();
589602 String cp = FileUtils .readFileToString (new File (COMPUTATE_SRC + "/config/cp.txt" ), "UTF-8" );
590603 String classpath = String .format ("%s:%s/target/classes" , cp , COMPUTATE_SRC );
@@ -598,12 +611,14 @@ private void regarderClasseEvenement(Message<Object> message) {
598611
599612 executeur .setWorkingDirectory (repertoireTravail );
600613 executeur .execute (ligneCommande );
614+ zookeeper .delete (zookeeperNodeName , zookeeperStat .getVersion ());
601615 String classeNomSimple = StringUtils .substringBeforeLast (cheminComplet .getFileName ().toString (), "." );
602616 String log = String .format (classeLangueConfig .getString (I18n .str_chemin_absolu ), classeNomSimple );
603617 LOG .info (log );
604618 promise .complete ();
605619 } catch (Exception ex ) {
606- LOG .error ("Une Problème d'exécution de RegarderRepertoire. " , ex );
620+ if (!(ex instanceof KeeperException ))
621+ LOG .error ("Une Problème d'exécution de RegarderRepertoire. " , ex );
607622 }
608623 return promise .future ();
609624 });
@@ -657,7 +672,7 @@ protected void traiterEvenements(JsonObject classeLangueConfig) {
657672 VertxOptions vertxOptions = new VertxOptions ();
658673 Long vertxWarningExceptionSeconds = configuration .getLong (ComputateConfigKeys .VERTX_WARNING_EXCEPTION_SECONDS );
659674 Long vertxMaxEventLoopExecuteTime = configuration .getLong (ComputateConfigKeys .VERTX_MAX_EVENT_LOOP_EXECUTE_TIME );
660- Long vertxMaxWorkerExecuteTime = configuration . getLong ( ComputateConfigKeys . VERTX_MAX_WORKER_EXECUTE_TIME ) ;
675+ Long vertxMaxWorkerExecuteTime = Long . MAX_VALUE ;
661676 vertxOptions .setWarningExceptionTime (vertxWarningExceptionSeconds );
662677 vertxOptions .setWarningExceptionTimeUnit (TimeUnit .SECONDS );
663678 vertxOptions .setMaxEventLoopExecuteTime (vertxMaxEventLoopExecuteTime );
@@ -782,6 +797,7 @@ public void start(Promise<Void> startPromise) throws Exception {
782797
783798 SITE_NOM = configuration .getString (classeLangueConfig .getString ("var_SITE_NOM" ));
784799 SITE_SRC = configuration .getString (classeLangueConfig .getString ("var_SITE_SRC" ));
800+ ZOOKEEPER_ROOT_PATH = configuration .getString ("ZOOKEEPER_ROOT_PATH" );
785801 Boolean REGARDER = Boolean .parseBoolean (Optional .ofNullable (System .getenv (classeLangueConfig .getString ("var_REGARDER" ))).orElse ("true" ));
786802 Boolean REGARDER_MAINTENANT = Boolean .parseBoolean (Optional .ofNullable (System .getenv (classeLangueConfig .getString ("var_REGARDER_MAINTENANT" ))).orElse ("false" ));
787803 Boolean GENERER = Boolean .parseBoolean (Optional .ofNullable (System .getenv (classeLangueConfig .getString ("var_GENERER" ))).orElse ("true" ));
@@ -796,6 +812,18 @@ public void start(Promise<Void> startPromise) throws Exception {
796812 cheminSrcGenJava = SITE_SRC + "/src/gen/java" ;
797813 cheminsBin .add (SITE_SRC + "/src/main/resources" );
798814
815+ String zkHostName = configuration .getString ("ZOOKEEPER_HOST_NAME" );
816+ Integer zkPort = Integer .parseInt (configuration .getString ("ZOOKEEPER_PORT" ));
817+ String zkConnectionString = String .format ("%s:%s" , zkHostName , zkPort );
818+ ZKClientConfig zkClientConfig = new ZKClientConfig ();
819+ Integer zkSessionTimeoutMillis = Integer .parseInt (configuration .getString ("ZOOKEEPER_CONNECTION_TIMEOUT_MILLIS" , "3000" ));
820+ CountDownLatch connectionLatch = new CountDownLatch (1 );
821+ zookeeper = new ZooKeeper (zkConnectionString , zkSessionTimeoutMillis , event -> {
822+ if (event .getState () == Watcher .Event .KeeperState .SyncConnected ) {
823+ connectionLatch .countDown ();
824+ }
825+ }, zkClientConfig );
826+ connectionLatch .await ();
799827
800828 trace = true ;
801829 initialiserRegarderRepertoire (classeLangueConfig );
@@ -805,7 +833,6 @@ public void start(Promise<Void> startPromise) throws Exception {
805833 regarderClasseEvenement (message );
806834 });
807835 Long vertxMaxWorkerExecuteTime = configuration .getLong (ComputateConfigKeys .VERTX_MAX_WORKER_EXECUTE_TIME );
808- // Integer workerPoolSize = Integer.parseInt(configuration.getString(ComputateConfigKeys.WORKER_POOL_SIZE));
809836 Integer workerPoolSize = 1 ;
810837 workerExecutor = vertx .createSharedWorkerExecutor (String .format ("%s-worker" , Thread .currentThread ().getName ()), workerPoolSize , vertxMaxWorkerExecuteTime , TimeUnit .SECONDS );
811838 startPromise .complete ();
0 commit comments