Skip to content

Commit 9d609cd

Browse files
author
Leon Eller
committed
Added support for chunked encoding when job result is a Stream[Byte]
1 parent b77637c commit 9d609cd

File tree

5 files changed

+128
-32
lines changed

5 files changed

+128
-32
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -620,6 +620,8 @@ serialized properly:
620620
- Array's
621621
- Anything that implements Product (Option, case classes) -- they will be serialized as lists
622622
- Maps and Seqs may contain nested values of any of the above
623+
- If a job result is of scala's Stream[Byte] type it will be serialised directly as a chunk encoded stream.
624+
This is useful if your job result payload is large and may cause a timeout serialising as objects.
623625
624626
If we encounter a data type that is not supported, then the entire result will be serialized to a string.
625627

job-server/config/local.conf.template

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ spark {
2222
filedao {
2323
rootdir = /tmp/spark-job-server/filedao/data
2424
}
25+
26+
# When using chunked transfer encoding with scala Stream job results, this is the size of each chunk
27+
result-chunk-size = 1m
2528
}
2629

2730
# predefined Spark contexts

job-server/src/spark.jobserver/WebApi.scala

Lines changed: 87 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,41 @@
11
package spark.jobserver
22

3+
import java.util.NoSuchElementException
34
import java.util.concurrent.TimeUnit
5+
import javax.net.ssl.SSLContext
46

5-
import akka.actor.{ ActorSystem, ActorRef }
7+
import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props}
68
import akka.pattern.ask
79
import akka.util.Timeout
8-
import com.typesafe.config.{ Config, ConfigFactory, ConfigException, ConfigRenderOptions }
9-
import java.util.NoSuchElementException
10-
import javax.net.ssl.SSLContext
11-
import ooyala.common.akka.web.{ WebService, CommonRoutes }
10+
import com.typesafe.config.{Config, ConfigException, ConfigFactory, ConfigRenderOptions}
11+
import ooyala.common.akka.web.JsonUtils.AnyJsonFormat
12+
import ooyala.common.akka.web.{CommonRoutes, WebService}
13+
import org.apache.shiro.SecurityUtils
14+
import org.apache.shiro.config.IniSecurityManagerFactory
1215
import org.joda.time.DateTime
1316
import org.slf4j.LoggerFactory
14-
import spark.jobserver.util.SparkJobUtils
15-
import spark.jobserver.util.SSLContextFactory
16-
import spark.jobserver.routes.DataRoutes
17-
import scala.concurrent.{Await, ExecutionContext, Future}
18-
import scala.util.Try
19-
import spark.jobserver.io.JobInfo
2017
import spark.jobserver.auth._
21-
import spray.http.HttpResponse
22-
import spray.http.MediaTypes
23-
import spray.http.StatusCodes
18+
import spark.jobserver.io.JobInfo
19+
import spark.jobserver.routes.DataRoutes
20+
import spark.jobserver.util.{SSLContextFactory, SparkJobUtils}
21+
import spray.can.Http
22+
import spray.http._
2423
import spray.httpx.SprayJsonSupport.sprayJsonMarshaller
24+
import spray.io.ServerSSLEngineProvider
2525
import spray.json.DefaultJsonProtocol._
26-
import spray.routing.{ HttpService, Route, RequestContext }
2726
import spray.routing.directives.AuthMagnet
28-
import spray.io.ServerSSLEngineProvider
29-
import org.apache.shiro.config.IniSecurityManagerFactory
30-
import org.apache.shiro.mgt.SecurityManager
31-
import org.apache.shiro.SecurityUtils
27+
import spray.routing.{HttpService, RequestContext, Route}
28+
29+
import scala.concurrent.{Await, ExecutionContext, Future}
30+
import scala.util.Try
31+
3232

3333
object WebApi {
3434
val StatusKey = "status"
3535
val ResultKey = "result"
36+
val ResultKeyStartBytes = "{\n".getBytes
37+
val ResultKeyEndBytes = "}".getBytes
38+
val ResultKeyBytes = ("\"" + ResultKey + "\":").getBytes
3639

3740
def badRequest(ctx: RequestContext, msg: String) {
3841
ctx.complete(StatusCodes.BadRequest, errMap(msg))
@@ -61,6 +64,14 @@ object WebApi {
6164
Map(ResultKey -> result)
6265
}
6366

67+
def resultToByteIterator(jobReport: Map[String, Any], result: Iterator[_]): Iterator[_] = {
68+
ResultKeyStartBytes.toIterator ++
69+
(jobReport.map(t => Seq(AnyJsonFormat.write(t._1).toString(),
70+
AnyJsonFormat.write(t._2).toString()).mkString(":") ).mkString(",") ++
71+
(if(jobReport.nonEmpty) "," else "")).getBytes().toIterator ++
72+
ResultKeyBytes.toIterator ++ result ++ ResultKeyEndBytes.toIterator
73+
}
74+
6475
def formatException(t: Throwable): Any =
6576
if (t.getCause != null) {
6677
Map("message" -> t.getMessage,
@@ -112,6 +123,12 @@ class WebApi(system: ActorSystem,
112123
val DefaultJobLimit = 50
113124
val StatusKey = "status"
114125
val ResultKey = "result"
126+
val ResultChunkSize = if (config.hasPath("spark.jobserver.result-chunk-size")) {
127+
config.getBytes("spark.jobserver.result-chunk-size").toInt
128+
}
129+
else {
130+
100 * 1024
131+
}
115132

116133
val contextTimeout = SparkJobUtils.getContextTimeout(config)
117134
val bindAddress = config.getString("spark.jobserver.bind-address")
@@ -228,6 +245,7 @@ class WebApi(system: ActorSystem,
228245
*/
229246
def contextRoutes: Route = pathPrefix("contexts") {
230247
import ContextSupervisor._
248+
231249
import collection.JavaConverters._
232250
// user authentication
233251
authenticate(authenticator) { authInfo =>
@@ -384,7 +402,12 @@ class WebApi(system: ActorSystem,
384402
val resultFuture = jobInfo ? GetJobResult(jobId)
385403
resultFuture.map {
386404
case JobResult(_, result) =>
387-
ctx.complete(jobReport ++ resultToTable(result))
405+
result match {
406+
case s: Stream[_] =>
407+
sendStreamingResponse(ctx, ResultChunkSize,
408+
resultToByteIterator(jobReport, s.toIterator))
409+
case _ => ctx.complete(jobReport ++ resultToTable(result))
410+
}
388411
case _ =>
389412
ctx.complete(jobReport)
390413
}
@@ -466,7 +489,13 @@ class WebApi(system: ActorSystem,
466489
JobManagerActor.StartJob(appName, classPath, jobConfig, events))(timeout)
467490
respondWithMediaType(MediaTypes.`application/json`) { ctx =>
468491
future.map {
469-
case JobResult(_, res) => ctx.complete(resultToTable(res))
492+
case JobResult(_, res) => {
493+
res match {
494+
case s: Stream[_] => sendStreamingResponse(ctx, ResultChunkSize,
495+
resultToByteIterator(Map.empty, s.toIterator))
496+
case _ => ctx.complete(resultToTable(res))
497+
}
498+
}
470499
case JobErroredOut(_, _, ex) => ctx.complete(errMap(ex, "ERROR"))
471500
case JobStarted(jobId, context, _) =>
472501
jobInfo ! StoreJobConfig(jobId, postedJobConfig)
@@ -504,6 +533,43 @@ class WebApi(system: ActorSystem,
504533
}
505534
}
506535

536+
private def sendStreamingResponse(ctx: RequestContext,
537+
chunkSize: Int,
538+
byteIterator: Iterator[_]): Unit = {
539+
// simple case class whose instances we use as send confirmation message for streaming chunks
540+
case class Ok(remaining: Iterator[_])
541+
actorRefFactory.actorOf {
542+
Props {
543+
new Actor with ActorLogging {
544+
// we use the successful sending of a chunk as trigger for sending the next chunk
545+
ctx.responder ! ChunkedResponseStart(
546+
HttpResponse(entity = HttpEntity(MediaTypes.`application/json`,
547+
byteIterator.take(chunkSize).map {
548+
case c: Byte => c
549+
}.toArray))).withAck(Ok(byteIterator))
550+
551+
def receive: Receive = {
552+
case Ok(remaining) =>
553+
val arr = remaining.take(chunkSize).map {
554+
case c: Byte => c
555+
}.toArray
556+
if (arr.nonEmpty) {
557+
ctx.responder ! MessageChunk(arr).withAck(Ok(remaining))
558+
}
559+
else {
560+
ctx.responder ! ChunkedMessageEnd
561+
context.stop(self)
562+
}
563+
case ev: Http.ConnectionClosed => {
564+
log.warning("Stopping response streaming due to {}", ev)
565+
context.stop(self)
566+
}
567+
}
568+
}
569+
}
570+
}
571+
}
572+
507573
override def timeoutRoute: Route =
508574
complete(500, errMap("Request timed out. Try using the /jobs/<jobID>, /jobs APIs to get status/results"))
509575

job-server/test/spark.jobserver/WebApiMainRoutesSpec.scala

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,16 @@
11
package spark.jobserver
22

3-
import akka.actor.{Actor, Props}
43
import com.typesafe.config.ConfigFactory
5-
import spark.jobserver.io.{JobInfo, JarInfo}
6-
import org.joda.time.DateTime
7-
import org.scalatest.{Matchers, FunSpec, BeforeAndAfterAll}
84
import spray.http.StatusCodes._
9-
import spray.routing.HttpService
10-
import spray.testkit.ScalatestRouteTest
115

126

137
// Tests web response codes and formatting
148
// Does NOT test underlying Supervisor / JarManager functionality
159
// HttpService trait is needed for the sealRoute() which wraps exception handling
1610
class WebApiMainRoutesSpec extends WebApiSpec {
17-
import scala.collection.JavaConverters._
11+
import ooyala.common.akka.web.JsonUtils._
1812
import spray.httpx.SprayJsonSupport._
1913
import spray.json.DefaultJsonProtocol._
20-
import ooyala.common.akka.web.JsonUtils._
2114

2215
val getJobStatusInfoMap = {
2316
Map(
@@ -135,6 +128,17 @@ class WebApiMainRoutesSpec extends WebApiSpec {
135128
}
136129
}
137130

131+
it("adhoc job with Stream result of sync route should return 200 and chunked result") {
132+
val config2 = "foo.baz = booboo"
133+
Post("/jobs?appName=foo.stream&classPath=com.abc.meme&sync=true", config2) ~>
134+
sealRoute(routes) ~> check {
135+
status should be (OK)
136+
responseAs[Map[String, Any]] should be (Map(
137+
ResultKey -> "1, 2, 3, 4, 5, 6"
138+
))
139+
}
140+
}
141+
138142
it("should be able to take a timeout param") {
139143
val config2 = "foo.baz = booboo"
140144
Post("/jobs?appName=foo&classPath=com.abc.meme&sync=true&timeout=5", config2) ~>
@@ -269,6 +273,15 @@ class WebApiMainRoutesSpec extends WebApiSpec {
269273
}
270274
}
271275

276+
it("should be able to chunk serialize Stream with different types to JSON") {
277+
Get("/jobs/_stream") ~> sealRoute(routes) ~> check {
278+
status should be (OK)
279+
responseAs[Map[String, Any]] should be (
280+
getJobStatusInfoMap ++ Map(ResultKey -> "1, 2, 3, 4, 5, 6, 7")
281+
)
282+
}
283+
}
284+
272285
it("should be able to serialize base types (eg float, numbers) to JSON") {
273286
Get("/jobs/_num") ~> sealRoute(routes) ~> check {
274287
status should be (OK)

job-server/test/spark.jobserver/WebApiSpec.scala

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ import akka.actor.{Actor, Props}
44
import com.typesafe.config.ConfigFactory
55
import spark.jobserver.io.{JobDAOActor, JobInfo, JarInfo}
66
import org.joda.time.DateTime
7-
import org.scalatest.{Matchers, FunSpec, BeforeAndAfterAll}
8-
import spray.http.StatusCodes._
7+
import org.scalatest.{BeforeAndAfterAll, FunSpec, Matchers}
8+
import spark.jobserver.io.{JarInfo, JobInfo}
99
import spray.routing.HttpService
1010
import spray.testkit.ScalatestRouteTest
1111

@@ -75,8 +75,12 @@ with ScalatestRouteTest with HttpService {
7575
sender ! finishedJobInfo
7676
case GetJobResult("_seq") =>
7777
sender ! JobResult("_seq", Seq(1, 2, Map("3" -> "three")))
78+
case GetJobResult("_stream") =>
79+
sender ! JobResult("_stream", "\"1, 2, 3, 4, 5, 6, 7\"".getBytes().toStream)
7880
case GetJobStatus("_num") =>
7981
sender ! finishedJobInfo
82+
case GetJobStatus("_stream") =>
83+
sender ! finishedJobInfo
8084
case GetJobResult("_num") =>
8185
sender ! JobResult("_num", 5000)
8286
case GetJobStatus("_unk") =>
@@ -117,14 +121,22 @@ with ScalatestRouteTest with HttpService {
117121
case StartJob("err", _, config, _) => sender ! JobErroredOut("foo", dt,
118122
new RuntimeException("oops",
119123
new IllegalArgumentException("foo")))
120-
case StartJob(_, _, config, events) =>
124+
case StartJob("foo", _, config, events) =>
121125
statusActor ! Subscribe("foo", sender, events)
122126
statusActor ! JobStatusActor.JobInit(JobInfo("foo", "context", null, "", dt, None, None))
123127
statusActor ! JobStarted("foo", "context1", dt)
124128
val map = config.entrySet().asScala.map { entry => (entry.getKey -> entry.getValue.unwrapped) }.toMap
125129
if (events.contains(classOf[JobResult])) sender ! JobResult("foo", map)
126130
statusActor ! Unsubscribe("foo", sender)
127131

132+
case StartJob("foo.stream", _, config, events) =>
133+
statusActor ! Subscribe("foo.stream", sender, events)
134+
statusActor ! JobStatusActor.JobInit(JobInfo("foo.stream", "context", null, "", dt, None, None))
135+
statusActor ! JobStarted("foo.stream", "context1", dt)
136+
val result = "\"1, 2, 3, 4, 5, 6\"".getBytes().toStream
137+
if (events.contains(classOf[JobResult])) sender ! JobResult("foo.stream", result)
138+
statusActor ! Unsubscribe("foo.stream", sender)
139+
128140
case GetJobConfig("badjobid") => sender ! NoSuchJobId
129141
case GetJobConfig(_) => sender ! config
130142
}

0 commit comments

Comments
 (0)