22
33import com .uid2 .operator .service .ShutdownService ;
44import com .uid2 .shared .attest .AttestationResponseCode ;
5- import lombok . extern . java . Log ;
5+ import io . vertx . core . Vertx ;
66import org .slf4j .Logger ;
77import org .slf4j .LoggerFactory ;
88import software .amazon .awssdk .utils .Pair ;
1111import java .time .Duration ;
1212import java .time .Instant ;
1313import java .time .temporal .ChronoUnit ;
14+ import java .util .Map ;
15+ import java .util .concurrent .ConcurrentHashMap ;
1416import java .util .concurrent .atomic .AtomicReference ;
1517
1618public class OperatorShutdownHandler {
1719 private static final Logger LOGGER = LoggerFactory .getLogger (OperatorShutdownHandler .class );
1820 private static final int SALT_FAILURE_LOG_INTERVAL_MINUTES = 10 ;
19- private static final int KEYSET_KEY_FAILURE_LOG_INTERVAL_MINUTES = 10 ;
21+ private static final int STORE_REFRESH_STALENESS_CHECK_INTERVAL_MINUTES = 60 ;
2022 private final Duration attestShutdownWaitTime ;
2123 private final Duration saltShutdownWaitTime ;
22- private final Duration keysetKeyShutdownWaitTime ;
24+ private final Duration storeRefreshStaleTimeout ;
2325 private final AtomicReference <Instant > attestFailureStartTime = new AtomicReference <>(null );
2426 private final AtomicReference <Instant > saltFailureStartTime = new AtomicReference <>(null );
25- private final AtomicReference <Instant > keysetKeyFailureStartTime = new AtomicReference <>(null );
2627 private final AtomicReference <Instant > lastSaltFailureLogTime = new AtomicReference <>(null );
27- private final AtomicReference <Instant > lastKeysetKeyFailureLogTime = new AtomicReference <>(null );
28+ private final Map < String , AtomicReference <Instant >> lastSuccessfulRefreshTimes = new ConcurrentHashMap <>();
2829 private final Clock clock ;
2930 private final ShutdownService shutdownService ;
31+ private boolean isStalenessCheckScheduled = false ;
3032
3133 public OperatorShutdownHandler (Duration attestShutdownWaitTime , Duration saltShutdownWaitTime ,
32- Duration keysetKeyShutdownWaitTime , Clock clock , ShutdownService shutdownService ) {
34+ Duration storeRefreshStaleTimeout , Clock clock , ShutdownService shutdownService ) {
3335 this .attestShutdownWaitTime = attestShutdownWaitTime ;
3436 this .saltShutdownWaitTime = saltShutdownWaitTime ;
35- this .keysetKeyShutdownWaitTime = keysetKeyShutdownWaitTime ;
37+ this .storeRefreshStaleTimeout = storeRefreshStaleTimeout ;
3638 this .clock = clock ;
3739 this .shutdownService = shutdownService ;
3840 }
@@ -60,37 +62,6 @@ public void logSaltFailureAtInterval() {
6062 }
6163 }
6264
63- public void handleKeysetKeyRefreshResponse (Boolean success ) {
64- if (success ) {
65- keysetKeyFailureStartTime .set (null );
66- lastKeysetKeyFailureLogTime .set (null );
67- LOGGER .debug ("keyset keys sync successful" );
68- } else {
69- Instant t = keysetKeyFailureStartTime .get ();
70- if (t == null ) {
71- keysetKeyFailureStartTime .set (clock .instant ());
72- lastKeysetKeyFailureLogTime .set (clock .instant ());
73- LOGGER .warn ("keyset keys sync started failing. shutdown timer started" );
74- } else {
75- Duration elapsed = Duration .between (t , clock .instant ());
76- if (elapsed .compareTo (this .keysetKeyShutdownWaitTime ) > 0 ) {
77- LOGGER .error ("keyset keys have been failing to sync for too long. shutting down operator" );
78- this .shutdownService .Shutdown (1 );
79- } else {
80- logKeysetKeyFailureProgressAtInterval (t , elapsed );
81- }
82- }
83- }
84- }
85-
86- private void logKeysetKeyFailureProgressAtInterval (Instant failureStartTime , Duration elapsed ) {
87- Instant lastLogTime = lastKeysetKeyFailureLogTime .get ();
88- if (lastLogTime == null || clock .instant ().isAfter (lastLogTime .plus (KEYSET_KEY_FAILURE_LOG_INTERVAL_MINUTES , ChronoUnit .MINUTES ))) {
89- LOGGER .warn ("keyset keys sync still failing - elapsed time: {}d {}h {}m" , elapsed .toDays (), elapsed .toHoursPart (), elapsed .toMinutesPart ());
90- lastKeysetKeyFailureLogTime .set (clock .instant ());
91- }
92- }
93-
9465 public void handleAttestResponse (Pair <AttestationResponseCode , String > response ) {
9566 if (response .left () == AttestationResponseCode .AttestationFailure ) {
9667 LOGGER .error ("core attestation failed with AttestationFailure, shutting down operator, core response: {}" , response .right ());
@@ -108,4 +79,50 @@ public void handleAttestResponse(Pair<AttestationResponseCode, String> response)
10879 }
10980 }
11081 }
82+
83+ public void handleStoreRefresh (String storeName ) {
84+ lastSuccessfulRefreshTimes .computeIfAbsent (storeName , k -> new AtomicReference <>())
85+ .set (clock .instant ());
86+ }
87+
88+ public void checkStoreRefreshStaleness () {
89+ Instant now = clock .instant ();
90+ for (Map .Entry <String , AtomicReference <Instant >> entry : lastSuccessfulRefreshTimes .entrySet ()) {
91+ String storeName = entry .getKey ();
92+ Instant lastSuccess = entry .getValue ().get ();
93+
94+ if (lastSuccess == null ) {
95+ // Store hasn't had a successful refresh yet
96+ // This should rarely happen since startup success also records timestamp, but keep as defensive guard for edge cases
97+ LOGGER .warn ("Store '{}' has no recorded successful refresh - skipping staleness check" , storeName );
98+ continue ;
99+ }
100+
101+ Duration timeSinceLastRefresh = Duration .between (lastSuccess , now );
102+ LOGGER .debug ("Store '{}' last successful refresh {} ago" , storeName , timeSinceLastRefresh );
103+ if (timeSinceLastRefresh .compareTo (storeRefreshStaleTimeout ) > 0 ) {
104+ LOGGER .error ("Store '{}' has not refreshed successfully for {} hours ({}). Shutting down operator" ,
105+ storeName , timeSinceLastRefresh .toHours (), timeSinceLastRefresh );
106+ this .shutdownService .Shutdown (1 );
107+ return ; // Exit after triggering shutdown for first stale store
108+ }
109+ }
110+ }
111+
112+ public void startPeriodicStaleCheck (Vertx vertx ) {
113+ if (isStalenessCheckScheduled ) {
114+ LOGGER .warn ("Periodic store staleness check already started" );
115+ return ;
116+ }
117+
118+ long intervalMs = STORE_REFRESH_STALENESS_CHECK_INTERVAL_MINUTES * 60 * 1000L ;
119+ vertx .setPeriodic (intervalMs , id -> {
120+ LOGGER .debug ("Running periodic store staleness check" );
121+ checkStoreRefreshStaleness ();
122+ });
123+ isStalenessCheckScheduled = true ;
124+ LOGGER .info ("Started periodic store staleness check (interval: {} minutes, timeout: {} hours)" ,
125+ STORE_REFRESH_STALENESS_CHECK_INTERVAL_MINUTES ,
126+ storeRefreshStaleTimeout .toHours ());
127+ }
111128}
0 commit comments