2
2
import asyncio
3
3
import io
4
4
import sys
5
+ from concurrent .futures import ThreadPoolExecutor
5
6
from enum import Enum
6
7
from types import TracebackType
7
8
from typing import (
@@ -104,10 +105,13 @@ def __exit__(
104
105
def create_protocol (self ) -> TProtocol :
105
106
...
106
107
108
+ @_logger .call
107
109
def shutdown_protocol (self , protocol : TProtocol ) -> None :
108
110
if self .mode == JsonRpcServerMode .STDIO and self ._stdio_stop_event is not None :
109
111
self ._stdio_stop_event .set ()
110
112
113
+ stdio_executor : Optional [ThreadPoolExecutor ] = None
114
+
111
115
@_logger .call
112
116
def start_stdio (self ) -> None :
113
117
self .mode = JsonRpcServerMode .STDIO
@@ -121,12 +125,21 @@ def run_io_nonblocking() -> None:
121
125
122
126
async def aio_readline (rfile : BinaryIO , protocol : asyncio .Protocol ) -> None :
123
127
protocol .connection_made (transport )
124
-
125
- while self ._stdio_stop_event is not None and not self ._stdio_stop_event .is_set () and not rfile .closed :
126
- data = await self .loop .run_in_executor (None , cast (io .BufferedReader , rfile ).read1 , 1000 )
127
- protocol .data_received (data )
128
-
129
- self .loop .run_until_complete (aio_readline (sys .__stdin__ .buffer , protocol ))
128
+ stdio_executor = ThreadPoolExecutor (max_workers = 1 )
129
+ with stdio_executor :
130
+ while (
131
+ self ._stdio_stop_event is not None and not self ._stdio_stop_event .is_set () and not rfile .closed
132
+ ):
133
+ data = await self .loop .run_in_executor (
134
+ stdio_executor , cast (io .BufferedReader , rfile ).read1 , 1000
135
+ )
136
+ protocol .data_received (data )
137
+
138
+ self ._logger .debug ("starting run_io_nonblocking" )
139
+ try :
140
+ self .loop .run_until_complete (aio_readline (sys .__stdin__ .buffer , protocol ))
141
+ finally :
142
+ self ._logger .debug ("exiting run_io_nonblocking" )
130
143
131
144
self ._run_func = run_io_nonblocking
132
145
0 commit comments