3434 from ..common .protos import ydb_topic_pb2 , ydb_issue_message_pb2
3535
3636from ... import issues , connection
37+ from ...settings import BaseRequestSettings
38+
39+
40+ DEFAULT_LONG_TIMEOUT = 31536000 # year
3741
3842
3943class IFromProto (abc .ABC ):
@@ -161,6 +165,13 @@ def __init__(self, convert_server_grpc_to_wrapper):
161165 self ._stream_call = None
162166 self ._wait_executor = None
163167
168+ self ._stream_settings : BaseRequestSettings = (
169+ BaseRequestSettings ()
170+ .with_operation_timeout (DEFAULT_LONG_TIMEOUT )
171+ .with_cancel_after (DEFAULT_LONG_TIMEOUT )
172+ .with_timeout (DEFAULT_LONG_TIMEOUT )
173+ )
174+
164175 def __del__ (self ):
165176 self ._clean_executor (wait = False )
166177
@@ -188,6 +199,7 @@ async def _start_asyncio_driver(self, driver: DriverIO, stub, method):
188199 requests_iterator ,
189200 stub ,
190201 method ,
202+ settings = self ._stream_settings ,
191203 )
192204 self ._stream_call = stream_call
193205 self .from_server_grpc = stream_call .__aiter__ ()
@@ -196,14 +208,27 @@ async def _start_sync_driver(self, driver: Driver, stub, method):
196208 requests_iterator = AsyncQueueToSyncIteratorAsyncIO (self .from_client_grpc )
197209 self ._wait_executor = concurrent .futures .ThreadPoolExecutor (max_workers = 1 )
198210
199- stream_call = await to_thread (driver , requests_iterator , stub , method , executor = self ._wait_executor )
211+ stream_call = await to_thread (
212+ driver ,
213+ requests_iterator ,
214+ stub ,
215+ method ,
216+ executor = self ._wait_executor ,
217+ settings = self ._stream_settings ,
218+ )
200219 self ._stream_call = stream_call
201220 self .from_server_grpc = SyncToAsyncIterator (stream_call .__iter__ (), self ._wait_executor )
202221
203- async def receive (self ) -> Any :
222+ async def receive (self , timeout : Optional [ int ] = None ) -> Any :
204223 # todo handle grpc exceptions and convert it to internal exceptions
205224 try :
206- grpc_message = await self .from_server_grpc .__anext__ ()
225+ if timeout is None :
226+ grpc_message = await self .from_server_grpc .__anext__ ()
227+ else :
228+ async def get_response ():
229+ return await self .from_server_grpc .__anext__ ()
230+ grpc_message = await asyncio .wait_for (get_response (), timeout )
231+
207232 except (grpc .RpcError , grpc .aio .AioRpcError ) as e :
208233 raise connection ._rpc_error_handler (self ._connection_state , e )
209234
0 commit comments