20
20
handler infrastructure, ensuring consistent behavior.
21
21
"""
22
22
23
+ import asyncio
24
+ import socket
23
25
import threading
26
+ import time
27
+ import warnings
24
28
from typing import Optional
25
29
26
- import prometheus_client
27
30
import tornado .httpserver
28
31
import tornado .ioloop
32
+ import tornado .web
29
33
30
34
from jupyter_server ._version import __version__
31
35
from jupyter_server .base .handlers import PrometheusMetricsHandler
41
45
42
46
43
47
class PrometheusMetricsServer :
44
- """A separate server for exposing Prometheus metrics."""
48
+ """A separate Tornado server for serving Prometheus metrics."""
45
49
46
- def __init__ (self , server_app ):
47
- """Initialize the metrics server.
48
-
49
- Parameters
50
- ----------
51
- server_app : ServerApp
52
- The main Jupyter server application instance
53
- """
54
- self .server_app = server_app
50
+ def __init__ (self , app ):
51
+ """Initialize the metrics server."""
52
+ self .app = app
55
53
self .port = None
56
- self .http_server = None
54
+ self .server = None
55
+ self .ioloop = None
57
56
self .thread = None
57
+ self ._running = False
58
58
59
59
def initialize_metrics (self ):
60
60
"""Initialize Jupyter-specific metrics for this server instance."""
61
61
# Set server version info
62
62
SERVER_INFO .info ({"version" : __version__ })
63
63
64
64
# Set up extension info
65
- for ext in self .server_app .extension_manager .extensions .values ():
65
+ for ext in self .app .extension_manager .extensions .values ():
66
66
SERVER_EXTENSION_INFO .labels (
67
67
name = ext .name , version = ext .version , enabled = str (ext .enabled ).lower ()
68
68
).info ({})
69
69
70
70
# Set server start time
71
- started = self .server_app .web_app .settings ["started" ]
71
+ started = self .app .web_app .settings ["started" ]
72
72
SERVER_STARTED .set (started .timestamp ())
73
73
74
74
# Set up activity tracking
75
- LAST_ACTIVITY .set_function (lambda : self .server_app .web_app .last_activity ().timestamp ())
75
+ LAST_ACTIVITY .set_function (lambda : self .app .web_app .last_activity ().timestamp ())
76
76
ACTIVE_DURATION .set_function (
77
77
lambda : (
78
- self .server_app .web_app .last_activity ()
79
- - self .server_app .web_app .settings ["started" ]
78
+ self .app .web_app .last_activity ()
79
+ - self .app .web_app .settings ["started" ]
80
80
).total_seconds ()
81
81
)
82
82
@@ -94,7 +94,7 @@ def _setup_runtime_metrics(self):
94
94
# Set up kernel count tracking
95
95
def update_kernel_metrics ():
96
96
try :
97
- kernel_manager = self .server_app .kernel_manager
97
+ kernel_manager = self .app .kernel_manager
98
98
if hasattr (kernel_manager , "list_kernel_ids" ):
99
99
kernel_ids = kernel_manager .list_kernel_ids ()
100
100
# Reset all kernel type metrics to 0
@@ -118,19 +118,19 @@ def update_kernel_metrics():
118
118
for kernel_type , count in kernel_types .items ():
119
119
KERNEL_CURRENTLY_RUNNING_TOTAL .labels (type = kernel_type ).set (count )
120
120
except Exception as e :
121
- self .server_app .log .debug (f"Error updating kernel metrics: { e } " )
121
+ self .app .log .debug (f"Error updating kernel metrics: { e } " )
122
122
123
123
# Set up terminal count tracking
124
124
def update_terminal_metrics ():
125
125
try :
126
- terminal_manager = getattr (self .server_app , "terminal_manager" , None )
126
+ terminal_manager = getattr (self .app , "terminal_manager" , None )
127
127
if terminal_manager and hasattr (terminal_manager , "list" ):
128
128
terminal_count = len (terminal_manager .list ())
129
129
TERMINAL_CURRENTLY_RUNNING_TOTAL .set (terminal_count )
130
130
else :
131
131
TERMINAL_CURRENTLY_RUNNING_TOTAL .set (0 )
132
132
except Exception as e :
133
- self .server_app .log .debug (f"Error updating terminal metrics: { e } " )
133
+ self .app .log .debug (f"Error updating terminal metrics: { e } " )
134
134
135
135
# Set up periodic updates
136
136
def periodic_update ():
@@ -143,130 +143,122 @@ def periodic_update():
143
143
# Store the periodic update function to be called from the metrics server thread
144
144
self ._periodic_update = periodic_update
145
145
146
- def start (self , port : int ) -> None :
147
- """Start the metrics server on the specified port.
146
+ def start (self , port : int = 9090 ) -> None :
147
+ """Start the metrics server on the specified port."""
148
+ if self ._running :
149
+ return
148
150
149
- Parameters
150
- ----------
151
- port : int
152
- The port to listen on for metrics requests
153
- """
154
151
# Initialize Jupyter metrics
155
152
self .initialize_metrics ()
156
153
157
- # Reuse the main server's web application and settings
158
- # This ensures identical behavior and eliminates duplication
159
- main_app = self .server_app .web_app
160
-
161
- # Create a new application that shares the same settings and handlers
162
- # but only serves the metrics endpoint
163
- metrics_app = tornado .web .Application (
164
- [
165
- (r"/metrics" , PrometheusMetricsHandler ),
166
- ],
167
- ** main_app .settings ,
168
- )
169
-
170
- # Determine authentication status for logging
171
- authenticate_metrics = main_app .settings .get ("authenticate_prometheus" , True )
172
- auth_info = "with authentication" if authenticate_metrics else "without authentication"
173
-
174
- # Create and start the HTTP server with port retry logic
175
- self .http_server = tornado .httpserver .HTTPServer (metrics_app )
176
-
177
- # Try to bind to the requested port, with fallback to random ports
178
- actual_port = port
179
- max_retries = 10
180
-
181
- for attempt in range (max_retries ):
182
- try :
183
- self .http_server .listen (actual_port )
184
- self .port = actual_port
185
- break
186
- except OSError as e :
187
- if e .errno == 98 : # Address already in use
188
- if attempt == 0 :
189
- # First attempt failed, try random ports
190
- import random
191
-
192
- actual_port = random .randint (49152 , 65535 ) # Use dynamic port range
193
- else :
194
- # Subsequent attempts, try next random port
195
- actual_port = random .randint (49152 , 65535 )
196
-
197
- if attempt == max_retries - 1 :
198
- # Last attempt failed
199
- self .server_app .log .warning (
200
- f"Could not start metrics server on any port after { max_retries } attempts. "
201
- )
202
- return
203
- else :
204
- # Non-port-related error, re-raise
205
- raise
206
-
207
- # Start the IOLoop in a separate thread
208
- def start_metrics_loop ():
154
+ # Create Tornado application with metrics handler
155
+ app = tornado .web .Application ([
156
+ (r"/metrics" , PrometheusMetricsHandler ),
157
+ ])
158
+
159
+ # Create HTTP server
160
+ self .server = tornado .httpserver .HTTPServer (app )
161
+
162
+ # Try to bind to the specified port
163
+ try :
164
+ self .server .bind (port )
165
+ self .port = port
166
+ except OSError :
167
+ # If port is in use, try alternative ports
168
+ for alt_port in range (port + 1 , port + 10 ):
169
+ try :
170
+ self .server .bind (alt_port )
171
+ self .port = alt_port
172
+ break
173
+ except OSError :
174
+ continue
175
+ else :
176
+ raise RuntimeError (f"Could not bind to any port starting from { port } " )
177
+
178
+ # Start the server in a separate thread
179
+ self .thread = threading .Thread (target = self ._start_metrics_loop , daemon = True )
180
+ self .thread .start ()
181
+
182
+ # Wait for server to be ready
183
+ self ._wait_for_server_ready ()
184
+ self ._running = True
185
+
186
+ def _start_metrics_loop (self ) -> None :
187
+ """Start the IOLoop in a separate thread."""
188
+ try :
189
+ # Create a new IOLoop for this thread
209
190
self .ioloop = tornado .ioloop .IOLoop ()
210
- self .ioloop .make_current ()
211
-
191
+
192
+ # Set as current event loop for this thread
193
+ asyncio .set_event_loop (self .ioloop .asyncio_loop )
194
+
195
+ # Start the server
196
+ self .server .start (1 ) # Single process
197
+
212
198
# Set up periodic updates in this IOLoop
213
199
def periodic_update_wrapper ():
214
200
if hasattr (self , "_periodic_update" ):
215
201
self ._periodic_update ()
216
202
# Schedule next update in 30 seconds
217
203
self .ioloop .call_later (30 , periodic_update_wrapper )
218
-
204
+
219
205
# Start periodic updates
220
206
self .ioloop .call_later (30 , periodic_update_wrapper )
221
-
207
+
208
+ # Start the IOLoop
222
209
self .ioloop .start ()
223
-
224
- self .thread = threading .Thread (target = start_metrics_loop , daemon = True )
225
- self .thread .start ()
226
-
227
- self .server_app .log .info (
228
- f"Metrics server started on port { self .port } { auth_info } (using Jupyter Prometheus integration)"
229
- )
210
+ except Exception as e :
211
+ # Log error but don't raise to avoid unhandled thread exceptions
212
+ print (f"Metrics server error: { e } " )
213
+
214
+ def _wait_for_server_ready (self , timeout : float = 5.0 ) -> None :
215
+ """Wait for the server to be ready to accept connections."""
216
+ start_time = time .time ()
217
+ while time .time () - start_time < timeout :
218
+ try :
219
+ with socket .socket (socket .AF_INET , socket .SOCK_STREAM ) as s :
220
+ s .settimeout (0.1 )
221
+ s .connect (('localhost' , self .port ))
222
+ return
223
+ except (socket .error , OSError ):
224
+ time .sleep (0.1 )
225
+ raise TimeoutError (f"Server not ready after { timeout } seconds" )
230
226
231
227
def stop (self ) -> None :
232
228
"""Stop the metrics server."""
233
- if self .http_server :
234
- self .http_server .stop ()
235
- self .http_server = None
229
+ if not self ._running :
230
+ return
236
231
237
- if hasattr (self , "ioloop" ) and self .ioloop :
238
- # Stop the IOLoop
232
+ self ._running = False
233
+
234
+ # Stop the server
235
+ if self .server :
236
+ self .server .stop ()
237
+
238
+ # Stop the IOLoop
239
+ if self .ioloop :
239
240
try :
240
241
self .ioloop .add_callback (self .ioloop .stop )
241
- except RuntimeError :
242
- # IOLoop might already be stopped
242
+ except Exception :
243
243
pass
244
- self .ioloop = None
245
244
245
+ # Wait for thread to finish
246
246
if self .thread and self .thread .is_alive ():
247
- # Wait for thread to finish (with timeout)
248
- self .thread .join (timeout = 1.0 )
247
+ self .thread .join (timeout = 2.0 )
249
248
250
- self .server_app .log .info (
251
- f"Metrics server stopped on port { getattr (self , 'port' , 'unknown' )} "
252
- )
249
+ # Clean up
250
+ self .server = None
251
+ self .ioloop = None
252
+ self .thread = None
253
+ self .port = None
253
254
254
255
255
- def start_metrics_server (server_app , port : int ) -> PrometheusMetricsServer :
256
- """Start a Prometheus metrics server for the given Jupyter server.
257
-
258
- Parameters
259
- ----------
260
- server_app : ServerApp
261
- The main Jupyter server application instance
262
- port : int
263
- The port to listen on for metrics requests
264
-
265
- Returns
266
- -------
267
- PrometheusMetricsServer
268
- The metrics server instance
269
- """
270
- metrics_server = PrometheusMetricsServer (server_app )
271
- metrics_server .start (port )
272
- return metrics_server
256
+ def start_metrics_server (app , port : int = 9090 ) -> Optional [PrometheusMetricsServer ]:
257
+ """Start a metrics server for the given app."""
258
+ try :
259
+ server = PrometheusMetricsServer (app )
260
+ server .start (port )
261
+ return server
262
+ except Exception as e :
263
+ print (f"Failed to start metrics server: { e } " )
264
+ return None
0 commit comments