2222import errno
2323from getpass import getuser
2424import os
25+ import re
2526import signal
2627from subprocess import (
2728 DEVNULL ,
5354from cylc .flow .scripts .clean import CleanOptions , run
5455from cylc .flow .util import natural_sort_key
5556
57+ from cylc .uiserver .utils import cast_non_null
58+
5659if TYPE_CHECKING :
5760 from concurrent .futures import Executor
5861 from logging import Logger
@@ -357,9 +360,17 @@ async def play(
357360 return cls ._return ('Workflow(s) started' )
358361
359362 @staticmethod
360- async def enqueue (stream , queue ):
361- async for line in stream :
362- await queue .put (line .decode ())
363+ async def enqueue (
364+ stream : asyncio .StreamReader , queue : asyncio .Queue [str | Exception ]
365+ ):
366+ try :
367+ async for line in stream :
368+ await queue .put (line .decode ())
369+ except Exception as exc :
370+ if re .match (r'Separator .+ chunk .+ limit' , str (exc )):
371+ await queue .put (ValueError ('line too long' ))
372+ else :
373+ await queue .put (exc )
363374
364375 @classmethod
365376 async def cat_log (cls , id_ : Tokens , app : 'CylcUIServer' , info , file = None ):
@@ -382,15 +393,18 @@ async def cat_log(cls, id_: Tokens, app: 'CylcUIServer', info, file=None):
382393 # For info, below subprocess is safe (uses shell=false by default)
383394 proc = await asyncio .subprocess .create_subprocess_exec (
384395 * cmd ,
396+ limit = 200 * 1024 , # increase line limit to 200 kiB
385397 stdout = asyncio .subprocess .PIPE ,
386398 stderr = asyncio .subprocess .PIPE ,
387399 )
388400 buffer : List [str ] = []
389- queue : asyncio .Queue = asyncio .Queue ()
401+ queue : asyncio .Queue [ str | Exception ] = asyncio .Queue ()
390402 # Farm out reading from stdout stream to a background task
391403 # This is to get around problem where stream is not EOF until
392404 # subprocess ends
393- enqueue_task = asyncio .create_task (cls .enqueue (proc .stdout , queue ))
405+ enqueue_task = asyncio .create_task (
406+ cls .enqueue (cast_non_null (proc .stdout ), queue )
407+ )
394408
395409 # GraphQL operation ID
396410 op_id = info .root_value
@@ -445,18 +459,25 @@ async def cat_log(cls, id_: Tokens, app: 'CylcUIServer', info, file=None):
445459 }
446460 break
447461
448- elif line_count == 0 :
462+ line = await queue .get ()
463+
464+ if isinstance (line , Exception ):
465+ yield {'lines' : buffer }
466+ yield {'error' : f"Error reading file: { line } " }
467+ app .log .warning (line )
468+ break
469+
470+ if line_count == 0 :
449471 # this is the first line
450472 # (this is a special line contains the file path)
451473 line_count += 1
452474 yield {
453475 'connected' : True ,
454- 'path' : ( await queue . get ()) [2 :].strip (),
476+ 'path' : line [2 :].strip (),
455477 }
456478 continue
457479
458480 # read in the log lines and add them to the buffer
459- line = await queue .get ()
460481 line_count += 1
461482 buffer .append (line )
462483 if len (buffer ) >= 75 :
0 commit comments