1+ import random
12import threading
3+ import time
24
5+ import structlog
36from flask import Flask , jsonify , request
7+ from flask .typing import ResponseReturnValue
48from pympler import muppy , summary # type: ignore[import-untyped]
59from werkzeug .serving import WSGIRequestHandler , run_simple
610
711from inbox .instrumentation import ProfileCollector
12+ from inbox .interruptible_threading import InterruptibleThread
13+
14+ log = structlog .get_logger ()
815
916
1017class ProfilingHTTPFrontend :
@@ -18,6 +25,11 @@ class ProfilingHTTPFrontend:
1825 def __init__ (self , port , profile ) -> None : # type: ignore[no-untyped-def]
1926 self .port = port
2027 self .profiler = ProfileCollector () if profile else None
28+ # Start reporting as unhealthy after 240-360 minutes to allow
29+ # this process to be restarted after this time.
30+ self .report_unhealthy_at = time .monotonic () + random .randint (
31+ 240 * 60 , 360 * 60
32+ )
2133
2234 def _create_app (self ): # type: ignore[no-untyped-def]
2335 app = Flask (__name__ )
@@ -50,6 +62,84 @@ def profile(): # type: ignore[no-untyped-def]
5062 def load () -> str :
5163 return "Load tracing disabled\n "
5264
65+ @app .route ("/health" )
66+ def health () -> ResponseReturnValue :
67+ now = time .monotonic ()
68+ threads = [
69+ thread
70+ for thread in threading .enumerate ()
71+ if isinstance (thread , InterruptibleThread )
72+ ]
73+ threads_count = len (threads )
74+ threads_delayed_5m_count = sum (
75+ 1
76+ for thread in threads
77+ if not thread .last_ping_time
78+ or now - thread .last_ping_time > 5 * 60
79+ )
80+ threads_delayed_20m_count = sum (
81+ 1
82+ for thread in threads
83+ if not thread .last_ping_time
84+ or now - thread .last_ping_time > 20 * 60
85+ )
86+ threads_delayed_60m_count = sum (
87+ 1
88+ for thread in threads
89+ if not thread .last_ping_time
90+ or now - thread .last_ping_time > 60 * 60
91+ )
92+
93+ longevity_deadline_reached = now >= self .report_unhealthy_at
94+ service_stuck = (
95+ # Treat as stuck if there are threads running, and:
96+ threads_count
97+ and (
98+ # Any of them are delayed by 60m+
99+ threads_delayed_60m_count
100+ or (
101+ # Or there are at least 50 threads, and 10%+ are
102+ # delayed by 20m+
103+ threads_count >= 50
104+ and threads_delayed_20m_count / threads_count >= 0.1
105+ )
106+ or (
107+ # Or there are at least 10 threads, and 40%+ are
108+ # delayed by 5m+
109+ threads_count >= 10
110+ and threads_delayed_5m_count / threads_count >= 0.4
111+ )
112+ )
113+ )
114+
115+ is_healthy = not longevity_deadline_reached and not service_stuck
116+ stats = {
117+ "threads_delayed_5m_count" : threads_delayed_5m_count ,
118+ "threads_delayed_20m_count" : threads_delayed_20m_count ,
119+ "threads_delayed_60m_count" : threads_delayed_60m_count ,
120+ "max_delay" : max (
121+ # XXX: Temporary. Remove me if everything's working fine in prod.
122+ (
123+ (
124+ now - thread .last_ping_time
125+ if thread .last_ping_time is not None
126+ else - 1
127+ ),
128+ thread .__class__ .__name__ ,
129+ )
130+ for thread in threads
131+ ),
132+ "threads_count" : threads_count ,
133+ "longevity_deadline_reached" : longevity_deadline_reached ,
134+ "is_healthy" : is_healthy ,
135+ }
136+
137+ if service_stuck :
138+ log .error ("The service is stuck" , stats = stats )
139+
140+ response_status = 200 if is_healthy else 503
141+ return jsonify (stats ), response_status
142+
53143 @app .route ("/mem" )
54144 def mem (): # type: ignore[no-untyped-def]
55145 objs = muppy .get_objects ()
0 commit comments