Skip to content

Commit d935445

Browse files
tcnicholmax-zilla
andauthored
127 add parameters to extractor submit add submit dataset (#128)
* changing parameters to dict working on adding submit dataset to extractor * not sure if parameters are being sent properly * handling parameters dict not list dict * black formatting * new pipfile lock * delete test * does this fix build errors? * new pipfile lock * new Pipfile.lock * adding tests for extractors also adding a sample extractor_info.json file for registration purposes * does this fix file not found error? * trying adding it as dict * adding user - will this fix the tests? * black formatting * comments in methods * black formatting * new classes * matching listener feed branch * more fields necessary for listener to match extractors from v1 * import pymongo * updating and adding models from codegen beginning of adding listener/extractor to front end * services added by codegen * adding to Explore * reverting changes * reverting * merging and fixing to work with existing code * pipenv run black app formatting * use listeners not extractors, routers have changed names * str for author for extractor * removing author for extractor info does this fix registration? * remove author from the extractor info * using Creator instead of Author to avoid conflict with old extractor fields. Making Optional in case not supplied (registration through rabbitmq heartbeat monitor instead of directly by user) * Change extractors to listeners consistently * remove renamed models file Co-authored-by: Max Burnette <[email protected]>
1 parent e4b2562 commit d935445

File tree

11 files changed

+172
-35
lines changed

11 files changed

+172
-35
lines changed

backend/app/models/listeners.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class EventListenerBase(BaseModel):
3434
"""An Event Listener is the expanded version of v1 Extractors."""
3535

3636
name: str
37-
version: int = 1
37+
version: str = "1.0"
3838
description: str = ""
3939

4040

@@ -55,7 +55,7 @@ class LegacyEventListenerIn(ExtractorInfo):
5555
class EventListenerDB(EventListenerBase, MongoModel):
5656
"""EventListeners have a name, version, author, description, and optionally properties where extractor_info will be saved."""
5757

58-
author: UserOut
58+
creator: Optional[UserOut] = None
5959
created: datetime = Field(default_factory=datetime.utcnow)
6060
modified: datetime = Field(default_factory=datetime.utcnow)
6161
properties: Optional[ExtractorInfo] = None

backend/app/rabbitmq/heartbeat_listener_async.py

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,7 @@
55
from app.config import settings
66
from aio_pika.abc import AbstractIncomingMessage
77
from pymongo import MongoClient
8-
from app.models.extractors import (
9-
ExtractorBase,
10-
ExtractorIn,
11-
ExtractorDB,
12-
ExtractorOut,
13-
)
8+
from app.models.listeners import LegacyEventListenerIn, EventListenerOut
149

1510

1611
async def on_message(message: AbstractIncomingMessage) -> None:
@@ -21,20 +16,18 @@ async def on_message(message: AbstractIncomingMessage) -> None:
2116
extractor_queue = statusBody["queue"]
2217
extractor_info = statusBody["extractor_info"]
2318
extractor_name = extractor_info["name"]
24-
extractor_db = ExtractorDB(**extractor_info)
19+
extractor_db = LegacyEventListenerIn(**extractor_info)
2520
client = MongoClient(settings.MONGODB_URL)
2621
db = client["clowder2"]
27-
existing_extractor = db["extractors"].find_one({"name": extractor_queue})
22+
existing_extractor = db["listeners"].find_one({"name": extractor_queue})
2823
if existing_extractor is not None:
2924
existing_version = existing_extractor["version"]
3025
new_version = extractor_db.version
3126
if version.parse(new_version) > version.parse(existing_version):
32-
new_extractor = db["extractors"].insert_one(extractor_db.to_mongo())
33-
found = db["extractors"].find_one({"_id": new_extractor.inserted_id})
34-
removed = db["extractors"].delete_one(
35-
{"_id": existing_extractor["_id"]}
36-
)
37-
extractor_out = ExtractorOut.from_mongo(found)
27+
new_extractor = db["listeners"].insert_one(extractor_db.to_mongo())
28+
found = db["listeners"].find_one({"_id": new_extractor.inserted_id})
29+
removed = db["listeners"].delete_one({"_id": existing_extractor["_id"]})
30+
extractor_out = EventListenerOut.from_mongo(found)
3831
print(
3932
"extractor updated: "
4033
+ extractor_name
@@ -45,9 +38,9 @@ async def on_message(message: AbstractIncomingMessage) -> None:
4538
)
4639
return extractor_out
4740
else:
48-
new_extractor = db["extractors"].insert_one(extractor_db.to_mongo())
49-
found = db["extractors"].find_one({"_id": new_extractor.inserted_id})
50-
extractor_out = ExtractorOut.from_mongo(found)
41+
new_extractor = db["listeners"].insert_one(extractor_db.to_mongo())
42+
found = db["listeners"].find_one({"_id": new_extractor.inserted_id})
43+
extractor_out = EventListenerOut.from_mongo(found)
5144
print("new extractor registered: " + extractor_name)
5245
return extractor_out
5346

backend/app/rabbitmq/heartbeat_listener_sync.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,14 @@ def callback(ch, method, properties, body):
1919
extractor_db = EventListenerDB(**extractor_info)
2020
client = MongoClient(settings.MONGODB_URL)
2121
db = client["clowder2"]
22-
existing_extractor = db["extractors"].find_one({"name": extractor_queue})
22+
existing_extractor = db["listeners"].find_one({"name": extractor_queue})
2323
if existing_extractor is not None:
2424
existing_version = existing_extractor["version"]
2525
new_version = extractor_db.version
2626
if version.parse(new_version) > version.parse(existing_version):
27-
new_extractor = db["extractors"].insert_one(extractor_db.to_mongo())
28-
found = db["extractors"].find_one({"_id": new_extractor.inserted_id})
29-
removed = db["extractors"].delete_one({"_id": existing_extractor["_id"]})
27+
new_extractor = db["listeners"].insert_one(extractor_db.to_mongo())
28+
found = db["listeners"].find_one({"_id": new_extractor.inserted_id})
29+
removed = db["listeners"].delete_one({"_id": existing_extractor["_id"]})
3030
extractor_out = EventListenerOut.from_mongo(found)
3131
print(
3232
"extractor updated: "
@@ -38,8 +38,8 @@ def callback(ch, method, properties, body):
3838
)
3939
return extractor_out
4040
else:
41-
new_extractor = db["extractors"].insert_one(extractor_db.to_mongo())
42-
found = db["extractors"].find_one({"_id": new_extractor.inserted_id})
41+
new_extractor = db["listeners"].insert_one(extractor_db.to_mongo())
42+
found = db["listeners"].find_one({"_id": new_extractor.inserted_id})
4343
extractor_out = EventListenerOut.from_mongo(found)
4444
print("new extractor registered: " + extractor_name)
4545
return extractor_out

backend/app/rabbitmq/listeners.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from app.keycloak_auth import get_token
99
from app import dependencies
1010
from app.models.files import FileOut
11+
from app.models.datasets import DatasetOut
1112
from app.models.listeners import EventListenerMessage
1213

1314

@@ -43,3 +44,37 @@ def submit_file_message(
4344
),
4445
)
4546
return {"message": "testing", "file_id": file_out.id}
47+
48+
49+
def submit_dataset_message(
50+
dataset_out: DatasetOut,
51+
queue: str,
52+
routing_key: str,
53+
parameters: dict,
54+
token: str = Depends(get_token),
55+
db: MongoClient = Depends(dependencies.get_db),
56+
rabbitmq_client: BlockingChannel = Depends(dependencies.get_rabbitmq),
57+
):
58+
# TODO check if extractor is registered
59+
msg_body = EventListenerMessage(
60+
filename=dataset_out.name,
61+
fileSize=dataset_out.bytes,
62+
id=dataset_out.id,
63+
datasetId=dataset_out.dataset_id,
64+
secretKey=token,
65+
)
66+
67+
rabbitmq_client.queue_bind(
68+
exchange="extractors",
69+
queue=queue,
70+
routing_key=routing_key,
71+
)
72+
rabbitmq_client.basic_publish(
73+
exchange="extractors",
74+
routing_key=routing_key,
75+
body=json.dumps(msg_body.dict(), ensure_ascii=False),
76+
properties=pika.BasicProperties(
77+
content_type="application/json", delivery_mode=1
78+
),
79+
)
80+
return {"message": "testing", "dataset_id": dataset_out.id}

backend/app/routers/datasets.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,8 @@
77
import zipfile
88
from collections.abc import Mapping, Iterable
99
from typing import List, Optional, Union
10+
import json
1011

11-
import pymongo
12-
from pymongo import MongoClient
1312
import pika
1413
from bson import ObjectId
1514
from bson import json_util
@@ -24,6 +23,7 @@
2423
)
2524
from minio import Minio
2625
from pika.adapters.blocking_connection import BlockingChannel
26+
import pymongo
2727
from pymongo import MongoClient
2828
from rocrate.model.person import Person
2929
from rocrate.rocrate import ROCrate
@@ -761,6 +761,8 @@ async def download_dataset(
761761
raise HTTPException(status_code=404, detail=f"Dataset {dataset_id} not found")
762762

763763

764+
# submits file to extractor
765+
# can handle parameeters pass in as key/values in info
764766
@router.post("/{dataset_id}/extract")
765767
async def get_dataset_extract(
766768
dataset_id: str,
@@ -780,10 +782,11 @@ async def get_dataset_extract(
780782
token = token.lstrip("Bearer")
781783
token = token.lstrip(" ")
782784
# TODO check of extractor exists
783-
msg = {"message": "testing", "dataseet_id": dataset_id}
785+
msg = {"message": "testing", "dataset_id": dataset_id}
784786
body = {}
785787
body["secretKey"] = token
786788
body["token"] = token
789+
# TODO better solution for host
787790
body["host"] = "http://127.0.0.1:8000"
788791
body["retry_count"] = 0
789792
body["filename"] = dataset["name"]
@@ -794,6 +797,7 @@ async def get_dataset_extract(
794797
current_queue = req_info["extractor"]
795798
if "parameters" in req_info:
796799
current_parameters = req_info["parameters"]
800+
body["parameters"] = current_parameters
797801
current_routing_key = "extractors." + current_queue
798802
rabbitmq_client.queue_bind(
799803
exchange="extractors",

backend/app/routers/files.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,8 @@ async def get_file_versions(
294294
raise HTTPException(status_code=404, detail=f"File {file_id} not found")
295295

296296

297+
# submits file to extractor
298+
# can handle parameeters pass in as key/values in info
297299
@router.post("/{file_id}/extract")
298300
async def get_file_extract(
299301
file_id: str,

backend/app/routers/listeners.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ async def save_listener(
2929
db: MongoClient = Depends(get_db),
3030
):
3131
"""Register a new Event Listener with the system."""
32-
listener = EventListenerDB(**listener_in.dict(), author=user)
32+
listener = EventListenerDB(**listener_in.dict(), creator=user)
3333
# TODO: Check for duplicates somehow?
3434
new_listener = await db["listeners"].insert_one(listener.to_mongo())
3535
found = await db["listeners"].find_one({"_id": new_listener.inserted_id})
@@ -49,7 +49,7 @@ async def save_legacy_listener(
4949
name=legacy_in.name,
5050
version=int(legacy_in.version),
5151
description=legacy_in.description,
52-
author=user,
52+
creator=user,
5353
properties=listener_properties,
5454
)
5555
new_listener = await db["listeners"].insert_one(listener.to_mongo())

backend/app/routers/metadata_files.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ async def _build_metadata_db_obj(
7676
extractor_info = metadata_in.extractor_info
7777
if extractor_info is not None:
7878
if (
79-
extractor := await db["extractors"].find_one(
79+
extractor := await db["listeners"].find_one(
8080
{"name": extractor_info.name, "version": extractor_info.version}
8181
)
8282
) is not None:
@@ -176,7 +176,7 @@ async def replace_file_metadata(
176176
extractor_info = metadata_in.extractor_info
177177
if extractor_info is not None:
178178
if (
179-
extractor := await db["extractors"].find_one(
179+
extractor := await db["listeners"].find_one(
180180
{"name": extractor_info.name, "version": extractor_info.version}
181181
)
182182
) is not None:
@@ -267,7 +267,7 @@ async def update_file_metadata(
267267
extractor_info = metadata_in.extractor_info
268268
if extractor_info is not None:
269269
if (
270-
extractor := await db["extractors"].find_one(
270+
extractor := await db["listeners"].find_one(
271271
{"name": extractor_info.name, "version": extractor_info.version}
272272
)
273273
) is not None:
@@ -402,7 +402,7 @@ async def delete_file_metadata(
402402
extractor_info = metadata_in.extractor_info
403403
if extractor_info is not None:
404404
if (
405-
extractor := await db["extractors"].find_one(
405+
extractor := await db["listeners"].find_one(
406406
{"name": extractor_info.name, "version": extractor_info.version}
407407
)
408408
) is not None:
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
{
2+
"@context": "http://clowder.ncsa.illinois.edu/contexts/extractors.jsonld",
3+
"name": "ncsa.wordcount",
4+
"version": "2.0",
5+
"description": "WordCount extractor. Counts the number of characters, words and lines in the text file that was uploaded.",
6+
"author": "Rob Kooper <[email protected]>",
7+
"contributors": [],
8+
"contexts": [
9+
{
10+
"lines": "http://clowder.ncsa.illinois.edu/metadata/ncsa.wordcount#lines",
11+
"words": "http://clowder.ncsa.illinois.edu/metadata/ncsa.wordcount#words",
12+
"characters": "http://clowder.ncsa.illinois.edu/metadata/ncsa.wordcount#characters"
13+
}
14+
],
15+
"repository": [
16+
{
17+
"repType": "git",
18+
"repUrl": "https://opensource.ncsa.illinois.edu/stash/scm/cats/pyclowder.git"
19+
}
20+
],
21+
"process": {
22+
"file": [
23+
"text/*",
24+
"application/json"
25+
]
26+
},
27+
"external_services": [],
28+
"dependencies": [],
29+
"bibtex": []
30+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import os
2+
from fastapi.testclient import TestClient
3+
from app.config import settings
4+
from app.models.pyobjectid import PyObjectId
5+
6+
user = {
7+
"email": "[email protected]",
8+
"password": "not_a_password",
9+
"first_name": "Foo",
10+
"last_name": "Bar",
11+
}
12+
13+
extractor_info = {
14+
"@context": "http://clowder.ncsa.illinois.edu/contexts/extractors.jsonld",
15+
"name": "ncsa.wordcount",
16+
"version": "2.0",
17+
"description": "WordCount extractor. Counts the number of characters, words and lines in the text file that was uploaded.",
18+
"contributors": [],
19+
"contexts": [
20+
{
21+
"lines": "http://clowder.ncsa.illinois.edu/metadata/ncsa.wordcount#lines",
22+
"words": "http://clowder.ncsa.illinois.edu/metadata/ncsa.wordcount#words",
23+
"characters": "http://clowder.ncsa.illinois.edu/metadata/ncsa.wordcount#characters",
24+
}
25+
],
26+
"repository": [
27+
{
28+
"repType": "git",
29+
"repUrl": "https://opensource.ncsa.illinois.edu/stash/scm/cats/pyclowder.git",
30+
}
31+
],
32+
"process": {"file": ["text/*", "application/json"]},
33+
"external_services": [],
34+
"dependencies": [],
35+
"bibtex": [],
36+
}
37+
38+
# extractor_info_file = os.path.join(os.getcwd(), 'extractor_info.json')
39+
40+
41+
def test_register(client: TestClient, headers: dict):
42+
response = client.post(
43+
f"{settings.API_V2_STR}/listeners", json=extractor_info, headers=headers
44+
)
45+
assert response.json().get("id") is not None
46+
assert response.status_code == 200
47+
48+
49+
def test_get_one(client: TestClient, headers: dict):
50+
response = client.post(
51+
f"{settings.API_V2_STR}/listeners", json=extractor_info, headers=headers
52+
)
53+
assert response.status_code == 200
54+
assert response.json().get("id") is not None
55+
extractor_id = response.json().get("id")
56+
response = client.get(
57+
f"{settings.API_V2_STR}/listeners/{extractor_id}", headers=headers
58+
)
59+
assert response.status_code == 200
60+
assert response.json().get("id") is not None
61+
62+
63+
def test_delete(client: TestClient, headers: dict):
64+
response = client.post(
65+
f"{settings.API_V2_STR}/listeners", json=extractor_info, headers=headers
66+
)
67+
assert response.status_code == 200
68+
assert response.json().get("id") is not None
69+
extractor_id = response.json().get("id")
70+
response = client.delete(
71+
f"{settings.API_V2_STR}/listeners/{extractor_id}", headers=headers
72+
)
73+
assert response.status_code == 200

0 commit comments

Comments
 (0)