Skip to content

Commit b2bff90

Browse files
authored
Merge pull request #135 from dataforgoodfr/109_connect_dispatcher_to_db
feat: [DISPATCHER-109] - Update magnet link and status of rescues in database
2 parents fcc363d + 1dbcdfa commit b2bff90

File tree

3 files changed

+153
-20
lines changed

3 files changed

+153
-20
lines changed

dispatcher/api/models/logic.py

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
import logging
77
import uuid
88

9+
from sqlalchemy.orm import Session
10+
11+
from rescue_api.models import Asset, Rescue, Rescuer
912
from .payload import AssetModel
1013

1114
# Configuration du logging
@@ -157,7 +160,95 @@ def allocate_assets(self, free_space_mb: float, node_id: str = None) -> Dict:
157160
"allocation_id": str(uuid.uuid4())
158161
}
159162

160-
def upsert_rescues(self, rescuer_id: int, assets: List[AssetModel]) -> Dict:
163+
164+
def upsert_rescues_to_db(self, rescuer_id: int, assets: List[AssetModel], db: Session) -> Dict:
165+
if not self._rescuer_exists(rescuer_id=rescuer_id, db=db):
166+
return {}
167+
168+
if not self._are_assets_data_consistent(assets=assets, db=db):
169+
return {}
170+
171+
updated_rescues = []
172+
inserted_rescues = []
173+
not_committed_rescues = []
174+
175+
for asset in assets:
176+
asset_id = int(asset.asset_id)
177+
rescue = db.query(Rescue).filter(
178+
(Rescue.rescuer_id == rescuer_id) & (Rescue.asset_id == asset_id)
179+
).first()
180+
181+
is_insertion_operation = False
182+
if not rescue:
183+
is_insertion_operation = True
184+
rescue = Rescue(
185+
asset_id=asset_id,
186+
rescuer_id=rescuer_id,
187+
magnet_link=asset.magnet_link,
188+
status=asset.status.value.lower(),
189+
)
190+
db.add(rescue)
191+
else:
192+
rescue.magnet_link = asset.magnet_link
193+
rescue.status = asset.status.value.lower()
194+
195+
try:
196+
db.commit()
197+
except Exception as e:
198+
print(e)
199+
print(f"Rescue with rescuer_id='{rescuer_id}' and asset_id='{asset_id}' has not been committed to DB.")
200+
not_committed_rescues.append(
201+
{
202+
"asset_id": asset_id,
203+
"rescuer_id": rescuer_id,
204+
"magnet_link": asset.magnet_link,
205+
"status": asset.status.value.lower(),
206+
}
207+
)
208+
else:
209+
committed_rescue = {
210+
"asset_id": asset_id,
211+
"rescuer_id": rescuer_id,
212+
"magnet_link": asset.magnet_link,
213+
"status": asset.status.value.lower(),
214+
}
215+
if is_insertion_operation:
216+
inserted_rescues.append(committed_rescue)
217+
else:
218+
updated_rescues.append(committed_rescue)
219+
220+
return {
221+
"updated_rescues": updated_rescues,
222+
"inserted_rescues": inserted_rescues,
223+
"not_committed_rescues": not_committed_rescues,
224+
}
225+
226+
227+
@staticmethod
228+
def _rescuer_exists(rescuer_id: int, db: Session) -> bool:
229+
rescuer = db.query(Rescuer).filter(Rescuer.id == rescuer_id).first()
230+
return True if rescuer else False
231+
232+
233+
@staticmethod
234+
def _are_assets_data_consistent(assets: List[AssetModel], db: Session) -> bool:
235+
missing_assets_count = 0
236+
asset_inconsistencies_count = 0
237+
238+
for asset in assets:
239+
db_asset = db.query(Asset).filter(Asset.id == int(asset.asset_id)).first()
240+
if not db_asset:
241+
missing_assets_count += 1
242+
elif db_asset.url != asset.url:
243+
asset_inconsistencies_count += 1
244+
245+
if missing_assets_count > 0 or asset_inconsistencies_count > 0:
246+
return False
247+
248+
return True
249+
250+
251+
def upsert_rescues_to_json(self, rescuer_id: int, assets: List[AssetModel]) -> Dict:
161252
rescues_to_upsert = self._prepare_rescues_to_upsert(rescuer_id=rescuer_id, assets=assets)
162253
rescues_from_db = self._load_json(self.rescues_file)
163254

dispatcher/api/models/payload.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,23 @@ class DispatchResponse(BaseModel):
4848
asset: List[AssetModel] = Field(..., description="Dataset list")
4949

5050

51-
class Rescues(BaseModel):
51+
class RescuesRequest(BaseModel):
5252
rescuer_id: int = Field(..., description="Rescuer id")
5353
message: str = Field(..., description="Message given by the application")
5454
assets: List[AssetModel] = Field(..., description="List of assets for which the rescue is over")
55+
56+
57+
class RescuesResponse(DispatchResponse):
58+
asset: Optional[List[AssetModel]] = Field(default=None, description="Dataset list")
59+
updated_rescues: Optional[List[Dict[str, Any]]] = Field(
60+
default=None,
61+
description="List of rescued assets that were updated in the database.",
62+
)
63+
inserted_rescues: Optional[List[Dict[str, Any]]] = Field(
64+
default=None,
65+
description="List of rescued assets that were inserted in the database.",
66+
)
67+
not_committed_rescues: Optional[List[Dict[str, Any]]] = Field(
68+
default=None,
69+
description="List of rescued assets for which an error happened when we tried to commit it to the database.",
70+
)

dispatcher/api/routers/dispatch.py

Lines changed: 44 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
1-
from models.state import app_state
2-
from fastapi import HTTPException, APIRouter
3-
from fastapi.responses import JSONResponse
4-
from models.payload import DispatchRequest, DispatchResponse, Rescues
51
import json
62
from os.path import join, dirname
73

4+
from models.payload import DispatchRequest, DispatchResponse, RescuesRequest, RescuesResponse
5+
from models.state import app_state
6+
from fastapi import HTTPException, APIRouter, Depends
7+
from fastapi.responses import JSONResponse
8+
from sqlalchemy.orm import Session
9+
10+
from rescue_api.database import get_db
11+
812
router = APIRouter()
913

1014
@router.get('/')
@@ -55,29 +59,51 @@ async def dispatch(request: DispatchRequest):
5559

5660
return dispatch_response
5761

58-
@router.put('/rescues', response_model=DispatchResponse)
59-
async def update_rescues(rescues: Rescues):
60-
app_state._logger.info("________Update rescues after asset downloads")
62+
@router.post('/assets-downloaded', response_model=RescuesResponse)
63+
async def upsert_rescues(request: RescuesRequest, db: Session = Depends(get_db)):
64+
app_state._logger.info("________Upsert rescues after downloading assets")
65+
66+
if not request.assets:
67+
raise HTTPException(
68+
status_code=422,
69+
detail="The rescues don't contain any asset, "
70+
"make sure they have at least one asset to be able to upsert them.",
71+
)
6172

6273
# Call dispatcher logic
63-
result = app_state._dispatcher.upsert_rescues(
64-
rescuer_id=rescues.rescuer_id,
65-
assets=rescues.assets,
74+
result = app_state._dispatcher.upsert_rescues_to_db(
75+
rescuer_id=request.rescuer_id,
76+
assets=request.assets,
77+
db=db,
6678
)
6779
app_state._logger.info(result)
6880

69-
if result["action_status"] == "FAIL":
70-
return HTTPException(
81+
if not result:
82+
raise HTTPException(
83+
status_code=422,
84+
detail="The error can be the following: the rescuer doesn't exist in the database; or at least one asset "
85+
"doesn't exist in the database; or the asset data you provided don't match the ones in the "
86+
"database. Make sure to provide an existing rescuer, existing assets and correct asset data.",
87+
)
88+
elif not result["updated_rescues"] and not result["inserted_rescues"]:
89+
raise HTTPException(
7190
status_code=500,
72-
detail="An error happened when saving the data, the rescues couldn't be updated, please retry later.",
91+
detail={
92+
"message": "Something went wrong when saving the data, the rescues below couldn't be upserted, "
93+
"please retry later.",
94+
"not_committed_rescues": result["not_committed_rescues"],
95+
},
7396
)
7497

75-
dispatch_response = DispatchResponse(
98+
response = RescuesResponse(
7699
status="success",
77100
message="Request received and processed",
78-
received_data=rescues.dict(),
79-
asset=rescues.assets,
101+
received_data=request.dict(),
102+
asset=request.assets,
103+
updated_rescues=result["updated_rescues"],
104+
inserted_rescues=result["inserted_rescues"],
105+
not_committed_rescues=result["not_committed_rescues"],
80106
)
81-
print(f"Dispatch response: {dispatch_response}")
107+
print(f"Dispatch response: {response}")
82108

83-
return dispatch_response
109+
return response

0 commit comments

Comments
 (0)