|
46 | 46 | import org.apache.commons.io.FileUtils; |
47 | 47 | import org.apache.commons.lang3.StringUtils; |
48 | 48 | import org.apache.solr.common.SolrInputDocument; |
49 | | -import org.apache.zookeeper.CreateMode; |
50 | | -import org.apache.zookeeper.KeeperException; |
51 | | -import org.apache.zookeeper.Watcher; |
52 | | -import org.apache.zookeeper.ZooDefs; |
53 | | -import org.apache.zookeeper.ZooKeeper; |
54 | | -import org.apache.zookeeper.client.ZKClientConfig; |
55 | | -import org.apache.zookeeper.data.Stat; |
56 | 49 | import org.computate.i18n.I18n; |
57 | 50 | import org.computate.vertx.config.ComputateConfigKeys; |
58 | 51 | import org.slf4j.Logger; |
|
70 | 63 | import io.vertx.core.eventbus.DeliveryOptions; |
71 | 64 | import io.vertx.core.eventbus.Message; |
72 | 65 | import io.vertx.core.json.JsonObject; |
| 66 | +import io.vertx.core.shareddata.SharedData; |
73 | 67 |
|
74 | 68 | /** |
75 | 69 | * NomCanonique.enUS: org.computate.enUS.java.WatchDirectory |
@@ -181,7 +175,6 @@ public class RegarderRepertoire extends AbstractVerticle { |
181 | 175 | protected String COMPUTATE_VERTX_SRC; |
182 | 176 |
|
183 | 177 | protected WorkerExecutor workerExecutor; |
184 | | - protected ZooKeeper zookeeper; |
185 | 178 |
|
186 | 179 | /** |
187 | 180 | * r: SITE_NOM |
@@ -590,40 +583,45 @@ protected Path enregistrerTout(final Path demarrer) throws IOException { |
590 | 583 | private void regarderClasseEvenement(Message<Object> message) { |
591 | 584 | workerExecutor.executeBlocking(() -> { |
592 | 585 | Promise<Void> promise = Promise.promise(); |
593 | | - String zookeeperNodeName = null; |
594 | | - Stat zookeeperStat = null; |
595 | | - String cheminCompletStr = null; |
| 586 | + String orderLock = null; |
596 | 587 | try { |
597 | 588 | JsonObject body = ((JsonObject)message.body()).getJsonObject("context").getJsonObject("params").getJsonObject("body"); |
598 | | - cheminCompletStr = body.getString("cheminComplet"); |
| 589 | + String cheminCompletStr = body.getString("cheminComplet"); |
599 | 590 | LOG.debug(String.format("Received request on the event bus: %s", cheminCompletStr)); |
600 | 591 | Path cheminComplet = Path.of(cheminCompletStr); |
601 | | - zookeeperStat = new Stat(); |
602 | | - zookeeperNodeName = String.format("/%s/%s", ZOOKEEPER_ROOT_PATH, cheminCompletStr.replace("/", "-")); |
603 | | - zookeeper.create(zookeeperNodeName, "reserved".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, zookeeperStat); |
604 | | - String classeCheminAbsolu = cheminComplet.toAbsolutePath().toString(); |
605 | | - String cp = FileUtils.readFileToString(new File(COMPUTATE_SRC + "/config/cp.txt"), "UTF-8"); |
606 | | - String classpath = String.format("%s:%s/target/classes", cp, COMPUTATE_SRC); |
607 | | - CommandLine ligneCommande = new CommandLine("java"); |
608 | | - ligneCommande.addArgument("-cp"); |
609 | | - ligneCommande.addArgument(classpath); |
610 | | - ligneCommande.addArgument(RegarderClasse.class.getCanonicalName()); |
611 | | - ligneCommande.addArgument(classeCheminRepertoireAppli); |
612 | | - ligneCommande.addArgument(classeCheminAbsolu); |
613 | | - File repertoireTravail = new File(COMPUTATE_SRC); |
614 | | - |
615 | | - executeur.setWorkingDirectory(repertoireTravail); |
616 | | - executeur.execute(ligneCommande); |
617 | | - String classeNomSimple = StringUtils.substringBeforeLast(cheminComplet.getFileName().toString(), "."); |
618 | | - String log = String.format(classeLangueConfig.getString(I18n.str_chemin_absolu), classeNomSimple); |
619 | | - LOG.info(log); |
620 | | - promise.complete(); |
| 592 | + orderLock = String.format("/%s/%s", ZOOKEEPER_ROOT_PATH, cheminCompletStr.replace("/", "-")); |
| 593 | + SharedData sharedData = vertx.sharedData(); |
| 594 | + sharedData.getLocalLockWithTimeout(orderLock, config().getLong(ComputateConfigKeys.ZOOKEEPER_CONNECTION_TIMEOUT_MILLIS, 3000L)).onSuccess(lock -> { |
| 595 | + try { |
| 596 | + String classeCheminAbsolu = cheminComplet.toAbsolutePath().toString(); |
| 597 | + String cp = FileUtils.readFileToString(new File(COMPUTATE_SRC + "/config/cp.txt"), "UTF-8"); |
| 598 | + String classpath = String.format("%s:%s/target/classes", cp, COMPUTATE_SRC); |
| 599 | + CommandLine ligneCommande = new CommandLine("java"); |
| 600 | + ligneCommande.addArgument("-cp"); |
| 601 | + ligneCommande.addArgument(classpath); |
| 602 | + ligneCommande.addArgument(RegarderClasse.class.getCanonicalName()); |
| 603 | + ligneCommande.addArgument(classeCheminRepertoireAppli); |
| 604 | + ligneCommande.addArgument(classeCheminAbsolu); |
| 605 | + File repertoireTravail = new File(COMPUTATE_SRC); |
| 606 | + |
| 607 | + executeur.setWorkingDirectory(repertoireTravail); |
| 608 | + executeur.execute(ligneCommande); |
| 609 | + String classeNomSimple = StringUtils.substringBeforeLast(cheminComplet.getFileName().toString(), "."); |
| 610 | + String log = String.format(classeLangueConfig.getString(I18n.str_chemin_absolu), classeNomSimple); |
| 611 | + LOG.info(log); |
| 612 | + promise.complete(); |
| 613 | + lock.release(); |
| 614 | + } catch(Exception ex) { |
| 615 | + LOG.error(String.format(classeLangueConfig.getString(I18n.str_UneProblemeExecutionRegarderRepertoire), cheminCompletStr), ex); |
| 616 | + promise.fail(ex); |
| 617 | + lock.release(); |
| 618 | + } |
| 619 | + }).onFailure(ex -> { |
| 620 | + promise.complete(); |
| 621 | + }); |
621 | 622 | } catch(Exception ex) { |
622 | | - if(!(ex instanceof KeeperException.NodeExistsException)) |
623 | | - LOG.error(String.format(classeLangueConfig.getString(I18n.str_UneProblemeExecutionRegarderRepertoire), cheminCompletStr), ex); |
| 623 | + LOG.error(String.format(classeLangueConfig.getString(I18n.str_UneProblemeExecutionRegarderRepertoire), ((JsonObject)message.body()).getJsonObject("context").getJsonObject("params").getJsonObject("body").getString("cheminComplet")), ex); |
624 | 624 | promise.fail(ex); |
625 | | - } finally { |
626 | | - zookeeper.delete(zookeeperNodeName, zookeeperStat.getVersion()); |
627 | 625 | } |
628 | 626 | return promise.future(); |
629 | 627 | }); |
@@ -815,25 +813,6 @@ public void start(Promise<Void> startPromise) throws Exception { |
815 | 813 | cheminSrcGenJava = SITE_SRC + "/src/gen/java"; |
816 | 814 | cheminsBin.add(SITE_SRC + "/src/main/resources"); |
817 | 815 |
|
818 | | - String zkHostName = configuration.getString("ZOOKEEPER_HOST_NAME"); |
819 | | - Integer zkPort = Integer.parseInt(configuration.getString("ZOOKEEPER_PORT")); |
820 | | - String zkConnectionString = String.format("%s:%s", zkHostName, zkPort); |
821 | | - ZKClientConfig zkClientConfig = new ZKClientConfig(); |
822 | | - Integer zkSessionTimeoutMillis = Integer.parseInt(configuration.getString("ZOOKEEPER_CONNECTION_TIMEOUT_MILLIS", "3000")); |
823 | | - CountDownLatch connectionLatch = new CountDownLatch(1); |
824 | | - zookeeper = new ZooKeeper(zkConnectionString, zkSessionTimeoutMillis, event -> { |
825 | | - if (event.getState() == Watcher.Event.KeeperState.SyncConnected) { |
826 | | - connectionLatch.countDown(); |
827 | | - } |
828 | | - }, zkClientConfig); |
829 | | - connectionLatch.await(); |
830 | | - Stat zookeeperStat = new Stat(); |
831 | | - try { |
832 | | - zookeeper.create(String.format("/%s", ZOOKEEPER_ROOT_PATH), "reserved".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, zookeeperStat); |
833 | | - } catch(KeeperException.NodeExistsException ex) { |
834 | | - LOG.info(String.format("The zookeeper root node already exists: %s", String.format("/%s", ZOOKEEPER_ROOT_PATH))); |
835 | | - } |
836 | | - |
837 | 816 | trace = true; |
838 | 817 | initialiserRegarderRepertoire(classeLangueConfig); |
839 | 818 | ajouterCheminsARegarder(classeLangueConfig, REGARDER); |
|
0 commit comments