Skip to content

Commit f75e559

Browse files
ddey2max-zillalmarini
authored
Initial implementation of metadata indexing (#173)
* Initial implementation of metadata indexing and also updated syntax for elasticsearch dependency injection * fixes * metadata search working * Enabling metadata to search from UI * move MongoDBRef * fixing coroutine error related to async issue * fixing test case * adding await to async method * fixing patch_metadata update * fixing #201 issue * fix to metadata insert_record * Added delete metadata index to delete-data.sh script. Co-authored-by: Max Burnette <[email protected]> Co-authored-by: Luigi Marini <[email protected]>
1 parent 3fb1609 commit f75e559

File tree

15 files changed

+160
-38
lines changed

15 files changed

+160
-38
lines changed

backend/app/database/errors.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from app.config import settings
99
from app.mongo import crete_mongo_indexes
1010
from app.models.errors import Error
11-
from app.models.metadata import MongoDBRef
11+
from app.models.mongomodel import MongoDBRef
1212

1313
logger = logging.getLogger(__name__)
1414

backend/app/dependencies.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,5 +63,6 @@ def get_rabbitmq() -> BlockingChannel:
6363
return channel
6464

6565

66-
def get_elasticsearchclient():
67-
return connect_elasticsearch()
66+
async def get_elasticsearchclient():
67+
es = await connect_elasticsearch()
68+
return es

backend/app/main.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,9 @@ async def startup_elasticsearch():
152152
create_index(
153153
es, "dataset", settings.elasticsearch_setting, indexSettings.dataset_mappings
154154
)
155+
create_index(
156+
es, "metadata", settings.elasticsearch_setting, indexSettings.metadata_mappings
157+
)
155158

156159

157160
@app.on_event("shutdown")

backend/app/models/errors.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@
22
from typing import Optional
33
from pydantic import Field
44

5-
from app.models.mongomodel import MongoModel
6-
from app.models.metadata import MongoDBRef
5+
from app.models.mongomodel import MongoModel, MongoDBRef
76

87

98
class ServiceUnreachable(Exception):

backend/app/models/metadata.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,26 +4,24 @@
44
from typing import Optional, List, Union
55
from enum import Enum
66

7+
from elasticsearch import Elasticsearch
78
from bson import ObjectId
89
from bson.dbref import DBRef
10+
from fastapi.param_functions import Depends
911
from pydantic import Field, validator, BaseModel, create_model
1012
from fastapi import HTTPException
1113
from pymongo import MongoClient
1214

13-
from app.models.mongomodel import MongoModel
15+
from app import dependencies
16+
from app.models.mongomodel import MongoModel, MongoDBRef
1417
from app.models.pyobjectid import PyObjectId
1518
from app.models.users import UserOut
1619
from app.models.listeners import (
1720
EventListenerIn,
1821
LegacyEventListenerIn,
1922
EventListenerOut,
2023
)
21-
22-
23-
class MongoDBRef(BaseModel):
24-
collection: str
25-
resource_id: PyObjectId
26-
version: Optional[int]
24+
from app.search.connect import update_record
2725

2826

2927
# List of valid types that can be specified for metadata fields
@@ -299,7 +297,9 @@ def deep_update(orig: dict, new: dict):
299297
return orig
300298

301299

302-
async def patch_metadata(metadata: dict, new_entries: dict, db: MongoClient):
300+
async def patch_metadata(
301+
metadata: dict, new_entries: dict, db: MongoClient, es: Elasticsearch
302+
):
303303
"""Convenience function for updating original metadata contents with new entries."""
304304
try:
305305
# TODO: For list-type definitions, should we append to list instead?
@@ -315,6 +315,9 @@ async def patch_metadata(metadata: dict, new_entries: dict, db: MongoClient):
315315
db["metadata"].replace_one(
316316
{"_id": metadata["_id"]}, MetadataDB(**metadata).to_mongo()
317317
)
318+
# Update entry to the metadata index
319+
doc = {"doc": {"contents": metadata["contents"]}}
320+
update_record(es, "metadata", doc, metadata["_id"])
318321
except Exception as e:
319322
raise e
320323
return MetadataOut.from_mongo(metadata)

backend/app/models/mongomodel.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from typing import Optional
12
from datetime import datetime
23
from bson import ObjectId
34
from bson.errors import InvalidId
@@ -61,3 +62,9 @@ def to_mongo(self, **kwargs):
6162
parsed["_id"] = parsed.pop("id")
6263

6364
return parsed
65+
66+
67+
class MongoDBRef(BaseModel):
68+
collection: str
69+
resource_id: PyObjectId
70+
version: Optional[int]

backend/app/routers/datasets.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from typing import List, Optional, Union
1010
import json
1111

12+
from elasticsearch import Elasticsearch
1213
import pika
1314
from bson import ObjectId
1415
from bson import json_util
@@ -190,10 +191,8 @@ async def save_dataset(
190191
dataset_in: DatasetIn,
191192
user=Depends(keycloak_auth.get_current_user),
192193
db: MongoClient = Depends(dependencies.get_db),
193-
es=Depends(dependencies.get_elasticsearchclient),
194+
es: Elasticsearch = Depends(dependencies.get_elasticsearchclient),
194195
):
195-
es = await connect_elasticsearch()
196-
197196
# Check all connection and abort if any one of them is not available
198197
if db is None or es is None:
199198
raise HTTPException(status_code=503, detail="Service not available")
@@ -348,7 +347,7 @@ async def patch_dataset(
348347
dataset_info: DatasetPatch,
349348
user_id=Depends(get_user),
350349
db: MongoClient = Depends(dependencies.get_db),
351-
es=Depends(dependencies.get_elasticsearchclient),
350+
es: Elasticsearch = Depends(dependencies.get_elasticsearchclient),
352351
):
353352
es = await connect_elasticsearch()
354353

@@ -390,7 +389,7 @@ async def delete_dataset(
390389
dataset_id: str,
391390
db: MongoClient = Depends(dependencies.get_db),
392391
fs: Minio = Depends(dependencies.get_fs),
393-
es=Depends(dependencies.get_elasticsearchclient),
392+
es: Elasticsearch = Depends(dependencies.get_elasticsearchclient),
394393
):
395394
es = await connect_elasticsearch()
396395

backend/app/routers/elasticsearch.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,15 @@ async def search_dataset(request: Request):
2525
return search_index(es, "dataset", query)
2626

2727

28-
@router.post("/file,dataset/_msearch")
29-
async def search_file_and_dataset(request: Request):
28+
@router.post("/metadata/_msearch")
29+
async def search_metadata(request: Request):
3030
es = await connect_elasticsearch()
3131
query = await request.body()
32-
return search_index(es, ["file", "dataset"], query)
32+
return search_index(es, "metadata", query)
33+
34+
35+
@router.post("/file,dataset,metadata/_msearch")
36+
async def search_file_dataset_and_metadata(request: Request):
37+
es = await connect_elasticsearch()
38+
query = await request.body()
39+
return search_index(es, ["file", "dataset", "metadata"], query)

backend/app/routers/files.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import io
22
import json
3+
4+
from elasticsearch import Elasticsearch
35
import pika
46
import mimetypes
57
from datetime import datetime
@@ -47,7 +49,7 @@ async def add_file_entry(
4749
fs: Minio,
4850
file: Optional[io.BytesIO] = None,
4951
content_type: Optional[str] = None,
50-
es=Depends(dependencies.get_elasticsearchclient),
52+
es: Elasticsearch = Depends(dependencies.get_elasticsearchclient),
5153
):
5254
"""Insert FileDB object into MongoDB (makes Clowder ID), then Minio (makes version ID), then update MongoDB with
5355
the version ID from Minio.
@@ -121,7 +123,7 @@ async def remove_file_entry(
121123
file_id: Union[str, ObjectId],
122124
db: MongoClient,
123125
fs: Minio,
124-
es=Depends(dependencies.get_elasticsearchclient),
126+
es: Elasticsearch = Depends(dependencies.get_elasticsearchclient),
125127
):
126128
"""Remove FileDB object into MongoDB, Minio, and associated metadata and version information."""
127129
# TODO: Deleting individual versions will require updating version_id in mongo, or deleting entire document
@@ -148,7 +150,7 @@ async def update_file(
148150
db: MongoClient = Depends(dependencies.get_db),
149151
fs: Minio = Depends(dependencies.get_fs),
150152
file: UploadFile = File(...),
151-
es=Depends(dependencies.get_elasticsearchclient),
153+
es: Elasticsearch = Depends(dependencies.get_elasticsearchclient),
152154
):
153155
es = await connect_elasticsearch()
154156

backend/app/routers/metadata_datasets.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import os
44
from typing import List, Optional
55

6+
from elasticsearch import Elasticsearch
67
from bson import ObjectId
78
from fastapi import APIRouter, HTTPException, Depends
89
from fastapi import Form
@@ -26,6 +27,7 @@
2627
patch_metadata,
2728
MetadataDelete,
2829
)
30+
from app.search.connect import insert_record, update_record, delete_document_by_id
2931

3032
router = APIRouter()
3133

@@ -80,6 +82,7 @@ async def add_dataset_metadata(
8082
dataset_id: str,
8183
user=Depends(get_current_user),
8284
db: MongoClient = Depends(dependencies.get_db),
85+
es: Elasticsearch = Depends(dependencies.get_elasticsearchclient),
8386
):
8487
"""Attach new metadata to a dataset. The body must include a contents field with the JSON metadata, and either a
8588
context JSON-LD object, context_url, or definition (name of a metadata definition) to be valid.
@@ -110,6 +113,18 @@ async def add_dataset_metadata(
110113
new_metadata = await db["metadata"].insert_one(md.to_mongo())
111114
found = await db["metadata"].find_one({"_id": new_metadata.inserted_id})
112115
metadata_out = MetadataOut.from_mongo(found)
116+
117+
# Add an entry to the metadata index
118+
doc = {
119+
"resource_id": dataset_id,
120+
"resource_type": "dataset",
121+
"created": metadata_out.created.utcnow(),
122+
"creator": user.email,
123+
"contents": metadata_out.contents,
124+
"context_url": metadata_out.context_url,
125+
"context": metadata_out.context,
126+
}
127+
insert_record(es, "metadata", doc, metadata_out.id)
113128
return metadata_out
114129

115130

@@ -119,6 +134,7 @@ async def replace_dataset_metadata(
119134
dataset_id: str,
120135
user=Depends(get_current_user),
121136
db: MongoClient = Depends(dependencies.get_db),
137+
es: Elasticsearch = Depends(dependencies.get_elasticsearchclient),
122138
):
123139
"""Update metadata. Any fields provided in the contents JSON will be added or updated in the metadata. If context or
124140
agent should be changed, use PUT.
@@ -159,6 +175,9 @@ async def replace_dataset_metadata(
159175
)
160176
found = await db["metadata"].find_one({"_id": md["_id"]})
161177
metadata_out = MetadataOut.from_mongo(found)
178+
# Update entry to the metadata index
179+
doc = {"doc": {"contents": metadata_out["contents"]}}
180+
update_record(es, "metadata", doc, metadata_out["_id"])
162181
return metadata_out
163182
else:
164183
raise HTTPException(status_code=404, detail=f"Dataset {dataset_id} not found")
@@ -170,6 +189,7 @@ async def update_dataset_metadata(
170189
dataset_id: str,
171190
user=Depends(get_current_user),
172191
db: MongoClient = Depends(dependencies.get_db),
192+
es: Elasticsearch = Depends(dependencies.get_elasticsearchclient),
173193
):
174194
"""Update metadata. Any fields provided in the contents JSON will be added or updated in the metadata. If context or
175195
agent should be changed, use PUT.
@@ -227,7 +247,7 @@ async def update_dataset_metadata(
227247

228248
if (md := await db["metadata"].find_one(query)) is not None:
229249
# TODO: Refactor this with permissions checks etc.
230-
result = await patch_metadata(md, contents, db)
250+
result = await patch_metadata(md, contents, db, es)
231251
return result
232252
else:
233253
raise HTTPException(
@@ -278,6 +298,7 @@ async def delete_dataset_metadata(
278298
dataset_id: str,
279299
user=Depends(get_current_user),
280300
db: MongoClient = Depends(dependencies.get_db),
301+
es: Elasticsearch = Depends(dependencies.get_elasticsearchclient),
281302
):
282303
if (
283304
dataset := await db["datasets"].find_one({"_id": ObjectId(dataset_id)})
@@ -327,5 +348,7 @@ async def delete_dataset_metadata(
327348
raise HTTPException(
328349
status_code=404, detail=f"No metadata found with that criteria"
329350
)
351+
# delete from elasticsearch
352+
delete_document_by_id(es, "metadata", str(metadata_in.id))
330353
else:
331354
raise HTTPException(status_code=404, detail=f"Dataset {dataset_id} not found")

0 commit comments

Comments
 (0)