22
33import com .redhat .podmortem .common .model .analysis .AnalysisResult ;
44import com .redhat .podmortem .common .model .kube .aiprovider .AIProvider ;
5+ import com .redhat .podmortem .common .model .kube .patternlibrary .PatternLibrary ;
56import com .redhat .podmortem .common .model .kube .podmortem .PodFailureData ;
67import com .redhat .podmortem .common .model .kube .podmortem .Podmortem ;
78import com .redhat .podmortem .common .model .kube .podmortem .PodmortemStatus ;
1516import jakarta .enterprise .context .ApplicationScoped ;
1617import jakarta .enterprise .event .Observes ;
1718import jakarta .inject .Inject ;
19+ import java .net .URI ;
20+ import java .net .http .HttpClient ;
21+ import java .net .http .HttpRequest ;
22+ import java .net .http .HttpResponse ;
23+ import java .time .Duration ;
1824import java .time .Instant ;
1925import java .util .ArrayList ;
2026import java .util .Arrays ;
2329import java .util .Optional ;
2430import java .util .Set ;
2531import java .util .concurrent .ConcurrentHashMap ;
32+ import java .util .concurrent .ConcurrentLinkedQueue ;
2633import java .util .concurrent .CopyOnWriteArrayList ;
34+ import java .util .concurrent .atomic .AtomicBoolean ;
2735import org .eclipse .microprofile .config .inject .ConfigProperty ;
2836import org .slf4j .Logger ;
2937import org .slf4j .LoggerFactory ;
@@ -55,6 +63,26 @@ public class PodFailureWatcher {
5563 private Set <String > allowedNamespaces = Set .of ();
5664 private final List <Watch > activeWatches = new CopyOnWriteArrayList <>();
5765
66+ @ ConfigProperty (name = "podmortem.processing.startup-delay-seconds" , defaultValue = "60" )
67+ int startupDelaySeconds ;
68+
69+ @ ConfigProperty (name = "quarkus.rest-client.log-parser.url" )
70+ Optional <String > logParserBaseUrlProperty ;
71+
72+ @ ConfigProperty (name = "quarkus.rest-client.ai-interface.url" )
73+ Optional <String > aiInterfaceBaseUrlProperty ;
74+
75+ private final AtomicBoolean systemReady = new AtomicBoolean (false );
76+ private Instant appStartupTime = Instant .now ();
77+ private final HttpClient httpClient =
78+ HttpClient .newBuilder ().connectTimeout (Duration .ofSeconds (2 )).build ();
79+
80+ // Queue pod failures until system is ready
81+ private static final int MAX_PENDING_QUEUE_SIZE = 500 ;
82+ private final ConcurrentLinkedQueue <String > pendingFailureQueue = new ConcurrentLinkedQueue <>();
83+ private final Set <String > queuedFailureKeys =
84+ java .util .Collections .newSetFromMap (new ConcurrentHashMap <>());
85+
5886 /**
5987 * Initializes the pod failure watcher on application startup.
6088 *
@@ -64,6 +92,7 @@ public class PodFailureWatcher {
6492 */
6593 public void onStartup (@ Observes StartupEvent event ) {
6694 log .info ("Starting real-time pod failure watcher" );
95+ appStartupTime = Instant .now ();
6796 // Parse configured namespaces (comma-separated)
6897 String namespaces = watchNamespacesProperty .orElse ("" );
6998 if (!namespaces .isBlank ()) {
@@ -78,6 +107,7 @@ public void onStartup(@Observes StartupEvent event) {
78107 log .info ("Configured to watch all namespaces (no namespace filter set)" );
79108 }
80109 startPodWatcher ();
110+ startReadinessGuard ();
81111 }
82112
83113 /**
@@ -112,6 +142,10 @@ public void eventReceived(Action action, Pod pod) {
112142 return ;
113143 }
114144 if (hasPodFailed (pod )) {
145+ if (!systemReady .get ()) {
146+ enqueuePendingFailure (pod );
147+ return ;
148+ }
115149 handlePodFailure (pod );
116150 }
117151 } catch (Exception e ) {
@@ -136,6 +170,204 @@ public void onClose(WatcherException cause) {
136170 };
137171 }
138172
173+ /**
174+ * Starts a background readiness guard.
175+ *
176+ * <p>This guard periodically checks whether the system is allowed to process pod failures. The
177+ * system becomes ready when all of the following are true:
178+ *
179+ * <ul>
180+ * <li>The configured startup delay (property: {@code
181+ * podmortem.processing.startup-delay-seconds}) has elapsed
182+ * <li>At least one {@code PatternLibrary} reports phase {@code Ready} (or none exist)
183+ * <li>The Log Parser service {@code /q/health/ready} endpoint returns HTTP 2xx
184+ * <li>The AI Interface {@code /q/health/ready} endpoint returns HTTP 2xx
185+ * </ul>
186+ *
187+ * <p>Once ready, any queued pod failure events are processed asynchronously.
188+ */
189+ private void startReadinessGuard () {
190+ Thread .ofVirtual ()
191+ .name ("podmortem-readiness-guard" )
192+ .start (
193+ () -> {
194+ while (!systemReady .get ()) {
195+ try {
196+ boolean ready = checkSystemReady ();
197+ if (ready ) {
198+ systemReady .set (true );
199+ log .info (
200+ "System dependencies ready; enabling failure processing" );
201+ processQueuedFailuresAsync ();
202+ break ;
203+ }
204+ Thread .sleep (5000 );
205+ } catch (InterruptedException ie ) {
206+ Thread .currentThread ().interrupt ();
207+ break ;
208+ } catch (Exception e ) {
209+ log .debug ("Readiness check failed: {}" , e .getMessage ());
210+ try {
211+ Thread .sleep (5000 );
212+ } catch (InterruptedException ie ) {
213+ Thread .currentThread ().interrupt ();
214+ break ;
215+ }
216+ }
217+ }
218+ });
219+ }
220+
221+ /**
222+ * Enqueues a failed pod event while the system is not yet ready to process failures.
223+ *
224+ * <p>Uses a bounded FIFO queue to avoid unbounded memory growth. Duplicate pod keys are
225+ * de-duplicated while pending. When the queue is full, the oldest entry is dropped and a warn
226+ * is logged.
227+ *
228+ * @param pod the pod associated with the failure event to buffer
229+ */
230+ private void enqueuePendingFailure (Pod pod ) {
231+ String podKey = pod .getMetadata ().getNamespace () + "/" + pod .getMetadata ().getName ();
232+ // Avoid duplicate entries
233+ if (queuedFailureKeys .add (podKey )) {
234+ if (pendingFailureQueue .size () >= MAX_PENDING_QUEUE_SIZE ) {
235+ String dropped = pendingFailureQueue .poll ();
236+ if (dropped != null ) {
237+ queuedFailureKeys .remove (dropped );
238+ log .warn ("Pending failure queue full; dropping oldest: {}" , dropped );
239+ }
240+ }
241+ pendingFailureQueue .offer (podKey );
242+ log .debug ("Queued pod failure event until ready: {}" , podKey );
243+ }
244+ }
245+
246+ /**
247+ * Drains the pending failure queue and processes each entry asynchronously.
248+ *
249+ * <p>For each queued item, the latest pod object is fetched to confirm the failure state before
250+ * processing; this avoids acting on stale data.
251+ */
252+ private void processQueuedFailuresAsync () {
253+ Thread .ofVirtual ()
254+ .name ("podmortem-queued-failure-drain" )
255+ .start (
256+ () -> {
257+ String podKey ;
258+ while ((podKey = pendingFailureQueue .poll ()) != null ) {
259+ try {
260+ queuedFailureKeys .remove (podKey );
261+ String [] parts = podKey .split ("/" , 2 );
262+ if (parts .length != 2 ) {
263+ continue ;
264+ }
265+ String ns = parts [0 ];
266+ String name = parts [1 ];
267+ Pod latest = client .pods ().inNamespace (ns ).withName (name ).get ();
268+ if (latest != null && hasPodFailed (latest )) {
269+ handlePodFailure (latest );
270+ }
271+ } catch (Exception e ) {
272+ log .debug (
273+ "Failed processing queued failure {}: {}" ,
274+ podKey ,
275+ e .getMessage ());
276+ }
277+ }
278+ });
279+ }
280+
281+ /**
282+ * Evaluates whether the operator may begin processing pod failures.
283+ *
284+ * <p>Readiness requires that the startup delay has elapsed, pattern libraries are ready (or not
285+ * present), and dependent services (log-parser and AI interface) report ready via their {@code
286+ * /q/health/ready} endpoints.
287+ *
288+ * @return {@code true} if failure processing can start; {@code false} otherwise
289+ */
290+ private boolean checkSystemReady () {
291+ if (Duration .between (appStartupTime , Instant .now ()).getSeconds () < startupDelaySeconds ) {
292+ return false ;
293+ }
294+
295+ // Pattern libraries: ready if none defined or any reports phase Ready
296+ try {
297+ List <com .redhat .podmortem .common .model .kube .patternlibrary .PatternLibrary > libs =
298+ client .resources (PatternLibrary .class ).inAnyNamespace ().list ().getItems ();
299+ boolean patternsReady =
300+ libs .isEmpty ()
301+ || libs .stream ()
302+ .anyMatch (
303+ l ->
304+ l .getStatus () != null
305+ && "Ready"
306+ .equalsIgnoreCase (
307+ l .getStatus ()
308+ .getPhase ()));
309+ if (!patternsReady ) {
310+ return false ;
311+ }
312+ } catch (Exception e ) {
313+ return false ;
314+ }
315+
316+ // log-parser ready
317+ String logParserUrl =
318+ logParserBaseUrlProperty .orElseGet (
319+ () -> System .getenv ("QUARKUS_REST_CLIENT_LOG_PARSER_URL" ));
320+ if (logParserUrl == null || logParserUrl .isBlank ()) {
321+ return false ;
322+ }
323+ if (!isServiceReady (logParserUrl )) {
324+ return false ;
325+ }
326+
327+ // ai-interface ready
328+ String aiUrl =
329+ aiInterfaceBaseUrlProperty .orElseGet (
330+ () -> System .getenv ("QUARKUS_REST_CLIENT_AI_INTERFACE_URL" ));
331+ if (aiUrl == null || aiUrl .isBlank ()) {
332+ return false ;
333+ }
334+ if (!isServiceReady (aiUrl )) {
335+ return false ;
336+ }
337+
338+ return true ;
339+ }
340+
341+ /**
342+ * Checks whether a dependent service is ready by querying its readiness endpoint.
343+ *
344+ * @param baseUrl the base URL of the service (without trailing path); {@code /q/health/ready}
345+ * will be appended
346+ * @return {@code true} if the service responds with HTTP 2xx, {@code false} otherwise
347+ */
348+ private boolean isServiceReady (String baseUrl ) {
349+ try {
350+ String healthUrl =
351+ baseUrl .endsWith ("/" )
352+ ? baseUrl + "q/health/ready"
353+ : baseUrl + "/q/health/ready" ;
354+ HttpRequest request =
355+ HttpRequest .newBuilder ()
356+ .GET ()
357+ .uri (URI .create (healthUrl ))
358+ .timeout (Duration .ofSeconds (2 ))
359+ .build ();
360+ HttpResponse <Void > response =
361+ httpClient .send (request , HttpResponse .BodyHandlers .discarding ());
362+ return response .statusCode () >= 200 && response .statusCode () < 300 ;
363+ } catch (InterruptedException ie ) {
364+ Thread .currentThread ().interrupt ();
365+ return false ;
366+ } catch (Exception e ) {
367+ return false ;
368+ }
369+ }
370+
139371 /**
140372 * Determines if a pod has failed by examining container statuses.
141373 *
@@ -568,17 +800,18 @@ private void restartWatcher() {
568800 }
569801 activeWatches .clear ();
570802
571- new Thread (
803+ Thread .ofVirtual ()
804+ .name ("podmortem-watcher-restart" )
805+ .start (
572806 () -> {
573807 try {
574- Thread .sleep (5000 ); // Wait 5 seconds before restart
808+ Thread .sleep (5000 );
575809 log .info ("Restarting pod failure watcher..." );
576810 startPodWatcher ();
577811 } catch (InterruptedException e ) {
578812 Thread .currentThread ().interrupt ();
579813 log .error ("Watcher restart interrupted" , e );
580814 }
581- })
582- .start ();
815+ });
583816 }
584817}
0 commit comments