Skip to content

Commit 2c558af

Browse files
committed
Merge pull request spark-jobserver#418 from leone17/streaming
Added support for chunked encoding when job result is a Stream[Byte]
2 parents c52d4b0 + b2fd783 commit 2c558af

File tree

8 files changed

+165
-33
lines changed

8 files changed

+165
-33
lines changed

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -629,6 +629,11 @@ serialized properly:
629629
- Array's
630630
- Anything that implements Product (Option, case classes) -- they will be serialized as lists
631631
- Maps and Seqs may contain nested values of any of the above
632+
- If a job result is of scala's Stream[Byte] type it will be serialised directly as a chunk encoded stream.
633+
This is useful if your job result payload is large and may cause a timeout serialising as objects. Beware, this
634+
will not currently work as desired with context-per-jvm=true configuration, since it would require serialising
635+
Stream[_] blob between processes. For now use Stream[_] job results in context-per-jvm=false configuration, pending
636+
potential future enhancements to support this in context-per-jvm=true mode.
632637
633638
If we encounter a data type that is not supported, then the entire result will be serialized to a string.
634639

job-server/config/local.conf.template

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ spark {
2222
filedao {
2323
rootdir = /tmp/spark-job-server/filedao/data
2424
}
25+
26+
# When using chunked transfer encoding with scala Stream job results, this is the size of each chunk
27+
result-chunk-size = 1m
2528
}
2629

2730
# predefined Spark contexts
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package spark.jobserver
2+
3+
import akka.actor.Props
4+
import spray.routing.{HttpService, RequestContext}
5+
6+
trait ChunkEncodedStreamingSupport {
7+
this: HttpService =>
8+
9+
protected def sendStreamingResponse(ctx: RequestContext,
10+
chunkSize: Int,
11+
byteIterator: Iterator[_]): Unit = {
12+
actorRefFactory.actorOf {
13+
Props {
14+
new ChunkEncodingActor(ctx, chunkSize, byteIterator)
15+
}
16+
}
17+
}
18+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package spark.jobserver
2+
3+
import akka.actor.{Actor, ActorLogging}
4+
import spark.jobserver.ChunkEncodingActor.Ok
5+
import spray.can.Http
6+
import spray.http._
7+
import spray.routing.RequestContext
8+
9+
object ChunkEncodingActor {
10+
// simple case class whose instances we use as send confirmation message for streaming chunks
11+
case class Ok(remaining: Iterator[_])
12+
}
13+
14+
/**
15+
* Performs sending back a response in streaming fashion using chunk encoding
16+
* @param ctx RequestContext which has the responder to send chunks to
17+
* @param chunkSize The size of each chunk
18+
* @param byteIterator Iterator of data to stream back (currently only Byte is supported)
19+
*/
20+
class ChunkEncodingActor(ctx: RequestContext,
21+
chunkSize: Int,
22+
byteIterator: Iterator[_]) extends Actor with ActorLogging {
23+
// we use the successful sending of a chunk as trigger for sending the next chunk
24+
ctx.responder ! ChunkedResponseStart(
25+
HttpResponse(entity = HttpEntity(MediaTypes.`application/json`,
26+
byteIterator.take(chunkSize).map {
27+
case c: Byte => c
28+
}.toArray))).withAck(Ok(byteIterator))
29+
30+
def receive: Receive = {
31+
case Ok(remaining) =>
32+
val arr = remaining.take(chunkSize).map {
33+
case c: Byte => c
34+
}.toArray
35+
if (arr.nonEmpty) {
36+
ctx.responder ! MessageChunk(arr).withAck(Ok(remaining))
37+
}
38+
else {
39+
ctx.responder ! ChunkedMessageEnd
40+
context.stop(self)
41+
}
42+
case ev: Http.ConnectionClosed => {
43+
log.warning("Stopping response streaming due to {}", ev)
44+
context.stop(self)
45+
}
46+
}
47+
}
48+

job-server/src/spark.jobserver/JobManagerActor.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,15 @@ class JobManagerActor(contextConfig: Config) extends InstrumentedActor {
307307
}(executionContext).andThen {
308308
case Success(result: Any) =>
309309
statusActor ! JobFinished(jobId, DateTime.now())
310+
// TODO: If the result is Stream[_] and this is running with context-per-jvm=true configuration
311+
// serializing a Stream[_] blob across process boundaries is not desirable.
312+
// In that scenario an enhancement is required here to chunk stream results back.
313+
// Something like ChunkedJobResultStart, ChunkJobResultMessage, and ChunkJobResultEnd messages
314+
// might be a better way to send results back and then on the other side use chunked encoding
315+
// transfer to send the chunks back. Alternatively the stream could be persisted here to HDFS
316+
// and the streamed out of InputStream on the other side.
317+
// Either way an enhancement would be required here to make Stream[_] responses work
318+
// with context-per-jvm=true configuration
310319
resultActor ! JobResult(jobId, result)
311320
case Failure(error: Throwable) =>
312321
// Wrapping the error inside a RuntimeException to handle the case of throwing custom exceptions.

job-server/src/spark.jobserver/WebApi.scala

Lines changed: 46 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,40 @@
11
package spark.jobserver
22

3+
import java.util.NoSuchElementException
34
import java.util.concurrent.TimeUnit
5+
import javax.net.ssl.SSLContext
46

5-
import akka.actor.{ ActorSystem, ActorRef }
7+
import akka.actor.{ActorRef, ActorSystem}
68
import akka.pattern.ask
79
import akka.util.Timeout
8-
import com.typesafe.config.{ Config, ConfigFactory, ConfigException, ConfigRenderOptions }
9-
import java.util.NoSuchElementException
10-
import javax.net.ssl.SSLContext
11-
import ooyala.common.akka.web.{ WebService, CommonRoutes }
10+
import com.typesafe.config.{Config, ConfigException, ConfigFactory, ConfigRenderOptions}
11+
import ooyala.common.akka.web.JsonUtils.AnyJsonFormat
12+
import ooyala.common.akka.web.{CommonRoutes, WebService}
13+
import org.apache.shiro.SecurityUtils
14+
import org.apache.shiro.config.IniSecurityManagerFactory
1215
import org.joda.time.DateTime
1316
import org.slf4j.LoggerFactory
14-
import spark.jobserver.util.SparkJobUtils
15-
import spark.jobserver.util.SSLContextFactory
16-
import spark.jobserver.routes.DataRoutes
17-
import scala.concurrent.{Await, ExecutionContext, Future}
18-
import scala.util.Try
19-
import spark.jobserver.io.JobInfo
2017
import spark.jobserver.auth._
21-
import spray.http.HttpResponse
22-
import spray.http.MediaTypes
23-
import spray.http.StatusCodes
18+
import spark.jobserver.io.JobInfo
19+
import spark.jobserver.routes.DataRoutes
20+
import spark.jobserver.util.{SSLContextFactory, SparkJobUtils}
21+
import spray.http._
2422
import spray.httpx.SprayJsonSupport.sprayJsonMarshaller
23+
import spray.io.ServerSSLEngineProvider
2524
import spray.json.DefaultJsonProtocol._
26-
import spray.routing.{ HttpService, Route, RequestContext }
2725
import spray.routing.directives.AuthMagnet
28-
import spray.io.ServerSSLEngineProvider
29-
import org.apache.shiro.config.IniSecurityManagerFactory
30-
import org.apache.shiro.mgt.SecurityManager
31-
import org.apache.shiro.SecurityUtils
26+
import spray.routing.{HttpService, RequestContext, Route}
27+
28+
import scala.concurrent.{Await, ExecutionContext, Future}
29+
import scala.util.Try
30+
3231

3332
object WebApi {
3433
val StatusKey = "status"
3534
val ResultKey = "result"
35+
val ResultKeyStartBytes = "{\n".getBytes
36+
val ResultKeyEndBytes = "}".getBytes
37+
val ResultKeyBytes = ("\"" + ResultKey + "\":").getBytes
3638

3739
def badRequest(ctx: RequestContext, msg: String) {
3840
ctx.complete(StatusCodes.BadRequest, errMap(msg))
@@ -61,6 +63,14 @@ object WebApi {
6163
Map(ResultKey -> result)
6264
}
6365

66+
def resultToByteIterator(jobReport: Map[String, Any], result: Iterator[_]): Iterator[_] = {
67+
ResultKeyStartBytes.toIterator ++
68+
(jobReport.map(t => Seq(AnyJsonFormat.write(t._1).toString(),
69+
AnyJsonFormat.write(t._2).toString()).mkString(":") ).mkString(",") ++
70+
(if(jobReport.nonEmpty) "," else "")).getBytes().toIterator ++
71+
ResultKeyBytes.toIterator ++ result ++ ResultKeyEndBytes.toIterator
72+
}
73+
6474
def formatException(t: Throwable): Any =
6575
if (t.getCause != null) {
6676
Map("message" -> t.getMessage,
@@ -95,7 +105,8 @@ class WebApi(system: ActorSystem,
95105
dataManager: ActorRef,
96106
supervisor: ActorRef,
97107
jobInfo: ActorRef)
98-
extends HttpService with CommonRoutes with DataRoutes with SJSAuthenticator with CORSSupport {
108+
extends HttpService with CommonRoutes with DataRoutes with SJSAuthenticator with CORSSupport
109+
with ChunkEncodedStreamingSupport {
99110
import CommonMessages._
100111
import ContextSupervisor._
101112
import scala.concurrent.duration._
@@ -112,6 +123,8 @@ class WebApi(system: ActorSystem,
112123
val DefaultJobLimit = 50
113124
val StatusKey = "status"
114125
val ResultKey = "result"
126+
val ResultChunkSize = Option("spark.jobserver.result-chunk-size").filter(config.hasPath)
127+
.fold(100 * 1024)(config.getBytes(_).toInt)
115128

116129
val contextTimeout = SparkJobUtils.getContextTimeout(config)
117130
val bindAddress = config.getString("spark.jobserver.bind-address")
@@ -228,6 +241,7 @@ class WebApi(system: ActorSystem,
228241
*/
229242
def contextRoutes: Route = pathPrefix("contexts") {
230243
import ContextSupervisor._
244+
231245
import collection.JavaConverters._
232246
// user authentication
233247
authenticate(authenticator) { authInfo =>
@@ -384,7 +398,12 @@ class WebApi(system: ActorSystem,
384398
val resultFuture = jobInfo ? GetJobResult(jobId)
385399
resultFuture.map {
386400
case JobResult(_, result) =>
387-
ctx.complete(jobReport ++ resultToTable(result))
401+
result match {
402+
case s: Stream[_] =>
403+
sendStreamingResponse(ctx, ResultChunkSize,
404+
resultToByteIterator(jobReport, s.toIterator))
405+
case _ => ctx.complete(jobReport ++ resultToTable(result))
406+
}
388407
case _ =>
389408
ctx.complete(jobReport)
390409
}
@@ -466,7 +485,12 @@ class WebApi(system: ActorSystem,
466485
JobManagerActor.StartJob(appName, classPath, jobConfig, events))(timeout)
467486
respondWithMediaType(MediaTypes.`application/json`) { ctx =>
468487
future.map {
469-
case JobResult(_, res) => ctx.complete(resultToTable(res))
488+
case JobResult(_, res) =>
489+
res match {
490+
case s: Stream[_] => sendStreamingResponse(ctx, ResultChunkSize,
491+
resultToByteIterator(Map.empty, s.toIterator))
492+
case _ => ctx.complete(resultToTable(res))
493+
}
470494
case JobErroredOut(_, _, ex) => ctx.complete(errMap(ex, "ERROR"))
471495
case JobStarted(jobId, context, _) =>
472496
jobInfo ! StoreJobConfig(jobId, postedJobConfig)

job-server/test/spark.jobserver/WebApiMainRoutesSpec.scala

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,16 @@
11
package spark.jobserver
22

3-
import akka.actor.{Actor, Props}
43
import com.typesafe.config.ConfigFactory
5-
import spark.jobserver.io.{JobInfo, JarInfo}
6-
import org.joda.time.DateTime
7-
import org.scalatest.{Matchers, FunSpec, BeforeAndAfterAll}
84
import spray.http.StatusCodes._
9-
import spray.routing.HttpService
10-
import spray.testkit.ScalatestRouteTest
115

126

137
// Tests web response codes and formatting
148
// Does NOT test underlying Supervisor / JarManager functionality
159
// HttpService trait is needed for the sealRoute() which wraps exception handling
1610
class WebApiMainRoutesSpec extends WebApiSpec {
17-
import scala.collection.JavaConverters._
11+
import ooyala.common.akka.web.JsonUtils._
1812
import spray.httpx.SprayJsonSupport._
1913
import spray.json.DefaultJsonProtocol._
20-
import ooyala.common.akka.web.JsonUtils._
2114

2215
val getJobStatusInfoMap = {
2316
Map(
@@ -135,6 +128,17 @@ class WebApiMainRoutesSpec extends WebApiSpec {
135128
}
136129
}
137130

131+
it("adhoc job with Stream result of sync route should return 200 and chunked result") {
132+
val config2 = "foo.baz = booboo"
133+
Post("/jobs?appName=foo.stream&classPath=com.abc.meme&sync=true", config2) ~>
134+
sealRoute(routes) ~> check {
135+
status should be (OK)
136+
responseAs[Map[String, Any]] should be (Map(
137+
ResultKey -> "1, 2, 3, 4, 5, 6"
138+
))
139+
}
140+
}
141+
138142
it("should be able to take a timeout param") {
139143
val config2 = "foo.baz = booboo"
140144
Post("/jobs?appName=foo&classPath=com.abc.meme&sync=true&timeout=5", config2) ~>
@@ -269,6 +273,15 @@ class WebApiMainRoutesSpec extends WebApiSpec {
269273
}
270274
}
271275

276+
it("should be able to chunk serialize Stream with different types to JSON") {
277+
Get("/jobs/_stream") ~> sealRoute(routes) ~> check {
278+
status should be (OK)
279+
responseAs[Map[String, Any]] should be (
280+
getJobStatusInfoMap ++ Map(ResultKey -> "1, 2, 3, 4, 5, 6, 7")
281+
)
282+
}
283+
}
284+
272285
it("should be able to serialize base types (eg float, numbers) to JSON") {
273286
Get("/jobs/_num") ~> sealRoute(routes) ~> check {
274287
status should be (OK)

job-server/test/spark.jobserver/WebApiSpec.scala

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ import akka.actor.{Actor, Props}
44
import com.typesafe.config.ConfigFactory
55
import spark.jobserver.io.{JobDAOActor, JobInfo, JarInfo}
66
import org.joda.time.DateTime
7-
import org.scalatest.{Matchers, FunSpec, BeforeAndAfterAll}
8-
import spray.http.StatusCodes._
7+
import org.scalatest.{BeforeAndAfterAll, FunSpec, Matchers}
8+
import spark.jobserver.io.{JarInfo, JobInfo}
99
import spray.routing.HttpService
1010
import spray.testkit.ScalatestRouteTest
1111

@@ -75,8 +75,12 @@ with ScalatestRouteTest with HttpService {
7575
sender ! finishedJobInfo
7676
case GetJobResult("_seq") =>
7777
sender ! JobResult("_seq", Seq(1, 2, Map("3" -> "three")))
78+
case GetJobResult("_stream") =>
79+
sender ! JobResult("_stream", "\"1, 2, 3, 4, 5, 6, 7\"".getBytes().toStream)
7880
case GetJobStatus("_num") =>
7981
sender ! finishedJobInfo
82+
case GetJobStatus("_stream") =>
83+
sender ! finishedJobInfo
8084
case GetJobResult("_num") =>
8185
sender ! JobResult("_num", 5000)
8286
case GetJobStatus("_unk") =>
@@ -117,14 +121,22 @@ with ScalatestRouteTest with HttpService {
117121
case StartJob("err", _, config, _) => sender ! JobErroredOut("foo", dt,
118122
new RuntimeException("oops",
119123
new IllegalArgumentException("foo")))
120-
case StartJob(_, _, config, events) =>
124+
case StartJob("foo", _, config, events) =>
121125
statusActor ! Subscribe("foo", sender, events)
122126
statusActor ! JobStatusActor.JobInit(JobInfo("foo", "context", null, "", dt, None, None))
123127
statusActor ! JobStarted("foo", "context1", dt)
124128
val map = config.entrySet().asScala.map { entry => (entry.getKey -> entry.getValue.unwrapped) }.toMap
125129
if (events.contains(classOf[JobResult])) sender ! JobResult("foo", map)
126130
statusActor ! Unsubscribe("foo", sender)
127131

132+
case StartJob("foo.stream", _, config, events) =>
133+
statusActor ! Subscribe("foo.stream", sender, events)
134+
statusActor ! JobStatusActor.JobInit(JobInfo("foo.stream", "context", null, "", dt, None, None))
135+
statusActor ! JobStarted("foo.stream", "context1", dt)
136+
val result = "\"1, 2, 3, 4, 5, 6\"".getBytes().toStream
137+
if (events.contains(classOf[JobResult])) sender ! JobResult("foo.stream", result)
138+
statusActor ! Unsubscribe("foo.stream", sender)
139+
128140
case GetJobConfig("badjobid") => sender ! NoSuchJobId
129141
case GetJobConfig(_) => sender ! config
130142
}

0 commit comments

Comments
 (0)