Skip to content

Commit 7cdd04c

Browse files
mveroedaniel
authored andcommitted
Add support for task management in interactive sessions.
Refactor job and statement handling to include tasks, implement task submission, cancellation, and progress tracking. Extend RPC to handle task requests. Add task eviction logic and test cases. Document task usage with examples.
1 parent 710c078 commit 7cdd04c

File tree

16 files changed

+1011
-33
lines changed

16 files changed

+1011
-33
lines changed

docs/rest-api.md

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,53 @@ Returns code completion candidates for the specified code in the session.
377377
</tr>
378378
</table>
379379

380+
### POST /sessions/{sessionId}/tasks
381+
382+
Submits a job (task) in an interactive session. This allows users to run pre-compiled Spark jobs within the context of an interactive session.
383+
384+
#### Request Body
385+
386+
<table class="table">
387+
<tr><th>Name</th><th>Description</th><th>Type</th></tr>
388+
<tr>
389+
<td>job</td>
390+
<td>Serialized job data (base64 encoded byte array)</td>
391+
<td>byte array</td>
392+
</tr>
393+
<tr>
394+
<td>jobType</td>
395+
<td>The type of job (e.g., "spark", "pyspark", "sparkr")</td>
396+
<td>string</td>
397+
</tr>
398+
</table>
399+
400+
#### Response Body
401+
402+
The <a href="#task">task</a> object.
403+
404+
### GET /sessions/{sessionId}/tasks/{taskId}
405+
406+
Returns the status of a submitted task.
407+
408+
#### Response Body
409+
410+
The <a href="#task">task</a> object.
411+
412+
### POST /sessions/{sessionId}/tasks/{taskId}/cancel
413+
414+
Cancel the specified task in this session.
415+
416+
#### Response Body
417+
418+
<table class="table">
419+
<tr><th>Name</th><th>Description</th><th>Type</th></tr>
420+
<tr>
421+
<td>msg</td>
422+
<td>is always "canceled"</td>
423+
<td>string</td>
424+
</tr>
425+
</table>
426+
380427
### GET /batches
381428

382429
Returns all the active batch sessions.
@@ -893,6 +940,73 @@ A statement represents the result of an execution statement.
893940
</tr>
894941
</table>
895942

943+
### Task
944+
945+
A task represents a submitted job in an interactive session.
946+
947+
<table class="table">
948+
<tr><th>Name</th><th>Description</th><th>Type</th></tr>
949+
<tr>
950+
<td>id</td>
951+
<td>The task id</td>
952+
<td>long</td>
953+
</tr>
954+
<tr>
955+
<td>jobType</td>
956+
<td>The type of job submitted</td>
957+
<td>string</td>
958+
</tr>
959+
<tr>
960+
<td>state</td>
961+
<td>The task state</td>
962+
<td>task state</td>
963+
</tr>
964+
<tr>
965+
<td>result</td>
966+
<td>The task result (if completed successfully)</td>
967+
<td>string</td>
968+
</tr>
969+
<tr>
970+
<td>error</td>
971+
<td>The error message (if failed)</td>
972+
<td>string</td>
973+
</tr>
974+
</table>
975+
976+
#### Task State
977+
978+
<table class="table">
979+
<tr><th>Value</th><th>Description</th></tr>
980+
<tr>
981+
<td>submitted</td>
982+
<td>Task has been submitted</td>
983+
</tr>
984+
<tr>
985+
<td>sent</td>
986+
<td>Task has been sent to the driver</td>
987+
</tr>
988+
<tr>
989+
<td>queued</td>
990+
<td>Task is queued for execution</td>
991+
</tr>
992+
<tr>
993+
<td>started</td>
994+
<td>Task is currently running</td>
995+
</tr>
996+
<tr>
997+
<td>succeeded</td>
998+
<td>Task completed successfully</td>
999+
</tr>
1000+
<tr>
1001+
<td>failed</td>
1002+
<td>Task failed</td>
1003+
</tr>
1004+
<tr>
1005+
<td>cancelled</td>
1006+
<td>Task was cancelled</td>
1007+
</tr>
1008+
</table>
1009+
8961010
### Batch
8971011

8981012
<table class="table">

examples/TASK_USAGE_EXAMPLE.md

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
# Task Submission in Interactive Sessions
2+
3+
This document demonstrates how to use the new task submission feature in Livy interactive sessions.
4+
5+
## Overview
6+
7+
The task API allows you to submit pre-compiled Spark jobs (tasks) within an interactive session. This is useful when you want to:
8+
- Run compiled Spark applications within the context of an interactive session
9+
- Execute batch-like jobs while maintaining session state
10+
- Submit jobs programmatically using the Livy client API
11+
12+
## Difference between Statements and Tasks
13+
14+
- **Statements**: Execute code snippets (Scala, Python, R, SQL) interactively in the REPL
15+
- **Tasks**: Execute pre-compiled, serialized Spark jobs within the session context
16+
17+
## REST API Usage
18+
19+
### 1. Create an Interactive Session
20+
21+
```bash
22+
curl -X POST http://localhost:8998/sessions \
23+
-H "Content-Type: application/json" \
24+
-d '{
25+
"kind": "spark",
26+
"name": "my-session"
27+
}'
28+
```
29+
30+
### 2. Submit a Task
31+
32+
```bash
33+
curl -X POST http://localhost:8998/sessions/0/tasks \
34+
-H "Content-Type: application/json" \
35+
-d '{
36+
"job": "<base64-encoded-serialized-job>",
37+
"jobType": "spark"
38+
}'
39+
```
40+
41+
Response:
42+
```json
43+
{
44+
"id": 1,
45+
"jobType": "spark",
46+
"state": "submitted",
47+
"result": null,
48+
"error": null
49+
}
50+
```
51+
52+
### 3. Check Task Status
53+
54+
```bash
55+
curl http://localhost:8998/sessions/0/tasks/1
56+
```
57+
58+
Response:
59+
```json
60+
{
61+
"id": 1,
62+
"jobType": null,
63+
"state": "succeeded",
64+
"result": "{\"value\": 42}",
65+
"error": null
66+
}
67+
```
68+
69+
### 4. Cancel a Task
70+
71+
```bash
72+
curl -X POST http://localhost:8998/sessions/0/tasks/1/cancel
73+
```
74+
75+
Response:
76+
```json
77+
{
78+
"msg": "canceled"
79+
}
80+
```
81+
82+
## Programmatic Usage with Livy Client
83+
84+
### Java Example
85+
86+
```java
87+
import org.apache.livy.Job;
88+
import org.apache.livy.JobContext;
89+
import org.apache.livy.LivyClient;
90+
import org.apache.livy.LivyClientBuilder;
91+
92+
// Create a job
93+
public class MySparkJob implements Job<String> {
94+
@Override
95+
public String call(JobContext context) throws Exception {
96+
JavaSparkContext sc = context.sc().sc();
97+
// Your Spark job logic here
98+
return "Job completed successfully";
99+
}
100+
}
101+
102+
// Submit the job in an interactive session context
103+
LivyClient client = new LivyClientBuilder()
104+
.setURI(new URI("http://localhost:8998/sessions/0"))
105+
.build();
106+
107+
String result = client.submit(new MySparkJob()).get();
108+
System.out.println(result);
109+
```
110+
111+
### Python Example
112+
113+
```python
114+
import base64
115+
import json
116+
import requests
117+
118+
# Your serialized job (this is a simplified example)
119+
# In practice, you would serialize your actual Spark job
120+
serialized_job = base64.b64encode(b'<your-serialized-job-bytes>').decode('utf-8')
121+
122+
# Submit task
123+
response = requests.post(
124+
'http://localhost:8998/sessions/0/tasks',
125+
json={
126+
'job': serialized_job,
127+
'jobType': 'spark'
128+
}
129+
)
130+
131+
task = response.json()
132+
task_id = task['id']
133+
134+
# Poll for task completion
135+
while True:
136+
response = requests.get(f'http://localhost:8998/sessions/0/tasks/{task_id}')
137+
task = response.json()
138+
139+
if task['state'] in ['succeeded', 'failed', 'cancelled']:
140+
break
141+
142+
time.sleep(1)
143+
144+
if task['state'] == 'succeeded':
145+
print(f"Task completed: {task['result']}")
146+
else:
147+
print(f"Task failed: {task['error']}")
148+
```
149+
150+
## Use Cases
151+
152+
1. **Hybrid Workflows**: Combine interactive exploration (statements) with batch processing (tasks) in the same session
153+
2. **Reusable Jobs**: Submit pre-compiled jobs without writing code snippets
154+
3. **Job Libraries**: Maintain a library of compiled Spark jobs that can be submitted to any session
155+
4. **Performance**: Pre-compiled jobs can be more efficient than interpreted code snippets
156+
157+
## Notes
158+
159+
- Tasks share the same SparkContext as the interactive session
160+
- Tasks are executed asynchronously by default
161+
- Task IDs are session-scoped and start from 1
162+
- Canceling a task will attempt to interrupt its execution
163+

repl/src/main/scala/org/apache/livy/repl/ReplDriver.scala

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ import org.apache.spark.SparkConf
2525

2626
import org.apache.livy.{EOLUtils, Logging}
2727
import org.apache.livy.client.common.ClientConf
28-
import org.apache.livy.rsc.{BaseProtocol, ReplJobResults, RSCConf}
29-
import org.apache.livy.rsc.BaseProtocol.ReplState
28+
import org.apache.livy.rsc.{BaseProtocol, ReplJobResults, RSCConf, TaskResults}
29+
import org.apache.livy.rsc.BaseProtocol.{CancelTaskRequest, GetTaskResults, ReplState}
3030
import org.apache.livy.rsc.driver._
3131
import org.apache.livy.rsc.rpc.Rpc
3232
import org.apache.livy.sessions._
@@ -81,7 +81,7 @@ class ReplDriver(conf: SparkConf, livyConf: RSCConf)
8181
session.statements.get(msg.from).toArray
8282
} else {
8383
val until = msg.from + msg.size
84-
session.statements.filterKeys(id => id >= msg.from && id < until).values.toArray
84+
session.statements.filterKeys(id => id >= msg.from).take(until).values.toArray
8585
}
8686
}
8787

@@ -93,6 +93,42 @@ class ReplDriver(conf: SparkConf, livyConf: RSCConf)
9393
new ReplJobResults(statements.sortBy(_.id))
9494
}
9595

96+
def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.TaskJobRequest): Int = {
97+
val context = jobContext()
98+
val serializer = this.serializer()
99+
val job = new BypassJob(serializer, msg.serializedJob)
100+
session.submitTask(job, context)
101+
}
102+
103+
/**
104+
* Return a specific task by ID from the session's task registry
105+
*/
106+
def handle(ctx: ChannelHandlerContext, msg: GetTaskResults): TaskResults = {
107+
val tasks = if (msg.allResults) {
108+
session.tasks.values.toArray
109+
} else {
110+
assert(msg.from != null)
111+
assert(msg.size != null)
112+
if (msg.size == 1) {
113+
session.tasks.get(msg.from).toArray
114+
} else {
115+
val until = msg.from + msg.size
116+
session.tasks.filterKeys(id => id >= msg.from).take(until).values.toArray
117+
}
118+
}
119+
120+
// Update progress of statements when queried
121+
tasks.foreach { s =>
122+
s.updateProgress(session.progressOfTask(s.id))
123+
}
124+
125+
new TaskResults(tasks.sortBy(_.id))
126+
}
127+
128+
def handle(ctx: ChannelHandlerContext, msg: CancelTaskRequest): Unit = {
129+
session.cancelTask(msg.taskId)
130+
}
131+
96132
override protected def createWrapper(msg: BaseProtocol.BypassJobRequest): BypassJobWrapper = {
97133
Kind(msg.jobType) match {
98134
case PySpark if session.interpreter(PySpark).isDefined =>

0 commit comments

Comments
 (0)