diff --git a/cylc/uiserver/resolvers.py b/cylc/uiserver/resolvers.py index ca0c778d..2ce7f493 100644 --- a/cylc/uiserver/resolvers.py +++ b/cylc/uiserver/resolvers.py @@ -22,6 +22,7 @@ import errno from getpass import getuser import os +import re import signal from subprocess import ( DEVNULL, @@ -53,6 +54,8 @@ from cylc.flow.scripts.clean import CleanOptions, run from cylc.flow.util import natural_sort_key +from cylc.uiserver.utils import cast_non_null + if TYPE_CHECKING: from concurrent.futures import Executor from logging import Logger @@ -357,9 +360,17 @@ async def play( return cls._return('Workflow(s) started') @staticmethod - async def enqueue(stream, queue): - async for line in stream: - await queue.put(line.decode()) + async def enqueue( + stream: asyncio.StreamReader, queue: asyncio.Queue[str | Exception] + ): + try: + async for line in stream: + await queue.put(line.decode()) + except Exception as exc: + if re.match(r'Separator .+ chunk .+ limit', str(exc)): + await queue.put(ValueError('line too long')) + else: + await queue.put(exc) @classmethod async def cat_log(cls, id_: Tokens, app: 'CylcUIServer', info, file=None): @@ -382,15 +393,18 @@ async def cat_log(cls, id_: Tokens, app: 'CylcUIServer', info, file=None): # For info, below subprocess is safe (uses shell=false by default) proc = await asyncio.subprocess.create_subprocess_exec( *cmd, + limit=200 * 1024, # increase line limit to 200 kiB stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) buffer: List[str] = [] - queue: asyncio.Queue = asyncio.Queue() + queue: asyncio.Queue[str | Exception] = asyncio.Queue() # Farm out reading from stdout stream to a background task # This is to get around problem where stream is not EOF until # subprocess ends - enqueue_task = asyncio.create_task(cls.enqueue(proc.stdout, queue)) + enqueue_task = asyncio.create_task( + cls.enqueue(cast_non_null(proc.stdout), queue) + ) # GraphQL operation ID op_id = info.root_value @@ -445,18 +459,25 @@ async def cat_log(cls, id_: Tokens, app: 'CylcUIServer', info, file=None): } break - elif line_count == 0: + line = await queue.get() + + if isinstance(line, Exception): + yield {'lines': buffer} + yield {'error': f"Error reading file: {line}"} + app.log.warning(line) + break + + if line_count == 0: # this is the first line # (this is a special line contains the file path) line_count += 1 yield { 'connected': True, - 'path': (await queue.get())[2:].strip(), + 'path': line[2:].strip(), } continue # read in the log lines and add them to the buffer - line = await queue.get() line_count += 1 buffer.append(line) if len(buffer) >= 75: diff --git a/cylc/uiserver/utils.py b/cylc/uiserver/utils.py index 98f9cb66..5364aaae 100644 --- a/cylc/uiserver/utils.py +++ b/cylc/uiserver/utils.py @@ -14,16 +14,33 @@ # along with this program. If not, see . -from typing import TYPE_CHECKING +from typing import ( + TYPE_CHECKING, + TypeVar, + cast, +) from jupyter_server.auth.identity import PasswordIdentityProvider + if TYPE_CHECKING: - from cylc.uiserver.handlers import CylcAppHandler from jupyter_server.auth.identity import ( IdentityProvider as JPSIdentityProvider, ) + from cylc.uiserver.handlers import CylcAppHandler + + +_T = TypeVar('_T') + + +def cast_non_null(var: _T | None) -> _T: + """Type cast an "Optional" variable to its non-None type. + + Does not perform any runtime checks. + """ + return cast('_T', var) + def is_bearer_token_authenticated(handler: 'CylcAppHandler') -> bool: """Returns True if this request is bearer token authenticated.