Skip to content

Commit a2fccf0

Browse files
mveroedaniel
authored andcommitted
Add support for task management in interactive sessions.
Refactor job and statement handling to include tasks, implement task submission, cancellation, and progress tracking. Extend RPC to handle task requests. Add task eviction logic and test cases. Document task usage with examples.
1 parent 710c078 commit a2fccf0

File tree

15 files changed

+915
-28
lines changed

15 files changed

+915
-28
lines changed

docs/rest-api.md

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,96 @@ Returns code completion candidates for the specified code in the session.
377377
</tr>
378378
</table>
379379

380+
### POST /sessions/{sessionId}/tasks
381+
382+
Submits a pre-compiled Spark job (task) to run in an interactive session. This endpoint allows you to execute compiled Java/Scala Spark jobs within the context of an existing interactive session, providing an alternative to submitting code snippets via statements.
383+
384+
Unlike statements which execute code strings, tasks run pre-compiled Job implementations that have been serialized and sent to the session. This is useful for running complex, pre-compiled Spark applications while maintaining the interactive session context.
385+
386+
#### Request Body
387+
388+
<table class="table">
389+
<tr><th>Name</th><th>Description</th><th>Type</th></tr>
390+
<tr>
391+
<td>job</td>
392+
<td>Serialized job data (base64 encoded byte array representing a compiled Job implementation)</td>
393+
<td>byte array (required)</td>
394+
</tr>
395+
<tr>
396+
<td>jobType</td>
397+
<td>The type of job being submitted (e.g., "spark" for Scala/Java jobs)</td>
398+
<td>string</td>
399+
</tr>
400+
</table>
401+
402+
#### Response Body
403+
404+
The <a href="#task">task</a> object.
405+
406+
### GET /sessions/{sessionId}/tasks
407+
408+
Returns all tasks submitted to this session.
409+
410+
#### Request Parameters
411+
412+
<table class="table">
413+
<tr><th>Name</th><th>Description</th><th>Type</th></tr>
414+
<tr>
415+
<td>from</td>
416+
<td>The start index to fetch tasks</td>
417+
<td>int</td>
418+
</tr>
419+
<tr>
420+
<td>size</td>
421+
<td>Number of tasks to fetch</td>
422+
<td>int</td>
423+
</tr>
424+
<tr>
425+
<td>order</td>
426+
<td>Provide value as "desc" to get tasks in descending order (by default, tasks are in ascending order)</td>
427+
<td>string</td>
428+
</tr>
429+
</table>
430+
431+
#### Response Body
432+
433+
<table class="table">
434+
<tr><th>Name</th><th>Description</th><th>Type</th></tr>
435+
<tr>
436+
<td>total_tasks</td>
437+
<td>Total number of tasks in this session</td>
438+
<td>int</td>
439+
</tr>
440+
<tr>
441+
<td>tasks</td>
442+
<td><a href="#task">Task</a> list</td>
443+
<td>list</td>
444+
</tr>
445+
</table>
446+
447+
### GET /sessions/{sessionId}/tasks/{taskId}
448+
449+
Returns the status and result of a specific submitted task.
450+
451+
#### Response Body
452+
453+
The <a href="#task">task</a> object.
454+
455+
### POST /sessions/{sessionId}/tasks/{taskId}/cancel
456+
457+
Cancels the specified task in this session. If the task is currently running, Livy will attempt to cancel the associated Spark job group. If the task is waiting, it will be cancelled immediately.
458+
459+
#### Response Body
460+
461+
<table class="table">
462+
<tr><th>Name</th><th>Description</th><th>Type</th></tr>
463+
<tr>
464+
<td>msg</td>
465+
<td>is always "canceled"</td>
466+
<td>string</td>
467+
</tr>
468+
</table>
469+
380470
### GET /batches
381471

382472
Returns all the active batch sessions.
@@ -893,6 +983,99 @@ A statement represents the result of an execution statement.
893983
</tr>
894984
</table>
895985

986+
### Task
987+
988+
A task represents a pre-compiled job submitted to an interactive session. Tasks provide a way to execute compiled Spark applications (implementing the `org.apache.livy.Job` interface) within an interactive session context, combining the benefits of pre-compiled code with the flexibility of interactive sessions.
989+
990+
**Key differences between Tasks and Statements:**
991+
- **Statements** execute code strings (Scala, Python, R, or SQL) interactively
992+
- **Tasks** execute pre-compiled, serialized Job implementations
993+
994+
Tasks are useful when you have complex Spark logic that has been compiled and tested, but you want to run it in the context of an existing interactive session without creating a separate batch job.
995+
996+
<table class="table">
997+
<tr><th>Name</th><th>Description</th><th>Type</th></tr>
998+
<tr>
999+
<td>id</td>
1000+
<td>The task id (unique within the session)</td>
1001+
<td>integer</td>
1002+
</tr>
1003+
<tr>
1004+
<td>state</td>
1005+
<td>The current execution state of the task</td>
1006+
<td><a href="#task-state">task state</a></td>
1007+
</tr>
1008+
<tr>
1009+
<td>output</td>
1010+
<td>The serialized task result as a byte array (if completed successfully)</td>
1011+
<td>byte array</td>
1012+
</tr>
1013+
<tr>
1014+
<td>error</td>
1015+
<td>The error message (if the task failed)</td>
1016+
<td>string</td>
1017+
</tr>
1018+
<tr>
1019+
<td>serializedException</td>
1020+
<td>The serialized exception object (if the task failed)</td>
1021+
<td>byte array</td>
1022+
</tr>
1023+
<tr>
1024+
<td>progress</td>
1025+
<td>The execution progress (0.0 to 1.0)</td>
1026+
<td>double</td>
1027+
</tr>
1028+
<tr>
1029+
<td>submitted</td>
1030+
<td>Timestamp when the task was submitted (milliseconds since epoch)</td>
1031+
<td>long</td>
1032+
</tr>
1033+
<tr>
1034+
<td>completed</td>
1035+
<td>Timestamp when the task completed (milliseconds since epoch)</td>
1036+
<td>long</td>
1037+
</tr>
1038+
</table>
1039+
1040+
#### Task State
1041+
1042+
<table class="table">
1043+
<tr><th>Value</th><th>Description</th></tr>
1044+
<tr>
1045+
<td>waiting</td>
1046+
<td>Task has been submitted and is waiting to start execution</td>
1047+
</tr>
1048+
<tr>
1049+
<td>running</td>
1050+
<td>Task is currently executing in the Spark context</td>
1051+
</tr>
1052+
<tr>
1053+
<td>available</td>
1054+
<td>Task completed successfully and results are available</td>
1055+
</tr>
1056+
<tr>
1057+
<td>failed</td>
1058+
<td>Task execution failed with an error</td>
1059+
</tr>
1060+
<tr>
1061+
<td>cancelling</td>
1062+
<td>Task cancellation has been requested and is in progress</td>
1063+
</tr>
1064+
<tr>
1065+
<td>cancelled</td>
1066+
<td>Task was successfully cancelled before completion</td>
1067+
</tr>
1068+
</table>
1069+
1070+
**Valid State Transitions:**
1071+
- `waiting``running` (task starts execution)
1072+
- `waiting``cancelled` (task cancelled before starting)
1073+
- `running``available` (task completes successfully)
1074+
- `running``failed` (task encounters an error)
1075+
- `running``cancelling` (cancellation requested)
1076+
- `cancelling``cancelled` (cancellation completes)
1077+
- `cancelling``failed` (task fails during cancellation)
1078+
8961079
### Batch
8971080

8981081
<table class="table">

repl/src/main/scala/org/apache/livy/repl/ReplDriver.scala

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ import org.apache.spark.SparkConf
2525

2626
import org.apache.livy.{EOLUtils, Logging}
2727
import org.apache.livy.client.common.ClientConf
28-
import org.apache.livy.rsc.{BaseProtocol, ReplJobResults, RSCConf}
29-
import org.apache.livy.rsc.BaseProtocol.ReplState
28+
import org.apache.livy.rsc.{BaseProtocol, ReplJobResults, RSCConf, TaskResults}
29+
import org.apache.livy.rsc.BaseProtocol.{CancelTaskRequest, GetTaskResults, ReplState}
3030
import org.apache.livy.rsc.driver._
3131
import org.apache.livy.rsc.rpc.Rpc
3232
import org.apache.livy.sessions._
@@ -81,7 +81,7 @@ class ReplDriver(conf: SparkConf, livyConf: RSCConf)
8181
session.statements.get(msg.from).toArray
8282
} else {
8383
val until = msg.from + msg.size
84-
session.statements.filterKeys(id => id >= msg.from && id < until).values.toArray
84+
session.statements.filterKeys(id => id >= msg.from).take(until).values.toArray
8585
}
8686
}
8787

@@ -93,6 +93,42 @@ class ReplDriver(conf: SparkConf, livyConf: RSCConf)
9393
new ReplJobResults(statements.sortBy(_.id))
9494
}
9595

96+
def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.TaskJobRequest): Int = {
97+
val context = jobContext()
98+
val serializer = this.serializer()
99+
val job = new BypassJob(serializer, msg.serializedJob)
100+
session.submitTask(job, context)
101+
}
102+
103+
/**
104+
* Return a specific task by ID from the session's task registry
105+
*/
106+
def handle(ctx: ChannelHandlerContext, msg: GetTaskResults): TaskResults = {
107+
val tasks = if (msg.allResults) {
108+
session.tasks.values.toArray
109+
} else {
110+
assert(msg.from != null)
111+
assert(msg.size != null)
112+
if (msg.size == 1) {
113+
session.tasks.get(msg.from).toArray
114+
} else {
115+
val until = msg.from + msg.size
116+
session.tasks.filterKeys(id => id >= msg.from).take(until).values.toArray
117+
}
118+
}
119+
120+
// Update progress of statements when queried
121+
tasks.foreach { s =>
122+
s.updateProgress(session.progressOfTask(s.id))
123+
}
124+
125+
new TaskResults(tasks.sortBy(_.id))
126+
}
127+
128+
def handle(ctx: ChannelHandlerContext, msg: CancelTaskRequest): Unit = {
129+
session.cancelTask(msg.taskId)
130+
}
131+
96132
override protected def createWrapper(msg: BaseProtocol.BypassJobRequest): BypassJobWrapper = {
97133
Kind(msg.jobType) match {
98134
case PySpark if session.interpreter(PySpark).isDefined =>

0 commit comments

Comments
 (0)