|
1 | 1 | """ |
2 | 2 | Launch an image with a temporary user via JupyterHub |
3 | 3 | """ |
| 4 | +import asyncio |
4 | 5 | import base64 |
5 | 6 | import json |
6 | 7 | import random |
|
9 | 10 | from urllib.parse import urlparse, quote |
10 | 11 | import uuid |
11 | 12 | import os |
| 13 | +from datetime import timedelta |
12 | 14 |
|
13 | 15 | from tornado.log import app_log |
14 | 16 | from tornado import web, gen |
|
18 | 20 | from jupyterhub.traitlets import Callable |
19 | 21 | from jupyterhub.utils import maybe_future |
20 | 22 |
|
| 23 | +from .utils import url_path_join |
| 24 | + |
21 | 25 | # pattern for checking if it's an ssh repo and not a URL |
22 | 26 | # used only after verifying that `://` is not present |
23 | 27 | _ssh_repo_pat = re.compile(r'.*@.*\:') |
@@ -80,6 +84,13 @@ def _default_hub_url_local(self): |
80 | 84 | Receives 5 parameters: launcher, image, username, server_name, repo_url |
81 | 85 | """ |
82 | 86 | ) |
| 87 | + launch_timeout = Integer( |
| 88 | + 600, |
| 89 | + config=True, |
| 90 | + help=""" |
| 91 | + Wait this many seconds until server is ready, raise TimeoutError otherwise. |
| 92 | + """, |
| 93 | + ) |
83 | 94 |
|
84 | 95 | async def api_request(self, url, *args, **kwargs): |
85 | 96 | """Make an API request to JupyterHub""" |
@@ -148,7 +159,15 @@ def unique_name_from_repo(self, repo_url): |
148 | 159 | # add a random suffix to avoid collisions for users on the same image |
149 | 160 | return '{}-{}'.format(prefix, ''.join(random.choices(SUFFIX_CHARS, k=SUFFIX_LENGTH))) |
150 | 161 |
|
151 | | - async def launch(self, image, username, server_name='', repo_url='', extra_args=None): |
| 162 | + async def launch( |
| 163 | + self, |
| 164 | + image, |
| 165 | + username, |
| 166 | + server_name="", |
| 167 | + repo_url="", |
| 168 | + extra_args=None, |
| 169 | + event_callback=None, |
| 170 | + ): |
152 | 171 | """Launch a server for a given image |
153 | 172 |
|
154 | 173 | - creates a temporary user on the Hub if authentication is not enabled |
@@ -215,38 +234,100 @@ async def launch(self, image, username, server_name='', repo_url='', extra_args= |
215 | 234 |
|
216 | 235 | # start server |
217 | 236 | app_log.info(f"Starting server{_server_name} for user {username} with image {image}") |
| 237 | + ready_event_future = asyncio.Future() |
| 238 | + |
| 239 | + def _cancel_ready_event(f=None): |
| 240 | + if not ready_event_future.done(): |
| 241 | + if f and f.exception(): |
| 242 | + ready_event_future.set_exception(f.exception()) |
| 243 | + else: |
| 244 | + ready_event_future.cancel() |
218 | 245 | try: |
219 | 246 | resp = await self.api_request( |
220 | 247 | 'users/{}/servers/{}'.format(escaped_username, server_name), |
221 | 248 | method='POST', |
222 | 249 | body=json.dumps(data).encode('utf8'), |
223 | 250 | ) |
224 | | - if resp.code == 202: |
225 | | - # Server hasn't actually started yet |
226 | | - # We wait for it! |
227 | | - # NOTE: This ends up being about ten minutes |
228 | | - for i in range(64): |
229 | | - user_data = await self.get_user_data(escaped_username) |
230 | | - if user_data['servers'][server_name]['ready']: |
231 | | - break |
232 | | - if not user_data['servers'][server_name]['pending']: |
233 | | - raise web.HTTPError(500, "Image %s for user %s failed to launch" % (image, username)) |
234 | | - # FIXME: make this configurable |
235 | | - # FIXME: Measure how long it takes for servers to start |
236 | | - # and tune this appropriately |
237 | | - await gen.sleep(min(1.4 ** i, 10)) |
238 | | - else: |
239 | | - raise web.HTTPError(500, f"Image {image} for user {username} took too long to launch") |
| 251 | + # listen for pending spawn (launch) events until server is ready |
| 252 | + # do this even if previous request finished! |
| 253 | + buffer_list = [] |
| 254 | + |
| 255 | + async def handle_chunk(chunk): |
| 256 | + lines = b"".join(buffer_list + [chunk]).split(b"\n\n") |
| 257 | + # the last item in the list is usually an empty line ('') |
| 258 | + # but it can be the partial line after the last `\n\n`, |
| 259 | + # so put it back on the buffer to handle with the next chunk |
| 260 | + buffer_list[:] = [lines[-1]] |
| 261 | + for line in lines[:-1]: |
| 262 | + if line: |
| 263 | + line = line.decode("utf8", "replace") |
| 264 | + if line and line.startswith("data:"): |
| 265 | + event = json.loads(line.split(":", 1)[1]) |
| 266 | + if event_callback: |
| 267 | + await event_callback(event) |
| 268 | + |
| 269 | + # stream ends when server is ready or fails |
| 270 | + if event.get("ready", False): |
| 271 | + if not ready_event_future.done(): |
| 272 | + ready_event_future.set_result(event) |
| 273 | + elif event.get("failed", False): |
| 274 | + if not ready_event_future.done(): |
| 275 | + ready_event_future.set_exception( |
| 276 | + web.HTTPError( |
| 277 | + 500, event.get("message", "unknown error") |
| 278 | + ) |
| 279 | + ) |
| 280 | + |
| 281 | + url_parts = ["users", username] |
| 282 | + if server_name: |
| 283 | + url_parts.extend(["servers", server_name, "progress"]) |
| 284 | + else: |
| 285 | + url_parts.extend(["server/progress"]) |
| 286 | + progress_api_url = url_path_join(*url_parts) |
| 287 | + self.log.debug("Requesting progress for {username}: {progress_api_url}") |
| 288 | + resp_future = self.api_request( |
| 289 | + progress_api_url, |
| 290 | + streaming_callback=lambda chunk: asyncio.ensure_future( |
| 291 | + handle_chunk(chunk) |
| 292 | + ), |
| 293 | + request_timeout=self.launch_timeout, |
| 294 | + ) |
| 295 | + try: |
| 296 | + await gen.with_timeout( |
| 297 | + timedelta(seconds=self.launch_timeout), resp_future |
| 298 | + ) |
| 299 | + except (gen.TimeoutError, TimeoutError): |
| 300 | + _cancel_ready_event() |
| 301 | + raise web.HTTPError( |
| 302 | + 500, |
| 303 | + f"Image {image} for user {username} took too long to launch", |
| 304 | + ) |
240 | 305 |
|
241 | 306 | except HTTPError as e: |
| 307 | + _cancel_ready_event() |
242 | 308 | if e.response: |
243 | 309 | body = e.response.body |
244 | 310 | else: |
245 | 311 | body = '' |
246 | 312 |
|
247 | | - app_log.error("Error starting server{} for user {}: {}\n{}". |
248 | | - format(_server_name, username, e, body)) |
| 313 | + app_log.error( |
| 314 | + f"Error starting server{_server_name} for user {username}: {e}\n{body}" |
| 315 | + ) |
249 | 316 | raise web.HTTPError(500, f"Failed to launch image {image}") |
| 317 | + except Exception: |
| 318 | + _cancel_ready_event() |
| 319 | + raise |
| 320 | + |
| 321 | + # verify that the server is running! |
| 322 | + try: |
| 323 | + # this should already be done, but it's async so wait a finite time |
| 324 | + ready_event = await gen.with_timeout( |
| 325 | + timedelta(seconds=5), ready_event_future |
| 326 | + ) |
| 327 | + except (gen.TimeoutError, TimeoutError): |
| 328 | + raise web.HTTPError( |
| 329 | + 500, f"Image {image} for user {username} failed to launch" |
| 330 | + ) |
250 | 331 |
|
251 | | - data['url'] = self.hub_url + 'user/%s/%s' % (escaped_username, server_name) |
| 332 | + data["url"] = url_path_join(self.hub_url, ready_event["url"]) |
252 | 333 | return data |
0 commit comments