Skip to content

Commit 875dfe8

Browse files
authored
Enable/disable listeners (#1022)
* draft * removing overhead of calling fetchListeners * fixing alive status on toggling active flag on backend * Adding active check on listener endpoints and adding test cases * fixing codegen * added Listener Authorization as depends and added it as check for all routes * addressed feedbacks * initial implementation of Feeds page * Revert "initial implementation of Feeds page" This reverts commit 275ccc7. * Used toggle switch button
1 parent c113f9b commit 875dfe8

File tree

16 files changed

+771
-29
lines changed

16 files changed

+771
-29
lines changed

backend/app/deps/authorization_deps.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from app.models.datasets import DatasetDB, DatasetStatus
44
from app.models.files import FileDB, FileStatus
55
from app.models.groups import GroupDB
6+
from app.models.listeners import EventListenerDB
67
from app.models.metadata import MetadataDB
78
from app.routers.authentication import get_admin, get_admin_mode
89
from beanie import PydanticObjectId
@@ -389,6 +390,41 @@ async def __call__(
389390
raise HTTPException(status_code=404, detail=f"Group {group_id} not found")
390391

391392

393+
class ListenerAuthorization:
394+
"""We use class dependency so that we can provide the `permission` parameter to the dependency.
395+
For more info see https://fastapi.tiangolo.com/advanced/advanced-dependencies/.
396+
Regular users are not allowed to run non-active listeners"""
397+
398+
# def __init__(self, optional_arg: str = None):
399+
# self.optional_arg = optional_arg
400+
401+
async def __call__(
402+
self,
403+
listener_id: str,
404+
current_user: str = Depends(get_current_username),
405+
admin_mode: bool = Depends(get_admin_mode),
406+
admin: bool = Depends(get_admin),
407+
):
408+
# If the current user is admin and has turned on admin_mode, user has access irrespective of any role assigned
409+
if admin and admin_mode:
410+
return True
411+
412+
# Else check if listener is active or current user is the creator of the extractor
413+
if (
414+
listener := await EventListenerDB.get(PydanticObjectId(listener_id))
415+
) is not None:
416+
if listener.active is True or (
417+
listener.creator and listener.creator.email == current_user
418+
):
419+
return True
420+
else:
421+
raise HTTPException(
422+
status_code=403,
423+
detail=f"User `{current_user} does not have permission on listener `{listener_id}`",
424+
)
425+
raise HTTPException(status_code=404, detail=f"Listener {listener_id} not found")
426+
427+
392428
class CheckStatus:
393429
"""We use class dependency so that we can provide the `permission` parameter to the dependency.
394430
For more info see https://fastapi.tiangolo.com/advanced/advanced-dependencies/."""

backend/app/models/listeners.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ class EventListenerDB(Document, EventListenerBase):
6969
modified: datetime = Field(default_factory=datetime.now)
7070
lastAlive: datetime = None
7171
alive: Optional[bool] = None # made up field to indicate if extractor is alive
72+
active: bool = False
7273
properties: Optional[ExtractorInfo] = None
7374

7475
class Settings:

backend/app/routers/feeds.py

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
from typing import List, Optional
22

3+
from app.deps.authorization_deps import ListenerAuthorization
34
from app.keycloak_auth import get_current_user, get_current_username
45
from app.models.feeds import FeedDB, FeedIn, FeedOut
56
from app.models.files import FileOut
67
from app.models.listeners import EventListenerDB, FeedListener
78
from app.models.users import UserOut
89
from app.rabbitmq.listeners import submit_file_job
10+
from app.routers.authentication import get_admin, get_admin_mode
911
from app.search.connect import check_search_result
1012
from beanie import PydanticObjectId
1113
from fastapi import APIRouter, Depends, HTTPException
@@ -23,7 +25,7 @@ async def disassociate_listener_db(feed_id: str, listener_id: str):
2325
if (feed := await FeedDB.get(PydanticObjectId(feed_id))) is not None:
2426
new_listeners = []
2527
for feed_listener in feed.listeners:
26-
if feed_listener.listener_id != listener_id:
28+
if feed_listener.listener_id != PydanticObjectId(listener_id):
2729
new_listeners.append(feed_listener)
2830
feed.listeners = new_listeners
2931
await feed.save()
@@ -122,6 +124,8 @@ async def associate_listener(
122124
feed_id: str,
123125
listener: FeedListener,
124126
user=Depends(get_current_user),
127+
admin=Depends(get_admin),
128+
admin_mode=Depends(get_admin_mode),
125129
):
126130
"""Associate an existing Event Listener with a Feed, e.g. so it will be triggered on new Feed results.
127131
@@ -131,22 +135,35 @@ async def associate_listener(
131135
"""
132136
if (feed := await FeedDB.get(PydanticObjectId(feed_id))) is not None:
133137
if (
134-
await EventListenerDB.get(PydanticObjectId(listener.listener_id))
138+
listener_db := await EventListenerDB.get(
139+
PydanticObjectId(listener.listener_id)
140+
)
135141
) is not None:
136-
feed.listeners.append(listener)
137-
await feed.save()
138-
return feed.dict()
142+
if (
143+
(admin and admin_mode)
144+
or (listener_db.creator and listener_db.creator.email == user.email)
145+
or listener_db.active
146+
):
147+
feed.listeners.append(listener)
148+
await feed.save()
149+
return feed.dict()
150+
else:
151+
raise HTTPException(
152+
status_code=403,
153+
detail=f"User {user} doesn't have permission to submit job to listener {listener.listener_id}",
154+
)
139155
raise HTTPException(
140156
status_code=404, detail=f"listener {listener.listener_id} not found"
141157
)
142158
raise HTTPException(status_code=404, detail=f"feed {feed_id} not found")
143159

144160

145-
@router.delete("/{feed_id}/listeners/{listener_id}", response_model=FeedOut)
161+
@router.delete("/{feed_id}/listeners/{listener_id}")
146162
async def disassociate_listener(
147163
feed_id: str,
148164
listener_id: str,
149165
user=Depends(get_current_user),
166+
allow: bool = Depends(ListenerAuthorization()),
150167
):
151168
"""Disassociate an Event Listener from a Feed.
152169

backend/app/routers/listeners.py

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from typing import List, Optional
66

77
from app.config import settings
8+
from app.deps.authorization_deps import ListenerAuthorization
89
from app.keycloak_auth import get_current_user, get_current_username, get_user
910
from app.models.config import ConfigEntryDB
1011
from app.models.feeds import FeedDB, FeedListener
@@ -18,6 +19,7 @@
1819
from app.models.pages import Paged, _construct_page_metadata, _get_page_query
1920
from app.models.search import SearchCriteria
2021
from app.models.users import UserOut
22+
from app.routers.authentication import get_admin, get_admin_mode
2123
from app.routers.feeds import disassociate_listener_db
2224
from beanie import PydanticObjectId
2325
from beanie.operators import Or, RegEx
@@ -198,6 +200,8 @@ async def search_listeners(
198200
heartbeat_interval: Optional[int] = settings.listener_heartbeat_interval,
199201
user=Depends(get_current_username),
200202
process: Optional[str] = None,
203+
admin=Depends(get_admin),
204+
admin_mode=Depends(get_admin_mode),
201205
):
202206
"""Search all Event Listeners in the db based on text.
203207
@@ -221,6 +225,8 @@ async def search_listeners(
221225
aggregation_pipeline.append(
222226
{"$match": {"properties.process.dataset": {"$exists": True}}}
223227
)
228+
if not admin or not admin_mode:
229+
aggregation_pipeline.append({"$match": {"active": True}})
224230
# Add pagination
225231
aggregation_pipeline.append(
226232
_get_page_query(skip, limit, sort_field="name", ascending=True)
@@ -260,7 +266,11 @@ async def list_default_labels(user=Depends(get_current_username)):
260266

261267

262268
@router.get("/{listener_id}", response_model=EventListenerOut)
263-
async def get_listener(listener_id: str, user=Depends(get_current_username)):
269+
async def get_listener(
270+
listener_id: str,
271+
user=Depends(get_current_username),
272+
allow: bool = Depends(ListenerAuthorization()),
273+
):
264274
"""Return JSON information about an Event Listener if it exists."""
265275
if (
266276
listener := await EventListenerDB.get(PydanticObjectId(listener_id))
@@ -274,6 +284,7 @@ async def check_listener_livelihood(
274284
listener_id: str,
275285
heartbeat_interval: Optional[int] = settings.listener_heartbeat_interval,
276286
user=Depends(get_current_username),
287+
allow: bool = Depends(ListenerAuthorization()),
277288
):
278289
"""Return JSON information about an Event Listener if it exists."""
279290
if (
@@ -293,6 +304,9 @@ async def get_listeners(
293304
label: Optional[str] = None,
294305
alive_only: Optional[bool] = False,
295306
process: Optional[str] = None,
307+
all: Optional[bool] = False,
308+
admin=Depends(get_admin),
309+
admin_mode=Depends(get_admin_mode),
296310
):
297311
"""Get a list of all Event Listeners in the db.
298312
@@ -303,6 +317,7 @@ async def get_listeners(
303317
category -- filter by category has to be exact match
304318
label -- filter by label has to be exact match
305319
alive_only -- filter by alive status
320+
all -- boolean stating if we want to show all listeners irrespective of admin and admin_mode
306321
"""
307322
# First compute alive flag for all listeners
308323
aggregation_pipeline = [
@@ -325,6 +340,9 @@ async def get_listeners(
325340
aggregation_pipeline.append(
326341
{"$match": {"properties.process.dataset": {"$exists": True}}}
327342
)
343+
# Non admin users can access only active listeners unless all is turned on for Extractor page
344+
if not all and (not admin or not admin_mode):
345+
aggregation_pipeline.append({"$match": {"active": True}})
328346
# Add pagination
329347
aggregation_pipeline.append(
330348
_get_page_query(skip, limit, sort_field="name", ascending=True)
@@ -351,6 +369,7 @@ async def edit_listener(
351369
listener_id: str,
352370
listener_in: EventListenerIn,
353371
user_id=Depends(get_user),
372+
allow: bool = Depends(ListenerAuthorization()),
354373
):
355374
"""Update the information about an existing Event Listener..
356375
@@ -374,10 +393,62 @@ async def edit_listener(
374393
raise HTTPException(status_code=404, detail=f"listener {listener_id} not found")
375394

376395

396+
@router.put("/{listener_id}/enable", response_model=EventListenerOut)
397+
async def enable_listener(
398+
listener_id: str,
399+
user_id=Depends(get_user),
400+
allow: bool = Depends(ListenerAuthorization()),
401+
):
402+
"""Enable an Event Listener. Only admins can enable listeners.
403+
404+
Arguments:
405+
listener_id -- UUID of the listener to be enabled
406+
"""
407+
return await _set_active_flag(listener_id, True)
408+
409+
410+
@router.put("/{listener_id}/disable", response_model=EventListenerOut)
411+
async def disable_listener(
412+
listener_id: str,
413+
user_id=Depends(get_user),
414+
allow: bool = Depends(ListenerAuthorization()),
415+
):
416+
"""Disable an Event Listener. Only admins can enable listeners.
417+
418+
Arguments:
419+
listener_id -- UUID of the listener to be enabled
420+
"""
421+
return await _set_active_flag(listener_id, False)
422+
423+
424+
async def _set_active_flag(
425+
listener_id: str,
426+
active: bool,
427+
allow: bool = Depends(ListenerAuthorization()),
428+
):
429+
"""Set the active flag of an Event Listener. Only admins can enable/disable listeners.
430+
431+
Arguments:
432+
listener_id -- UUID of the listener to be enabled/disabled
433+
"""
434+
listener = await EventListenerDB.find_one(
435+
EventListenerDB.id == ObjectId(listener_id)
436+
)
437+
if listener:
438+
try:
439+
listener.active = active
440+
await listener.save()
441+
return listener.dict()
442+
except Exception as e:
443+
raise HTTPException(status_code=500, detail=e.args[0])
444+
raise HTTPException(status_code=404, detail=f"listener {listener_id} not found")
445+
446+
377447
@router.delete("/{listener_id}")
378448
async def delete_listener(
379449
listener_id: str,
380450
user=Depends(get_current_username),
451+
allow: bool = Depends(ListenerAuthorization()),
381452
):
382453
"""Remove an Event Listener from the database. Will not clear event history for the listener."""
383454
listener = await EventListenerDB.find_one(

backend/app/tests/test_extractors.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,3 +60,23 @@ def test_v1_mime_trigger(client: TestClient, headers: dict):
6060
)
6161
assert response.status_code == 200
6262
assert len(response.json()) > 0
63+
64+
65+
def test_enable_disable_extractor(client: TestClient, headers: dict):
66+
# create a new extractor
67+
ext_name = "test.v1_extractor"
68+
extractor_id = register_v1_extractor(client, headers, ext_name).get("id")
69+
70+
# enable the extractor
71+
response = client.put(
72+
f"{settings.API_V2_STR}/listeners/{extractor_id}/enable", headers=headers
73+
)
74+
assert response.status_code == 200
75+
assert response.json()["active"] is True
76+
77+
# disable the extractor
78+
response = client.put(
79+
f"{settings.API_V2_STR}/listeners/{extractor_id}/disable", headers=headers
80+
)
81+
assert response.status_code == 200
82+
assert response.json()["active"] is False

frontend/src/actions/listeners.js

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ export function fetchListeners(
1010
category = null,
1111
label = null,
1212
aliveOnly = false,
13-
process = null
13+
process = null,
14+
all = false
1415
) {
1516
return (dispatch) => {
1617
// TODO: Parameters for dates? paging?
@@ -21,7 +22,8 @@ export function fetchListeners(
2122
category,
2223
label,
2324
aliveOnly,
24-
process
25+
process,
26+
all
2527
)
2628
.then((json) => {
2729
dispatch({
@@ -49,6 +51,45 @@ export function fetchListeners(
4951
};
5052
}
5153

54+
export const TOGGLE_ACTIVE_FLAG_LISTENER = "TOGGLE_ACTIVE_FLAG_LISTENER";
55+
export function enableListener(id) {
56+
return (dispatch) => {
57+
return V2.ListenersService.enableListenerApiV2ListenersListenerIdEnablePut(
58+
id
59+
)
60+
.then((json) => {
61+
// We could have called fetchListeners but it would be an overhead since we are just toggling the active flag for one listener.
62+
// Hence we create a separate action to update the particular listener in state
63+
dispatch({
64+
type: TOGGLE_ACTIVE_FLAG_LISTENER,
65+
listener: json,
66+
});
67+
})
68+
.catch((reason) => {
69+
dispatch(handleErrors(reason, enableListener(id)));
70+
});
71+
};
72+
}
73+
74+
export function disableListener(id) {
75+
return (dispatch) => {
76+
return V2.ListenersService.disableListenerApiV2ListenersListenerIdDisablePut(
77+
id
78+
)
79+
.then((json) => {
80+
// We could have called fetchListeners but it would be an overhead since we are just toggling the active flag for one listener.
81+
// Hence we create a separate action to update the particular listener in state
82+
dispatch({
83+
type: TOGGLE_ACTIVE_FLAG_LISTENER,
84+
listener: json,
85+
});
86+
})
87+
.catch((reason) => {
88+
dispatch(handleErrors(reason, disableListener(id)));
89+
});
90+
};
91+
}
92+
5293
export const SEARCH_LISTENERS = "SEARCH_LISTENERS";
5394

5495
export function queryListeners(

0 commit comments

Comments
 (0)