@@ -436,14 +436,14 @@ class RabbitmqPlugin(application: Application) extends Plugin {
436436 * @param file_id the UUID of file
437437 * @param extractor_id the extractor queue name to be submitted
438438 */
439- def postSubmissionEven (file_id : UUID , extractor_id : String ): (UUID , Option [UUID ]) = {
439+ def postSubmissionEvent (file_id : UUID , extractor_id : String , user_id : UUID ): (UUID , Option [UUID ]) = {
440440 val extractions : ExtractionService = DI .injector.getInstance(classOf [ExtractionService ])
441441
442442 import java .text .SimpleDateFormat
443443 val dateFormatter = new SimpleDateFormat (" yyyy-MM-dd'T'HH:mm:ssX" )
444444 val submittedDateConvert = new java.util.Date ()
445445 val job_id = Some (UUID .generate())
446- extractions.insert(Extraction (UUID .generate(), file_id, job_id, extractor_id, " SUBMITTED" , submittedDateConvert, None )) match {
446+ extractions.insert(Extraction (UUID .generate(), file_id, job_id, extractor_id, " SUBMITTED" , submittedDateConvert, None , user_id )) match {
447447 case Some (objectid) => (UUID (objectid.toString), job_id)
448448 case None => (UUID (" " ), job_id)
449449 }
@@ -477,6 +477,7 @@ class RabbitmqPlugin(application: Application) extends Plugin {
477477 val routingKey = exchange + " ." + " file." + contentTypeToRoutingKey(file.contentType)
478478 val extraInfo = Map (" filename" -> file.filename)
479479 val apiKey = requestAPIKey.getOrElse(globalAPIKey)
480+ val user = userService.findByKey(apiKey).getOrElse(User .anonymous)
480481 val sourceExtra = JsObject ((Seq (" filename" -> JsString (file.filename))))
481482 Logger .debug(s " Sending message $routingKey from $host with extraInfo $extraInfo" )
482483 var jobId : Option [UUID ] = None
@@ -487,7 +488,7 @@ class RabbitmqPlugin(application: Application) extends Plugin {
487488
488489 val notifies = getEmailNotificationEmailList(requestAPIKey)
489490
490- val (id, job_id) = postSubmissionEven (file.id, queue)
491+ val (id, job_id) = postSubmissionEvent (file.id, queue, user.id )
491492
492493 val msg = ExtractorMessage (id, file.id, job_id, notifies, file.id, host, queue, extraInfo, file.length.toString,
493494 d.id, " " , apiKey, routingKey, source, " created" , None )
@@ -513,11 +514,12 @@ class RabbitmqPlugin(application: Application) extends Plugin {
513514 val routingKey = exchange + " ." + " file." + contentTypeToRoutingKey(file.contentType)
514515 val extraInfo = Map (" filename" -> file.filename)
515516 val apiKey = requestAPIKey.getOrElse(globalAPIKey)
517+ val user = userService.findByKey(apiKey).getOrElse(User .anonymous)
516518 Logger .debug(s " Sending message $routingKey from $host with extraInfo $extraInfo" )
517519 val sourceExtra = JsObject ((Seq (" filename" -> JsString (file.filename))))
518520 val source = Entity (ResourceRef (ResourceRef .file, file.id), Some (file.contentType), sourceExtra)
519521 val notifies = getEmailNotificationEmailList(requestAPIKey)
520- val (id, job_id) = postSubmissionEven (file.id, routingKey)
522+ val (id, job_id) = postSubmissionEvent (file.id, routingKey, user.id )
521523 val msg = ExtractorMessage (id, file.id, job_id, notifies, file.id, host, routingKey, extraInfo, file.length.toString, null ,
522524 " " , apiKey, routingKey, source, " created" , None )
523525 extractWorkQueue(msg)
@@ -535,14 +537,15 @@ class RabbitmqPlugin(application: Application) extends Plugin {
535537 Logger .debug(s " Sending message $routingKey from $host" )
536538 val queues = getQueues(dataset, routingKey, file.contentType)
537539 val apiKey = requestAPIKey.getOrElse(globalAPIKey)
540+ val user = userService.findByKey(apiKey).getOrElse(User .anonymous)
538541 val sourceExtra = JsObject ((Seq (" filename" -> JsString (file.filename))))
539542 queues.foreach{ extractorId =>
540543 val source = Entity (ResourceRef (ResourceRef .file, file.id), Some (file.contentType), sourceExtra)
541544 val target = Entity (ResourceRef (ResourceRef .dataset, dataset.id), None , JsObject (Seq .empty))
542545
543546 val notifies = getEmailNotificationEmailList(requestAPIKey)
544547
545- val (id, job_id) = postSubmissionEven (file.id, extractorId)
548+ val (id, job_id) = postSubmissionEvent (file.id, extractorId, user.id )
546549
547550 val msg = ExtractorMessage (id, file.id, job_id, notifies, file.id, host, extractorId, Map .empty, file.length.toString, dataset.id,
548551 " " , apiKey, routingKey, source, " added" , Some (target))
@@ -562,14 +565,15 @@ class RabbitmqPlugin(application: Application) extends Plugin {
562565 Logger .debug(s " Sending message $routingKey from $host" )
563566 val queues = getQueues(dataset, routingKey, " " )
564567 val apiKey = requestAPIKey.getOrElse(globalAPIKey)
568+ val user = userService.findByKey(apiKey).getOrElse(User .anonymous)
565569 val sourceExtra = JsObject ((Seq (" filenames" -> JsArray (filelist.map(f=> JsString (f.filename)).toSeq))))
566570 val msgExtra = Map (" filenames" -> filelist.map(f=> f.filename))
567571 queues.foreach{ extractorId =>
568572 val source = Entity (ResourceRef (ResourceRef .dataset, dataset.id), None , sourceExtra)
569573
570574 val notifies = getEmailNotificationEmailList(requestAPIKey)
571575
572- val (id, job_id) = postSubmissionEven (dataset.id, extractorId)
576+ val (id, job_id) = postSubmissionEvent (dataset.id, extractorId, user.id )
573577
574578 var totalsize : Long = 0
575579 filelist.map(f => totalsize += f.length)
@@ -590,13 +594,14 @@ class RabbitmqPlugin(application: Application) extends Plugin {
590594 Logger .debug(s " Sending message $routingKey from $host" )
591595 val queues = getQueues(dataset, routingKey, file.contentType)
592596 val apiKey = requestAPIKey.getOrElse(globalAPIKey)
597+ val user = userService.findByKey(apiKey).getOrElse(User .anonymous)
593598 val sourceExtra = JsObject ((Seq (" filename" -> JsString (file.filename))))
594599 queues.foreach{ extractorId =>
595600 val source = Entity (ResourceRef (ResourceRef .file, file.id), Some (file.contentType), sourceExtra)
596601 val target = Entity (ResourceRef (ResourceRef .dataset, dataset.id), None , JsObject (Seq .empty))
597602
598603 val notifies = getEmailNotificationEmailList(requestAPIKey)
599- val (id, job_id) = postSubmissionEven (file.id, extractorId)
604+ val (id, job_id) = postSubmissionEvent (file.id, extractorId, user.id )
600605 val msg = ExtractorMessage (id, file.id, job_id, notifies, file.id, host, extractorId, Map .empty, file.length.toString, dataset.id,
601606 " " , apiKey, routingKey, source, " removed" , Some (target))
602607 extractWorkQueue(msg)
@@ -621,7 +626,7 @@ class RabbitmqPlugin(application: Application) extends Plugin {
621626 val source = Entity (ResourceRef (ResourceRef .file, originalId), Some (file.contentType), sourceExtra)
622627
623628 val notifies = getEmailNotificationEmailList(requestAPIKey)
624- val (id, job_id) = postSubmissionEven (file.id, queue)
629+ val (id, job_id) = postSubmissionEvent (file.id, queue, user.getOrElse( User .anonymous).id )
625630 val msg = ExtractorMessage (id, file.id, job_id, notifies, file.id, host, queue, extraInfo, file.length.toString, datasetId,
626631 " " , apiKey, " extractors." + queue, source, " submitted" , None )
627632 extractWorkQueue(msg)
@@ -644,7 +649,7 @@ class RabbitmqPlugin(application: Application) extends Plugin {
644649
645650 val notifies = getEmailNotificationEmailList(requestAPIKey)
646651
647- val (id, job_id) = postSubmissionEven (datasetId, queue)
652+ val (id, job_id) = postSubmissionEvent (datasetId, queue, user.getOrElse( User .anonymous).id )
648653 val msg = ExtractorMessage (id, datasetId, job_id, notifies, datasetId, host, queue, extraInfo, 0 .toString, datasetId,
649654 " " , apiKey, " extractors." + queue, source, " submitted" , None )
650655 extractWorkQueue(msg)
@@ -676,7 +681,7 @@ class RabbitmqPlugin(application: Application) extends Plugin {
676681 val source = Entity (ResourceRef (ResourceRef .metadata, metadataId), None , JsObject (Seq .empty))
677682 val target = Entity (resourceRef, None , JsObject (Seq .empty))
678683
679- val (id, job_id) = postSubmissionEven (resourceRef.id, extractorId)
684+ val (id, job_id) = postSubmissionEvent (resourceRef.id, extractorId, user.getOrElse( User .anonymous).id )
680685 val msg = ExtractorMessage (id, resourceRef.id, job_id, notifies, resourceRef.id, host, extractorId, extraInfo, 0 .toString, resourceRef.id,
681686 " " , apiKey, routingKey, source, " added" , Some (target))
682687 extractWorkQueue(msg)
@@ -693,7 +698,7 @@ class RabbitmqPlugin(application: Application) extends Plugin {
693698 val source = Entity (ResourceRef (ResourceRef .metadata, metadataId), None , JsObject (Seq .empty))
694699 val target = Entity (resourceRef, None , JsObject (Seq .empty))
695700
696- val (id, job_id) = postSubmissionEven (resourceRef.id, extractorId)
701+ val (id, job_id) = postSubmissionEvent (resourceRef.id, extractorId, user.getOrElse( User .anonymous).id )
697702 val msg = ExtractorMessage (id, resourceRef.id, job_id, notifies, resourceRef.id, host, extractorId, extraInfo, 0 .toString, null ,
698703 " " , apiKey, routingKey, source, " added" , Some (target))
699704 extractWorkQueue(msg)
@@ -728,7 +733,7 @@ class RabbitmqPlugin(application: Application) extends Plugin {
728733 val source = Entity (ResourceRef (ResourceRef .metadata, metadataId), None , JsObject (Seq .empty))
729734 val target = Entity (resourceRef, None , JsObject (Seq .empty))
730735
731- val (id, job_id) = postSubmissionEven (resourceRef.id, extractorId)
736+ val (id, job_id) = postSubmissionEvent (resourceRef.id, extractorId, user.getOrElse( User .anonymous).id )
732737 val msg = ExtractorMessage (id, resourceRef.id, job_id, notifies, resourceRef.id, host, extractorId, extraInfo, 0 .toString, resourceRef.id,
733738 " " , apiKey, routingKey, source, " removed" , Some (target))
734739 extractWorkQueue(msg)
@@ -745,7 +750,7 @@ class RabbitmqPlugin(application: Application) extends Plugin {
745750 val source = Entity (ResourceRef (ResourceRef .metadata, metadataId), None , JsObject (Seq .empty))
746751 val target = Entity (resourceRef, None , JsObject (Seq .empty))
747752
748- val (id, job_id) = postSubmissionEven (resourceRef.id, extractorId)
753+ val (id, job_id) = postSubmissionEvent (resourceRef.id, extractorId, user.getOrElse( User .anonymous).id )
749754 val msg = ExtractorMessage (id, resourceRef.id, job_id, notifies, resourceRef.id, host, extractorId, extraInfo, 0 .toString, null ,
750755 " " , apiKey, routingKey, source, " removed" , Some (target))
751756 extractWorkQueue(msg)
@@ -768,12 +773,13 @@ class RabbitmqPlugin(application: Application) extends Plugin {
768773 // since the thumbnail extractor during processing will need to upload to correct mongo collection.
769774 val routingKey = exchange + " .query." + contentType.replace(" /" , " ." )
770775 val apiKey = requestAPIKey.getOrElse(globalAPIKey)
776+ val user = userService.findByKey(apiKey).getOrElse(User .anonymous)
771777 Logger .debug(s " Sending message $routingKey from $host" )
772778
773779 val notifies = getEmailNotificationEmailList(requestAPIKey)
774780 val source = Entity (ResourceRef (ResourceRef .file, tempFileId), Some (contentType), JsObject (Seq .empty))
775781
776- val (id, job_id) = postSubmissionEven (tempFileId, routingKey)
782+ val (id, job_id) = postSubmissionEvent (tempFileId, routingKey, user.id )
777783 val msg = ExtractorMessage (id, tempFileId, job_id, notifies, tempFileId, host, routingKey, Map .empty[String , Any ], length, null ,
778784 " " , apiKey, routingKey, source, " created" , None )
779785 extractWorkQueue(msg)
@@ -788,12 +794,13 @@ class RabbitmqPlugin(application: Application) extends Plugin {
788794 def submitSectionPreviewManually (preview : Preview , sectionId : UUID , host : String , requestAPIKey : Option [String ]): Unit = {
789795 val routingKey = exchange + " .index." + contentTypeToRoutingKey(preview.contentType)
790796 val apiKey = requestAPIKey.getOrElse(globalAPIKey)
797+ val user = userService.findByKey(apiKey).getOrElse(User .anonymous)
791798 val extraInfo = Map (" section_id" -> sectionId)
792799 val source = Entity (ResourceRef (ResourceRef .preview, preview.id), None , JsObject (Seq .empty))
793800 val target = Entity (ResourceRef (ResourceRef .section, sectionId), None , JsObject (Seq .empty))
794801 val notifies = getEmailNotificationEmailList(requestAPIKey)
795802
796- val (id, job_id) = postSubmissionEven (sectionId, routingKey)
803+ val (id, job_id) = postSubmissionEvent (sectionId, routingKey, user.id )
797804 val msg = ExtractorMessage (id, sectionId, job_id, notifies, sectionId, host, routingKey, extraInfo, 0 .toString, null ,
798805 " " , apiKey, routingKey, source, " added" , Some (target))
799806 extractWorkQueue(msg)
@@ -961,24 +968,27 @@ class RabbitmqPlugin(application: Application) extends Plugin {
961968 * @param connection the connection to the rabbitmq
962969 * @param cancellationDownloadQueueName the queue name of the cancellation downloaded queue
963970 */
964- class PendingRequestCancellationActor (exchange : String , connection : Option [Connection ], cancellationDownloadQueueName : String , cancellationSearchTimeout : Long ) extends Actor {
971+ class PendingRequestCancellationActor (exchange : String , connection : Option [Connection ], cancellationDownloadQueueName : String ,
972+ cancellationSearchTimeout : Long ) extends Actor {
965973 val configuration = play.api.Play .configuration
966974 val CancellationSearchNumLimits : Integer = configuration.getString(" submission.cancellation.search.numlimits" ).getOrElse(" 100" ).toInt
967975 def receive = {
968976 case CancellationMessage (id, queueName, msg_id) => {
969977 val extractions : ExtractionService = DI .injector.getInstance(classOf [ExtractionService ])
970978 val dateFormatter = new SimpleDateFormat (" yyyy-MM-dd'T'HH:mm:ssX" )
971979 var startDate = new java.util.Date ()
980+ var user_id = User .anonymous.id
972981 val job_id : Option [UUID ] = extractions.get(msg_id) match {
973982 case Some (extraction) => {
983+ user_id = extraction.user_id
974984 extraction.job_id
975985 }
976986 case None => {
977987 Logger .warn(" Failed to lookup jobId.. no extraction message found with id=" + msg_id)
978988 None
979989 }
980990 }
981- extractions.insert(Extraction (UUID .generate(), id, job_id, queueName, " Cancel Requested" , startDate, None ))
991+ extractions.insert(Extraction (UUID .generate(), id, job_id, queueName, " Cancel Requested" , startDate, None , user_id ))
982992
983993 val channel : Channel = connection.get.createChannel()
984994 // 1. connect to the target rabbitmq queue
@@ -1046,9 +1056,9 @@ class PendingRequestCancellationActor(exchange: String, connection: Option[Conne
10461056 // update extraction event
10471057 startDate = new java.util.Date ()
10481058 if (foundCancellationRequest) {
1049- extractions.insert(Extraction (UUID .generate(), id, job_id, queueName, " Cancel Success" , startDate, None ))
1059+ extractions.insert(Extraction (UUID .generate(), id, job_id, queueName, " Cancel Success" , startDate, None , user_id ))
10501060 } else {
1051- extractions.insert(Extraction (UUID .generate(), id, job_id, queueName, " Cancel Failed" , startDate, None ))
1061+ extractions.insert(Extraction (UUID .generate(), id, job_id, queueName, " Cancel Failed" , startDate, None , user_id ))
10521062 }
10531063
10541064 try {
@@ -1194,6 +1204,7 @@ class EventFilter(channel: Channel, queue: String) extends Actor {
11941204 Logger .debug(" Received extractor status: " + statusBody)
11951205 val json = Json .parse(statusBody)
11961206 val file_id = UUID ((json \ " file_id" ).as[String ])
1207+ val user_id = UUID ((json \ " user_id" ).as[String ])
11971208 val job_id : Option [UUID ] = (json \ " job_id" ).asOpt[String ] match {
11981209 case Some (jid) => { Some (UUID (jid)) }
11991210 case None => { None }
@@ -1209,11 +1220,11 @@ class EventFilter(channel: Channel, queue: String) extends Actor {
12091220 // other detailed status updates to logs when we start implementing
12101221 // distributed logging
12111222 if (updatedStatus.contains(" DONE" )) {
1212- extractions.insert(Extraction (UUID .generate(), file_id, job_id, extractor_id, " DONE" , startDate.get, None ))
1223+ extractions.insert(Extraction (UUID .generate(), file_id, job_id, extractor_id, " DONE" , startDate.get, None , user_id ))
12131224 } else {
12141225 val commKey = " key=" + play.Play .application().configuration().getString(" commKey" )
12151226 val parsed_status = status.replace(commKey, " key=secretKey" )
1216- extractions.insert(Extraction (UUID .generate(), file_id, job_id, extractor_id, parsed_status, startDate.get, None ))
1227+ extractions.insert(Extraction (UUID .generate(), file_id, job_id, extractor_id, parsed_status, startDate.get, None , user_id ))
12171228 }
12181229 Logger .debug(" updatedStatus=" + updatedStatus + " status=" + status + " startDate=" + startDate)
12191230 models.ExtractionInfoSetUp .updateDTSRequests(file_id, extractor_id)
0 commit comments