Skip to content

Commit 08596e0

Browse files
author
Leon Eller
committed
Actioned review comments
1 parent 9d609cd commit 08596e0

File tree

3 files changed

+72
-48
lines changed

3 files changed

+72
-48
lines changed
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package spark.jobserver
2+
3+
import akka.actor.Props
4+
import spray.routing.{HttpService, RequestContext}
5+
6+
trait ChunkEncodedStreamingSupport {
7+
this: HttpService =>
8+
9+
protected def sendStreamingResponse(ctx: RequestContext,
10+
chunkSize: Int,
11+
byteIterator: Iterator[_]): Unit = {
12+
actorRefFactory.actorOf {
13+
Props {
14+
new ChunkEncodingActor(ctx, chunkSize, byteIterator)
15+
}
16+
}
17+
}
18+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package spark.jobserver
2+
3+
import akka.actor.{Actor, ActorLogging}
4+
import spark.jobserver.ChunkEncodingActor.Ok
5+
import spray.can.Http
6+
import spray.http._
7+
import spray.routing.RequestContext
8+
9+
object ChunkEncodingActor {
10+
// simple case class whose instances we use as send confirmation message for streaming chunks
11+
case class Ok(remaining: Iterator[_])
12+
}
13+
14+
/**
15+
* Performs sending back a response in streaming fashion using chunk encoding
16+
* @param ctx RequestContext which has the responder to send chunks to
17+
* @param chunkSize The size of each chunk
18+
* @param byteIterator Iterator of data to stream back (currently only Byte is supported)
19+
*/
20+
class ChunkEncodingActor(ctx: RequestContext,
21+
chunkSize: Int,
22+
byteIterator: Iterator[_]) extends Actor with ActorLogging {
23+
// we use the successful sending of a chunk as trigger for sending the next chunk
24+
ctx.responder ! ChunkedResponseStart(
25+
HttpResponse(entity = HttpEntity(MediaTypes.`application/json`,
26+
byteIterator.take(chunkSize).map {
27+
case c: Byte => c
28+
}.toArray))).withAck(Ok(byteIterator))
29+
30+
def receive: Receive = {
31+
case Ok(remaining) =>
32+
val arr = remaining.take(chunkSize).map {
33+
case c: Byte => c
34+
}.toArray
35+
if (arr.nonEmpty) {
36+
ctx.responder ! MessageChunk(arr).withAck(Ok(remaining))
37+
}
38+
else {
39+
ctx.responder ! ChunkedMessageEnd
40+
context.stop(self)
41+
}
42+
case ev: Http.ConnectionClosed => {
43+
log.warning("Stopping response streaming due to {}", ev)
44+
context.stop(self)
45+
}
46+
}
47+
}
48+

job-server/src/spark.jobserver/WebApi.scala

Lines changed: 6 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import java.util.NoSuchElementException
44
import java.util.concurrent.TimeUnit
55
import javax.net.ssl.SSLContext
66

7-
import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props}
7+
import akka.actor.{ActorRef, ActorSystem}
88
import akka.pattern.ask
99
import akka.util.Timeout
1010
import com.typesafe.config.{Config, ConfigException, ConfigFactory, ConfigRenderOptions}
@@ -18,7 +18,6 @@ import spark.jobserver.auth._
1818
import spark.jobserver.io.JobInfo
1919
import spark.jobserver.routes.DataRoutes
2020
import spark.jobserver.util.{SSLContextFactory, SparkJobUtils}
21-
import spray.can.Http
2221
import spray.http._
2322
import spray.httpx.SprayJsonSupport.sprayJsonMarshaller
2423
import spray.io.ServerSSLEngineProvider
@@ -106,7 +105,8 @@ class WebApi(system: ActorSystem,
106105
dataManager: ActorRef,
107106
supervisor: ActorRef,
108107
jobInfo: ActorRef)
109-
extends HttpService with CommonRoutes with DataRoutes with SJSAuthenticator with CORSSupport {
108+
extends HttpService with CommonRoutes with DataRoutes with SJSAuthenticator with CORSSupport
109+
with ChunkEncodedStreamingSupport {
110110
import CommonMessages._
111111
import ContextSupervisor._
112112
import scala.concurrent.duration._
@@ -123,12 +123,8 @@ class WebApi(system: ActorSystem,
123123
val DefaultJobLimit = 50
124124
val StatusKey = "status"
125125
val ResultKey = "result"
126-
val ResultChunkSize = if (config.hasPath("spark.jobserver.result-chunk-size")) {
127-
config.getBytes("spark.jobserver.result-chunk-size").toInt
128-
}
129-
else {
130-
100 * 1024
131-
}
126+
val ResultChunkSize = Option("spark.jobserver.result-chunk-size").filter(config.hasPath)
127+
.fold(100 * 1024)(config.getBytes(_).toInt)
132128

133129
val contextTimeout = SparkJobUtils.getContextTimeout(config)
134130
val bindAddress = config.getString("spark.jobserver.bind-address")
@@ -489,13 +485,12 @@ class WebApi(system: ActorSystem,
489485
JobManagerActor.StartJob(appName, classPath, jobConfig, events))(timeout)
490486
respondWithMediaType(MediaTypes.`application/json`) { ctx =>
491487
future.map {
492-
case JobResult(_, res) => {
488+
case JobResult(_, res) =>
493489
res match {
494490
case s: Stream[_] => sendStreamingResponse(ctx, ResultChunkSize,
495491
resultToByteIterator(Map.empty, s.toIterator))
496492
case _ => ctx.complete(resultToTable(res))
497493
}
498-
}
499494
case JobErroredOut(_, _, ex) => ctx.complete(errMap(ex, "ERROR"))
500495
case JobStarted(jobId, context, _) =>
501496
jobInfo ! StoreJobConfig(jobId, postedJobConfig)
@@ -533,43 +528,6 @@ class WebApi(system: ActorSystem,
533528
}
534529
}
535530

536-
private def sendStreamingResponse(ctx: RequestContext,
537-
chunkSize: Int,
538-
byteIterator: Iterator[_]): Unit = {
539-
// simple case class whose instances we use as send confirmation message for streaming chunks
540-
case class Ok(remaining: Iterator[_])
541-
actorRefFactory.actorOf {
542-
Props {
543-
new Actor with ActorLogging {
544-
// we use the successful sending of a chunk as trigger for sending the next chunk
545-
ctx.responder ! ChunkedResponseStart(
546-
HttpResponse(entity = HttpEntity(MediaTypes.`application/json`,
547-
byteIterator.take(chunkSize).map {
548-
case c: Byte => c
549-
}.toArray))).withAck(Ok(byteIterator))
550-
551-
def receive: Receive = {
552-
case Ok(remaining) =>
553-
val arr = remaining.take(chunkSize).map {
554-
case c: Byte => c
555-
}.toArray
556-
if (arr.nonEmpty) {
557-
ctx.responder ! MessageChunk(arr).withAck(Ok(remaining))
558-
}
559-
else {
560-
ctx.responder ! ChunkedMessageEnd
561-
context.stop(self)
562-
}
563-
case ev: Http.ConnectionClosed => {
564-
log.warning("Stopping response streaming due to {}", ev)
565-
context.stop(self)
566-
}
567-
}
568-
}
569-
}
570-
}
571-
}
572-
573531
override def timeoutRoute: Route =
574532
complete(500, errMap("Request timed out. Try using the /jobs/<jobID>, /jobs APIs to get status/results"))
575533

0 commit comments

Comments
 (0)