Skip to content

Commit d12ccb8

Browse files
committed
Inline streaming invoke mode implementation
1 parent cf8ed5b commit d12ccb8

File tree

4 files changed

+90
-137
lines changed

4 files changed

+90
-137
lines changed

src/main/php/com/amazon/aws/lambda/Handler.class.php

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public function __construct(Environment $environment) {
2020
/** @return com.amazon.aws.lambda.Environment */
2121
public function environment() { return $this->environment; }
2222

23-
/** @return com.amazon.aws.lambda.Lambda|callable */
23+
/** @return com.amazon.aws.lambda.Lambda|com.amazon.aws.lambda.XXX|callable */
2424
public abstract function target();
2525

2626
/**
@@ -34,11 +34,13 @@ public final function invokeable($api) {
3434
$target= $this->target();
3535
if ($target instanceof Lambda) {
3636
return new Invokeable([$target, 'process'], $api->buffered());
37+
} else if ($target instanceof XXX) {
38+
return new Invokeable([$target, 'yyy'], $api->streaming());
3739
} else if (is_callable($target)) {
3840
$n= (new ReflectionFunction($target))->getNumberOfParameters();
3941
return new Invokeable($target, $n < 3 ? $api->buffered() : $api->streaming());
4042
} else {
41-
throw new IllegalArgumentException('Expected either a callable or a Lambda instance, have '.typeof($target));
43+
throw new IllegalArgumentException('Expected callable|Lambda|XXX, have '.typeof($target));
4244
}
4345
}
4446
}

src/main/php/com/amazon/aws/lambda/RuntimeApi.class.php

Lines changed: 85 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
<?php namespace com\amazon\aws\lambda;
22

33
use Throwable as Any;
4-
use lang\Throwable;
4+
use io\Channel;
5+
use io\streams\InputStream;
6+
use lang\{Throwable, IllegalStateException, IllegalArgumentException};
57
use peer\http\{HttpConnection, HttpRequest, RequestData};
68
use text\json\Json;
79

@@ -11,6 +13,7 @@
1113
* @test com.amazon.aws.lambda.unittest.RuntimeApiTest
1214
* @test com.amazon.aws.lambda.unittest.ExceptionTest
1315
* @test com.amazon.aws.lambda.unittest.BufferedTest
16+
* @test com.amazon.aws.lambda.unittest.StreamedTest
1417
* @see https://docs.aws.amazon.com/lambda/latest/dg/runtimes-api.html
1518
*/
1619
class RuntimeApi {
@@ -43,7 +46,87 @@ public function invoke($lambda, $event, $context) {
4346
}
4447

4548
/** Returns the streaming invoke mode */
46-
public function streaming(): InvokeMode { return new Streaming($this); }
49+
public function streaming(): InvokeMode {
50+
return new class($this) extends InvokeMode implements Stream {
51+
private $request= null;
52+
private $response= null;
53+
private $stream= null;
54+
55+
private function start() {
56+
$this->request->setHeader('Lambda-Runtime-Function-Response-Mode', 'streaming');
57+
$this->request->setHeader('Transfer-Encoding', 'chunked');
58+
return $this->api->stream($this->request);
59+
}
60+
61+
public function transmit($source, $mimeType= null) {
62+
if ($this->response) throw new IllegalStateException('Streaming ended');
63+
64+
if ($source instanceof InputStream) {
65+
$in= $source;
66+
} else if ($source instanceof Channel) {
67+
$in= $source->in();
68+
} else {
69+
throw new IllegalArgumentException('Expected either a channel or an input stream, have '.typeof($source));
70+
}
71+
72+
if (null !== $mimeType) {
73+
$this->request->setHeader('Content-Type', $mimeType);
74+
}
75+
76+
$this->stream ?? $this->stream= $this->start();
77+
try {
78+
while ($in->available()) {
79+
$this->stream->write($in->read());
80+
$this->stream->flush();
81+
}
82+
} finally {
83+
$in->close();
84+
$this->end();
85+
}
86+
}
87+
88+
public function use($mimeType) {
89+
if ($this->response) throw new IllegalStateException('Streaming ended');
90+
91+
$this->request->setHeader('Content-Type', $mimeType);
92+
}
93+
94+
public function write($bytes) {
95+
if ($this->response) throw new IllegalStateException('Streaming ended');
96+
97+
$this->stream ?? $this->stream= $this->start();
98+
$this->stream->write($bytes);
99+
$this->stream->flush();
100+
}
101+
102+
public function end() {
103+
if ($this->response) return; // Already ended
104+
105+
$this->stream ?? $this->stream= $this->start();
106+
$this->response= $this->api->finish($this->stream);
107+
$this->response->closeStream();
108+
}
109+
110+
public function invoke($lambda, $event, $context) {
111+
try {
112+
$this->request= $this->api->request("invocation/{$context->awsRequestId}/response");
113+
$lambda($event, $context, $this);
114+
$this->end();
115+
return $this->response;
116+
} catch (Throwable $t) {
117+
118+
// We can only report errors before starting to stream.
119+
if (null === $this->stream) {
120+
return $this->api->report("invocation/{$context->awsRequestId}/error", $t);
121+
}
122+
123+
// TODO: Use HTTP trailers to report back errors
124+
$this->end();
125+
throw $t;
126+
}
127+
}
128+
};
129+
}
47130

48131
/**
49132
* Marshals an exception according to the AWS specification.

src/main/php/com/amazon/aws/lambda/Streaming.class.php

Lines changed: 0 additions & 132 deletions
This file was deleted.

src/test/php/com/amazon/aws/lambda/unittest/StreamingTest.class.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public function reports_exceptions_before_streaming_via_error() {
5555
"Connection: close\r\n".
5656
"Host: test\r\n".
5757
"Content-Type: application/json\r\n".
58-
"Content-Length: 760\r\n".
58+
"Content-Length: 882\r\n".
5959
"\r\n",
6060
substr($response, 0, strpos($response, "\r\n\r\n") + 4)
6161
);

0 commit comments

Comments
 (0)