@@ -6,12 +6,13 @@ import edu.uci.ics.texera.web.auth.SessionUser
66import edu .uci .ics .texera .dao .jooq .generated .Tables ._
77import edu .uci .ics .texera .web .resource .dashboard .admin .execution .AdminExecutionResource ._
88import io .dropwizard .auth .Auth
9+ import org .jooq .impl .DSL
910import org .jooq .types .UInteger
1011
1112import javax .annotation .security .RolesAllowed
1213import javax .ws .rs ._
1314import javax .ws .rs .core .MediaType
14- import scala .jdk .CollectionConverters .IterableHasAsScala
15+ import scala .jdk .CollectionConverters ._
1516
1617/**
1718 * This file handles various request related to saved-executions.
@@ -48,49 +49,130 @@ object AdminExecutionResource {
4849 }
4950 }
5051
52+ def mapToStatus (status : String ): Int = {
53+ status match {
54+ case " READY" => 0
55+ case " RUNNING" => 1
56+ case " PAUSED" => 2
57+ case " COMPLETED" => 3
58+ case " FAILED" => 4
59+ case " KILLED" => 5
60+ case _ => - 1 // or throw an exception, depends on your needs
61+ }
62+ }
63+
64+ val sortFieldMapping = Map (
65+ " workflow_name" -> WORKFLOW .NAME ,
66+ " execution_name" -> WORKFLOW_EXECUTIONS .NAME ,
67+ " initiator" -> USER .NAME ,
68+ " end_time" -> WORKFLOW_EXECUTIONS .LAST_UPDATE_TIME
69+ )
70+
5171}
5272
5373@ Produces (Array (MediaType .APPLICATION_JSON ))
5474@ Path (" /admin/execution" )
5575@ RolesAllowed (Array (" ADMIN" ))
5676class AdminExecutionResource {
5777
78+ @ GET
79+ @ Path (" /totalWorkflow" )
80+ @ Produces ()
81+ def getTotalWorkflows : Int = {
82+ context
83+ .select(
84+ DSL .countDistinct(WORKFLOW .WID )
85+ )
86+ .from(WORKFLOW_EXECUTIONS )
87+ .join(WORKFLOW_VERSION )
88+ .on(WORKFLOW_EXECUTIONS .VID .eq(WORKFLOW_VERSION .VID ))
89+ .join(USER )
90+ .on(WORKFLOW_EXECUTIONS .UID .eq(USER .UID ))
91+ .join(WORKFLOW )
92+ .on(WORKFLOW .WID .eq(WORKFLOW_VERSION .WID ))
93+ .fetchOne(0 , classOf [Int ])
94+ }
95+
5896 /**
59- * This method retrieves all existing executions
97+ * This method retrieves latest execution of each workflow for specified page.
98+ * The returned executions are sorted and filtered according to the parameters.
6099 */
61100 @ GET
62- @ Path (" /executionList" )
101+ @ Path (" /executionList/{pageSize}/{pageIndex}/{sortField}/{sortDirection} " )
63102 @ Produces (Array (MediaType .APPLICATION_JSON ))
64- def listWorkflows (@ Auth current_user : SessionUser ): List [dashboardExecution] = {
65- val workflowEntries = context
103+ def listWorkflows (
104+ @ Auth current_user : SessionUser ,
105+ @ PathParam (" pageSize" ) page_size : Int = 20 ,
106+ @ PathParam (" pageIndex" ) page_index : Int = 0 ,
107+ @ PathParam (" sortField" ) sortField : String = " end_time" ,
108+ @ PathParam (" sortDirection" ) sortDirection : String = " desc" ,
109+ @ QueryParam (" filter" ) filter : java.util.List [String ]
110+ ): List [dashboardExecution] = {
111+ val filter_status = filter.asScala.map(mapToStatus).toSeq.filter(_ != - 1 ).asJava
112+
113+ // Base query that retrieves latest execution info for each workflow without sorting and filtering.
114+ // Only retrieving executions in current page according to pageSize and pageIndex parameters.
115+ val executions_base_query = context
66116 .select(
67117 WORKFLOW_EXECUTIONS .UID ,
68118 USER .NAME ,
69119 WORKFLOW_VERSION .WID ,
70120 WORKFLOW .NAME ,
71121 WORKFLOW_EXECUTIONS .EID ,
72- WORKFLOW_EXECUTIONS .VID ,
73122 WORKFLOW_EXECUTIONS .STARTING_TIME ,
74123 WORKFLOW_EXECUTIONS .LAST_UPDATE_TIME ,
75124 WORKFLOW_EXECUTIONS .STATUS ,
76125 WORKFLOW_EXECUTIONS .NAME
77126 )
78127 .from(WORKFLOW_EXECUTIONS )
79- .leftJoin (WORKFLOW_VERSION )
128+ .join (WORKFLOW_VERSION )
80129 .on(WORKFLOW_EXECUTIONS .VID .eq(WORKFLOW_VERSION .VID ))
81- .leftJoin (USER )
130+ .join (USER )
82131 .on(WORKFLOW_EXECUTIONS .UID .eq(USER .UID ))
83- .leftJoin (WORKFLOW )
132+ .join (WORKFLOW )
84133 .on(WORKFLOW .WID .eq(WORKFLOW_VERSION .WID ))
85- .fetch()
134+ .naturalJoin(
135+ context
136+ .select(
137+ DSL .max(WORKFLOW_EXECUTIONS .EID ).as(" eid" )
138+ )
139+ .from(WORKFLOW_EXECUTIONS )
140+ .join(WORKFLOW_VERSION )
141+ .on(WORKFLOW_VERSION .VID .eq(WORKFLOW_EXECUTIONS .VID ))
142+ .groupBy(WORKFLOW_VERSION .WID )
143+ )
144+
145+ // Apply filter if the status are not empty.
146+ val executions_apply_filter = if (! filter_status.isEmpty) {
147+ executions_base_query.where(WORKFLOW_EXECUTIONS .STATUS .in(filter_status))
148+ } else {
149+ executions_base_query
150+ }
151+
152+ // Apply sorting if user specified.
153+ var executions_apply_order =
154+ executions_apply_filter.limit(page_size).offset(page_index * page_size)
155+ if (sortField != " NO_SORTING" ) {
156+ executions_apply_order = executions_apply_filter
157+ .orderBy(
158+ if (sortDirection == " desc" ) sortFieldMapping.getOrElse(sortField, WORKFLOW .NAME ).desc()
159+ else sortFieldMapping.getOrElse(sortField, WORKFLOW .NAME ).asc()
160+ )
161+ .limit(page_size)
162+ .offset(page_index * page_size)
163+ }
164+
165+ val executions = executions_apply_order.fetch()
86166
167+ // Retrieve the id of each workflow that the user has access to.
87168 val availableWorkflowIds = context
88169 .select(WORKFLOW_USER_ACCESS .WID )
89170 .from(WORKFLOW_USER_ACCESS )
90171 .where(WORKFLOW_USER_ACCESS .UID .eq(current_user.getUid))
91172 .fetchInto(classOf [UInteger ])
92173
93- workflowEntries
174+ // Calculate the statistics needed for each execution.
175+ executions
94176 .map(workflowRecord => {
95177 val startingTime =
96178 workflowRecord.get(WORKFLOW_EXECUTIONS .STARTING_TIME ).getTime
0 commit comments