Skip to content

Commit 0b00cba

Browse files
committed
Merge remote-tracking branch 'origin/main'
2 parents 7aeabf2 + 80eb045 commit 0b00cba

File tree

8 files changed

+139
-57
lines changed

8 files changed

+139
-57
lines changed

dispatcher/api/constants/mvp.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# coding: utf-8
2+
3+
from enum import IntEnum
4+
5+
class AppRejectionCode(IntEnum):
6+
FILE_TOO_BIG_FOR_SEEDING = 1
7+
INSUFFICIENT_SPACE = 2
8+
HTTP_404 = 3 # HTTP : Not found
9+
HTTP_403 = 4 # HTTP : Forbidden
10+
FETCH_FAILED = 5 # javascript fetch failure
11+
NOT_SUITABLE_FOR_BETA = 100000

dispatcher/api/models/logic.py

Lines changed: 64 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
import json
2-
import os
32
from pathlib import Path
43
from typing import List, Dict, Optional
5-
from datetime import datetime # Import spécifique de la classe datetime
64
import logging
75
import uuid
86

97
from sqlalchemy.orm import Session
108

119
from rescue_api.models import Asset, Rescue, Rescuer
10+
from rescue_api.models.mvp_downloader_library import MvpDownloaderLibrary
11+
1212
from .payload import AssetModel
1313
from .priorizer_client import PriorizerClient
1414

@@ -78,6 +78,14 @@ def _load_json(self, file: Path) -> List[Dict]:
7878
def _save_json(file: Path, data: List[Dict]):
7979
file.write_text(json.dumps(data, indent=2))
8080

81+
@staticmethod
82+
def _sort_helper(asset: Dict) -> tuple:
83+
"""Helper function to sort by priority (desc) and size (asc)"""
84+
return (
85+
asset['priority'],
86+
asset.get('size', float('inf')) if asset.get('size') is not None else float('inf')
87+
)
88+
8189
async def get_available_assets(self) -> List[Dict]:
8290
"""
8391
Retrieve all assets (without filtering by allocation).
@@ -109,12 +117,12 @@ async def allocate_assets(self, free_space_mb: float, node_id: str = None) -> Di
109117
remaining_space = free_space_mb
110118

111119
# Tri par priorité (desc) puis taille (asc)
112-
for asset in sorted(available, key=lambda x: (x['priority'], x['size_mb'])):
120+
for asset in sorted(available, key=self._sort_helper):
113121
# Asset size may not be known yet
114-
if asset['size_mb'] is None or asset['size_mb'] <= remaining_space:
122+
if asset['size'] is None or asset['size'] <= remaining_space:
115123
selected.append(asset)
116-
if asset['size_mb'] is not None:
117-
remaining_space -= asset['size_mb']
124+
if asset['size'] is not None:
125+
remaining_space -= asset['size']
118126

119127
if not selected:
120128
return None
@@ -130,7 +138,7 @@ async def allocate_assets(self, free_space_mb: float, node_id: str = None) -> Di
130138
"res_id": a['res_id'],
131139
"asset_id": a['asset_id'],
132140
"name": a['name'],
133-
"size_mb": float(a['size_mb']) if a['size_mb'] is not None else None,
141+
"size": float(a['size']) if a['size'] is not None else None,
134142
"priority": int(a['priority']),
135143
"url": a['url']
136144
} for a in selected]
@@ -140,7 +148,7 @@ async def allocate_assets(self, free_space_mb: float, node_id: str = None) -> Di
140148

141149
return {
142150
"node_id": node_id,
143-
"allocated_size_mb": sum(a['size_mb'] for a in selected if a['size_mb'] is not None),
151+
"allocated_size": sum(a['size'] for a in selected if a['size'] is not None),
144152
"assets": selected,
145153
"allocation_id": str(uuid.uuid4())
146154
}
@@ -180,6 +188,11 @@ def upsert_rescues_to_db(self, rescuer_id: int, assets: List[AssetModel], db: Se
180188
rescue.magnet_link = asset.magnet_link
181189
rescue.status = asset.status.value.lower()
182190

191+
# Updating MVP downloader library table with file size
192+
db.query(MvpDownloaderLibrary) \
193+
.filter(MvpDownloaderLibrary.resource_id == asset.res_id) \
194+
.update({'deeplink_file_size': asset.size})
195+
183196
try:
184197
db.commit()
185198
except Exception as e:
@@ -212,6 +225,49 @@ def upsert_rescues_to_db(self, rescuer_id: int, assets: List[AssetModel], db: Se
212225
}
213226

214227

228+
def update_mvp_resource_link_as_defective(self, rescuer_id: int, resource_id: int, db: Session) -> bool:
229+
result = False
230+
231+
if not self._rescuer_exists(rescuer_id=rescuer_id, db=db):
232+
logger.error(f"Rescuer with id={rescuer_id} doesn't exist in the database.")
233+
return False
234+
235+
db.query(MvpDownloaderLibrary) \
236+
.filter(MvpDownloaderLibrary.resource_id == resource_id) \
237+
.update({'defective_link_flag': True})
238+
239+
# @todo Log update in some table : rescuer, resource, defective, created_at
240+
241+
try:
242+
db.commit()
243+
result = True
244+
except Exception as e:
245+
print(f'Error while updating res {resource_id} with defective link in MVP Downloader Library', e)
246+
247+
return result
248+
249+
250+
def update_mvp_resource_link_size(self, rescuer_id: int, resource_id: int, size: int, db: Session) -> bool:
251+
result = False
252+
253+
if not self._rescuer_exists(rescuer_id=rescuer_id, db=db):
254+
logger.error(f"Rescuer with id={rescuer_id} doesn't exist in the database.")
255+
return False
256+
257+
db.query(MvpDownloaderLibrary) \
258+
.filter(MvpDownloaderLibrary.resource_id == resource_id) \
259+
.update({'deeplink_file_size': size})
260+
261+
# @todo Log update in some table : rescuer, resource, size, created_at
262+
263+
try:
264+
db.commit()
265+
result = True
266+
except Exception as e:
267+
print(f'Error while updating res {resource_id} with size {size} in MVP Downloader Library', e)
268+
269+
return result
270+
215271
@staticmethod
216272
def _rescuer_exists(rescuer_id: int, db: Session) -> bool:
217273
rescuer = db.query(Rescuer).filter(Rescuer.id == rescuer_id).first()

dispatcher/api/models/payload.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ class BaseAssetModel(BaseModel):
2222
# @todo : remove name ?
2323
name: str = Field(..., description="Asset name")
2424
priority: int = Field(..., description="Asset priority according to ranker")
25-
size_mb: Optional[float] = Field(..., description="Asset estimated size")
25+
size: Optional[float] = Field(..., description="Asset size (may be estimated)")
2626
ds_id: int = Field(..., description="Dataset id")
2727
res_id: int = Field(..., description="Resource id")
2828
asset_id: Optional[int] = Field(None, description="Asset id")
@@ -75,3 +75,10 @@ class RescuesResponse(DispatchResponse):
7575
default=None,
7676
description="List of rescued assets for which an error happened when we tried to commit it to the database.",
7777
)
78+
79+
class RejectRequest(BaseModel):
80+
rescuer_id: int = Field(..., description="Rescuer id")
81+
code: int = Field(..., description="Rejection code")
82+
# Resource id for Beta. To be replaced by asset id.
83+
res_id: int = Field(..., description="Resource id")
84+
size: Optional[int] = Field(default=None, description="Asset size")

dispatcher/api/routers/dispatch.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import json
22
from os.path import join, dirname
33

4-
from models.payload import DispatchRequest, DispatchResponse, RescuesRequest, RescuesResponse
4+
from constants.mvp import AppRejectionCode as MVPAppRejectionCode
5+
from models.payload import DispatchRequest, DispatchResponse, RejectRequest, RescuesRequest, RescuesResponse
56
from models.state import app_state
67
from fastapi import HTTPException, APIRouter, Depends
78
from fastapi.responses import JSONResponse
@@ -107,3 +108,21 @@ async def upsert_rescues(request: RescuesRequest, db: Session = Depends(get_db))
107108
print(f"Dispatch response: {response}")
108109

109110
return response
111+
112+
@router.post('/asset/reject')
113+
async def update_mvp_rejected_resource(request: RejectRequest, db: Session = Depends(get_db)):
114+
result = False
115+
116+
app_state._logger.info(f"________Rejected MVP resource: {request.code}")
117+
118+
if request.code in [MVPAppRejectionCode.FILE_TOO_BIG_FOR_SEEDING, MVPAppRejectionCode.NOT_SUITABLE_FOR_BETA]:
119+
result = app_state._dispatcher.update_mvp_resource_link_size(request.rescuer_id, request.res_id, request.size, db)
120+
elif request.code in [MVPAppRejectionCode.HTTP_404, MVPAppRejectionCode.HTTP_403, MVPAppRejectionCode.FETCH_FAILED]:
121+
result = app_state._dispatcher.update_mvp_resource_link_as_defective(request.rescuer_id, request.res_id, db)
122+
123+
if not result:
124+
raise HTTPException(
125+
status_code=422,
126+
detail="The error can be the following: the rescuer doesn't exist in the database; or the code provided is not valid"
127+
" or the resource provided does not exist",
128+
)

priorizer/api/models/logic.py

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,15 @@
1010
from rescue_api.models.mvp_downloader_library import MvpDownloaderLibrary
1111
from rescue_api.models.rescues import Rescue
1212
from rescue_api.database import get_db
13-
from sqlalchemy import func, case, desc, and_
13+
from sqlalchemy import func, case, desc, and_, or_
1414
from datetime import datetime, timezone
1515
from typing import List
1616

17-
_RANKING_LIMIT = 100
17+
_RANKING_LIMIT = 100
1818
_MVP_RANKING_ID = 8 # broija 2025-09-20 : MVP default ranking id
1919

20+
_MVP_BETA_LIMIT_SIZE = 400 * 1024 * 1024
21+
2022
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s')
2123
logger = logging.getLogger(__name__)
2224
logger.setLevel(logging.INFO)
@@ -73,25 +75,34 @@ def get_rank(self) -> dict:
7375
.join(Resource, Resource.id == MvpDownloaderLibrary.resource_id)
7476
.join(asset_resource, asset_resource.c.resource_id == MvpDownloaderLibrary.resource_id)
7577
.outerjoin(Rescue, Rescue.asset_id == asset_resource.c.asset_id)
76-
.where(DatasetRank.ranking_id == last_ranking_id).where(Rescue.asset_id.is_(None))
78+
.where(DatasetRank.ranking_id == last_ranking_id)
79+
.where(Rescue.asset_id.is_(None))
80+
.where(and_(
81+
MvpDownloaderLibrary.defective_link_flag.is_(False),
82+
or_(MvpDownloaderLibrary.deeplink_file_size.is_(None), MvpDownloaderLibrary.deeplink_file_size < _MVP_BETA_LIMIT_SIZE)
83+
))
7784
.order_by(DatasetRank.rank, Resource.id)
7885
.limit(_RANKING_LIMIT)
7986
)
87+
# self._write_query(no_magnet_ranks)
8088

8189
results = [{
8290
"path": "",
8391
"name": "",
8492
"priority": r.rank,
85-
"size_mb": r.deeplink_file_size,
93+
"size": r.deeplink_file_size,
8694
"ds_id": r.dataset_id,
8795
"res_id": r.resource_id,
8896
"asset_id": r.asset_id,
8997
"url": r.deeplink
9098
}
9199
for r in no_magnet_ranks]
92100

101+
# results = [] # Forcing torrent downloads
102+
103+
remaining_count = _RANKING_LIMIT - len(results)
93104
# If not enough results, complete with results with magnet link
94-
if len(results) < _RANKING_LIMIT:
105+
if remaining_count > 0:
95106
magnet_ranks = (
96107
session.query(
97108
MvpDownloaderLibrary.dataset_id,
@@ -106,22 +117,22 @@ def get_rank(self) -> dict:
106117
.join(asset_resource, asset_resource.c.resource_id == MvpDownloaderLibrary.resource_id)
107118
.outerjoin(Rescue, Rescue.asset_id == asset_resource.c.asset_id)
108119
.where(DatasetRank.ranking_id == last_ranking_id)
109-
.where(Rescue.asset_id.is_not_(None))
120+
.where(Rescue.asset_id.is_not(None))
110121
.order_by(DatasetRank.rank, Resource.id)
111-
.limit(_RANKING_LIMIT)
122+
.limit(remaining_count)
112123
)
113-
while len(results) < _RANKING_LIMIT:
114-
for r in magnet_ranks:
115-
results.append({
116-
"path": "",
117-
"name": "",
118-
"priority": r.rank,
119-
"size_mb": r.deeplink_file_size,
120-
"ds_id": r.dataset_id,
121-
"res_id": r.resource_id,
122-
"asset_id": r.asset_id,
123-
"url": r.magnet_link
124-
})
124+
for r in magnet_ranks:
125+
results.append({
126+
"path": "",
127+
"name": "",
128+
"priority": r.rank,
129+
"size": r.deeplink_file_size,
130+
"ds_id": r.dataset_id,
131+
"res_id": r.resource_id,
132+
"asset_id": r.asset_id,
133+
"url": r.magnet_link
134+
})
135+
125136
return {"assets": results}
126137

127138
def compute_rank(self) -> List[dict]:

priorizer/api/models/priorizer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ class AssetModel(BaseModel):
88
path: str = Field(..., description="Asset path")
99
name: str = Field(..., description="Asset name")
1010
priority: int = Field(..., description="Asset priority according to ranker")
11-
size_mb: Optional[float] = Field(..., description="Asset estimated size")
11+
size: Optional[float] = Field(..., description="Asset size (may be estimated)")
1212
ds_id: int = Field(..., description="Dataset id")
1313
res_id: int = Field(..., description="Resource id")
1414
asset_id: Optional[int] = Field(None, description="Asset id")

priorizer/dev/pyproject.toml

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,10 @@ dependencies = [
1616
"uvicorn==0.35.0",
1717
"psycopg2-binary>=2.9.9",
1818
"sqlalchemy>=2.0.0",
19+
"rescue-api",
1920
]
2021

21-
[dependency-groups]
22-
dev = ["rescue-api"]
23-
prod = ["rescue-api"]
24-
2522
[tool.uv.sources]
2623
rescue-api = [
27-
{ git = "https://github.com/dataforgoodfr/offseason-shelter-for-science-rescue_db.git", rev = "v0.0.3", group = "prod" },
2824
{ path = "/lib/rescue_db/", editable = true, group = "dev" },
29-
]
30-
31-
[tool.uv]
32-
default-groups = ["prod"]
33-
34-
conflicts = [
35-
[
36-
{ group = "dev" },
37-
{ group = "prod" },
38-
]
39-
]
25+
]

priorizer/pyproject.toml

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,10 @@ dependencies = [
1616
"uvicorn==0.35.0",
1717
"psycopg2-binary>=2.9.9",
1818
"sqlalchemy>=2.0.0",
19+
"rescue-api"
1920
]
2021

21-
[dependency-groups]
22-
prod = ["rescue-api"]
23-
2422
[tool.uv.sources]
2523
rescue-api = [
26-
{ git = "https://github.com/dataforgoodfr/offseason-shelter-for-science-rescue_db.git", rev = "v0.0.4", group = "prod" },
27-
]
28-
29-
[tool.uv]
30-
default-groups = ["prod"]
31-
32-
conflicts = [
33-
]
24+
{ git = "https://github.com/dataforgoodfr/offseason-shelter-for-science-rescue_db.git", rev = "v0.0.4" },
25+
]

0 commit comments

Comments
 (0)