Skip to content

Commit e4b2562

Browse files
authored
Extractors -> listeners, trigger listeners on saved feeds automatically (#114)
* Rename extractors to listeners * Additional model work * More implementation * auto-generation of feeds * listener assignment * standardizing search object and search criteria * add delete endpoints * name cleanup * refactor how RMQ message built * standardize some of the submission methods * resolve merges, cleanup ds metadata * Update metadata_datasets.py * Listener -> Event Listener, add docstrings * fix remaining imports * docstrings * get listener name & version as default * remove version * Update feeds.py
1 parent 8866c50 commit e4b2562

File tree

17 files changed

+850
-235
lines changed

17 files changed

+850
-235
lines changed

backend/app/main.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@
2525
collections,
2626
authentication,
2727
keycloak,
28-
extractors,
2928
elasticsearch,
29+
listeners,
30+
feeds,
3031
)
3132

3233
# setup loggers
@@ -114,7 +115,13 @@
114115
dependencies=[Depends(get_current_username)],
115116
)
116117
api_router.include_router(
117-
extractors.router,
118+
listeners.router,
119+
prefix="/listeners",
120+
tags=["listeners"],
121+
dependencies=[Depends(get_current_username)],
122+
)
123+
api_router.include_router(
124+
listeners.legacy_router,
118125
prefix="/extractors",
119126
tags=["extractors"],
120127
dependencies=[Depends(get_current_username)],
@@ -125,6 +132,12 @@
125132
tags=["elasticsearch"],
126133
dependencies=[Depends(get_current_username)],
127134
)
135+
api_router.include_router(
136+
feeds.router,
137+
prefix="/feeds",
138+
tags=["feeds"],
139+
dependencies=[Depends(get_current_username)],
140+
)
128141
api_router.include_router(keycloak.router, prefix="/auth", tags=["auth"])
129142
app.include_router(api_router, prefix=settings.API_V2_STR)
130143

backend/app/models/extractors.py

Lines changed: 0 additions & 43 deletions
This file was deleted.

backend/app/models/feeds.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
from datetime import datetime
2+
from pydantic import Field, BaseModel
3+
from typing import Optional, List, Union
4+
from app.models.mongomodel import MongoModel
5+
from app.models.users import UserOut
6+
from app.models.search import SearchObject
7+
from app.models.listeners import EventListenerOut, FeedListener
8+
9+
10+
class JobFeed(BaseModel):
11+
"""A Job Feed is a saved set of search criteria with some number of Event Listeners that can be triggered when new
12+
resources match the saved search criteria for the Feed."""
13+
14+
name: str
15+
search: SearchObject
16+
listeners: List[FeedListener] = []
17+
18+
19+
class FeedBase(JobFeed):
20+
description: str = ""
21+
22+
23+
class FeedIn(JobFeed):
24+
pass
25+
26+
27+
class FeedDB(JobFeed, MongoModel):
28+
author: UserOut
29+
updated: datetime = Field(default_factory=datetime.utcnow)
30+
31+
32+
class FeedOut(FeedDB):
33+
pass

backend/app/models/listeners.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
from datetime import datetime
2+
from pydantic import Field, BaseModel
3+
from typing import Optional, List, Union
4+
from app.models.pyobjectid import PyObjectId
5+
from app.models.mongomodel import MongoModel
6+
from app.models.users import UserOut
7+
8+
9+
class Repository(MongoModel):
10+
"""Reference to a repository associated with Event Listener/Extractor."""
11+
12+
repository_type: str = "git"
13+
repository_url: str = ""
14+
15+
16+
class ExtractorInfo(BaseModel):
17+
"""Currently for extractor_info JSON from Clowder v1 extractors for use with to /api/extractors endpoint."""
18+
19+
author: str # Referring to author of listener script (e.g. name or email), not Clowder user
20+
process: dict
21+
maturity: str = "Development"
22+
contributors: List[str] = []
23+
contexts: List[dict] = []
24+
repository: List[Repository] = []
25+
external_services: List[str] = []
26+
libraries: List[str] = []
27+
bibtex: List[str] = []
28+
default_labels: List[str] = []
29+
categories: List[str] = []
30+
parameters: List[dict] = []
31+
32+
33+
class EventListenerBase(BaseModel):
34+
"""An Event Listener is the expanded version of v1 Extractors."""
35+
36+
name: str
37+
version: int = 1
38+
description: str = ""
39+
40+
41+
class EventListenerIn(EventListenerBase):
42+
"""On submission, minimum info for a listener is name, version and description. Clowder will use name and version to locate queue."""
43+
44+
pass
45+
46+
47+
class LegacyEventListenerIn(ExtractorInfo):
48+
"""v1 Extractors can submit data formatted as a LegacyEventListener (i.e. v1 format) and it will be converted to a v2 EventListener."""
49+
50+
name: str
51+
version: str = "1.0"
52+
description: str = ""
53+
54+
55+
class EventListenerDB(EventListenerBase, MongoModel):
56+
"""EventListeners have a name, version, author, description, and optionally properties where extractor_info will be saved."""
57+
58+
author: UserOut
59+
created: datetime = Field(default_factory=datetime.utcnow)
60+
modified: datetime = Field(default_factory=datetime.utcnow)
61+
properties: Optional[ExtractorInfo] = None
62+
63+
64+
class EventListenerOut(EventListenerDB):
65+
pass
66+
67+
68+
class FeedListener(BaseModel):
69+
"""This is a shorthand POST class for associating an existing EventListener with a Feed. The automatic flag determines
70+
whether the Feed will automatically send new matches to the Event Listener."""
71+
72+
listener_id: PyObjectId
73+
automatic: bool # Listeners can trigger automatically or not on a per-feed basis.
74+
75+
76+
class EventListenerMessage(BaseModel):
77+
"""This describes contents of JSON object that is submitted to RabbitMQ for the Event Listeners/Extractors to consume."""
78+
79+
host: str = "http://127.0.0.1:8000"
80+
secretKey: str = "secretKey"
81+
retry_count: int = 0
82+
resource_type: str = "file"
83+
flags: str = ""
84+
filename: str
85+
fileSize: int
86+
id: str
87+
datasetId: str
88+
token: str

backend/app/models/metadata.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,11 @@
1313
from app.models.mongomodel import MongoModel
1414
from app.models.pyobjectid import PyObjectId
1515
from app.models.users import UserOut
16-
from app.models.extractors import ExtractorIn, ExtractorOut, ExtractorIdentifier
16+
from app.models.listeners import (
17+
EventListenerIn,
18+
LegacyEventListenerIn,
19+
EventListenerOut,
20+
)
1721

1822

1923
class MongoDBRef(BaseModel):
@@ -57,7 +61,7 @@ class MetadataField(MongoModel):
5761

5862
class MetadataDefinitionBase(MongoModel):
5963
"""This describes a metadata object with a short name and description, predefined set of fields, and context.
60-
These provide a shorthand for use by extractors as well as a source for building GUI widgets to add new entries.
64+
These provide a shorthand for use by listeners as well as a source for building GUI widgets to add new entries.
6165
6266
Example: {
6367
"name" : "LatLon",
@@ -164,11 +168,11 @@ def validate_definition(contents: dict, metadata_def: MetadataDefinitionOut):
164168

165169

166170
class MetadataAgent(MongoModel):
167-
"""Describes the user who created a piece of metadata. If extractor is provided, user refers to the user who
168-
triggered the extraction."""
171+
"""Describes the user who created a piece of metadata. If listener is provided, user refers to the user who
172+
triggered the job."""
169173

170174
creator: UserOut
171-
extractor: Optional[ExtractorOut]
175+
listener: Optional[EventListenerOut]
172176

173177

174178
class MetadataBase(MongoModel):
@@ -199,7 +203,8 @@ def definition_is_valid(cls, v):
199203

200204
class MetadataIn(MetadataBase):
201205
file_version: Optional[int]
202-
extractor_info: Optional[ExtractorIn]
206+
listener: Optional[EventListenerIn]
207+
extractor: Optional[LegacyEventListenerIn]
203208

204209

205210
class MetadataPatch(MetadataIn):
@@ -225,7 +230,7 @@ class MetadataPatch(MetadataIn):
225230
class MetadataDelete(MongoModel):
226231
metadata_id: Optional[str] # specific metadata ID we are deleting
227232
definition: Optional[str]
228-
extractor_info: Optional[ExtractorIdentifier]
233+
listener: Optional[EventListenerIn]
229234

230235

231236
class MetadataDB(MetadataBase):

backend/app/models/search.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
from datetime import datetime
2+
from pydantic import BaseModel
3+
from typing import Optional, List
4+
5+
6+
# TODO: may eventually be split by index (resource type)
7+
class SearchIndexContents(BaseModel):
8+
"""This describes what is indexed in Elasticsearch for a given resource."""
9+
10+
id: str
11+
name: str
12+
creator: str # currently just email
13+
created: datetime
14+
download: int
15+
16+
17+
class SearchCriteria(BaseModel):
18+
field: str
19+
operator: str = "=="
20+
value: str
21+
22+
23+
class SearchObject(BaseModel):
24+
"""This is a way to save a search (i.e. as a Feed).
25+
26+
Parameters:
27+
index_name -- which ES index to search
28+
criteria -- some number of field/operator/value tuples describing the search requirements
29+
mode -- and/or determines whether all of the criteria must match, or any of them
30+
original -- if the user originally performed a string search, their original text entry is preserved here
31+
"""
32+
33+
index_name: str
34+
criteria: List[SearchCriteria] = []
35+
mode: str = "and" # and / or
36+
original: Optional[str] # original un-parsed search string

backend/app/rabbitmq/heartbeat_listener_sync.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,9 @@
33
from packaging import version
44
from app.config import settings
55
from pymongo import MongoClient
6-
from app.models.extractors import (
7-
ExtractorBase,
8-
ExtractorIn,
9-
ExtractorDB,
10-
ExtractorOut,
6+
from app.models.listeners import (
7+
EventListenerDB,
8+
EventListenerOut,
119
)
1210

1311

@@ -18,7 +16,7 @@ def callback(ch, method, properties, body):
1816
extractor_queue = statusBody["queue"]
1917
extractor_info = statusBody["extractor_info"]
2018
extractor_name = extractor_info["name"]
21-
extractor_db = ExtractorDB(**extractor_info)
19+
extractor_db = EventListenerDB(**extractor_info)
2220
client = MongoClient(settings.MONGODB_URL)
2321
db = client["clowder2"]
2422
existing_extractor = db["extractors"].find_one({"name": extractor_queue})
@@ -29,7 +27,7 @@ def callback(ch, method, properties, body):
2927
new_extractor = db["extractors"].insert_one(extractor_db.to_mongo())
3028
found = db["extractors"].find_one({"_id": new_extractor.inserted_id})
3129
removed = db["extractors"].delete_one({"_id": existing_extractor["_id"]})
32-
extractor_out = ExtractorOut.from_mongo(found)
30+
extractor_out = EventListenerOut.from_mongo(found)
3331
print(
3432
"extractor updated: "
3533
+ extractor_name
@@ -42,7 +40,7 @@ def callback(ch, method, properties, body):
4240
else:
4341
new_extractor = db["extractors"].insert_one(extractor_db.to_mongo())
4442
found = db["extractors"].find_one({"_id": new_extractor.inserted_id})
45-
extractor_out = ExtractorOut.from_mongo(found)
43+
extractor_out = EventListenerOut.from_mongo(found)
4644
print("new extractor registered: " + extractor_name)
4745
return extractor_out
4846

backend/app/rabbitmq/listeners.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import json
2+
import pika
3+
from fastapi import Request, HTTPException, Depends
4+
from pymongo import MongoClient
5+
from bson import ObjectId
6+
from pika.adapters.blocking_connection import BlockingChannel
7+
8+
from app.keycloak_auth import get_token
9+
from app import dependencies
10+
from app.models.files import FileOut
11+
from app.models.listeners import EventListenerMessage
12+
13+
14+
def submit_file_message(
15+
file_out: FileOut,
16+
queue: str,
17+
routing_key: str,
18+
parameters: dict,
19+
token: str = Depends(get_token),
20+
db: MongoClient = Depends(dependencies.get_db),
21+
rabbitmq_client: BlockingChannel = Depends(dependencies.get_rabbitmq),
22+
):
23+
# TODO check if extractor is registered
24+
msg_body = EventListenerMessage(
25+
filename=file_out.name,
26+
fileSize=file_out.bytes,
27+
id=file_out.id,
28+
datasetId=file_out.dataset_id,
29+
secretKey=token,
30+
)
31+
32+
rabbitmq_client.queue_bind(
33+
exchange="extractors",
34+
queue=queue,
35+
routing_key=routing_key,
36+
)
37+
rabbitmq_client.basic_publish(
38+
exchange="extractors",
39+
routing_key=routing_key,
40+
body=json.dumps(msg_body.dict(), ensure_ascii=False),
41+
properties=pika.BasicProperties(
42+
content_type="application/json", delivery_mode=1
43+
),
44+
)
45+
return {"message": "testing", "file_id": file_out.id}

backend/app/routers/datasets.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,6 @@ async def save_dataset(
197197
raise HTTPException(status_code=503, detail="Service not available")
198198
return
199199

200-
result = dataset_in.dict()
201200
dataset_db = DatasetDB(**dataset_in.dict(), author=user)
202201
new_dataset = await db["datasets"].insert_one(dataset_db.to_mongo())
203202
found = await db["datasets"].find_one({"_id": new_dataset.inserted_id})

0 commit comments

Comments
 (0)