diff --git a/conf/docker-config.json b/conf/docker-config.json index f346045bc..d2927f044 100644 --- a/conf/docker-config.json +++ b/conf/docker-config.json @@ -40,7 +40,7 @@ "enclave_platform": null, "failure_shutdown_wait_hours": 120, "salts_expired_shutdown_hours": 12, - "keysetkeys_failed_shutdown_hours": 168, + "store_refresh_stale_shutdown_hours": 12, "operator_type": "public", "disable_optout_token": true, "enable_remote_config": true, diff --git a/conf/integ-config.json b/conf/integ-config.json index a9b0b9049..f7b16e833 100644 --- a/conf/integ-config.json +++ b/conf/integ-config.json @@ -16,7 +16,7 @@ "cloud_encryption_keys_metadata_path": "http://localhost:8088/cloud_encryption_keys/retrieve", "runtime_config_metadata_path": "http://localhost:8088/operator/config", "salts_expired_shutdown_hours": 12, - "keysetkeys_failed_shutdown_hours": 168, + "store_refresh_stale_shutdown_hours": 12, "operator_type": "public", "disable_optout_token": true, "enable_remote_config": false, diff --git a/conf/local-config.json b/conf/local-config.json index 52697a1c0..0d8720483 100644 --- a/conf/local-config.json +++ b/conf/local-config.json @@ -38,7 +38,7 @@ "key_sharing_endpoint_provide_app_names": true, "client_side_token_generate_log_invalid_http_origins": true, "salts_expired_shutdown_hours": 12, - "keysetkeys_failed_shutdown_hours": 168, + "store_refresh_stale_shutdown_hours": 12, "operator_type": "public", "encrypted_files": false, "disable_optout_token": true, diff --git a/conf/local-e2e-docker-private-config.json b/conf/local-e2e-docker-private-config.json index bdf259c84..629a2c998 100644 --- a/conf/local-e2e-docker-private-config.json +++ b/conf/local-e2e-docker-private-config.json @@ -30,7 +30,7 @@ "optout_delta_rotate_interval": 60, "cloud_refresh_interval": 30, "salts_expired_shutdown_hours": 12, - "keysetkeys_failed_shutdown_hours": 168, + "store_refresh_stale_shutdown_hours": 12, "operator_type": "private", "enable_remote_config": true, "uid_instance_id_prefix": "local-private-operator" diff --git a/conf/local-e2e-docker-public-config.json b/conf/local-e2e-docker-public-config.json index c4c899a63..7610e9bc7 100644 --- a/conf/local-e2e-docker-public-config.json +++ b/conf/local-e2e-docker-public-config.json @@ -36,7 +36,7 @@ "optout_status_api_enabled": true, "cloud_refresh_interval": 30, "salts_expired_shutdown_hours": 12, - "keysetkeys_failed_shutdown_hours": 168, + "store_refresh_stale_shutdown_hours": 12, "operator_type": "public", "disable_optout_token": true, "enable_remote_config": true, diff --git a/conf/local-e2e-private-config.json b/conf/local-e2e-private-config.json index a69994d47..5e3c39a0e 100644 --- a/conf/local-e2e-private-config.json +++ b/conf/local-e2e-private-config.json @@ -41,7 +41,7 @@ "client_side_token_generate_domain_name_check_enabled": false, "client_side_token_generate_log_invalid_http_origins": true, "salts_expired_shutdown_hours": 12, - "keysetkeys_failed_shutdown_hours": 168, + "store_refresh_stale_shutdown_hours": 12, "operator_type": "private", "enable_remote_config": true, "uid_instance_id_prefix": "local-private-operator" diff --git a/conf/local-e2e-public-config.json b/conf/local-e2e-public-config.json index 075481496..80459743b 100644 --- a/conf/local-e2e-public-config.json +++ b/conf/local-e2e-public-config.json @@ -42,7 +42,7 @@ "key_sharing_endpoint_provide_app_names": true, "client_side_token_generate_log_invalid_http_origins": true, "salts_expired_shutdown_hours": 12, - "keysetkeys_failed_shutdown_hours": 168, + "store_refresh_stale_shutdown_hours": 12, "operator_type": "public", "disable_optout_token": true, "enable_remote_config": true, diff --git a/pom.xml b/pom.xml index 85f1dec90..3174ee817 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,7 @@ 2.1.0 2.1.13 2.1.0 - 11.1.80 + 11.1.91 ${project.version} 21 21 diff --git a/src/main/java/com/uid2/operator/Main.java b/src/main/java/com/uid2/operator/Main.java index c46a080a0..44f2fbe4a 100644 --- a/src/main/java/com/uid2/operator/Main.java +++ b/src/main/java/com/uid2/operator/Main.java @@ -117,7 +117,7 @@ public Main(Vertx vertx, JsonObject config) throws Exception { this.encryptedCloudFilesEnabled = config.getBoolean(Const.Config.EncryptedFiles, false); this.shutdownHandler = new OperatorShutdownHandler(Duration.ofHours(12), Duration.ofHours(config.getInteger(Const.Config.SaltsExpiredShutdownHours, 12)), - Duration.ofHours(config.getInteger(Const.Config.KeysetKeysFailedShutdownHours, 168)), + Duration.ofHours(config.getInteger(Const.Config.StoreRefreshStaleShutdownHours, 12)), Clock.systemUTC(), new ShutdownService()); this.uidInstanceIdProvider = new UidInstanceIdProvider(config); @@ -423,14 +423,16 @@ private Future createStoreVerticles() throws Exception { fs.add(createAndDeployRotatingStoreVerticle("runtime_config", (RuntimeConfigStore) configStore, Const.Config.ConfigScanPeriodMsProp)); } fs.add(createAndDeployRotatingStoreVerticle("auth", clientKeyProvider, "auth_refresh_ms")); - fs.add(createAndDeployRotatingStoreVerticle("keyset", keysetProvider, "keyset_refresh_ms")); - fs.add(createAndDeployRotatingStoreVerticle("keysetkey", keysetKeyStore, "keysetkey_refresh_ms", - this.shutdownHandler::handleKeysetKeyRefreshResponse)); - fs.add(createAndDeployRotatingStoreVerticle("salt", saltProvider, "salt_refresh_ms")); + fs.add(createAndDeployRotatingStoreVerticle("keyset", keysetProvider, "keyset_refresh_ms")); + fs.add(createAndDeployRotatingStoreVerticle("keysetkey", keysetKeyStore, "keysetkey_refresh_ms")); + fs.add(createAndDeployRotatingStoreVerticle("salt", saltProvider, "salt_refresh_ms")); fs.add(createAndDeployCloudSyncStoreVerticle("optout", fsOptOut, optOutCloudSync)); CompositeFuture.all(fs).onComplete(ar -> { if (ar.failed()) promise.fail(new Exception(ar.cause())); - else promise.complete(); + else { + promise.complete(); + this.shutdownHandler.startPeriodicStaleCheck(this.vertx); + } }); @@ -438,11 +440,9 @@ private Future createStoreVerticles() throws Exception { } private Future createAndDeployRotatingStoreVerticle(String name, IMetadataVersionedStore store, String storeRefreshConfigMs) { - return createAndDeployRotatingStoreVerticle(name, store, storeRefreshConfigMs, null); - } - - private Future createAndDeployRotatingStoreVerticle(String name, IMetadataVersionedStore store, String storeRefreshConfigMs, Consumer refreshCallback) { - final int intervalMs = config.getInteger(storeRefreshConfigMs, 10000); + final long intervalMs = config.getInteger(storeRefreshConfigMs, 10000); + + Runnable refreshCallback = () -> this.shutdownHandler.handleStoreRefresh(name); RotatingStoreVerticle rotatingStoreVerticle = new RotatingStoreVerticle(name, intervalMs, store, refreshCallback); return vertx.deployVerticle(rotatingStoreVerticle); diff --git a/src/main/java/com/uid2/operator/vertx/OperatorShutdownHandler.java b/src/main/java/com/uid2/operator/vertx/OperatorShutdownHandler.java index bba8f8540..bd2f5f243 100644 --- a/src/main/java/com/uid2/operator/vertx/OperatorShutdownHandler.java +++ b/src/main/java/com/uid2/operator/vertx/OperatorShutdownHandler.java @@ -2,7 +2,7 @@ import com.uid2.operator.service.ShutdownService; import com.uid2.shared.attest.AttestationResponseCode; -import lombok.extern.java.Log; +import io.vertx.core.Vertx; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.utils.Pair; @@ -11,28 +11,30 @@ import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; public class OperatorShutdownHandler { private static final Logger LOGGER = LoggerFactory.getLogger(OperatorShutdownHandler.class); private static final int SALT_FAILURE_LOG_INTERVAL_MINUTES = 10; - private static final int KEYSET_KEY_FAILURE_LOG_INTERVAL_MINUTES = 10; + private static final int STORE_REFRESH_STALENESS_CHECK_INTERVAL_MINUTES = 60; private final Duration attestShutdownWaitTime; private final Duration saltShutdownWaitTime; - private final Duration keysetKeyShutdownWaitTime; + private final Duration storeRefreshStaleTimeout; private final AtomicReference attestFailureStartTime = new AtomicReference<>(null); private final AtomicReference saltFailureStartTime = new AtomicReference<>(null); - private final AtomicReference keysetKeyFailureStartTime = new AtomicReference<>(null); private final AtomicReference lastSaltFailureLogTime = new AtomicReference<>(null); - private final AtomicReference lastKeysetKeyFailureLogTime = new AtomicReference<>(null); + private final Map> lastSuccessfulRefreshTimes = new ConcurrentHashMap<>(); private final Clock clock; private final ShutdownService shutdownService; + private boolean isStalenessCheckScheduled = false; public OperatorShutdownHandler(Duration attestShutdownWaitTime, Duration saltShutdownWaitTime, - Duration keysetKeyShutdownWaitTime, Clock clock, ShutdownService shutdownService) { + Duration storeRefreshStaleTimeout, Clock clock, ShutdownService shutdownService) { this.attestShutdownWaitTime = attestShutdownWaitTime; this.saltShutdownWaitTime = saltShutdownWaitTime; - this.keysetKeyShutdownWaitTime = keysetKeyShutdownWaitTime; + this.storeRefreshStaleTimeout = storeRefreshStaleTimeout; this.clock = clock; this.shutdownService = shutdownService; } @@ -60,37 +62,6 @@ public void logSaltFailureAtInterval() { } } - public void handleKeysetKeyRefreshResponse(Boolean success) { - if (success) { - keysetKeyFailureStartTime.set(null); - lastKeysetKeyFailureLogTime.set(null); - LOGGER.debug("keyset keys sync successful"); - } else { - Instant t = keysetKeyFailureStartTime.get(); - if (t == null) { - keysetKeyFailureStartTime.set(clock.instant()); - lastKeysetKeyFailureLogTime.set(clock.instant()); - LOGGER.warn("keyset keys sync started failing. shutdown timer started"); - } else { - Duration elapsed = Duration.between(t, clock.instant()); - if (elapsed.compareTo(this.keysetKeyShutdownWaitTime) > 0) { - LOGGER.error("keyset keys have been failing to sync for too long. shutting down operator"); - this.shutdownService.Shutdown(1); - } else { - logKeysetKeyFailureProgressAtInterval(t, elapsed); - } - } - } - } - - private void logKeysetKeyFailureProgressAtInterval(Instant failureStartTime, Duration elapsed) { - Instant lastLogTime = lastKeysetKeyFailureLogTime.get(); - if (lastLogTime == null || clock.instant().isAfter(lastLogTime.plus(KEYSET_KEY_FAILURE_LOG_INTERVAL_MINUTES, ChronoUnit.MINUTES))) { - LOGGER.warn("keyset keys sync still failing - elapsed time: {}d {}h {}m", elapsed.toDays(), elapsed.toHoursPart(), elapsed.toMinutesPart()); - lastKeysetKeyFailureLogTime.set(clock.instant()); - } - } - public void handleAttestResponse(Pair response) { if (response.left() == AttestationResponseCode.AttestationFailure) { LOGGER.error("core attestation failed with AttestationFailure, shutting down operator, core response: {}", response.right()); @@ -108,4 +79,50 @@ public void handleAttestResponse(Pair response) } } } + + public void handleStoreRefresh(String storeName) { + lastSuccessfulRefreshTimes.computeIfAbsent(storeName, k -> new AtomicReference<>()) + .set(clock.instant()); + } + + public void checkStoreRefreshStaleness() { + Instant now = clock.instant(); + for (Map.Entry> entry : lastSuccessfulRefreshTimes.entrySet()) { + String storeName = entry.getKey(); + Instant lastSuccess = entry.getValue().get(); + + if (lastSuccess == null) { + // Store hasn't had a successful refresh yet + // This should rarely happen since startup success also records timestamp, but keep as defensive guard for edge cases + LOGGER.warn("Store '{}' has no recorded successful refresh - skipping staleness check", storeName); + continue; + } + + Duration timeSinceLastRefresh = Duration.between(lastSuccess, now); + LOGGER.debug("Store '{}' last successful refresh {} ago", storeName, timeSinceLastRefresh); + if (timeSinceLastRefresh.compareTo(storeRefreshStaleTimeout) > 0) { + LOGGER.error("Store '{}' has not refreshed successfully for {} hours ({}). Shutting down operator", + storeName, timeSinceLastRefresh.toHours(), timeSinceLastRefresh); + this.shutdownService.Shutdown(1); + return; // Exit after triggering shutdown for first stale store + } + } + } + + public void startPeriodicStaleCheck(Vertx vertx) { + if (isStalenessCheckScheduled) { + LOGGER.warn("Periodic store staleness check already started"); + return; + } + + long intervalMs = STORE_REFRESH_STALENESS_CHECK_INTERVAL_MINUTES * 60 * 1000L; + vertx.setPeriodic(intervalMs, id -> { + LOGGER.debug("Running periodic store staleness check"); + checkStoreRefreshStaleness(); + }); + isStalenessCheckScheduled = true; + LOGGER.info("Started periodic store staleness check (interval: {} minutes, timeout: {} hours)", + STORE_REFRESH_STALENESS_CHECK_INTERVAL_MINUTES, + storeRefreshStaleTimeout.toHours()); + } } diff --git a/src/test/java/com/uid2/operator/OperatorShutdownHandlerTest.java b/src/test/java/com/uid2/operator/OperatorShutdownHandlerTest.java index e719fc8e1..6cfa22a49 100644 --- a/src/test/java/com/uid2/operator/OperatorShutdownHandlerTest.java +++ b/src/test/java/com/uid2/operator/OperatorShutdownHandlerTest.java @@ -43,7 +43,7 @@ void beforeEach() { mocks = MockitoAnnotations.openMocks(this); when(clock.instant()).thenAnswer(i -> Instant.now()); doThrow(new RuntimeException()).when(shutdownService).Shutdown(1); - this.operatorShutdownHandler = new OperatorShutdownHandler(Duration.ofHours(12), Duration.ofHours(12), Duration.ofHours(168), clock, shutdownService); + this.operatorShutdownHandler = new OperatorShutdownHandler(Duration.ofHours(12), Duration.ofHours(12), Duration.ofHours(12), clock, shutdownService); } @AfterEach @@ -168,72 +168,165 @@ void saltsLogErrorAtInterval(VertxTestContext testContext) { } @Test - void shutdownOnKeysetKeyFailedTooLong(VertxTestContext testContext) { + void storeRefreshRecordsSuccessTimestamp(VertxTestContext testContext) { + this.operatorShutdownHandler.handleStoreRefresh("test_store"); + + when(clock.instant()).thenAnswer(i -> Instant.now().plus(11, ChronoUnit.HOURS)); + assertDoesNotThrow(() -> { + this.operatorShutdownHandler.checkStoreRefreshStaleness(); + }); + verify(shutdownService, never()).Shutdown(anyInt()); + + when(clock.instant()).thenAnswer(i -> Instant.now().plus(13, ChronoUnit.HOURS)); + try { + this.operatorShutdownHandler.checkStoreRefreshStaleness(); + } catch (RuntimeException e) { + verify(shutdownService).Shutdown(1); + testContext.completeNow(); + } + } + + @Test + void storeRefreshFailureDoesNotResetTimestamp(VertxTestContext testContext) { + this.operatorShutdownHandler.handleStoreRefresh("test_store"); + + when(clock.instant()).thenAnswer(i -> Instant.now().plus(2, ChronoUnit.HOURS)); + + // Simulate multiple refresh failures by NOT calling handleStoreRefresh + + when(clock.instant()).thenAnswer(i -> Instant.now().plus(13, ChronoUnit.HOURS)); + + try { + this.operatorShutdownHandler.checkStoreRefreshStaleness(); + } catch (RuntimeException e) { + verify(shutdownService).Shutdown(1); + testContext.completeNow(); + } + } + + @Test + void storeRefreshStaleShutdown(VertxTestContext testContext) { ListAppender logWatcher = new ListAppender<>(); logWatcher.start(); ((Logger) LoggerFactory.getLogger(OperatorShutdownHandler.class)).addAppender(logWatcher); - this.operatorShutdownHandler.handleKeysetKeyRefreshResponse(false); - Assertions.assertTrue(logWatcher.list.get(0).getFormattedMessage().contains("keyset keys sync started failing")); + this.operatorShutdownHandler.handleStoreRefresh("test_store"); + + when(clock.instant()).thenAnswer(i -> Instant.now().plus(12, ChronoUnit.HOURS).plusSeconds(1)); + + try { + this.operatorShutdownHandler.checkStoreRefreshStaleness(); + } catch (RuntimeException e) { + verify(shutdownService).Shutdown(1); + Assertions.assertTrue(logWatcher.list.stream().anyMatch(log -> + log.getFormattedMessage().contains("has not refreshed successfully") && + log.getFormattedMessage().contains("test_store"))); + testContext.completeNow(); + } + } - when(clock.instant()).thenAnswer(i -> Instant.now().plus(7, ChronoUnit.DAYS).plusSeconds(60)); + @Test + void storeRefreshRecoverBeforeStale(VertxTestContext testContext) { + this.operatorShutdownHandler.handleStoreRefresh("test_store"); + + when(clock.instant()).thenAnswer(i -> Instant.now().plus(11, ChronoUnit.HOURS)); + + this.operatorShutdownHandler.handleStoreRefresh("test_store"); + + when(clock.instant()).thenAnswer(i -> Instant.now().plus(12, ChronoUnit.HOURS)); + + assertDoesNotThrow(() -> { + this.operatorShutdownHandler.checkStoreRefreshStaleness(); + }); + verify(shutdownService, never()).Shutdown(anyInt()); + testContext.completeNow(); + } + + @Test + void multipleStoresOneStaleTriggers(VertxTestContext testContext) { + ListAppender logWatcher = new ListAppender<>(); + logWatcher.start(); + ((Logger) LoggerFactory.getLogger(OperatorShutdownHandler.class)).addAppender(logWatcher); + + + this.operatorShutdownHandler.handleStoreRefresh("store1"); + this.operatorShutdownHandler.handleStoreRefresh("store2"); + this.operatorShutdownHandler.handleStoreRefresh("store3"); + + when(clock.instant()).thenAnswer(i -> Instant.now().plus(6, ChronoUnit.HOURS)); + + this.operatorShutdownHandler.handleStoreRefresh("store1"); + this.operatorShutdownHandler.handleStoreRefresh("store2"); + + when(clock.instant()).thenAnswer(i -> Instant.now().plus(12, ChronoUnit.HOURS).plusSeconds(1)); + try { - this.operatorShutdownHandler.handleKeysetKeyRefreshResponse(false); + this.operatorShutdownHandler.checkStoreRefreshStaleness(); } catch (RuntimeException e) { verify(shutdownService).Shutdown(1); Assertions.assertTrue(logWatcher.list.stream().anyMatch(log -> - log.getFormattedMessage().contains("keyset keys have been failing to sync for too long"))); + log.getFormattedMessage().contains("store3") && + log.getFormattedMessage().contains("has not refreshed successfully"))); testContext.completeNow(); } } @Test - void keysetKeyRecoverOnSuccess(VertxTestContext testContext) { - this.operatorShutdownHandler.handleKeysetKeyRefreshResponse(false); - when(clock.instant()).thenAnswer(i -> Instant.now().plus(3, ChronoUnit.DAYS)); + void multipleStoresFailSimultaneouslyTriggersShutdown(VertxTestContext testContext) { + ListAppender logWatcher = new ListAppender<>(); + logWatcher.start(); + ((Logger) LoggerFactory.getLogger(OperatorShutdownHandler.class)).addAppender(logWatcher); + + this.operatorShutdownHandler.handleStoreRefresh("keyset"); + this.operatorShutdownHandler.handleStoreRefresh("keysetkey"); + this.operatorShutdownHandler.handleStoreRefresh("salt"); + this.operatorShutdownHandler.handleStoreRefresh("auth"); + + when(clock.instant()).thenAnswer(i -> Instant.now().plus(12, ChronoUnit.HOURS).plusSeconds(1)); - this.operatorShutdownHandler.handleKeysetKeyRefreshResponse(true); + try { + this.operatorShutdownHandler.checkStoreRefreshStaleness(); + } catch (RuntimeException e) { + verify(shutdownService).Shutdown(1); + Assertions.assertTrue(logWatcher.list.stream().anyMatch(log -> + log.getFormattedMessage().contains("has not refreshed successfully"))); + testContext.completeNow(); + } + } + + @Test + void noShutdownWhenStoreNeverInitialized(VertxTestContext testContext) { - when(clock.instant()).thenAnswer(i -> Instant.now().plus(7, ChronoUnit.DAYS)); assertDoesNotThrow(() -> { - this.operatorShutdownHandler.handleKeysetKeyRefreshResponse(false); + this.operatorShutdownHandler.checkStoreRefreshStaleness(); }); verify(shutdownService, never()).Shutdown(anyInt()); testContext.completeNow(); } @Test - void keysetKeyNoShutdownWhenAlwaysSuccessful(VertxTestContext testContext) { - this.operatorShutdownHandler.handleKeysetKeyRefreshResponse(true); - this.operatorShutdownHandler.handleKeysetKeyRefreshResponse(true); - this.operatorShutdownHandler.handleKeysetKeyRefreshResponse(true); - + void periodicCheckStartsSuccessfully(Vertx vertx, VertxTestContext testContext) { + // Start the periodic check + assertDoesNotThrow(() -> { + this.operatorShutdownHandler.startPeriodicStaleCheck(vertx); + }); + + // Verify it doesn't crash and doesn't trigger shutdown immediately verify(shutdownService, never()).Shutdown(anyInt()); testContext.completeNow(); } @Test - void keysetKeyLogProgressAtInterval(VertxTestContext testContext) { + void periodicCheckOnlyStartsOnce(Vertx vertx, VertxTestContext testContext) { ListAppender logWatcher = new ListAppender<>(); logWatcher.start(); ((Logger) LoggerFactory.getLogger(OperatorShutdownHandler.class)).addAppender(logWatcher); - this.operatorShutdownHandler.handleKeysetKeyRefreshResponse(false); - long warnLogCount1 = logWatcher.list.stream().filter(log -> - log.getFormattedMessage().contains("keyset keys sync still failing")).count(); - - when(clock.instant()).thenAnswer(i -> Instant.now().plus(5, ChronoUnit.MINUTES)); - this.operatorShutdownHandler.handleKeysetKeyRefreshResponse(false); - long warnLogCount2 = logWatcher.list.stream().filter(log -> - log.getFormattedMessage().contains("keyset keys sync still failing")).count(); - Assertions.assertEquals(warnLogCount1, warnLogCount2); + this.operatorShutdownHandler.startPeriodicStaleCheck(vertx); + this.operatorShutdownHandler.startPeriodicStaleCheck(vertx); - when(clock.instant()).thenAnswer(i -> Instant.now().plus(11, ChronoUnit.MINUTES)); - this.operatorShutdownHandler.handleKeysetKeyRefreshResponse(false); - long warnLogCount3 = logWatcher.list.stream().filter(log -> - log.getFormattedMessage().contains("keyset keys sync still failing")).count(); - Assertions.assertTrue(warnLogCount3 > warnLogCount2); - + Assertions.assertTrue(logWatcher.list.stream().anyMatch(log -> + log.getFormattedMessage().contains("already started"))); testContext.completeNow(); } }