Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions client-http/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.4.1</version>
<executions>
<execution>
<id>shade</id>
Expand Down
183 changes: 183 additions & 0 deletions docs/rest-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,96 @@ Returns code completion candidates for the specified code in the session.
</tr>
</table>

### POST /sessions/{sessionId}/tasks

Submits a pre-compiled Spark job (task) to run in an interactive session. This endpoint allows you to execute compiled Java/Scala Spark jobs within the context of an existing interactive session, providing an alternative to submitting code snippets via statements.

Unlike statements which execute code strings, tasks run pre-compiled Job implementations that have been serialized and sent to the session. This is useful for running complex, pre-compiled Spark applications while maintaining the interactive session context.

#### Request Body

<table class="table">
<tr><th>Name</th><th>Description</th><th>Type</th></tr>
<tr>
<td>job</td>
<td>Serialized job data (base64 encoded byte array representing a compiled Job implementation)</td>
<td>byte array (required)</td>
</tr>
<tr>
<td>jobType</td>
<td>The type of job being submitted (e.g., "spark" for Scala/Java jobs)</td>
<td>string</td>
</tr>
</table>

#### Response Body

The <a href="#task">task</a> object.

### GET /sessions/{sessionId}/tasks

Returns all tasks submitted to this session.

#### Request Parameters

<table class="table">
<tr><th>Name</th><th>Description</th><th>Type</th></tr>
<tr>
<td>from</td>
<td>The start index to fetch tasks</td>
<td>int</td>
</tr>
<tr>
<td>size</td>
<td>Number of tasks to fetch</td>
<td>int</td>
</tr>
<tr>
<td>order</td>
<td>Provide value as "desc" to get tasks in descending order (by default, tasks are in ascending order)</td>
<td>string</td>
</tr>
</table>

#### Response Body

<table class="table">
<tr><th>Name</th><th>Description</th><th>Type</th></tr>
<tr>
<td>total_tasks</td>
<td>Total number of tasks in this session</td>
<td>int</td>
</tr>
<tr>
<td>tasks</td>
<td><a href="#task">Task</a> list</td>
<td>list</td>
</tr>
</table>

### GET /sessions/{sessionId}/tasks/{taskId}

Returns the status and result of a specific submitted task.

#### Response Body

The <a href="#task">task</a> object.

### POST /sessions/{sessionId}/tasks/{taskId}/cancel

Cancels the specified task in this session. If the task is currently running, Livy will attempt to cancel the associated Spark job group. If the task is waiting, it will be cancelled immediately.

#### Response Body

<table class="table">
<tr><th>Name</th><th>Description</th><th>Type</th></tr>
<tr>
<td>msg</td>
<td>is always "canceled"</td>
<td>string</td>
</tr>
</table>

### GET /batches

Returns all the active batch sessions.
Expand Down Expand Up @@ -893,6 +983,99 @@ A statement represents the result of an execution statement.
</tr>
</table>

### Task

A task represents a pre-compiled job submitted to an interactive session. Tasks provide a way to execute compiled Spark applications (implementing the `org.apache.livy.Job` interface) within an interactive session context, combining the benefits of pre-compiled code with the flexibility of interactive sessions.

**Key differences between Tasks and Statements:**
- **Statements** execute code strings (Scala, Python, R, or SQL) interactively
- **Tasks** execute pre-compiled, serialized Job implementations

Tasks are useful when you have complex Spark logic that has been compiled and tested, but you want to run it in the context of an existing interactive session without creating a separate batch job.

<table class="table">
<tr><th>Name</th><th>Description</th><th>Type</th></tr>
<tr>
<td>id</td>
<td>The task id (unique within the session)</td>
<td>integer</td>
</tr>
<tr>
<td>state</td>
<td>The current execution state of the task</td>
<td><a href="#task-state">task state</a></td>
</tr>
<tr>
<td>output</td>
<td>The serialized task result as a byte array (if completed successfully)</td>
<td>byte array</td>
</tr>
<tr>
<td>error</td>
<td>The error message (if the task failed)</td>
<td>string</td>
</tr>
<tr>
<td>serializedException</td>
<td>The serialized exception object (if the task failed)</td>
<td>byte array</td>
</tr>
<tr>
<td>progress</td>
<td>The execution progress (0.0 to 1.0)</td>
<td>double</td>
</tr>
<tr>
<td>submitted</td>
<td>Timestamp when the task was submitted (milliseconds since epoch)</td>
<td>long</td>
</tr>
<tr>
<td>completed</td>
<td>Timestamp when the task completed (milliseconds since epoch)</td>
<td>long</td>
</tr>
</table>

#### Task State

<table class="table">
<tr><th>Value</th><th>Description</th></tr>
<tr>
<td>waiting</td>
<td>Task has been submitted and is waiting to start execution</td>
</tr>
<tr>
<td>running</td>
<td>Task is currently executing in the Spark context</td>
</tr>
<tr>
<td>available</td>
<td>Task completed successfully and results are available</td>
</tr>
<tr>
<td>failed</td>
<td>Task execution failed with an error</td>
</tr>
<tr>
<td>cancelling</td>
<td>Task cancellation has been requested and is in progress</td>
</tr>
<tr>
<td>cancelled</td>
<td>Task was successfully cancelled before completion</td>
</tr>
</table>

**Valid State Transitions:**
- `waiting` → `running` (task starts execution)
- `waiting` → `cancelled` (task cancelled before starting)
- `running` → `available` (task completes successfully)
- `running` → `failed` (task encounters an error)
- `running` → `cancelling` (cancellation requested)
- `cancelling` → `cancelled` (cancellation completes)
- `cancelling` → `failed` (task fails during cancellation)

### Batch

<table class="table">
Expand Down
11 changes: 9 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
<spark.scala-2.11.version>2.4.5</spark.scala-2.11.version>
<spark.scala-2.12.version>2.4.5</spark.scala-2.12.version>
<spark.version>${spark.scala-2.11.version}</spark.version>
<kubernetes.client.version>5.6.0</kubernetes.client.version>
<kubernetes.client.version>6.8.1</kubernetes.client.version>
<hive.version>3.0.0</hive.version>
<commons-codec.version>1.15</commons-codec.version>
<commons-lang3.version>3.17.0</commons-lang3.version>
Expand Down Expand Up @@ -1169,6 +1169,13 @@
</reporting>

<profiles>
<profile>
<id>hadoop3</id>
<properties>
<hadoop.major-minor.version>3</hadoop.major-minor.version>
<hadoop.version>3.4.0</hadoop.version>
</properties>
</profile>
<profile>
<id>hadoop2</id>
<properties>
Expand Down Expand Up @@ -1222,7 +1229,7 @@
<java.version>1.8</java.version>
<py4j.version>0.10.9.7</py4j.version>
<json4s.version>3.7.0-M11</json4s.version>
<netty.version>4.1.96.Final</netty.version>
<netty.version>4.1.108.Final</netty.version>
<jackson.version>2.15.2</jackson.version>
<jackson-databind.version>2.15.2</jackson-databind.version>
<spark.bin.name>spark-${spark.version}-bin-hadoop${hadoop.major-minor.version}</spark.bin.name>
Expand Down
1 change: 1 addition & 0 deletions repl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.4.1</version>
<executions>
<execution>
<id>shade</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,14 @@ class SparkInterpreter(protected override val conf: SparkConf) extends AbstractS
sparkILoop.interpret(code)
}

override protected def completeCandidates(code: String, cursor: Int) : Array[String] = {
val completer : Completion = {
override protected def completeCandidates(code: String, cursor: Int): Array[String] = {
val completer: Completion = {
try {
val cls = Class.forName("scala.tools.nsc.interpreter.PresentationCompilerCompleter")
cls.getDeclaredConstructor(classOf[IMain]).newInstance(sparkILoop.intp)
.asInstanceOf[Completion]
} catch {
case e : ClassNotFoundException => NoCompletion
case e: ClassNotFoundException => NoCompletion
}
}
completer.complete(code, cursor).candidates.toArray
Expand All @@ -126,9 +126,9 @@ class SparkInterpreter(protected override val conf: SparkConf) extends AbstractS
}

override protected def bind(name: String,
tpe: String,
value: Object,
modifier: List[String]): Unit = {
tpe: String,
value: Object,
modifier: List[String]): Unit = {
sparkILoop.beQuietDuring {
sparkILoop.bind(name, tpe, value, modifier)
}
Expand Down
42 changes: 39 additions & 3 deletions repl/src/main/scala/org/apache/livy/repl/ReplDriver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import org.apache.spark.SparkConf

import org.apache.livy.{EOLUtils, Logging}
import org.apache.livy.client.common.ClientConf
import org.apache.livy.rsc.{BaseProtocol, ReplJobResults, RSCConf}
import org.apache.livy.rsc.BaseProtocol.ReplState
import org.apache.livy.rsc.{BaseProtocol, ReplJobResults, RSCConf, TaskResults}
import org.apache.livy.rsc.BaseProtocol.{CancelTaskRequest, GetTaskResults, ReplState}
import org.apache.livy.rsc.driver._
import org.apache.livy.rsc.rpc.Rpc
import org.apache.livy.sessions._
Expand Down Expand Up @@ -81,7 +81,7 @@ class ReplDriver(conf: SparkConf, livyConf: RSCConf)
session.statements.get(msg.from).toArray
} else {
val until = msg.from + msg.size
session.statements.filterKeys(id => id >= msg.from && id < until).values.toArray
session.statements.filterKeys(id => id >= msg.from).take(until).values.toArray
}
}

Expand All @@ -93,6 +93,42 @@ class ReplDriver(conf: SparkConf, livyConf: RSCConf)
new ReplJobResults(statements.sortBy(_.id))
}

def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.TaskJobRequest): Int = {
val context = jobContext()
val serializer = this.serializer()
val job = new BypassJob(serializer, msg.serializedJob)
session.submitTask(job, context)
}

/**
* Return a specific task by ID from the session's task registry
*/
def handle(ctx: ChannelHandlerContext, msg: GetTaskResults): TaskResults = {
val tasks = if (msg.allResults) {
session.tasks.values.toArray
} else {
assert(msg.from != null)
assert(msg.size != null)
if (msg.size == 1) {
session.tasks.get(msg.from).toArray
} else {
val until = msg.from + msg.size
session.tasks.filterKeys(id => id >= msg.from).take(until).values.toArray
}
}

// Update progress of statements when queried
tasks.foreach { s =>
s.updateProgress(session.progressOfTask(s.id))
}

new TaskResults(tasks.sortBy(_.id))
}

def handle(ctx: ChannelHandlerContext, msg: CancelTaskRequest): Unit = {
session.cancelTask(msg.taskId)
}

override protected def createWrapper(msg: BaseProtocol.BypassJobRequest): BypassJobWrapper = {
Kind(msg.jobType) match {
case PySpark if session.interpreter(PySpark).isDefined =>
Expand Down
Loading