Skip to content

Commit bb2ea95

Browse files
authored
Interceptable errors
- update to latest roadrunner interfaces - added the ability to retry errors
1 parent cd5b476 commit bb2ea95

File tree

11 files changed

+225
-74
lines changed

11 files changed

+225
-74
lines changed
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
<?php
2+
3+
/**
4+
* This file is part of Temporal package.
5+
*
6+
* For the full copyright and license information, please view the LICENSE
7+
* file that was distributed with this source code.
8+
*/
9+
10+
declare(strict_types=1);
11+
12+
namespace Temporal\Exception;
13+
14+
/**
15+
* Exception interceptor provides the ability to let workflow know if exception should be treated as fatal (stops execution)
16+
* or must only fail the task execution (with consecutive retry).
17+
*/
18+
class ExceptionInterceptor implements ExceptionInterceptorInterface
19+
{
20+
/**
21+
* @var array
22+
*/
23+
private array $retryableErrors;
24+
25+
/**
26+
* @param array $retryableErrors
27+
*/
28+
public function __construct(array $retryableErrors)
29+
{
30+
$this->retryableErrors = $retryableErrors;
31+
}
32+
33+
/**
34+
* @param \Throwable $e
35+
* @return bool
36+
*/
37+
public function isRetryable(\Throwable $e): bool
38+
{
39+
foreach ($this->retryableErrors as $retryableError) {
40+
if ($e instanceof $retryableError) {
41+
return true;
42+
}
43+
}
44+
45+
return false;
46+
}
47+
48+
/**
49+
* @return static
50+
*/
51+
public static function createDefault(): self
52+
{
53+
return new self([\Error::class]);
54+
}
55+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
<?php
2+
3+
/**
4+
* This file is part of Temporal package.
5+
*
6+
* For the full copyright and license information, please view the LICENSE
7+
* file that was distributed with this source code.
8+
*/
9+
10+
declare(strict_types=1);
11+
12+
namespace Temporal\Exception;
13+
14+
/**
15+
* Exception interceptor provides the ability to let workflow know if exception should be treated as fatal (stops execution)
16+
* or must only fail the task execution (with consecutive retry).
17+
*/
18+
interface ExceptionInterceptorInterface
19+
{
20+
/**
21+
* @param \Throwable $e
22+
* @return bool
23+
*/
24+
public function isRetryable(\Throwable $e): bool;
25+
}

src/Internal/ServiceContainer.php

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
use JetBrains\PhpStorm\Immutable;
1515
use Spiral\Attributes\ReaderInterface;
1616
use Temporal\DataConverter\DataConverterInterface;
17+
use Temporal\Exception\ExceptionInterceptor;
18+
use Temporal\Exception\ExceptionInterceptorInterface;
1719
use Temporal\Internal\Declaration\Prototype\ActivityPrototype;
1820
use Temporal\Internal\Declaration\Prototype\ActivityCollection;
1921
use Temporal\Internal\Declaration\Prototype\WorkflowCollection;
@@ -105,6 +107,11 @@ final class ServiceContainer
105107
#[Immutable]
106108
public ActivityReader $activitiesReader;
107109

110+
/**
111+
* @var ExceptionInterceptorInterface
112+
*/
113+
public ExceptionInterceptorInterface $exceptionInterceptor;
114+
108115
/**
109116
* @param LoopInterface $loop
110117
* @param EnvironmentInterface $env
@@ -113,6 +120,7 @@ final class ServiceContainer
113120
* @param QueueInterface $queue
114121
* @param MarshallerInterface $marshaller
115122
* @param DataConverterInterface $dataConverter
123+
* @param ExceptionInterceptorInterface $exceptionInterceptor
116124
*/
117125
public function __construct(
118126
LoopInterface $loop,
@@ -121,7 +129,8 @@ public function __construct(
121129
ReaderInterface $reader,
122130
QueueInterface $queue,
123131
MarshallerInterface $marshaller,
124-
DataConverterInterface $dataConverter
132+
DataConverterInterface $dataConverter,
133+
ExceptionInterceptorInterface $exceptionInterceptor
125134
) {
126135
$this->env = $env;
127136
$this->loop = $loop;
@@ -137,22 +146,28 @@ public function __construct(
137146

138147
$this->workflowsReader = new WorkflowReader($this->reader);
139148
$this->activitiesReader = new ActivityReader($this->reader);
149+
$this->exceptionInterceptor = $exceptionInterceptor;
140150
}
141151

142152
/**
143-
* @param WorkerFactory $worker
153+
* @param WorkerFactory $worker
154+
* @param ExceptionInterceptorInterface $exceptionInterceptor
144155
* @return static
145156
*/
146-
public static function fromWorker(WorkerFactory $worker): self
157+
public static function fromWorkerFactory(
158+
WorkerFactory $worker,
159+
ExceptionInterceptorInterface $exceptionInterceptor
160+
): self
147161
{
148162
return new self(
149163
$worker,
150-
$worker->getEnviroment(),
164+
$worker->getEnvironment(),
151165
$worker->getClient(),
152166
$worker->getReader(),
153167
$worker->getQueue(),
154168
$worker->getMarshaller(),
155-
$worker->getDataConverter()
169+
$worker->getDataConverter(),
170+
$exceptionInterceptor
156171
);
157172
}
158173
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
<?php
2+
3+
/**
4+
* This file is part of Temporal package.
5+
*
6+
* For the full copyright and license information, please view the LICENSE
7+
* file that was distributed with this source code.
8+
*/
9+
10+
declare(strict_types=1);
11+
12+
namespace Temporal\Internal\Transport\Request;
13+
14+
use Temporal\DataConverter\ValuesInterface;
15+
use Temporal\Worker\Transport\Command\Request;
16+
17+
final class Panic extends Request
18+
{
19+
public const NAME = 'Panic';
20+
21+
/**
22+
* @param \Throwable|null $failure
23+
*/
24+
public function __construct(\Throwable $failure = null)
25+
{
26+
parent::__construct(self::NAME, [], null);
27+
$this->setFailure($failure);
28+
}
29+
}

src/Internal/Workflow/Process/Process.php

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,16 @@ protected function complete($result): void
108108
return;
109109
}
110110

111+
if ($this->services->exceptionInterceptor->isRetryable($result)) {
112+
$this->scopeContext->panic($result);
113+
return;
114+
}
115+
111116
$this->scopeContext->complete([], $result);
112117
return;
113118
}
114119

115-
if ($this->context->isContinuedAsNew()) {
120+
if ($this->scopeContext->isContinuedAsNew()) {
116121
return;
117122
}
118123

src/Internal/Workflow/ScopeContext.php

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ class ScopeContext extends WorkflowContext implements ScopedContextInterface
3131
* Creates scope specific context.
3232
*
3333
* @param WorkflowContext $context
34-
* @param Scope $scope
35-
* @param callable $onRequest
34+
* @param Scope $scope
35+
* @param callable $onRequest
3636
*
3737
* @return WorkflowContextInterface
3838
*/
@@ -78,11 +78,12 @@ public function asyncDetached(callable $handler): CancellationScopeInterface
7878

7979
/**
8080
* @param RequestInterface $request
81+
* @param bool $cancellable
8182
* @return PromiseInterface
8283
*/
83-
public function request(RequestInterface $request): PromiseInterface
84+
public function request(RequestInterface $request, bool $cancellable = true): PromiseInterface
8485
{
85-
if ($this->scope->isCancelled()) {
86+
if ($cancellable && $this->scope->isCancelled()) {
8687
throw new CanceledFailure('Attempt to send request to cancelled scope');
8788
}
8889

src/Internal/Workflow/WorkflowContext.php

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
use Temporal\Internal\Transport\Request\ContinueAsNew;
2828
use Temporal\Internal\Transport\Request\GetVersion;
2929
use Temporal\Internal\Transport\Request\NewTimer;
30+
use Temporal\Internal\Transport\Request\Panic;
3031
use Temporal\Internal\Transport\Request\SideEffect;
3132
use Temporal\Promise;
3233
use Temporal\Worker\Transport\Command\RequestInterface;
@@ -57,11 +58,11 @@ class WorkflowContext implements WorkflowContextInterface
5758

5859
/**
5960
* WorkflowContext constructor.
60-
* @param ServiceContainer $services
61-
* @param ClientInterface $client
61+
* @param ServiceContainer $services
62+
* @param ClientInterface $client
6263
* @param WorkflowInstanceInterface $workflowInstance
63-
* @param Input $input
64-
* @param ValuesInterface|null $lastCompletionResult
64+
* @param Input $input
65+
* @param ValuesInterface|null $lastCompletionResult
6566
*/
6667
public function __construct(
6768
ServiceContainer $services,
@@ -221,9 +222,15 @@ public function complete(array $result = null, \Throwable $failure = null): Prom
221222
$values = EncodedValues::empty();
222223
}
223224

224-
return $this->request(
225-
new CompleteWorkflow($values, $failure)
226-
);
225+
return $this->request(new CompleteWorkflow($values, $failure), false);
226+
}
227+
228+
/**
229+
* {@inheritDoc}
230+
*/
231+
public function panic(\Throwable $failure = null): PromiseInterface
232+
{
233+
return $this->request(new Panic($failure), false);
227234
}
228235

229236
/**
@@ -243,7 +250,7 @@ public function continueAsNew(
243250
);
244251

245252
// must not be captured
246-
return $this->request($request);
253+
return $this->request($request, false);
247254
}
248255

249256
/**
@@ -276,8 +283,7 @@ public function executeChildWorkflow(
276283
$returnType = null
277284
): PromiseInterface {
278285
return $this->newUntypedChildWorkflowStub($type, $options)
279-
->execute($args, $returnType)
280-
;
286+
->execute($args, $returnType);
281287
}
282288

283289
/**
@@ -377,7 +383,7 @@ public function timer($interval): PromiseInterface
377383
/**
378384
* {@inheritDoc}
379385
*/
380-
public function request(RequestInterface $request): PromiseInterface
386+
public function request(RequestInterface $request, bool $cancellable = true): PromiseInterface
381387
{
382388
$this->recordTrace();
383389
return $this->client->request($request);
@@ -426,8 +432,7 @@ public function awaitWithTimeout($interval, ...$conditions): PromiseInterface
426432
$conditions[] = $timer;
427433

428434
return $this->await(...$conditions)
429-
->then(static fn(): bool => !$timer->isComplete())
430-
;
435+
->then(static fn(): bool => !$timer->isComplete());
431436
}
432437

433438
/**

src/Worker/Transport/RoadRunner.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public function send(string $frame, array $headers = []): void
8585
$json = $this->encodeHeaders($headers);
8686

8787
try {
88-
$this->worker->send($frame, $json);
88+
$this->worker->respond(new Payload($frame, $json));
8989
} catch (\Throwable $e) {
9090
throw new TransportException($e->getMessage(), $e->getCode(), $e);
9191
}

src/WorkerFactory.php

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
use Spiral\Attributes\ReaderInterface;
2222
use Temporal\DataConverter\DataConverter;
2323
use Temporal\DataConverter\DataConverterInterface;
24+
use Temporal\Exception\ExceptionInterceptor;
25+
use Temporal\Exception\ExceptionInterceptorInterface;
2426
use Temporal\Internal\Marshaller\Mapper\AttributeMapperFactory;
2527
use Temporal\Internal\Marshaller\Marshaller;
2628
use Temporal\Internal\Marshaller\MarshallerInterface;
@@ -182,12 +184,17 @@ public static function create(
182184
*/
183185
public function newWorker(
184186
string $taskQueue = self::DEFAULT_TASK_QUEUE,
185-
WorkerOptions $options = null
187+
WorkerOptions $options = null,
188+
ExceptionInterceptorInterface $exceptionInterceptor = null
186189
): WorkerInterface {
190+
187191
$worker = new Worker(
188192
$taskQueue,
189193
$options ?? WorkerOptions::new(),
190-
ServiceContainer::fromWorker($this),
194+
ServiceContainer::fromWorkerFactory(
195+
$this,
196+
$exceptionInterceptor ?? ExceptionInterceptor::createDefault()
197+
),
191198
$this->rpc
192199
);
193200
$this->queues->add($worker);
@@ -238,7 +245,7 @@ public function getMarshaller(): MarshallerInterface
238245
/**
239246
* @return EnvironmentInterface
240247
*/
241-
public function getEnviroment(): EnvironmentInterface
248+
public function getEnvironment(): EnvironmentInterface
242249
{
243250
return $this->env;
244251
}
@@ -295,9 +302,9 @@ private function createReader(): ReaderInterface
295302
{
296303
if (\interface_exists(Reader::class)) {
297304
return new SelectiveReader([
298-
new AnnotationReader(),
299-
new AttributeReader(),
300-
]);
305+
new AnnotationReader(),
306+
new AttributeReader(),
307+
]);
301308
}
302309

303310
return new AttributeReader();

src/Workflow/WorkflowContextInterface.php

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,10 @@ public function registerSignal(string $queryType, callable $handler): self;
7373
* @internal This is an internal method
7474
*
7575
* @param RequestInterface $request
76+
* @param bool $cancellable
7677
* @return PromiseInterface
7778
*/
78-
public function request(RequestInterface $request): PromiseInterface;
79+
public function request(RequestInterface $request, bool $cancellable = true): PromiseInterface;
7980

8081
/**
8182
* @see Workflow::getVersion()
@@ -107,6 +108,14 @@ public function sideEffect(callable $context): PromiseInterface;
107108
*/
108109
public function complete(array $result = null, \Throwable $failure = null): PromiseInterface;
109110

111+
/**
112+
* @internal This is an internal method
113+
*
114+
* @param \Throwable|null $failure
115+
* @return PromiseInterface
116+
*/
117+
public function panic(\Throwable $failure = null): PromiseInterface;
118+
110119
/**
111120
* @see Workflow::timer()
112121
*

0 commit comments

Comments
 (0)