|
16 | 16 |
|
17 | 17 | package feral.lambda |
18 | 18 |
|
19 | | -import cats.data.OptionT |
20 | 19 | import cats.effect.IO |
21 | | -import cats.effect.kernel.Resource |
22 | 20 | import com.amazonaws.services.lambda.{runtime => lambdaRuntime} |
23 | | -import io.circe |
24 | 21 | import io.circe.Printer |
| 22 | +import io.circe.jawn |
25 | 23 | import io.circe.syntax._ |
26 | 24 |
|
27 | 25 | import java.io.InputStream |
28 | 26 | import java.io.OutputStream |
29 | 27 | import java.io.OutputStreamWriter |
| 28 | +import java.nio.channels.Channels |
30 | 29 | import scala.concurrent.Await |
31 | | -import scala.concurrent.duration.Duration |
| 30 | +import scala.concurrent.duration._ |
32 | 31 |
|
33 | 32 | private[lambda] abstract class IOLambdaPlatform[Event, Result] |
34 | 33 | extends lambdaRuntime.RequestStreamHandler { this: IOLambda[Event, Result] => |
35 | 34 |
|
36 | 35 | final def handleRequest( |
37 | 36 | input: InputStream, |
38 | 37 | output: OutputStream, |
39 | | - context: lambdaRuntime.Context): Unit = { |
| 38 | + runtimeContext: lambdaRuntime.Context): Unit = { |
40 | 39 | val (dispatcher, lambda) = |
41 | | - Await.result(setupMemo, Duration.Inf) |
| 40 | + Await.result(setupMemo, runtimeContext.getRemainingTimeInMillis().millis) |
42 | 41 |
|
43 | | - dispatcher.unsafeRunSync { |
44 | | - Resource |
45 | | - .eval { |
46 | | - for { |
47 | | - event <- fs2 |
48 | | - .io |
49 | | - .readInputStream(IO.pure(input), 8192, closeAfterUse = false) |
50 | | - .through(circe.fs2.byteStreamParser) |
51 | | - .through(circe.fs2.decoder[IO, Event]) |
52 | | - .head |
53 | | - .compile |
54 | | - .lastOrError |
55 | | - context <- IO(Context.fromJava[IO](context)) |
56 | | - _ <- OptionT(lambda(event, context)).foreachF { result => |
57 | | - Resource.fromAutoCloseable(IO(new OutputStreamWriter(output))).use { writer => |
58 | | - IO.blocking(Printer.noSpaces.unsafePrintToAppendable(result.asJson, writer)) |
59 | | - } |
60 | | - } |
61 | | - } yield () |
62 | | - } |
63 | | - .onFinalize(IO.blocking(input.close()) &> IO.blocking(output.close())) |
64 | | - .use_ |
65 | | - } |
| 42 | + val event = jawn.decodeChannel[Event](Channels.newChannel(input)).fold(throw _, identity(_)) |
| 43 | + val context = Context.fromJava[IO](runtimeContext) |
| 44 | + dispatcher |
| 45 | + .unsafeRunTimed( |
| 46 | + lambda(event, context), |
| 47 | + runtimeContext.getRemainingTimeInMillis().millis |
| 48 | + ) |
| 49 | + .foreach { result => |
| 50 | + val writer = new OutputStreamWriter(output) |
| 51 | + Printer.noSpaces.unsafePrintToAppendable(result.asJson, writer) |
| 52 | + writer.flush() |
| 53 | + } |
66 | 54 | } |
67 | 55 |
|
68 | 56 | } |
0 commit comments