@@ -11,10 +11,11 @@ import javax.inject.Inject
1111import java .util .{Date , TimeZone }
1212
1313import services ._
14- import models .{Collection , Dataset , File , ProjectSpace , UUID , User , UserStatus }
15- import util .Parsers
14+ import models .{Collection , Dataset , File , ProjectSpace , UUID , User , UserStatus , ExtractionJob }
1615
17- import scala .collection .mutable .ListBuffer
16+ import org .apache .commons .lang3 .Range .between
17+ import scala .collection .mutable .{ListBuffer , Map => MutaMap }
18+ import util .Parsers
1819
1920
2021/**
@@ -25,7 +26,8 @@ class Reporting @Inject()(selections: SelectionService,
2526 files : FileService ,
2627 collections : CollectionService ,
2728 spaces : SpaceService ,
28- users : UserService ) extends Controller with ApiController {
29+ users : UserService ,
30+ extractions : ExtractionService ) extends Controller with ApiController {
2931
3032 val dateFormat = new java.text.SimpleDateFormat (" yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" )
3133 dateFormat.setTimeZone(TimeZone .getTimeZone(" GMT" ))
@@ -394,9 +396,9 @@ class Reporting @Inject()(selections: SelectionService,
394396 return contents
395397 }
396398
397- def spaceStorage (id : UUID , since : Option [String ], until : Option [String ]) = ServerAdminAction { implicit request =>
399+ def spaceStorage (space : Option [ String ] , since : Option [String ], until : Option [String ]) = ServerAdminAction { implicit request =>
398400 // Iterate over the files of every dataset in the space
399- val results = datasets.getIterator(Some (id) , None , None ) // TODO: Can't use time filters here if user intends files
401+ val results = datasets.getIterator(space , None , None ) // TODO: Can't use time filters here if user intends files
400402
401403 var headerRow = true
402404 val enum = Enumerator .generateM({
@@ -495,9 +497,174 @@ class Reporting @Inject()(selections: SelectionService,
495497 Future (chunk)
496498 })
497499
500+ val filename = space match {
501+ case Some (spid) => " SpaceStorage_" + spid+ " .csv"
502+ case None => " SpaceStorage.csv"
503+ }
504+ Ok .chunked(enum .andThen(Enumerator .eof)).withHeaders(
505+ " Content-Type" -> " text/csv" ,
506+ " Content-Disposition" -> (" attachment; filename=" + filename)
507+ )
508+ }
509+
510+ private def determineJobType (jobMsg : String ): String = {
511+ if (jobMsg == " SUBMITTED" )
512+ " queue"
513+ else
514+ " work" // TODO: Better solution?
515+ }
516+
517+ def extractorUsage (since : Option [String ], until : Option [String ]) = ServerAdminAction { implicit request =>
518+ Logger .debug(" Generating extraction metrics report" )
519+
520+ /** This mapping is used to aggregate jobs.
521+ * A job is considered some countable extraction duration. It has a jobType so
522+ * we can attempt to differentiate "time in queue" from "time being processed".
523+ *
524+ * jobLookup: [
525+ * UserID -> [
526+ * UniqueJobKey -> {
527+ * jobs: [ list of jobs identical to current_job below ]
528+ * current_job: {
529+ * target event.file_id (but can be a dataset ID or metadata ID in reality)
530+ * targetType file/dataset/metadata
531+ * extractor extractor id (e.g. ncsa.file.digest)
532+ * spaceId id of space containing target
533+ * jobId official job_id, if available
534+ * jobType is this a queue event or an actual work event on a node? see determineJobType()
535+ * lastStatus most recent event.status for the job
536+ * start earliest event.start time from events in this job (event.end is often blank)
537+ * end latest event.start time from events in this job (event.end is often blank)
538+ *
539+ * }
540+ * }
541+ */
542+ val jobLookup : MutaMap [UUID ,
543+ MutaMap [String , (List [ExtractionJob ], Option [ExtractionJob ])]] = MutaMap .empty
544+
545+ val results = extractions.getIterator(true , since, until, None )
546+ while (results.hasNext) {
547+ val event = results.next
548+
549+ // Collect info to associate this event with a job if possible
550+ val jobId = event.job_id match {
551+ case Some (jid) => jid.stringify
552+ case None => " "
553+ }
554+ val jobType = determineJobType(event.status)
555+ val uniqueKey = event.file_id + " - " + event.extractor_id
556+
557+ // Add user and uniqueKey if they don't exist yet
558+ if (! jobLookup.get(event.user_id).isDefined)
559+ jobLookup(event.user_id) = MutaMap .empty
560+ if (! jobLookup.get(event.user_id).get.get(uniqueKey).isDefined)
561+ jobLookup(event.user_id)(uniqueKey) = (List .empty, None )
562+
563+ // If we don't have an ongoing job, or it's not same jobType, start a new ongoing job
564+ var jobList = jobLookup(event.user_id)(uniqueKey)._1
565+ val currentJob = jobLookup(event.user_id)(uniqueKey)._2
566+ val newJobBeginning = currentJob match {
567+ case Some (currJob) => currJob.jobType != jobType
568+ case None => true
569+ }
570+
571+ if (newJobBeginning) {
572+ // Determine parent details for new job - quick dataset check first, then file search
573+ var spaces = " "
574+ var resourceType = " file"
575+ val parentDatasets = datasets.findByFileIdAllContain(event.file_id)
576+ if (parentDatasets.length > 0 ) {
577+ parentDatasets.foreach(ds => {
578+ spaces = ds.spaces.mkString(" ," )
579+ resourceType = " file"
580+ })
581+ } else {
582+ datasets.get(event.file_id) match {
583+ case Some (ds) => {
584+ spaces = ds.spaces.mkString(" ," )
585+ resourceType = " dataset"
586+ }
587+ case None => {}
588+ }
589+ }
590+
591+ // Push current job to jobs list (saying it ended at start of next stage) and make new job entry
592+ if (currentJob.isDefined) {
593+ jobList = jobList ::: List (currentJob.get.copy(end= event.start))
594+ }
595+ val newJob = ExtractionJob (event.file_id.stringify, resourceType, event.extractor_id, spaces, jobId, jobType, 1 ,
596+ event.status, event.start, event.start)
597+ jobLookup(event.user_id)(uniqueKey) = (jobList, Some (newJob))
598+ } else {
599+ // Don't overwrite DONE as final message in case we have small differences in timing of last extractor msg
600+ var status = currentJob.get.lastStatus
601+ if (status != " DONE" ) status = event.status
602+ val updatedJob = currentJob.get.copy(statusCount= currentJob.get.statusCount+ 1 , lastStatus= event.status, end= event.start)
603+ jobLookup(event.user_id)(uniqueKey) = (jobList, Some (updatedJob))
604+ }
605+ }
606+
607+ var headerRow = true
608+ val keyiter = jobLookup.keysIterator
609+ val enum = Enumerator .generateM({
610+ val chunk = if (headerRow) {
611+ val headers = List (" userid" , " username" , " email" , " resource_id" , " resource_type" , " space_id" , " extractor" ,
612+ " job_id" , " job_type" , " status_count" , " last_status" , " start" , " end" , " duration_ms" )
613+ val header = " \" " + headers.mkString(" \" ,\" " )+ " \"\n "
614+ headerRow = false
615+ Some (header.getBytes(" UTF-8" ))
616+ } else {
617+ scala.concurrent.blocking {
618+ if (keyiter.hasNext) {
619+ val userid = keyiter.next
620+
621+ // Get pretty user info
622+ var username = " "
623+ var email = " "
624+ users.get(userid) match {
625+ case Some (u) => {
626+ username = u.fullName
627+ email = u.email.getOrElse(" " )
628+ }
629+ case None => {}
630+ }
631+
632+ var content = " "
633+ val userRecords = jobLookup(userid)
634+ userRecords.keysIterator.foreach(jobkey => {
635+ val jobHistory = userRecords(jobkey)
636+ val jobList = jobHistory._1
637+ val currJob = jobHistory._2
638+ jobList.foreach(job => {
639+ val duration = (job.end.getTime - job.start.getTime)
640+ val row = List (userid.stringify, username, email, job.target, job.targetType, job.spaces, job.extractor,
641+ job.jobId, job.jobType, job.statusCount, job.lastStatus, job.start, job.end, duration)
642+ if (duration > 0 )
643+ content += " \" " + row.mkString(" \" ,\" " )+ " \"\n "
644+ })
645+ // current job if it was never "closed" and pushed to the jobList (most common case)
646+ currJob match {
647+ case Some (job) => {
648+ val duration = (job.end.getTime - job.start.getTime)
649+ val row = List (userid.stringify, username, email, job.target, job.targetType, job.spaces, job.extractor,
650+ job.jobId, job.jobType, job.statusCount, job.lastStatus, job.start, job.end, duration)
651+ if (duration > 0 )
652+ content += " \" " + row.mkString(" \" ,\" " )+ " \"\n "
653+ }
654+ case None => {}
655+ }
656+ })
657+ Some (content.getBytes(" UTF-8" ))
658+ }
659+ else None
660+ }
661+ }
662+ Future (chunk)
663+ })
664+
498665 Ok .chunked(enum .andThen(Enumerator .eof)).withHeaders(
499666 " Content-Type" -> " text/csv" ,
500- " Content-Disposition" -> ( " attachment; filename=SpaceStorage " + id.stringify + " . csv" )
667+ " Content-Disposition" -> " attachment; filename=ExtractorMetrics. csv"
501668 )
502669 }
503670}
0 commit comments