1616 File ,
1717 UploadFile ,
1818)
19- from fastapi .responses import StreamingResponse
19+ from fastapi .responses import StreamingResponse , FileResponse
2020from fastapi .security import HTTPAuthorizationCredentials , HTTPBearer
2121from minio import Minio
2222from pika .adapters .blocking_connection import BlockingChannel
2525from app .config import settings
2626from app .deps .authorization_deps import FileAuthorization
2727from app .keycloak_auth import get_current_user , get_token
28- from app .models .files import (
29- FileOut ,
30- FileVersion ,
31- FileDB ,
32- FileVersionDB ,
33- )
28+ from app .models .files import FileOut , FileVersion , FileDB , FileVersionDB , StorageType
3429from app .models .metadata import MetadataDB
35- from app .models .users import UserOut
3630from app .models .thumbnails import ThumbnailDB
31+ from app .models .users import UserOut
3732from app .rabbitmq .listeners import submit_file_job , EventListenerJobDB
3833from app .routers .feeds import check_feed_listeners
3934from app .routers .utils import get_content_type
@@ -155,17 +150,41 @@ async def add_file_entry(
155150 )
156151
157152
153+ async def add_local_file_entry (
154+ new_file : FileDB ,
155+ user : UserOut ,
156+ es : Elasticsearch ,
157+ rabbitmq_client : BlockingChannel ,
158+ content_type : Optional [str ] = None ,
159+ ):
160+ """Insert FileDB object into MongoDB (makes Clowder ID). Bytes are not stored in DB and versioning not supported
161+ for local files."""
162+
163+ content_type_obj = get_content_type (new_file .name , content_type )
164+ new_file .content_type = content_type_obj
165+ await new_file .insert ()
166+
167+ # Add entry to the file index
168+ await index_file (es , FileOut (** new_file .dict ()))
169+
170+ # TODO - timing issue here, check_feed_listeners needs to happen asynchronously.
171+ time .sleep (1 )
172+
173+ # Submit file job to any qualifying feeds
174+ await check_feed_listeners (
175+ es ,
176+ FileOut (** new_file .dict ()),
177+ user ,
178+ rabbitmq_client ,
179+ )
180+
181+
158182# TODO: Move this to MongoDB middle layer
159183async def remove_file_entry (
160184 file_id : Union [str , ObjectId ], fs : Minio , es : Elasticsearch
161185):
162186 """Remove FileDB object into MongoDB, Minio, and associated metadata and version information."""
163187 # TODO: Deleting individual versions will require updating version_id in mongo, or deleting entire document
164-
165- # Check all connection and abort if any one of them is not available
166- if fs is None or es is None :
167- raise HTTPException (status_code = 503 , detail = "Service not available" )
168- return
169188 fs .remove_object (settings .MINIO_BUCKET_NAME , str (file_id ))
170189 # delete from elasticsearch
171190 delete_document_by_id (es , settings .elasticsearch_index , str (file_id ))
@@ -175,6 +194,18 @@ async def remove_file_entry(
175194 await FileVersionDB .find (FileVersionDB .file_id == ObjectId (file_id )).delete ()
176195
177196
197+ async def remove_local_file_entry (file_id : Union [str , ObjectId ], es : Elasticsearch ):
198+ """Remove FileDB object into MongoDB, Minio, and associated metadata and version information."""
199+ # TODO: Deleting individual versions will require updating version_id in mongo, or deleting entire document
200+ # delete from elasticsearch
201+ delete_document_by_id (es , settings .elasticsearch_index , str (file_id ))
202+ if (file := await FileDB .get (PydanticObjectId (file_id ))) is not None :
203+ # TODO: delete from disk - should this be allowed if Clowder didn't originally write the file?
204+ # os.path.remove(file.storage_path)
205+ await file .delete ()
206+ await MetadataDB .find (MetadataDB .resource .resource_id == ObjectId (file_id )).delete ()
207+
208+
178209@router .put ("/{file_id}" , response_model = FileOut )
179210async def update_file (
180211 file_id : str ,
@@ -275,33 +306,53 @@ async def download_file(
275306):
276307 # If file exists in MongoDB, download from Minio
277308 if (file := await FileDB .get (PydanticObjectId (file_id ))) is not None :
278- if version is not None :
279- # Version is specified, so get the minio ID from versions table if possible
280- file_vers = await FileVersionDB .find_one (
281- FileVersionDB .file_id == ObjectId (file_id ),
282- FileVersionDB .version_num == version ,
283- )
284- if file_vers is not None :
285- vers = FileVersion (** file_vers .dict ())
286- content = fs .get_object (
287- settings .MINIO_BUCKET_NAME , file_id , version_id = vers .version_id
309+ if file .storage_type == StorageType .MINIO :
310+ if version is not None :
311+ # Version is specified, so get the minio ID from versions table if possible
312+ file_vers = await FileVersionDB .find_one (
313+ FileVersionDB .file_id == ObjectId (file_id ),
314+ FileVersionDB .version_num == version ,
288315 )
316+ if file_vers is not None :
317+ vers = FileVersion (** file_vers .dict ())
318+ content = fs .get_object (
319+ settings .MINIO_BUCKET_NAME , file_id , version_id = vers .version_id
320+ )
321+ else :
322+ raise HTTPException (
323+ status_code = 404 ,
324+ detail = f"File { file_id } version { version } not found" ,
325+ )
289326 else :
290- raise HTTPException (
291- status_code = 404 ,
292- detail = f"File { file_id } version { version } not found" ,
293- )
327+ # If no version specified, get latest version directly
328+ content = fs .get_object (settings .MINIO_BUCKET_NAME , file_id )
329+
330+ # Get content type & open file stream
331+ response = StreamingResponse (
332+ content .stream (settings .MINIO_UPLOAD_CHUNK_SIZE )
333+ )
334+ response .headers ["Content-Disposition" ] = (
335+ "attachment; filename=%s" % file .name
336+ )
337+
338+ elif file .storage_type == StorageType .LOCAL :
339+ response = FileResponse (
340+ path = file .storage_path ,
341+ filename = file .name ,
342+ media_type = file .content_type .content_type ,
343+ )
344+
294345 else :
295- # If no version specified, get latest version directly
296- content = fs .get_object (settings .MINIO_BUCKET_NAME , file_id )
346+ raise HTTPException (
347+ status_code = 400 , detail = f"Unable to download { file_id } ."
348+ )
349+
350+ if response :
351+ if increment :
352+ # Increment download count
353+ await file .update (Inc ({FileDB .downloads : 1 }))
354+ return response
297355
298- # Get content type & open file stream
299- response = StreamingResponse (content .stream (settings .MINIO_UPLOAD_CHUNK_SIZE ))
300- response .headers ["Content-Disposition" ] = "attachment; filename=%s" % file .name
301- if increment :
302- # Increment download count
303- await file .update (Inc ({FileDB .downloads : 1 }))
304- return response
305356 else :
306357 raise HTTPException (status_code = 404 , detail = f"File { file_id } not found" )
307358
@@ -365,8 +416,11 @@ async def delete_file(
365416 es : Elasticsearch = Depends (dependencies .get_elasticsearchclient ),
366417 allow : bool = Depends (FileAuthorization ("editor" )),
367418):
368- if (await FileDB .get (PydanticObjectId (file_id ))) is not None :
369- await remove_file_entry (file_id , fs , es )
419+ if (file := await FileDB .get (PydanticObjectId (file_id ))) is not None :
420+ if file .storage_type == StorageType .LOCAL :
421+ await remove_local_file_entry (file_id , es )
422+ else :
423+ await remove_file_entry (file_id , fs , es )
370424 return {"deleted" : file_id }
371425 else :
372426 raise HTTPException (status_code = 404 , detail = f"File { file_id } not found" )
@@ -415,13 +469,13 @@ async def get_file_versions(
415469 limit : int = 20 ,
416470 allow : bool = Depends (FileAuthorization ("viewer" )),
417471):
418- file = await FileDB .get (PydanticObjectId (file_id ))
419- if file is not None :
472+ if (file := await FileDB .get (PydanticObjectId (file_id ))) is not None :
420473 mongo_versions = []
421- async for ver in FileVersionDB .find (
422- FileVersionDB .file_id == ObjectId (file_id )
423- ).sort (- FileVersionDB .created ).skip (skip ).limit (limit ):
424- mongo_versions .append (FileVersion (** ver .dict ()))
474+ if file .storage_type == StorageType .MINIO :
475+ async for ver in FileVersionDB .find (
476+ FileVersionDB .file_id == ObjectId (file_id )
477+ ).sort (- FileVersionDB .created ).skip (skip ).limit (limit ):
478+ mongo_versions .append (FileVersion (** ver .dict ()))
425479 return mongo_versions
426480
427481 raise HTTPException (status_code = 404 , detail = f"File { file_id } not found" )
0 commit comments