Skip to content

Commit a555862

Browse files
longshuicylmarini
andauthored
769 leveraging the heartbeat info to show extractor status (#822)
* add version chips * update heart beat for keep the alive time * update heartbeat * update endpoints for livelihood * add test * black reformat * codegen * optional set to none * frontend logic wired in * I can't do math... * try utcnow * the logic of disable is correct now * add version chip to pop up as well * disable the description * take out the extractors at root * reformat * update the symbol for extractor livelihood * capitalize O * update the indicator * set time interval to pull etractor status every minute * reformat heartbeat listener lint * Updated comments. --------- Co-authored-by: Luigi Marini <[email protected]>
1 parent 9c64e8b commit a555862

File tree

15 files changed

+304
-98
lines changed

15 files changed

+304
-98
lines changed

backend/app/config.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,5 +79,8 @@ class Settings(BaseSettings):
7979
RABBITMQ_HOST: str = "127.0.0.1"
8080
HEARTBEAT_EXCHANGE: str = "extractors"
8181

82+
# defautl listener heartbeat time interval in seconds 5 minutes
83+
listener_heartbeat_interval = 5 * 60
84+
8285

8386
settings = Settings()

backend/app/models/listeners.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ class EventListenerDB(Document, EventListenerBase):
6969
creator: Optional[UserOut] = None
7070
created: datetime = Field(default_factory=datetime.now)
7171
modified: datetime = Field(default_factory=datetime.now)
72+
lastAlive: datetime = None
73+
alive: Optional[bool] = None # made up field to indicate if extractor is alive
7274
properties: Optional[ExtractorInfo] = None
7375

7476
class Settings:

backend/app/routers/listeners.py

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,23 @@ async def _process_incoming_v1_extractor_info(
7474
return new_feed
7575

7676

77+
async def _check_livelihood(
78+
listener: EventListenerDB, heartbeat_interval=settings.listener_heartbeat_interval
79+
):
80+
if heartbeat_interval == 0:
81+
heartbeat_interval = settings.listener_heartbeat_interval
82+
83+
if listener.lastAlive is None:
84+
return False
85+
86+
now = datetime.datetime.utcnow()
87+
elapsed = now - listener.lastAlive
88+
if elapsed.total_seconds() > heartbeat_interval:
89+
return False
90+
else:
91+
return True
92+
93+
7794
@router.get("/instance")
7895
async def get_instance_id(
7996
user=Depends(get_current_user),
@@ -150,7 +167,11 @@ async def save_legacy_listener(
150167

151168
@router.get("/search", response_model=List[EventListenerOut])
152169
async def search_listeners(
153-
text: str = "", skip: int = 0, limit: int = 2, user=Depends(get_current_username)
170+
text: str = "",
171+
skip: int = 0,
172+
limit: int = 2,
173+
heartbeat_interval: Optional[int] = settings.listener_heartbeat_interval,
174+
user=Depends(get_current_username),
154175
):
155176
"""Search all Event Listeners in the db based on text.
156177
@@ -171,7 +192,14 @@ async def search_listeners(
171192
.limit(limit)
172193
.to_list()
173194
)
174-
return [listener.dict() for listener in listeners]
195+
196+
# batch return listener statuses for easy consumption
197+
listenerResponse = []
198+
for listener in listeners:
199+
listener.alive = await _check_livelihood(listener, heartbeat_interval)
200+
listenerResponse.append(listener.dict())
201+
202+
return listenerResponse
175203

176204

177205
@router.get("/categories", response_model=List[str])
@@ -196,11 +224,26 @@ async def get_listener(listener_id: str, user=Depends(get_current_username)):
196224
raise HTTPException(status_code=404, detail=f"listener {listener_id} not found")
197225

198226

227+
@router.get("/{listener_id}/status", response_model=bool)
228+
async def check_listener_livelihood(
229+
listener_id: str,
230+
heartbeat_interval: Optional[int] = settings.listener_heartbeat_interval,
231+
user=Depends(get_current_username),
232+
):
233+
"""Return JSON information about an Event Listener if it exists."""
234+
if (
235+
listener := await EventListenerDB.get(PydanticObjectId(listener_id))
236+
) is not None:
237+
return await _check_livelihood(listener, heartbeat_interval)
238+
raise HTTPException(status_code=404, detail=f"listener {listener_id} not found")
239+
240+
199241
@router.get("", response_model=List[EventListenerOut])
200242
async def get_listeners(
201243
user_id=Depends(get_current_username),
202244
skip: int = 0,
203245
limit: int = 2,
246+
heartbeat_interval: Optional[int] = settings.listener_heartbeat_interval,
204247
category: Optional[str] = None,
205248
label: Optional[str] = None,
206249
):
@@ -219,7 +262,14 @@ async def get_listeners(
219262
query.append(EventListenerDB.properties.default_labels == label)
220263

221264
listeners = await EventListenerDB.find(*query, skip=skip, limit=limit).to_list()
222-
return [listener.dict() for listener in listeners]
265+
266+
# batch return listener statuses for easy consumption
267+
listenerResponse = []
268+
for listener in listeners:
269+
listener.alive = await _check_livelihood(listener, heartbeat_interval)
270+
listenerResponse.append(listener.dict())
271+
272+
return listenerResponse
223273

224274

225275
@router.put("/{listener_id}", response_model=EventListenerOut)

backend/app/tests/test_extractors.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import os
21
from fastapi.testclient import TestClient
2+
33
from app.config import settings
44
from app.tests.utils import create_dataset, upload_file, register_v1_extractor
55

@@ -18,6 +18,15 @@ def test_get_one(client: TestClient, headers: dict):
1818
assert response.json().get("id") is not None
1919

2020

21+
def test_get_status(client: TestClient, headers: dict):
22+
extractor_id = register_v1_extractor(client, headers).get("id")
23+
response = client.get(
24+
f"{settings.API_V2_STR}/listeners/{extractor_id}/status", headers=headers
25+
)
26+
assert response.status_code == 200
27+
assert response.json() is False
28+
29+
2130
def test_delete(client: TestClient, headers: dict):
2231
ext_name = "test.test_delete"
2332
extractor_id = register_v1_extractor(client, headers, ext_name).get("id")

backend/heartbeat_listener.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
import json
33
import logging
44
import os
5+
from datetime import datetime
6+
57
from aio_pika import connect_robust
68
from aio_pika.abc import AbstractIncomingMessage
79
from packaging import version
@@ -30,27 +32,35 @@ async def callback(message: AbstractIncomingMessage):
3032
**extractor_info, properties=ExtractorInfo(**extractor_info)
3133
)
3234

33-
# check to see if extractor alredy exists and update if so
35+
# check to see if extractor already exists and update if so
3436
existing_extractor = await EventListenerDB.find_one(
3537
EventListenerDB.name == msg["queue"]
3638
)
3739
if existing_extractor is not None:
38-
# Update existing listener
40+
extractor_db.id = existing_extractor.id
41+
extractor_db.created = existing_extractor.created
42+
43+
# Update existing listener version
3944
existing_version = existing_extractor.version
4045
new_version = extractor_db.version
4146
if version.parse(new_version) > version.parse(existing_version):
42-
# if this is a new version, add it to the database
43-
extractor_db.id = existing_extractor.id
44-
extractor_db.created = existing_extractor.created
45-
new_extractor = await extractor_db.replace()
46-
extractor_out = EventListenerOut(**new_extractor.dict())
4747
logger.info(
4848
"%s updated from %s to %s"
4949
% (extractor_name, existing_version, new_version)
5050
)
51-
return extractor_out
51+
52+
extractor_db.lastAlive = datetime.utcnow()
53+
logger.info("%s is alive at %s" % (extractor_name, str(datetime.utcnow())))
54+
# Update existing listeners alive status
55+
new_extractor = await extractor_db.replace()
56+
extractor_out = EventListenerOut(**new_extractor.dict())
57+
58+
return extractor_out
59+
5260
else:
5361
# Register new listener
62+
extractor_db.lastAlive = datetime.utcnow()
63+
logger.info("%s is alive at %s" % (extractor_name, str(datetime.utcnow())))
5464
new_extractor = await extractor_db.insert()
5565
extractor_out = EventListenerOut(**new_extractor.dict())
5666
logger.info("New extractor registered: " + extractor_name)

frontend/src/actions/listeners.js

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ export const RECEIVE_LISTENERS = "RECEIVE_LISTENERS";
66
export function fetchListeners(
77
skip = 0,
88
limit = 21,
9+
heartbeatInterval = 0,
910
category = null,
1011
label = null
1112
) {
@@ -14,6 +15,7 @@ export function fetchListeners(
1415
return V2.ListenersService.getListenersApiV2ListenersGet(
1516
skip,
1617
limit,
18+
heartbeatInterval,
1719
category,
1820
label
1921
)
@@ -26,21 +28,30 @@ export function fetchListeners(
2628
})
2729
.catch((reason) => {
2830
dispatch(
29-
handleErrors(reason, fetchListeners(skip, limit, category, label))
31+
handleErrors(
32+
reason,
33+
fetchListeners(skip, limit, heartbeatInterval, category, label)
34+
)
3035
);
3136
});
3237
};
3338
}
3439

3540
export const SEARCH_LISTENERS = "SEARCH_LISTENERS";
3641

37-
export function queryListeners(text, skip = 0, limit = 21) {
42+
export function queryListeners(
43+
text,
44+
skip = 0,
45+
limit = 21,
46+
heartbeatInterval = 0
47+
) {
3848
return (dispatch) => {
3949
// TODO: Parameters for dates? paging?
4050
return V2.ListenersService.searchListenersApiV2ListenersSearchGet(
4151
text,
4252
skip,
43-
limit
53+
limit,
54+
heartbeatInterval
4455
)
4556
.then((json) => {
4657
dispatch({
@@ -50,7 +61,12 @@ export function queryListeners(text, skip = 0, limit = 21) {
5061
});
5162
})
5263
.catch((reason) => {
53-
dispatch(handleErrors(reason, queryListeners(text, skip, limit)));
64+
dispatch(
65+
handleErrors(
66+
reason,
67+
queryListeners(text, skip, limit, heartbeatInterval)
68+
)
69+
);
5470
});
5571
};
5672
}

frontend/src/app.config.ts

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ interface Config {
1212
KeycloakRegister: string;
1313
searchEndpoint: string;
1414
refreshTokenInterval: number;
15-
extractorInterval: number;
15+
extractorStatusInterval: number;
16+
extractorLivelihoodInterval: number;
1617
eventListenerJobStatus: EventListenerJobStatus;
1718
streamingBytes: number;
1819
rawDataVisualizationThreshold: number;
@@ -36,18 +37,21 @@ config["GHIssueBaseURL"] =
3637

3738
// Backend Keycloak login url
3839
config["KeycloakBaseURL"] =
39-
process.env.KeycloakBaseURL || config.hostname + "/api/v2/auth";
40-
config["KeycloakLogin"] = config.KeycloakBaseURL + "/login";
41-
config["KeycloakLogout"] = config.KeycloakBaseURL + "/logout";
42-
config["KeycloakRefresh"] = config.KeycloakBaseURL + "/refresh_token";
43-
config["KeycloakRegister"] = config.KeycloakBaseURL + "/register";
40+
process.env.KeycloakBaseURL || `${config.hostname}/api/v2/auth`;
41+
config["KeycloakLogin"] = `${config.KeycloakBaseURL}/login`;
42+
config["KeycloakLogout"] = `${config.KeycloakBaseURL}/logout`;
43+
config["KeycloakRefresh"] = `${config.KeycloakBaseURL}/refresh_token`;
44+
config["KeycloakRegister"] = `${config.KeycloakBaseURL}/register`;
4445

4546
// elasticsearch
4647
config["searchEndpoint"] = `${hostname}/api/v2/elasticsearch`;
4748

48-
// refresh toekn time interval
49-
config["refreshTokenInterval"] = 1000 * 60; // milliseconds
50-
config["extractorInterval"] = 2000; // milliseconds
49+
// refresh token time interval
50+
config["refreshTokenInterval"] = 1000 * 60; // 1 minute
51+
// updated extractor logs
52+
config["extractorStatusInterval"] = 1000 * 10; // 10 seconds
53+
// update extractor stutus (offline/online)
54+
config["extractorLivelihoodInterval"] = 1000 * 120; // 2 minutes
5155

5256
config["eventListenerJobStatus"] = <EventListenerJobStatus>{};
5357
config["eventListenerJobStatus"]["created"] = "CREATED";

frontend/src/components/Explore.tsx

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,7 @@ import { a11yProps, TabPanel } from "./tabs/TabComponent";
99
import DatasetCard from "./datasets/DatasetCard";
1010
import { ArrowBack, ArrowForward } from "@material-ui/icons";
1111
import Layout from "./Layout";
12-
import { Link as RouterLink, useLocation } from "react-router-dom";
13-
import { Listeners } from "./listeners/Listeners";
12+
import { Link as RouterLink } from "react-router-dom";
1413
import { ErrorModal } from "./errors/ErrorModal";
1514

1615
const tab = {
@@ -96,12 +95,11 @@ export const Explore = (): JSX.Element => {
9695
aria-label="dashboard tabs"
9796
>
9897
<Tab sx={tab} label="Datasets" {...a11yProps(0)} />
99-
<Tab sx={tab} label="Extractors" {...a11yProps(1)} />
10098
</Tabs>
10199
</Box>
102100
<TabPanel value={selectedTabIndex} index={0}>
103101
<Grid container spacing={2}>
104-
{datasets !== undefined ? (
102+
{datasets !== undefined ? (
105103
datasets.map((dataset) => {
106104
return (
107105
<Grid item key={dataset.id} xs={12} sm={6} md={4} lg={3}>
@@ -122,12 +120,17 @@ export const Explore = (): JSX.Element => {
122120
{datasets.length === 0 ? (
123121
<Grid container justifyContent="center">
124122
<Box textAlign="center">
125-
<p>Nobody has created any datasets on this instance. Click below to create a dataset!</p>
126-
<Button component={RouterLink} to="/create-dataset"
123+
<p>
124+
Nobody has created any datasets on this instance. Click
125+
below to create a dataset!
126+
</p>
127+
<Button
128+
component={RouterLink}
129+
to="/create-dataset"
127130
variant="contained"
128131
sx={{ m: 2 }}
129132
>
130-
Create Dataset
133+
Create Dataset
131134
</Button>
132135
</Box>
133136
</Grid>
@@ -157,13 +160,10 @@ export const Explore = (): JSX.Element => {
157160
</Button>
158161
</ButtonGroup>
159162
</Box>
160-
): (
163+
) : (
161164
<></>
162165
)}
163166
</TabPanel>
164-
<TabPanel value={selectedTabIndex} index={1}>
165-
<Listeners />
166-
</TabPanel>
167167
<TabPanel value={selectedTabIndex} index={4} />
168168
<TabPanel value={selectedTabIndex} index={2} />
169169
<TabPanel value={selectedTabIndex} index={3} />

frontend/src/components/listeners/ExtractionHistory.tsx

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,19 @@ export const ExtractionHistory = (): JSX.Element => {
7777
const listListeners = (
7878
skip: number | undefined,
7979
limit: number | undefined,
80+
heartbeatInterval: number | undefined,
8081
selectedCategory: string | null,
8182
selectedLabel: string | null
82-
) => dispatch(fetchListeners(skip, limit, selectedCategory, selectedLabel));
83+
) =>
84+
dispatch(
85+
fetchListeners(
86+
skip,
87+
limit,
88+
heartbeatInterval,
89+
selectedCategory,
90+
selectedLabel
91+
)
92+
);
8393
const listListenerJobs = (
8494
listenerId: string | null,
8595
status: string | null,
@@ -118,7 +128,7 @@ export const ExtractionHistory = (): JSX.Element => {
118128
const [selectedCreatedTime, setSelectedCreatedTime] = useState(null);
119129

120130
useEffect(() => {
121-
listListeners(skip, limit, null, null);
131+
listListeners(skip, limit, 0, null, null);
122132
listListenerJobs(null, null, null, null, null, null, 0, 100);
123133
}, []);
124134

@@ -180,7 +190,7 @@ export const ExtractionHistory = (): JSX.Element => {
180190

181191
useEffect(() => {
182192
if (skip !== null && skip !== undefined) {
183-
listListeners(skip, limit, null, null);
193+
listListeners(skip, limit, 0, null, null);
184194
if (skip === 0) setPrevDisabled(true);
185195
else setPrevDisabled(false);
186196
}

0 commit comments

Comments
 (0)