@@ -11,10 +11,11 @@ import javax.inject.Inject
1111import java .util .{Date , TimeZone }
1212
1313import services ._
14- import models .{Collection , Dataset , File , ProjectSpace , UUID , User , UserStatus }
15- import util .Parsers
14+ import models .{Collection , Dataset , File , ProjectSpace , UUID , User , UserStatus , ExtractionJob }
1615
17- import scala .collection .mutable .ListBuffer
16+ import org .apache .commons .lang3 .Range .between
17+ import scala .collection .mutable .{ListBuffer , Map => MutaMap }
18+ import util .Parsers
1819
1920
2021/**
@@ -25,7 +26,8 @@ class Reporting @Inject()(selections: SelectionService,
2526 files : FileService ,
2627 collections : CollectionService ,
2728 spaces : SpaceService ,
28- users : UserService ) extends Controller with ApiController {
29+ users : UserService ,
30+ extractions : ExtractionService ) extends Controller with ApiController {
2931
3032 val dateFormat = new java.text.SimpleDateFormat (" yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" )
3133 dateFormat.setTimeZone(TimeZone .getTimeZone(" GMT" ))
@@ -500,4 +502,165 @@ class Reporting @Inject()(selections: SelectionService,
500502 " Content-Disposition" -> (" attachment; filename=SpaceStorage" + id.stringify+ " .csv" )
501503 )
502504 }
505+
506+ private def determineJobType (jobMsg : String ): String = {
507+ if (jobMsg == " SUBMITTED" )
508+ " queue"
509+ else
510+ " work" // TODO: Better solution?
511+ }
512+
513+ def extractorUsage (since : Option [String ], until : Option [String ]) = ServerAdminAction { implicit request =>
514+ Logger .debug(" Generating extraction metrics report" )
515+
516+ /** This mapping is used to aggregate jobs.
517+ * A job is considered some countable extraction duration. It has a jobType so
518+ * we can attempt to differentiate "time in queue" from "time being processed".
519+ *
520+ * jobLookup: [
521+ * UserID -> [
522+ * UniqueJobKey -> {
523+ * jobs: [ list of jobs identical to current_job below ]
524+ * current_job: {
525+ * target event.file_id (but can be a dataset ID or metadata ID in reality)
526+ * targetType file/dataset/metadata
527+ * extractor extractor id (e.g. ncsa.file.digest)
528+ * spaceId id of space containing target
529+ * jobId official job_id, if available
530+ * jobType is this a queue event or an actual work event on a node? see determineJobType()
531+ * lastStatus most recent event.status for the job
532+ * start earliest event.start time from events in this job (event.end is often blank)
533+ * end latest event.start time from events in this job (event.end is often blank)
534+ *
535+ * }
536+ * }
537+ */
538+ val jobLookup : MutaMap [UUID ,
539+ MutaMap [String , (List [ExtractionJob ], Option [ExtractionJob ])]] = MutaMap .empty
540+
541+ val results = extractions.getIterator(true , since, until, None )
542+ while (results.hasNext) {
543+ val event = results.next
544+
545+ // Collect info to associate this event with a job if possible
546+ val jobId = event.job_id match {
547+ case Some (jid) => jid.stringify
548+ case None => " "
549+ }
550+ val jobType = determineJobType(event.status)
551+ val uniqueKey = event.file_id + " - " + event.extractor_id
552+
553+ // Add user and uniqueKey if they don't exist yet
554+ if (! jobLookup.get(event.user_id).isDefined)
555+ jobLookup(event.user_id) = MutaMap .empty
556+ if (! jobLookup.get(event.user_id).get.get(uniqueKey).isDefined)
557+ jobLookup(event.user_id)(uniqueKey) = (List .empty, None )
558+
559+ // If we don't have an ongoing job, or it's not same jobType, start a new ongoing job
560+ var jobList = jobLookup(event.user_id)(uniqueKey)._1
561+ val currentJob = jobLookup(event.user_id)(uniqueKey)._2
562+ val newJobBeginning = currentJob match {
563+ case Some (currJob) => currJob.jobType != jobType
564+ case None => true
565+ }
566+
567+ if (newJobBeginning) {
568+ // Determine parent details for new job - quick dataset check first, then file search
569+ var spaces = " "
570+ var resourceType = " file"
571+ val parentDatasets = datasets.findByFileIdAllContain(event.file_id)
572+ if (parentDatasets.length > 0 ) {
573+ parentDatasets.foreach(ds => {
574+ spaces = ds.spaces.mkString(" ," )
575+ resourceType = " file"
576+ })
577+ } else {
578+ datasets.get(event.file_id) match {
579+ case Some (ds) => {
580+ spaces = ds.spaces.mkString(" ," )
581+ resourceType = " dataset"
582+ }
583+ case None => {}
584+ }
585+ }
586+
587+ // Push current job to jobs list (saying it ended at start of next stage) and make new job entry
588+ if (currentJob.isDefined) {
589+ jobList = jobList ::: List (currentJob.get.copy(end= event.start))
590+ }
591+ val newJob = ExtractionJob (event.file_id.stringify, resourceType, event.extractor_id, spaces, jobId, jobType, 1 ,
592+ event.status, event.start, event.start)
593+ jobLookup(event.user_id)(uniqueKey) = (jobList, Some (newJob))
594+ } else {
595+ // Don't overwrite DONE as final message in case we have small differences in timing of last extractor msg
596+ var status = currentJob.get.lastStatus
597+ if (status != " DONE" ) status = event.status
598+ val updatedJob = currentJob.get.copy(statusCount= currentJob.get.statusCount+ 1 , lastStatus= event.status, end= event.start)
599+ jobLookup(event.user_id)(uniqueKey) = (jobList, Some (updatedJob))
600+ }
601+ }
602+
603+ var headerRow = true
604+ val keyiter = jobLookup.keysIterator
605+ val enum = Enumerator .generateM({
606+ val chunk = if (headerRow) {
607+ val headers = List (" userid" , " username" , " email" , " resource_id" , " resource_type" , " space_id" , " extractor" ,
608+ " job_id" , " job_type" , " status_count" , " last_status" , " start" , " end" , " duration_ms" )
609+ val header = " \" " + headers.mkString(" \" ,\" " )+ " \"\n "
610+ headerRow = false
611+ Some (header.getBytes(" UTF-8" ))
612+ } else {
613+ scala.concurrent.blocking {
614+ if (keyiter.hasNext) {
615+ val userid = keyiter.next
616+
617+ // Get pretty user info
618+ var username = " "
619+ var email = " "
620+ users.get(userid) match {
621+ case Some (u) => {
622+ username = u.fullName
623+ email = u.email.getOrElse(" " )
624+ }
625+ case None => {}
626+ }
627+
628+ var content = " "
629+ val userRecords = jobLookup(userid)
630+ userRecords.keysIterator.foreach(jobkey => {
631+ val jobHistory = userRecords(jobkey)
632+ val jobList = jobHistory._1
633+ val currJob = jobHistory._2
634+ jobList.foreach(job => {
635+ val duration = (job.end.getTime - job.start.getTime)
636+ val row = List (userid.stringify, username, email, job.target, job.targetType, job.spaces, job.extractor,
637+ job.jobId, job.jobType, job.statusCount, job.lastStatus, job.start, job.end, duration)
638+ if (duration > 0 )
639+ content += " \" " + row.mkString(" \" ,\" " )+ " \"\n "
640+ })
641+ // current job if it was never "closed" and pushed to the jobList (most common case)
642+ currJob match {
643+ case Some (job) => {
644+ val duration = (job.end.getTime - job.start.getTime)
645+ val row = List (userid.stringify, username, email, job.target, job.targetType, job.spaces, job.extractor,
646+ job.jobId, job.jobType, job.statusCount, job.lastStatus, job.start, job.end, duration)
647+ if (duration > 0 )
648+ content += " \" " + row.mkString(" \" ,\" " )+ " \"\n "
649+ }
650+ case None => {}
651+ }
652+ })
653+ Some (content.getBytes(" UTF-8" ))
654+ }
655+ else None
656+ }
657+ }
658+ Future (chunk)
659+ })
660+
661+ Ok .chunked(enum .andThen(Enumerator .eof)).withHeaders(
662+ " Content-Type" -> " text/csv" ,
663+ " Content-Disposition" -> " attachment; filename=ExtractorMetrics.csv"
664+ )
665+ }
503666}
0 commit comments