|
20 | 20 | from rawstatus import models as rm |
21 | 21 | from jobs import jobs as jj |
22 | 22 | from jobs import views as jv |
23 | | -from jobs.jobutil import create_job, jobmap |
| 23 | +from jobs.jobutil import create_job, check_job_error, jobmap, create_job_without_check |
24 | 24 | from jobs import models as jm |
25 | 25 |
|
26 | 26 |
|
@@ -1320,69 +1320,104 @@ def undelete_analysis(request): |
1320 | 1320 |
|
1321 | 1321 | @login_required |
1322 | 1322 | def delete_analysis(request): |
| 1323 | + # FIXME analysis can be properly deleted if its backed up |
| 1324 | + # right now this is only mark for deletion and it was for a previous |
| 1325 | + # setup where delete meant gone for ever |
| 1326 | + # There is a lot of old analyses that should be backed up or purged properly |
| 1327 | + # Maybe delete all those where no project is active anymore in delete_expired? |
1323 | 1328 | if request.method != 'POST': |
1324 | 1329 | return JsonResponse({'error': 'Must use POST'}, status=405) |
1325 | 1330 | req = json.loads(request.body.decode('utf-8')) |
1326 | 1331 | try: |
1327 | | - analysis = am.Analysis.objects.select_related('nextflowsearch__job').get(pk=req['item_id']) |
| 1332 | + analysis = am.Analysis.objects.get(pk=req['item_id']) |
1328 | 1333 | except am.Analysis.DoesNotExist: |
1329 | 1334 | return JsonResponse({'error': 'Analysis does not exist'}, status=403) |
1330 | 1335 | except KeyError: |
1331 | 1336 | return JsonResponse({'error': 'Bad request'}, status=400) |
1332 | 1337 | if analysis.deleted: |
1333 | 1338 | return JsonResponse({'error': 'Analysis is already deleted'}, status=403) |
1334 | | - if analysis.user == request.user or request.user.is_staff: |
1335 | | - if not analysis.deleted: |
1336 | | - analysis.deleted = True |
1337 | | - analysis.save() |
1338 | | - am.AnalysisDeleted.objects.update_or_create(analysis=analysis) |
1339 | | - if hasattr(analysis, 'nextflowsearch'): |
1340 | | - jobq = jm.Job.objects.filter(nextflowsearch__analysis=analysis) |
1341 | | - jv.cancel_or_revoke_job(jobq) |
1342 | | - return JsonResponse({}) |
1343 | | - else: |
| 1339 | + if analysis.user != request.user and not request.user.is_staff: |
1344 | 1340 | return JsonResponse({'error': 'User is not authorized to delete this analysis'}, status=403) |
| 1341 | + if errmsg := do_analysis_deletion(analysis): |
| 1342 | + return JsonResponse({'error': f'Could not delete analysis, {errmsg}'}, status=409) |
| 1343 | + return JsonResponse({}) |
1345 | 1344 |
|
1346 | 1345 |
|
1347 | | -@login_required |
1348 | | -def purge_analysis(request): |
1349 | | - if request.method != 'POST': |
1350 | | - return JsonResponse({'error': 'Must use POST'}, status=405) |
1351 | | - elif not request.user.is_staff: |
1352 | | - return JsonResponse({'error': 'Only admin is authorized to purge analysis'}, status=403) |
1353 | | - req = json.loads(request.body.decode('utf-8')) |
1354 | | - try: |
1355 | | - analysis = am.Analysis.objects.get(pk=req['item_id']) |
1356 | | - except am.Analysis.DoesNotExist: |
1357 | | - return JsonResponse({'error': 'Analysis does not exist'}, status=403) |
1358 | | - except KeyError: |
1359 | | - return JsonResponse({'error': 'Bad request'}, status=400) |
1360 | | - if not analysis.deleted: |
1361 | | - return JsonResponse({'error': 'Analysis is not deleted, cannot purge'}, status=403) |
1362 | | - analysis.purged = True |
1363 | | - analysis.save() |
1364 | | - webshares = rm.ServerShare.objects.filter(function=rm.ShareFunction.REPORTS) |
1365 | | - # Delete files on web share here since the job tasks run on storage cannot do that |
1366 | | - webfiles = rm.StoredFileLoc.objects.filter(sfile__analysisresultfile__analysis__id=analysis.pk, |
1367 | | - servershare__in=webshares, active=True) |
1368 | | - for webfile in webfiles.values('path', 'sfile__filename'): |
1369 | | - fpath = os.path.join(settings.WEBSHARE, webfile['path'], webfile['sfile__filename']) |
1370 | | - os.unlink(fpath) |
1371 | | - webfiles.update(active=False, purged=True) |
1372 | | - sfiles = rm.StoredFile.objects.filter(analysisresultfile__analysis__id=analysis.pk) |
1373 | | - sfiles.update(deleted=True) |
1374 | | - sfls = rm.StoredFileLoc.objects.filter(sfile__in=sfiles, active=True) |
1375 | | - # One job per servershare as empty dir deleter needs that |
1376 | | - share_pks = defaultdict(list) |
1377 | | - for sfl in sfls.values('pk', 'servershare_id', 'path'): |
1378 | | - share_pks[f'{sfl["servershare_id"]}__{sfl["path"]}'].append(sfl['pk']) |
1379 | | - for share_path, sfloc_ids in share_pks.items(): |
1380 | | - shareid, path = share_path.split('__') |
1381 | | - purgejob = create_job('purge_files', sfloc_ids=sfloc_ids) |
1382 | | - if not purgejob['error']: |
| 1346 | +def do_analysis_deletion(analysis): |
| 1347 | + '''Function to call when deleting analysis. Also used when |
| 1348 | + projects are deleted. Checks if analysis is backed up''' |
| 1349 | + # FIXME analysisdeleted tracking no longer needed when we backup etc |
| 1350 | + # implement a log instead |
| 1351 | + am.AnalysisDeleted.objects.update_or_create(analysis=analysis) |
| 1352 | + if jobq := jm.Job.objects.filter(nextflowsearch__analysis=analysis).exclude(state__in=[ |
| 1353 | + jj.Jobstates.ERROR, jj.Jobstates.DONE, jj.Jobstates.REVOKING, jj.Jobstates.CANCELED]): |
| 1354 | + jv.cancel_or_revoke_job(jobq) |
| 1355 | + if analysis.success_completed: |
| 1356 | + # Back up files if that for some reason (old analysis) hasnt happened earlier |
| 1357 | + backup_jobs = [] |
| 1358 | + for sf in rm.StoredFile.objects.filter(analysisresultfile__analysis=analysis).exclude( |
| 1359 | + pdcbackedupfile__success=True).values('pk', 'filetype__is_folder'): |
| 1360 | + sfl_pks = [x['pk'] for x in rm.StoredFileLoc.objects.filter(sfile_id=sf['pk']).values('pk')] |
| 1361 | + do_backup = True |
| 1362 | + for exist_job in jm.Job.objects.filter(funcname='create_pdc_archive', |
| 1363 | + kwargs__sfloc_id__in=sfl_pks).values('state'): |
| 1364 | + match exist_job['state']: |
| 1365 | + case jj.Jobstates.WAITING | jj.Jobstates.HOLD: |
| 1366 | + do_backup = False |
| 1367 | + return 'Result file queued for backing up but job not running, please check' |
| 1368 | + case x if x in jj.JOBSTATES_JOB_ACTIVE: |
| 1369 | + do_backup = False |
| 1370 | + continue |
| 1371 | + case jj.Jobstates.ERROR: |
| 1372 | + return 'Result file queued for backing up but job errored, please check' |
| 1373 | + # If no backup exists and job is somehow in done/revoking/canceled, |
| 1374 | + # keep looking for more jobs, if none exist -> do a backup |
| 1375 | + if do_backup and (sfl := rm.StoredFileLoc.objects.filter(pk__in=sfl_pks, active=True, |
| 1376 | + servershare__fileservershare__server__can_backup=True).values('pk')): |
| 1377 | + jobkw = {'sfloc_id': sfl.first()['pk'], 'isdir': sf['filetype__is_folder']} |
| 1378 | + backup_jobs.append(jobkw) |
| 1379 | + if joberror := check_job_error('create_pdc_archive', **jobkw): |
| 1380 | + return f'errors backing up result file(s): {joberror}' |
| 1381 | + elif do_backup: |
| 1382 | + return 'could not find copy of result file to back up' |
| 1383 | + for bupjob in backup_jobs: |
| 1384 | + create_job_without_check('create_pdc_archive', **bupjob) |
| 1385 | + |
| 1386 | + # Delete files on web share here since the job tasks run on storage cannot do that |
| 1387 | + webshares = rm.ServerShare.objects.filter(function=rm.ShareFunction.REPORTS) |
| 1388 | + webfiles = rm.StoredFileLoc.objects.filter(sfile__analysisresultfile__analysis__id=analysis.pk, |
| 1389 | + servershare__in=webshares, active=True) |
| 1390 | + for webfile in webfiles.values('path', 'sfile__filename'): |
| 1391 | + fpath = os.path.join(settings.WEBSHARE, webfile['path'], webfile['sfile__filename']) |
| 1392 | + os.unlink(fpath) |
| 1393 | + webfiles.update(active=False, purged=True) |
| 1394 | + sfiles = rm.StoredFile.objects.filter(analysisresultfile__analysis__id=analysis.pk) |
| 1395 | + sfls = rm.StoredFileLoc.objects.filter(sfile__in=sfiles, active=True) |
| 1396 | + # One job per servershare as empty dir deleter needs that |
| 1397 | + share_pks = defaultdict(list) |
| 1398 | + for sfl in sfls.values('pk', 'servershare_id', 'path'): |
| 1399 | + share_pks[f'{sfl["servershare_id"]}__{sfl["path"]}'].append(sfl['pk']) |
| 1400 | + purgejobs, rmdirjobs = [], [] |
| 1401 | + for share_path, sfloc_ids in share_pks.items(): |
| 1402 | + shareid, path = share_path.split('__') |
| 1403 | + purgejobs.append(sfloc_ids) |
| 1404 | + if joberror := check_job_error('purge_files', sfloc_ids=sfloc_ids): |
| 1405 | + return f'error trying to queue delete files job: {joberror}' |
| 1406 | + rmdirkw = {'sfloc_ids': sfloc_ids, 'path': path, 'share_id': shareid} |
| 1407 | + rmdirjobs.append(rmdirkw) |
| 1408 | + if joberror := check_job_error('delete_empty_directory', sfloc_ids=sfloc_ids): |
| 1409 | + return f'error trying to queue delete files job: {joberror}' |
| 1410 | + for sfloc_ids in purgejobs: |
| 1411 | + create_job_without_check('purge_files', sfloc_ids=sfloc_ids) |
1383 | 1412 | rm.StoredFileLoc.objects.filter(pk__in=sfloc_ids).update(active=False) |
1384 | | - rmdirjob = create_job('delete_empty_directory', sfloc_ids=sfloc_ids, path=path, share_id=shareid) |
1385 | | - return JsonResponse({}) |
| 1413 | + for rmdirkw in rmdirjobs: |
| 1414 | + create_job_without_check('delete_empty_directory', **rmdirkw) |
| 1415 | + sfiles.update(deleted=True) |
| 1416 | + |
| 1417 | + # No more errors, mark for deletion |
| 1418 | + analysis.deleted, analysis.purged = True, True |
| 1419 | + analysis.save() |
| 1420 | + return 0 |
1386 | 1421 |
|
1387 | 1422 |
|
1388 | 1423 | @login_required |
|
0 commit comments