1515from trio ._util import final
1616
1717
18+ class SuppressDecorator (contextlib .ContextDecorator , contextlib .suppress ):
19+ pass
20+
21+
1822@final
1923class TrioInteractiveConsole (InteractiveConsole ):
2024 # code.InteractiveInterpreter defines locals as Mapping[str, Any]
2125 # but when we pass this to FunctionType it expects a dict. So
2226 # we make the type more specific on our subclass
2327 locals : dict [str , object ]
2428
25- def __init__ (self , repl_locals : dict [str , object ] | None = None ) -> None :
29+ def __init__ (
30+ self , repl_locals : dict [str , object ], trio_token : trio .lowlevel .TrioToken
31+ ) -> None :
2632 super ().__init__ (locals = repl_locals )
33+ self .trio_token = trio_token
2734 self .compile .compiler .flags |= ast .PyCF_ALLOW_TOP_LEVEL_AWAIT
2835
2936 def runcode (self , code : types .CodeType ) -> None :
@@ -70,17 +77,24 @@ def raw_input(self, prompt: str = "") -> str:
7077 else :
7178
7279 def raw_input (self , prompt : str = "" ) -> str :
73- import fcntl
74- import termios
7580 from signal import SIGINT , signal
7681
7782 interrupted = False
7883
84+ @SuppressDecorator (KeyboardInterrupt )
85+ @trio .lowlevel .disable_ki_protection
86+ def newline ():
87+ import fcntl
88+ import termios
89+
90+ # Fake up a newline char as if user had typed it at
91+ # os.write(sys.stdin.buffer.fileno(), b"\n")
92+ fcntl .ioctl (sys .stdin , termios .TIOCSTI , b"\n " )
93+
7994 def handler (sig : int , frame : types .FrameType | None ) -> None :
8095 nonlocal interrupted
8196 interrupted = True
82- # Fake up a newline char as if user had typed it at terminal
83- fcntl .ioctl (sys .stdin , termios .TIOCSTI , b"\n " )
97+ self .trio_token .run_sync_soon (newline , idempotent = True )
8498
8599 prev_handler = trio .from_thread .run_sync (signal , SIGINT , handler )
86100 try :
@@ -91,14 +105,16 @@ def handler(sig: int, frame: types.FrameType | None) -> None:
91105 raise KeyboardInterrupt
92106
93107
94- async def run_repl (console : TrioInteractiveConsole ) -> None :
108+ async def run_repl (repl_locals : dict [ str , object ] ) -> None :
95109 banner = (
96110 f"trio REPL { sys .version } on { sys .platform } \n "
97111 f'Use "await" directly instead of "trio.run()".\n '
98112 f'Type "help", "copyright", "credits" or "license" '
99113 f"for more information.\n "
100114 f'{ getattr (sys , "ps1" , ">>> " )} import trio'
101115 )
116+ token = trio .lowlevel .current_trio_token ()
117+ console = TrioInteractiveConsole (repl_locals , token )
102118 try :
103119 await trio .to_thread .run_sync (console .interact , banner )
104120 finally :
@@ -124,5 +140,4 @@ def main(original_locals: dict[str, object]) -> None:
124140 }:
125141 repl_locals [key ] = original_locals [key ]
126142
127- console = TrioInteractiveConsole (repl_locals )
128- trio .run (run_repl , console )
143+ trio .run (run_repl , repl_locals )
0 commit comments