99from urllib .parse import quote , unquote
1010
1111from aiohttp import ClientTimeout , web
12- from common_library .json_serialization import json_dumps
1312from models_library .api_schemas_long_running_tasks .tasks import (
1413 TaskGet ,
1514)
1615from models_library .api_schemas_rpc_async_jobs .async_jobs import (
1716 AsyncJobGet ,
18- AsyncJobId ,
1917)
2018from models_library .api_schemas_storage .storage_schemas import (
2119 FileUploadCompleteResponse ,
4543from servicelib .aiohttp .client_session import get_client_session
4644from servicelib .aiohttp .requests_validation import (
4745 parse_request_body_as ,
48- parse_request_headers_as ,
4946 parse_request_path_parameters_as ,
5047 parse_request_query_parameters_as ,
5148)
5249from servicelib .aiohttp .rest_responses import (
5350 create_data_response ,
54- create_event_stream_response ,
5551)
56- from servicelib .celery .models import TaskFilter
5752from servicelib .common_headers import X_FORWARDED_PROTO
5853from servicelib .rabbitmq .rpc_interfaces .storage .paths import (
5954 compute_path_size as remote_compute_path_size ,
6762)
6863from servicelib .request_keys import RQT_USERID_KEY
6964from servicelib .rest_responses import unwrap_envelope
70- from servicelib .sse .models import SSEEvent , SSEHeaders
7165from yarl import URL
7266
7367from .._meta import API_VTAG
74- from ..celery import get_task_manager
7568from ..login .decorators import login_required
7669from ..models import AuthenticatedRequestContext
7770from ..rabbitmq import get_rabbitmq_rpc_client
@@ -559,10 +552,11 @@ class _PathParams(BaseModel):
559552
560553 rabbitmq_rpc_client = get_rabbitmq_rpc_client (request .app )
561554 _req_ctx = AuthenticatedRequestContext .model_validate (request )
562- path_params = parse_request_path_parameters_as (_PathParams , request )
555+ parse_request_path_parameters_as (_PathParams , request )
563556 search_body = await parse_request_body_as (
564557 model_schema_cls = SearchBodyParams , request = request
565558 )
559+
566560 async_job_rpc_get , _ = await start_search (
567561 rabbitmq_rpc_client ,
568562 job_filter = get_job_filter (
@@ -580,47 +574,7 @@ class _PathParams(BaseModel):
580574 task_id = _job_id ,
581575 status_href = f"{ request .url .with_path (str (request .app .router ['get_async_job_status' ].url_for (task_id = _job_id )))} " ,
582576 abort_href = f"{ request .url .with_path (str (request .app .router ['cancel_async_job' ].url_for (task_id = _job_id )))} " ,
583- result_stream_href = f"{ request .url .with_path (str (request .app .router ['stream_search ' ].url_for (location_id = str ( path_params . location_id ), job_id = _job_id )))} " ,
577+ result_stream_href = f"{ request .url .with_path (str (request .app .router ['get_async_job_stream ' ].url_for (task_id = _job_id )))} " ,
584578 ),
585579 status = status .HTTP_202_ACCEPTED ,
586580 )
587-
588-
589- @routes .get (
590- _storage_locations_prefix + "/{location_id}/search/{job_id}/stream" ,
591- name = "stream_search" ,
592- )
593- @login_required
594- @permission_required ("storage.files.*" )
595- @handle_exceptions
596- async def stream_search (request : web .Request ) -> web .Response :
597- class _PathParams (BaseModel ):
598- location_id : Annotated [LocationID , AfterValidator (_allow_only_simcore )]
599- job_id : AsyncJobId
600-
601- _req_ctx = AuthenticatedRequestContext .model_validate (request )
602- path_params = parse_request_path_parameters_as (_PathParams , request )
603- header_params = parse_request_headers_as (SSEHeaders , request )
604-
605- task_manager = get_task_manager (request .app )
606- task_filter = get_job_filter (
607- user_id = _req_ctx .user_id ,
608- product_name = _req_ctx .product_name ,
609- )
610-
611- async def event_generator ():
612- async for event_id , event in task_manager .consume_task_events (
613- task_filter = TaskFilter .model_validate (task_filter .model_dump ()),
614- task_uuid = path_params .job_id ,
615- last_id = header_params .last_event_id ,
616- ):
617- yield SSEEvent (
618- id = event_id , event = event .type , data = [json_dumps (event .data )]
619- ).serialize ()
620- if event .type == "status" and getattr (event , "data" , None ) in (
621- "done" ,
622- "error" ,
623- ):
624- break
625-
626- return create_event_stream_response (event_generator = event_generator )
0 commit comments