Skip to content

Commit 6db18ee

Browse files
committed
Add function_id / jobs endpoint
1 parent dc371c7 commit 6db18ee

File tree

8 files changed

+137
-1
lines changed

8 files changed

+137
-1
lines changed

packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/webserver/functions/functions_rpc_interface.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,13 +124,15 @@ async def list_function_jobs(
124124
*,
125125
pagination_limit: int,
126126
pagination_offset: int,
127+
filter_by_function_id: FunctionID | None = None,
127128
) -> tuple[list[RegisteredFunctionJob], PageMetaInfoLimitOffset]:
128129
result: tuple[list[RegisteredFunctionJob], PageMetaInfoLimitOffset] = (
129130
await rabbitmq_rpc_client.request(
130131
WEBSERVER_RPC_NAMESPACE,
131132
TypeAdapter(RPCMethodName).validate_python("list_function_jobs"),
132133
pagination_offset=pagination_offset,
133134
pagination_limit=pagination_limit,
135+
filter_by_function_id=filter_by_function_id,
134136
)
135137
)
136138
return TypeAdapter(

services/api-server/src/simcore_service_api_server/api/routes/functions_routes.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,29 @@ async def list_functions(
9999
)
100100

101101

102+
@function_router.get(
103+
"/{function_id:uuid}/jobs",
104+
response_model=Page[RegisteredFunctionJob],
105+
description="List function jobs for a function",
106+
)
107+
async def list_function_jobs(
108+
function_id: FunctionID,
109+
wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)],
110+
page_params: Annotated[PaginationParams, Depends()],
111+
):
112+
function_jobs_list, meta = await wb_api_rpc.list_function_jobs(
113+
pagination_offset=page_params.offset,
114+
pagination_limit=page_params.limit,
115+
filter_by_function_id=function_id,
116+
)
117+
118+
return create_page(
119+
function_jobs_list,
120+
total=meta.total,
121+
params=page_params,
122+
)
123+
124+
102125
@function_router.patch(
103126
"/{function_id:uuid}/title",
104127
response_model=RegisteredFunction,

services/api-server/src/simcore_service_api_server/services_rpc/wb_api_server.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,11 +310,13 @@ async def list_function_jobs(
310310
*,
311311
pagination_offset: PageOffsetInt = 0,
312312
pagination_limit: PageLimitInt = DEFAULT_NUMBER_OF_ITEMS_PER_PAGE,
313+
filter_by_function_id: FunctionID | None = None,
313314
) -> tuple[list[RegisteredFunctionJob], PageMetaInfoLimitOffset]:
314315
return await functions_rpc_interface.list_function_jobs(
315316
self._client,
316317
pagination_offset=pagination_offset,
317318
pagination_limit=pagination_limit,
319+
filter_by_function_id=filter_by_function_id,
318320
)
319321

320322
async def list_function_job_collections(

services/api-server/tests/unit/api_functions/test_api_routers_functions.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,35 @@ async def test_list_function_jobs(
307307
)
308308

309309

310+
async def test_list_function_jobs_with_function_filter(
311+
client: AsyncClient,
312+
mock_handler_in_functions_rpc_interface: Callable[[str, Any], None],
313+
sample_registered_function_job: RegisteredProjectFunctionJob,
314+
sample_registered_function: RegisteredProjectFunction,
315+
) -> None:
316+
317+
mock_handler_in_functions_rpc_interface(
318+
"list_function_jobs",
319+
(
320+
[sample_registered_function_job for _ in range(5)],
321+
PageMetaInfoLimitOffset(total=5, count=5, limit=10, offset=0),
322+
),
323+
)
324+
325+
# Now, list function jobs with a filter
326+
response = await client.get(
327+
f"{API_VTAG}/functions/{sample_registered_function.uid}/jobs"
328+
)
329+
330+
assert response.status_code == status.HTTP_200_OK
331+
data = response.json()["items"]
332+
assert len(data) == 5
333+
assert (
334+
RegisteredProjectFunctionJob.model_validate(data[0])
335+
== sample_registered_function_job
336+
)
337+
338+
310339
async def test_delete_function_job(
311340
client: AsyncClient,
312341
mock_handler_in_functions_rpc_interface: Callable[[str, Any], None],

services/web/server/src/simcore_service_webserver/functions/_controller/_functions_rpc.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,11 +100,13 @@ async def list_function_jobs(
100100
app: web.Application,
101101
pagination_limit: int,
102102
pagination_offset: int,
103+
filter_by_function_id: FunctionID | None = None,
103104
) -> tuple[list[RegisteredFunctionJob], PageMetaInfoLimitOffset]:
104105
return await _functions_service.list_function_jobs(
105106
app=app,
106107
pagination_limit=pagination_limit,
107108
pagination_offset=pagination_offset,
109+
filter_by_function_id=filter_by_function_id,
108110
)
109111

110112

services/web/server/src/simcore_service_webserver/functions/_functions_repository.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,16 +229,33 @@ async def list_function_jobs(
229229
*,
230230
pagination_limit: int,
231231
pagination_offset: int,
232+
filter_by_function_id: FunctionID | None = None,
232233
) -> tuple[list[RegisteredFunctionJobDB], PageMetaInfoLimitOffset]:
233234

234235
async with transaction_context(get_asyncpg_engine(app), connection) as conn:
235236
total_count_result = await conn.scalar(
236-
func.count().select().select_from(function_jobs_table)
237+
func.count()
238+
.select()
239+
.select_from(function_jobs_table)
240+
.where(
241+
(
242+
function_jobs_table.c.function_uuid == filter_by_function_id
243+
if filter_by_function_id
244+
else True
245+
),
246+
)
237247
)
238248
result = await conn.stream(
239249
function_jobs_table.select()
240250
.offset(pagination_offset)
241251
.limit(pagination_limit)
252+
.where(
253+
(
254+
function_jobs_table.c.function_uuid == filter_by_function_id
255+
if filter_by_function_id
256+
else True
257+
),
258+
)
242259
)
243260
rows = await result.all()
244261
if rows is None:

services/web/server/src/simcore_service_webserver/functions/_functions_service.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,11 +159,13 @@ async def list_function_jobs(
159159
app: web.Application,
160160
pagination_limit: int,
161161
pagination_offset: int,
162+
filter_by_function_id: FunctionID | None = None,
162163
) -> tuple[list[RegisteredFunctionJob], PageMetaInfoLimitOffset]:
163164
returned_function_jobs, page = await _functions_repository.list_function_jobs(
164165
app=app,
165166
pagination_limit=pagination_limit,
166167
pagination_offset=pagination_offset,
168+
filter_by_function_id=filter_by_function_id,
167169
)
168170
return [
169171
_decode_functionjob(returned_function_job)

services/web/server/tests/unit/with_dbs/04/functions_rpc/test_functions_controller_rpc.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,65 @@ async def test_list_function_jobs(
449449
assert any(j.uid == registered_job.uid for j in jobs)
450450

451451

452+
async def test_list_function_jobs_for_functionid(
453+
client: TestClient, rpc_client: RabbitMQRPCClient, mock_function: ProjectFunction
454+
):
455+
# Register the function first
456+
first_registered_function = await functions_rpc.register_function(
457+
rabbitmq_rpc_client=rpc_client, function=mock_function
458+
)
459+
second_registered_function = await functions_rpc.register_function(
460+
rabbitmq_rpc_client=rpc_client, function=mock_function
461+
)
462+
463+
first_registered_function_jobs = []
464+
second_registered_function_jobs = []
465+
for i_job in range(6):
466+
if i_job < 3:
467+
function_job = ProjectFunctionJob(
468+
function_uid=first_registered_function.uid,
469+
title="Test Function Job",
470+
description="A test function job",
471+
project_job_id=uuid4(),
472+
inputs={"input1": "value1"},
473+
outputs={"output1": "result1"},
474+
)
475+
# Register the function job
476+
first_registered_function_jobs.append(
477+
await functions_rpc.register_function_job(
478+
rabbitmq_rpc_client=rpc_client, function_job=function_job
479+
)
480+
)
481+
else:
482+
function_job = ProjectFunctionJob(
483+
function_uid=second_registered_function.uid,
484+
title="Test Function Job",
485+
description="A test function job",
486+
project_job_id=uuid4(),
487+
inputs={"input1": "value1"},
488+
outputs={"output1": "result1"},
489+
)
490+
# Register the function job
491+
second_registered_function_jobs.append(
492+
await functions_rpc.register_function_job(
493+
rabbitmq_rpc_client=rpc_client, function_job=function_job
494+
)
495+
)
496+
497+
# List function jobs for a specific function ID
498+
jobs, _ = await functions_rpc.list_function_jobs(
499+
rabbitmq_rpc_client=rpc_client,
500+
pagination_limit=10,
501+
pagination_offset=0,
502+
filter_by_function_id=first_registered_function.uid,
503+
)
504+
505+
# Assert the list contains the registered job
506+
assert len(jobs) > 0
507+
assert len(jobs) == 3
508+
assert all(j.function_uid == first_registered_function.uid for j in jobs)
509+
510+
452511
async def test_delete_function_job(
453512
client: TestClient, mock_function: ProjectFunction, rpc_client: RabbitMQRPCClient
454513
):

0 commit comments

Comments
 (0)