Skip to content

Commit 552a809

Browse files
authored
Merge pull request #1338 from UdashFramework/streaming
Streaming support in Udash REST
2 parents bfe28f2 + 33366c6 commit 552a809

39 files changed

+3352
-345
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package io.udash.rest
2+
3+
import monix.execution.Scheduler
4+
import monix.reactive.Observable
5+
import org.openjdk.jmh.annotations.{Benchmark, BenchmarkMode, Fork, Mode, Scope, State}
6+
import com.avsystem.commons.concurrent.ObservableExtensions.*
7+
8+
import java.io.ByteArrayOutputStream
9+
import scala.util.Random
10+
11+
@Fork(1)
12+
@BenchmarkMode(Array(Mode.Throughput))
13+
@State(Scope.Benchmark)
14+
class MergeArraysBenchmark {
15+
16+
import Scheduler.Implicits.global
17+
18+
private final val data: Observable[Array[Byte]] =
19+
Observable.repeatEval(Random.nextBytes(1024)).take(32)
20+
21+
@Benchmark
22+
def mergeByteArrayOutputStream: Array[Byte] =
23+
data.foldLeftL(new ByteArrayOutputStream()) { case (acc, elem) =>
24+
acc.write(elem)
25+
acc
26+
}.map(_.toByteArray).runSyncUnsafe()
27+
28+
@Benchmark
29+
def mergeByteArrayToL: Array[Byte] =
30+
data.flatMap(Observable.fromIterable(_)).toL(Array).runSyncUnsafe()
31+
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package io.udash.rest
2+
3+
import com.avsystem.commons.serialization.json.JsonStringOutput
4+
import io.udash.rest.RestExampleData.RestResponseSize
5+
import io.udash.rest.raw.RawRest
6+
import monix.eval.Task
7+
import monix.execution.Scheduler
8+
import org.openjdk.jmh.annotations.*
9+
10+
import java.nio.charset.StandardCharsets
11+
import java.util.concurrent.TimeUnit
12+
import scala.concurrent.Await
13+
import scala.concurrent.duration.Duration
14+
15+
private object RestApiBenchmark {
16+
trait RestTestApi {
17+
@GET def exampleEndpoint(size: RestResponseSize): Task[List[RestExampleData]]
18+
@GET def exampleBinaryEndpoint(size: RestResponseSize): Task[List[Array[Byte]]]
19+
}
20+
21+
object RestTestApi extends DefaultRestApiCompanion[RestTestApi] {
22+
final class Impl extends RestTestApi {
23+
private var responses: Map[RestResponseSize, List[RestExampleData]] = Map.empty
24+
25+
def exampleEndpoint(size: RestResponseSize): Task[List[RestExampleData]] =
26+
Task.eval(getResponse(size))
27+
28+
override def exampleBinaryEndpoint(size: RestResponseSize): Task[List[Array[Byte]]] =
29+
Task.eval(getResponse(size).iterator.map(JsonStringOutput.write(_).getBytes(StandardCharsets.UTF_8)).toList)
30+
31+
private def getResponse(size: RestResponseSize): List[RestExampleData] =
32+
responses(size)
33+
34+
def generateResponses(): Unit =
35+
this.responses = RestResponseSize.values.map(size => size -> RestExampleData.generateRandomList(size)).toMap
36+
}
37+
}
38+
39+
private def createApiProxy(): (RestTestApi.Impl, RestTestApi) = {
40+
val apiImpl = new RestTestApi.Impl()
41+
val handler = RawRest.asHandleRequest[RestTestApi](apiImpl)
42+
(apiImpl, RawRest.fromHandleRequest[RestTestApi](handler))
43+
}
44+
}
45+
46+
47+
@OutputTimeUnit(TimeUnit.SECONDS)
48+
@BenchmarkMode(Array(Mode.Throughput))
49+
@State(Scope.Thread)
50+
@Fork(1)
51+
class RestApiBenchmark {
52+
implicit def scheduler: Scheduler = Scheduler.global
53+
54+
private final val (impl, proxy) = RestApiBenchmark.createApiProxy()
55+
56+
@Setup(Level.Trial)
57+
def setup(): Unit = {
58+
this.impl.generateResponses()
59+
}
60+
61+
@Benchmark
62+
def smallArrayJsonList(): Unit = {
63+
waitEndpoint(RestResponseSize.Small)
64+
}
65+
66+
@Benchmark
67+
def mediumArrayJsonList(): Unit = {
68+
waitEndpoint(RestResponseSize.Medium)
69+
}
70+
71+
@Benchmark
72+
def hugeArrayJsonList(): Unit = {
73+
waitEndpoint(RestResponseSize.Huge)
74+
}
75+
76+
@Benchmark
77+
def smallArrayBinary(): Unit = {
78+
waitEndpointBinary(RestResponseSize.Small)
79+
}
80+
81+
@Benchmark
82+
def mediumArrayBinary(): Unit = {
83+
waitEndpointBinary(RestResponseSize.Medium)
84+
}
85+
86+
@Benchmark
87+
def hugeArrayBinary(): Unit = {
88+
waitEndpointBinary(RestResponseSize.Huge)
89+
}
90+
91+
private def waitEndpoint(size: RestResponseSize): Unit =
92+
Await.result(this.proxy.exampleEndpoint(size).runToFuture, Duration.apply(10, TimeUnit.SECONDS))
93+
94+
private def waitEndpointBinary(size: RestResponseSize): Unit =
95+
Await.result(this.proxy.exampleBinaryEndpoint(size).runToFuture, Duration.apply(10, TimeUnit.SECONDS))
96+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package io.udash.rest
2+
3+
import com.avsystem.commons.misc.{AbstractValueEnum, EnumCtx}
4+
5+
import scala.util.Random
6+
7+
final case class RestExampleData(number: Long, string: String)
8+
object RestExampleData extends RestDataCompanion[RestExampleData] {
9+
final case class RestResponseSize(value: Int)(implicit enumCtx: EnumCtx) extends AbstractValueEnum
10+
object RestResponseSize extends RestValueEnumCompanion[RestResponseSize] {
11+
final val Small: Value = new RestResponseSize(10)
12+
final val Medium: Value = new RestResponseSize(500)
13+
final val Huge: Value = new RestResponseSize(10000)
14+
}
15+
16+
private def random() =
17+
RestExampleData(
18+
Random.nextLong(),
19+
Iterator.continually(Random.nextPrintableChar()).take(200).mkString
20+
)
21+
22+
def generateRandomList(size: RestResponseSize): List[RestExampleData] =
23+
Range(0, size.value).toList.map(_ => RestExampleData.random())
24+
}
Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
package io.udash.rest
2+
3+
import com.avsystem.commons.serialization.json.JsonStringOutput
4+
import io.udash.rest.RestExampleData.RestResponseSize
5+
import io.udash.rest.raw.{RawRest, RestRequest, RestResponse, StreamedRestResponse}
6+
import monix.eval.Task
7+
import monix.execution.Scheduler
8+
import monix.reactive.Observable
9+
import org.openjdk.jmh.annotations.*
10+
11+
import java.nio.charset.StandardCharsets
12+
import java.util.concurrent.TimeUnit
13+
import scala.concurrent.Await
14+
import scala.concurrent.duration.Duration
15+
16+
private object StreamingRestApiBenchmark {
17+
trait RestTestApi {
18+
@GET def exampleEndpoint(size: RestResponseSize): Observable[RestExampleData]
19+
@GET def exampleEndpointBinary(size: RestResponseSize): Observable[Array[Byte]]
20+
21+
@streamingResponseBatchSize(10)
22+
@GET def exampleEndpointBatch10(size: RestResponseSize): Observable[RestExampleData]
23+
24+
@streamingResponseBatchSize(10)
25+
@GET def exampleEndpointBatch10Binary(size: RestResponseSize): Observable[Array[Byte]]
26+
27+
@streamingResponseBatchSize(500)
28+
@GET def exampleEndpointBatch500(size: RestResponseSize): Observable[RestExampleData]
29+
30+
@streamingResponseBatchSize(500)
31+
@GET def exampleEndpointBatch500Binary(size: RestResponseSize): Observable[Array[Byte]]
32+
33+
@GET def exampleEndpointWithoutStreaming(size: RestResponseSize): Task[List[RestExampleData]]
34+
}
35+
36+
object RestTestApi extends DefaultRestApiCompanion[RestTestApi] {
37+
final class Impl extends RestTestApi {
38+
private var responses: Map[RestResponseSize, List[RestExampleData]] = Map.empty
39+
40+
def exampleEndpoint(size: RestResponseSize): Observable[RestExampleData] =
41+
Observable.fromIterable(getResponse(size))
42+
43+
def exampleEndpointBinary(size: RestResponseSize): Observable[Array[Byte]] =
44+
getResponseBinary(size)
45+
46+
def exampleEndpointBatch10(size: RestResponseSize): Observable[RestExampleData] =
47+
Observable.fromIterable(getResponse(size))
48+
49+
def exampleEndpointBatch10Binary(size: RestResponseSize): Observable[Array[Byte]] =
50+
getResponseBinary(size)
51+
52+
def exampleEndpointBatch500(size: RestResponseSize): Observable[RestExampleData] =
53+
Observable.fromIterable(getResponse(size))
54+
55+
def exampleEndpointBatch500Binary(size: RestResponseSize): Observable[Array[Byte]] =
56+
getResponseBinary(size)
57+
58+
def exampleEndpointWithoutStreaming(size: RestResponseSize): Task[List[RestExampleData]] =
59+
Task.eval(getResponse(size))
60+
61+
private def getResponse(size: RestResponseSize): List[RestExampleData] =
62+
responses(size)
63+
64+
private def getResponseBinary(size: RestResponseSize): Observable[Array[Byte]] =
65+
Observable.fromIterable(getResponse(size)).map(JsonStringOutput.write(_).getBytes(StandardCharsets.UTF_8))
66+
67+
def generateResponses(): Unit =
68+
this.responses = RestResponseSize.values.map(size => size -> RestExampleData.generateRandomList(size)).toMap
69+
}
70+
}
71+
72+
private def creteApiProxy(): (RestTestApi.Impl, RestTestApi) = {
73+
val apiImpl = new RestTestApi.Impl()
74+
val streamingServerHandle = RawRest.asHandleRequestWithStreaming[RestTestApi](apiImpl)
75+
val streamingClientHandler = new RawRest.RestRequestHandler {
76+
override def handleRequest(request: RestRequest): Task[RestResponse] =
77+
streamingServerHandle(request).map(_.asInstanceOf[RestResponse])
78+
79+
override def handleRequestStream(request: RestRequest): Task[StreamedRestResponse] =
80+
streamingServerHandle(request).map(_.asInstanceOf[StreamedRestResponse])
81+
}
82+
(apiImpl, RawRest.fromHandleRequestWithStreaming[RestTestApi](streamingClientHandler))
83+
}
84+
}
85+
86+
87+
@OutputTimeUnit(TimeUnit.SECONDS)
88+
@BenchmarkMode(Array(Mode.Throughput))
89+
@State(Scope.Thread)
90+
@Fork(1)
91+
class StreamingRestApiBenchmark {
92+
implicit def scheduler: Scheduler = Scheduler.global
93+
private final val (impl, proxy) = StreamingRestApiBenchmark.creteApiProxy()
94+
95+
@Setup(Level.Trial)
96+
def setup(): Unit = {
97+
this.impl.generateResponses()
98+
}
99+
100+
@Benchmark
101+
def smallArrayJsonList(): Unit = {
102+
waitStreamingEndpoint(RestResponseSize.Small)
103+
}
104+
105+
@Benchmark
106+
def mediumArrayJsonList(): Unit = {
107+
waitStreamingEndpoint(RestResponseSize.Medium)
108+
}
109+
110+
@Benchmark
111+
def hugeArrayJsonList(): Unit = {
112+
waitStreamingEndpoint(RestResponseSize.Huge)
113+
}
114+
115+
@Benchmark
116+
def smallArrayBinary(): Unit = {
117+
waitStreamingEndpointBinary(RestResponseSize.Small)
118+
}
119+
120+
@Benchmark
121+
def mediumArrayBinary(): Unit = {
122+
waitStreamingEndpointBinary(RestResponseSize.Medium)
123+
}
124+
125+
@Benchmark
126+
def hugeArrayBinary(): Unit = {
127+
waitStreamingEndpointBinary(RestResponseSize.Huge)
128+
}
129+
130+
@Benchmark
131+
def smallArrayBatch10JsonList(): Unit = {
132+
waitObservable(this.proxy.exampleEndpointBatch10(RestResponseSize.Small))
133+
}
134+
135+
@Benchmark
136+
def mediumArrayBatch10JsonList(): Unit = {
137+
waitObservable(this.proxy.exampleEndpointBatch10(RestResponseSize.Medium))
138+
}
139+
140+
@Benchmark
141+
def hugeArrayBatch10JsonList(): Unit = {
142+
waitObservable(this.proxy.exampleEndpointBatch10(RestResponseSize.Huge))
143+
}
144+
145+
@Benchmark
146+
def smallArrayBatch10Binary(): Unit = {
147+
waitObservable(this.proxy.exampleEndpointBatch10Binary(RestResponseSize.Small))
148+
}
149+
150+
@Benchmark
151+
def mediumArrayBatch10Binary(): Unit = {
152+
waitObservable(this.proxy.exampleEndpointBatch10Binary(RestResponseSize.Medium))
153+
}
154+
155+
@Benchmark
156+
def hugeArrayBatch10Binary(): Unit = {
157+
waitObservable(this.proxy.exampleEndpointBatch10Binary(RestResponseSize.Huge))
158+
}
159+
160+
@Benchmark
161+
def smallArrayBatch500JsonList(): Unit = {
162+
waitObservable(this.proxy.exampleEndpointBatch500(RestResponseSize.Small))
163+
}
164+
165+
@Benchmark
166+
def mediumArrayBatch500JsonList(): Unit = {
167+
waitObservable(this.proxy.exampleEndpointBatch500(RestResponseSize.Medium))
168+
}
169+
170+
@Benchmark
171+
def hugeArrayBatch500JsonList(): Unit = {
172+
waitObservable(this.proxy.exampleEndpointBatch500(RestResponseSize.Huge))
173+
}
174+
175+
@Benchmark
176+
def smallArrayBatch500Binary(): Unit = {
177+
waitObservable(this.proxy.exampleEndpointBatch500Binary(RestResponseSize.Small))
178+
}
179+
180+
@Benchmark
181+
def mediumArrayBatch500Binary(): Unit = {
182+
waitObservable(this.proxy.exampleEndpointBatch500Binary(RestResponseSize.Medium))
183+
}
184+
185+
@Benchmark
186+
def hugeArrayBatch500Binary(): Unit = {
187+
waitObservable(this.proxy.exampleEndpointBatch500Binary(RestResponseSize.Huge))
188+
}
189+
190+
@Benchmark
191+
def smallArrayWithoutStreaming(): Unit = {
192+
waitEndpointWithoutStreaming(RestResponseSize.Small)
193+
}
194+
195+
@Benchmark
196+
def mediumArrayWithoutStreaming(): Unit = {
197+
waitEndpointWithoutStreaming(RestResponseSize.Medium)
198+
}
199+
200+
@Benchmark
201+
def hugeArrayWithoutStreaming(): Unit = {
202+
waitEndpointWithoutStreaming(RestResponseSize.Huge)
203+
}
204+
205+
private def waitEndpointWithoutStreaming(samples: RestResponseSize): Unit =
206+
wait(this.proxy.exampleEndpointWithoutStreaming(samples))
207+
208+
private def waitStreamingEndpoint(samples: RestResponseSize): Unit =
209+
wait(this.proxy.exampleEndpoint(samples).completedL)
210+
211+
private def waitStreamingEndpointBinary(samples: RestResponseSize): Unit =
212+
wait(this.proxy.exampleEndpointBinary(samples).completedL)
213+
214+
private def wait[T](task: Task[T]): Unit =
215+
Await.result(task.runToFuture, Duration.apply(15, TimeUnit.SECONDS))
216+
217+
private def waitObservable[T](obs: Observable[T]): Unit =
218+
Await.result(obs.completedL.runToFuture, Duration.apply(15, TimeUnit.SECONDS))
219+
}

build.sbt

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import org.openqa.selenium.firefox.{FirefoxDriverLogLevel, FirefoxOptions}
33
import org.scalajs.jsdependencies.sbtplugin.JSModuleID
44
import org.scalajs.jsenv.jsdomnodejs.JSDOMNodeJSEnv
55
import org.scalajs.jsenv.selenium.SeleniumJSEnv
6+
import pl.project13.scala.sbt.JmhPlugin
67

78
name := "udash"
89

@@ -460,4 +461,12 @@ lazy val `guide-selenium` =
460461
`guide-homepage` / Compile / compileStatics,
461462
`guide-guide` / Compile / compileStatics,
462463
).value
463-
)
464+
)
465+
466+
lazy val benchmarksJVM = project.in(file("benchmarks"))
467+
.enablePlugins(JmhPlugin)
468+
.dependsOn(jvmLibraries.map(p => p: ClasspathDep[ProjectReference]): _*)
469+
.settings(
470+
commonSettings,
471+
noPublishSettings,
472+
)

0 commit comments

Comments
 (0)