|
16 | 16 | import java.util.concurrent.ScheduledFuture;
|
17 | 17 | import java.util.concurrent.TimeUnit;
|
18 | 18 | import java.util.concurrent.atomic.AtomicBoolean;
|
| 19 | +import java.util.concurrent.atomic.AtomicInteger; |
19 | 20 | import java.util.concurrent.atomic.AtomicReference;
|
20 | 21 |
|
21 | 22 | import io.kubernetes.client.ApiException;
|
@@ -89,6 +90,14 @@ public class Main {
|
89 | 90 | private static final FiberGate domainUpdaters = new FiberGate(engine);
|
90 | 91 | private static final ConcurrentMap<String, DomainPresenceInfo> domains = new ConcurrentHashMap<String, DomainPresenceInfo>();
|
91 | 92 |
|
| 93 | + private static final ConfigMapConsumer config = new ConfigMapConsumer("/operator/config"); |
| 94 | + |
| 95 | + // tuning parameters |
| 96 | + private static final int statusUpdateTimeoutSeconds = (int) readTuningParameter("statusUpdateTimeoutSeconds", 10); |
| 97 | + private static final int unchangedCountToDelayStatusRecheck = (int) readTuningParameter("unchangedCountToDelayStatusRecheck", 10); |
| 98 | + private static final long initialShortDelay = readTuningParameter("initialShortDelay", 3); |
| 99 | + private static final long eventualLongDelay = readTuningParameter("eventualLongDelay", 30); |
| 100 | + |
92 | 101 | private static final ConcurrentMap<String, Boolean> initialized = new ConcurrentHashMap<>();
|
93 | 102 | private static final AtomicBoolean stopping = new AtomicBoolean(false);
|
94 | 103 |
|
@@ -122,8 +131,7 @@ public static void main(String[] args) {
|
122 | 131 |
|
123 | 132 | Collection<String> targetNamespaces = getTargetNamespaces(namespace);
|
124 | 133 |
|
125 |
| - ConfigMapConsumer cmc = new ConfigMapConsumer("/operator/config"); |
126 |
| - String serviceAccountName = cmc.get("serviceaccount"); |
| 134 | + String serviceAccountName = config.get("serviceaccount"); |
127 | 135 | if (serviceAccountName == null) {
|
128 | 136 | serviceAccountName = "default";
|
129 | 137 | }
|
@@ -409,6 +417,19 @@ private static void normalizeDomainSpec(DomainSpec spec) {
|
409 | 417 | }
|
410 | 418 | }
|
411 | 419 |
|
| 420 | + private static long readTuningParameter(String parameter, long defaultValue) { |
| 421 | + String val = config.get(parameter); |
| 422 | + if (val != null) { |
| 423 | + try { |
| 424 | + return Long.parseLong(val); |
| 425 | + } catch (NumberFormatException nfe) { |
| 426 | + LOGGER.warning(MessageKeys.EXCEPTION, nfe); |
| 427 | + } |
| 428 | + } |
| 429 | + |
| 430 | + return defaultValue; |
| 431 | + } |
| 432 | + |
412 | 433 | /**
|
413 | 434 | * Restarts the admin server, if already running
|
414 | 435 | * @param principal Service principal
|
@@ -458,43 +479,51 @@ public static void doRollingRestartClusters(String principal, String domainUID,
|
458 | 479 | }
|
459 | 480 | }
|
460 | 481 |
|
461 |
| - private static void scheduleDomainStatusUpdating(DomainPresenceInfo info, long initialShortDelay, long eventualLongDelay, TimeUnit timeUnit) { |
| 482 | + private static void scheduleDomainStatusUpdating(DomainPresenceInfo info) { |
462 | 483 | String domainUID = info.getDomain().getSpec().getDomainUID();
|
| 484 | + AtomicInteger unchangedCount = new AtomicInteger(0); |
463 | 485 | AtomicReference<ScheduledFuture<?>> statusUpdater = info.getStatusUpdater();
|
464 | 486 | ScheduledFuture<?> existing = statusUpdater.get();
|
465 |
| - if (existing == null || !validateExisting(initialShortDelay, timeUnit, existing)) { |
| 487 | + if (existing == null || !validateExisting(initialShortDelay, existing)) { |
466 | 488 | Runnable command = new Runnable() {
|
467 | 489 | public void run() {
|
468 |
| - Runnable r = this; // resolve visibility later |
| 490 | + Runnable r = this; // resolve visibility |
469 | 491 | Packet packet = new Packet();
|
470 | 492 | packet.getComponents().put(ProcessingConstants.DOMAIN_COMPONENT_NAME, Component.createFor(info, version));
|
471 |
| - Step strategy = DomainStatusUpdater.createStatusStep(10, null); // FIXME: configure |
| 493 | + Step strategy = DomainStatusUpdater.createStatusStep(statusUpdateTimeoutSeconds, null); |
472 | 494 | domainUpdaters.startFiberIfNoCurrentFiber(domainUID, strategy, packet, new CompletionCallback() {
|
473 | 495 | @Override
|
474 | 496 | public void onCompletion(Packet packet) {
|
475 |
| - Boolean isStatusClean = (Boolean) packet.get(ProcessingConstants.CLEAN_STATUS); |
476 |
| - long delay = Boolean.TRUE.equals(isStatusClean) ? eventualLongDelay : initialShortDelay; |
| 497 | + Boolean isStatusUnchanged = (Boolean) packet.get(ProcessingConstants.STATUS_UNCHANGED); |
| 498 | + long delay = initialShortDelay; |
| 499 | + if (Boolean.TRUE.equals(isStatusUnchanged)) { |
| 500 | + if (unchangedCount.incrementAndGet() > unchangedCountToDelayStatusRecheck) { |
| 501 | + delay = eventualLongDelay; |
| 502 | + } |
| 503 | + } else { |
| 504 | + unchangedCount.set(0); |
| 505 | + } |
477 | 506 | // retry after delay
|
478 |
| - statusUpdater.set(engine.getExecutor().schedule(r, delay, timeUnit)); |
| 507 | + statusUpdater.set(engine.getExecutor().schedule(r, delay, TimeUnit.SECONDS)); |
479 | 508 | }
|
480 | 509 |
|
481 | 510 | @Override
|
482 | 511 | public void onThrowable(Packet packet, Throwable throwable) {
|
483 | 512 | LOGGER.severe(MessageKeys.EXCEPTION, throwable);
|
484 | 513 | // retry after delay
|
485 |
| - statusUpdater.set(engine.getExecutor().schedule(r, initialShortDelay, timeUnit)); |
| 514 | + statusUpdater.set(engine.getExecutor().schedule(r, initialShortDelay, TimeUnit.SECONDS)); |
486 | 515 | }
|
487 | 516 | });
|
488 | 517 | }
|
489 | 518 | };
|
490 |
| - statusUpdater.set(engine.getExecutor().schedule(command, initialShortDelay, timeUnit)); |
| 519 | + statusUpdater.set(engine.getExecutor().schedule(command, initialShortDelay, TimeUnit.SECONDS)); |
491 | 520 | }
|
492 | 521 | }
|
493 | 522 |
|
494 |
| - private static boolean validateExisting(long initialShortDelay, TimeUnit timeUnit, ScheduledFuture<?> existing) { |
| 523 | + private static boolean validateExisting(long initialShortDelay, ScheduledFuture<?> existing) { |
495 | 524 | if (existing.isCancelled())
|
496 | 525 | return false;
|
497 |
| - return existing.getDelay(timeUnit) <= initialShortDelay; |
| 526 | + return existing.getDelay(TimeUnit.SECONDS) <= initialShortDelay; |
498 | 527 | }
|
499 | 528 |
|
500 | 529 | private static void cancelDomainStatusUpdating(DomainPresenceInfo info) {
|
@@ -542,7 +571,7 @@ private static void doCheckAndCreateDomainPresence(
|
542 | 571 | }
|
543 | 572 | info.setDomain(dom);
|
544 | 573 | }
|
545 |
| - scheduleDomainStatusUpdating(info, 3, 30, TimeUnit.SECONDS); // FIXME: configure |
| 574 | + scheduleDomainStatusUpdating(info); |
546 | 575 |
|
547 | 576 | LOGGER.info(MessageKeys.PROCESSING_DOMAIN, domainUID);
|
548 | 577 |
|
|
0 commit comments