Skip to content

Commit eb956d6

Browse files
authored
Use special kind of failure on desync (#639)
2 parents 4a73bfa + 65b3130 commit eb956d6

File tree

11 files changed

+383
-62
lines changed

11 files changed

+383
-62
lines changed

psalm-baseline.xml

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
<?xml version="1.0" encoding="UTF-8"?>
2-
<files psalm-version="6.12.1@e71404b0465be25cf7f8a631b298c01c5ddd864f">
2+
<files psalm-version="6.13.0@70cdf647255a1362b426bb0f522a85817b8c791c">
33
<file src="src/Activity.php">
44
<ImplicitToStringCast>
55
<code><![CDATA[$type]]></code>
@@ -346,9 +346,6 @@
346346
</MissingReturnType>
347347
</file>
348348
<file src="src/Exception/Client/ActivityCompletionException.php">
349-
<PossiblyInvalidArgument>
350-
<code><![CDATA[$code]]></code>
351-
</PossiblyInvalidArgument>
352349
<PossiblyNullReference>
353350
<code><![CDATA[getID]]></code>
354351
</PossiblyNullReference>
@@ -559,10 +556,10 @@
559556
</file>
560557
<file src="src/Internal/Declaration/Dispatcher/Dispatcher.php">
561558
<InvalidReturnType>
562-
<code><![CDATA[FunctionExecutor]]></code>
559+
<code><![CDATA[\Closure(object, array): mixed]]></code>
563560
</InvalidReturnType>
564561
<MismatchingDocblockReturnType>
565-
<code><![CDATA[FunctionExecutor]]></code>
562+
<code><![CDATA[\Closure(object, array): mixed]]></code>
566563
</MismatchingDocblockReturnType>
567564
<PropertyNotSetInConstructor>
568565
<code><![CDATA[$executor]]></code>
@@ -1066,11 +1063,6 @@
10661063
<code><![CDATA[PromiseInterface]]></code>
10671064
</TooManyTemplateParams>
10681065
</file>
1069-
<file src="src/Internal/Transport/Router/CancelWorkflow.php">
1070-
<DocblockTypeContradiction>
1071-
<code><![CDATA[$process === null]]></code>
1072-
</DocblockTypeContradiction>
1073-
</file>
10741066
<file src="src/Internal/Transport/Router/DestroyWorkflow.php">
10751067
<UndefinedInterfaceMethod>
10761068
<code><![CDATA[pull]]></code>
@@ -1084,6 +1076,7 @@
10841076
<file src="src/Internal/Transport/Router/InvokeQuery.php">
10851077
<ArgumentTypeCoercion>
10861078
<code><![CDATA[$request->getID()]]></code>
1079+
<code><![CDATA[$running]]></code>
10871080
</ArgumentTypeCoercion>
10881081
<UnusedVariable>
10891082
<code><![CDATA[$headers]]></code>
@@ -1115,17 +1108,10 @@
11151108
<code><![CDATA[Input]]></code>
11161109
</UnnecessaryVarAnnotation>
11171110
</file>
1118-
<file src="src/Internal/Transport/Router/WorkflowProcessAwareRoute.php">
1119-
<LessSpecificReturnStatement>
1120-
<code><![CDATA[$this->running->find($runId) ?? throw new \LogicException(
1121-
\sprintf(self::ERROR_PROCESS_NOT_FOUND, $runId),
1122-
)]]></code>
1123-
</LessSpecificReturnStatement>
1124-
<MoreSpecificReturnType>
1125-
<code><![CDATA[Process]]></code>
1126-
</MoreSpecificReturnType>
1127-
</file>
11281111
<file src="src/Internal/Transport/Server.php">
1112+
<ArgumentTypeCoercion>
1113+
<code><![CDATA[$e->getMessage()]]></code>
1114+
</ArgumentTypeCoercion>
11291115
<InvalidArgument>
11301116
<code><![CDATA[$request]]></code>
11311117
<code><![CDATA[$request]]></code>

src/Exception/Client/ActivityCompletionException.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ class ActivityCompletionException extends TemporalException
2323

2424
final public function __construct(string $message = "", string|int $code = 0, ?\Throwable $previous = null)
2525
{
26-
parent::__construct($message, $code, $previous);
26+
parent::__construct($message, (int) $code, $previous);
2727
}
2828

2929
/**

src/Internal/Declaration/Dispatcher/Dispatcher.php

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,6 @@
1313

1414
use JetBrains\PhpStorm\Pure;
1515

16-
/**
17-
* @psalm-type FunctionExecutor = \Closure(object, array): mixed
18-
*/
1916
class Dispatcher implements DispatcherInterface
2017
{
2118
/**
@@ -30,7 +27,6 @@ class Dispatcher implements DispatcherInterface
3027

3128
/**
3229
* @var \Closure(object, array): mixed
33-
* @psalm-var FunctionExecutor
3430
*/
3531
private \Closure $executor;
3632

@@ -88,9 +84,7 @@ private function scopeMatches(int $scope): bool
8884
}
8985

9086
/**
91-
*
9287
* @return \Closure(object, array): mixed
93-
* @psalm-return FunctionExecutor
9488
*/
9589
private function createExecutorFromMethod(\ReflectionMethod $fun): \Closure
9690
{
@@ -104,9 +98,7 @@ private function createExecutorFromMethod(\ReflectionMethod $fun): \Closure
10498
}
10599

106100
/**
107-
*
108101
* @return \Closure(object, array): mixed
109-
* @psalm-return FunctionExecutor
110102
*/
111103
private function createExecutorFromFunction(\ReflectionFunction $fun): \Closure
112104
{
@@ -131,7 +123,7 @@ private function createExecutorFromFunction(\ReflectionFunction $fun): \Closure
131123
}
132124

133125
/**
134-
* @psalm-return FunctionExecutor
126+
* @return \Closure(object, array): mixed
135127
*/
136128
private function boot(\ReflectionFunctionAbstract $fun): void
137129
{
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
/**
4+
* This file is part of Temporal package.
5+
*
6+
* For the full copyright and license information, please view the LICENSE
7+
* file that was distributed with this source code.
8+
*/
9+
10+
declare(strict_types=1);
11+
12+
namespace Temporal\Internal\Exception;
13+
14+
use Temporal\Internal\Transport\Request\UndefinedResponse;
15+
16+
/**
17+
* The exception is converted into {@see UndefinedResponse} and sent to the client.
18+
* This kind of failure raises panic in the Temporal Worker on the RoadRunner side.
19+
*
20+
* @internal
21+
*/
22+
final class UndefinedRequestException extends \LogicException {}

src/Internal/Transport/Router/CancelWorkflow.php

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,11 @@
1313

1414
use React\Promise\Deferred;
1515
use Temporal\DataConverter\EncodedValues;
16-
use Temporal\Internal\Workflow\Process\Process;
16+
use Temporal\Internal\Exception\UndefinedRequestException;
1717
use Temporal\Worker\Transport\Command\ServerRequestInterface;
1818

1919
class CancelWorkflow extends WorkflowProcessAwareRoute
2020
{
21-
/**
22-
* @var string
23-
*/
2421
private const ERROR_PROCESS_NOT_DEFINED = 'Unable to cancel workflow because workflow process #%s was not found';
2522

2623
public function handle(ServerRequestInterface $request, array $headers, Deferred $resolver): void
@@ -30,17 +27,15 @@ public function handle(ServerRequestInterface $request, array $headers, Deferred
3027
$resolver->resolve(EncodedValues::fromValues([null]));
3128
}
3229

33-
public function cancel(string $runId): array
30+
/**
31+
* @throws UndefinedRequestException
32+
*/
33+
private function cancel(string $runId): void
3434
{
35-
/** @var Process $process */
36-
$process = $this->running->find($runId);
37-
38-
if ($process === null) {
39-
throw new \InvalidArgumentException(\sprintf(self::ERROR_PROCESS_NOT_DEFINED, $runId));
40-
}
35+
$process = $this->running->find($runId) ?? throw new UndefinedRequestException(
36+
\sprintf(self::ERROR_PROCESS_NOT_DEFINED, $runId),
37+
);
4138

4239
$process->cancel();
43-
44-
return [];
4540
}
4641
}

src/Internal/Transport/Router/WorkflowProcessAwareRoute.php

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
namespace Temporal\Internal\Transport\Router;
1313

14-
use Temporal\Internal\Declaration\WorkflowInstanceInterface;
14+
use Temporal\Internal\Exception\UndefinedRequestException;
1515
use Temporal\Internal\Repository\RepositoryInterface;
1616
use Temporal\Internal\Workflow\Process\Process;
1717

@@ -20,23 +20,18 @@ abstract class WorkflowProcessAwareRoute extends Route
2020
private const ERROR_PROCESS_NOT_FOUND = 'Workflow with the specified run identifier "%s" not found';
2121

2222
public function __construct(
23+
/**
24+
* @var RepositoryInterface<Process>
25+
*/
2326
protected RepositoryInterface $running,
2427
) {}
2528

26-
/**
27-
* @param non-empty-string $runId
28-
*/
29-
protected function findInstanceOrFail(string $runId): WorkflowInstanceInterface
30-
{
31-
return $this->findProcessOrFail($runId)->getWorkflowInstance();
32-
}
33-
3429
/**
3530
* @param non-empty-string $runId
3631
*/
3732
protected function findProcessOrFail(string $runId): Process
3833
{
39-
return $this->running->find($runId) ?? throw new \LogicException(
34+
return $this->running->find($runId) ?? throw new UndefinedRequestException(
4035
\sprintf(self::ERROR_PROCESS_NOT_FOUND, $runId),
4136
);
4237
}

src/Internal/Transport/Server.php

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@
1212
namespace Temporal\Internal\Transport;
1313

1414
use React\Promise\PromiseInterface;
15+
use Temporal\Internal\Exception\UndefinedRequestException;
1516
use Temporal\Internal\Queue\QueueInterface;
17+
use Temporal\Internal\Transport\Request\UndefinedResponse;
1618
use Temporal\Worker\Transport\Command\Client\FailedClientResponse;
1719
use Temporal\Worker\Transport\Command\Client\SuccessClientResponse;
1820
use Temporal\Worker\Transport\Command\FailureResponseInterface;
@@ -84,12 +86,19 @@ private function onFulfilled(ServerRequestInterface $request): \Closure
8486
}
8587

8688
/**
87-
* @return \Closure(\Throwable): FailureResponseInterface
89+
* @return \Closure(\Throwable): (FailureResponseInterface|never)
8890
*/
8991
private function onRejected(ServerRequestInterface $request): \Closure
9092
{
91-
return function (\Throwable $result) use ($request) {
92-
$response = new FailedClientResponse($request->getID(), $result);
93+
return function (\Throwable $e) use ($request) {
94+
if ($e::class === UndefinedRequestException::class) {
95+
// This is not a FailureResponseInterface, but it's a better place to handle it.
96+
$response = new UndefinedResponse($e->getMessage());
97+
$this->queue->push($response);
98+
throw $e;
99+
}
100+
101+
$response = new FailedClientResponse($request->getID(), $e);
93102
$this->queue->push($response);
94103

95104
return $response;

src/Internal/Workflow/Process/Scope.php

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@ public function cancel(?\Throwable $reason = null): void
215215
if ($this->cancelled) {
216216
return;
217217
}
218+
218219
$this->cancelled = true;
219220

220221
foreach ($this->onCancel as $i => $handler) {
@@ -357,19 +358,19 @@ protected function callSignalOrUpdateHandler(callable $handler, ValuesInterface
357358
protected function onRequest(RequestInterface $request, PromiseInterface $promise): void
358359
{
359360
$this->onCancel[++$this->cancelID] = function (?\Throwable $reason = null) use ($request): void {
361+
$client = $this->context->getClient();
360362
if ($reason instanceof DestructMemorizedInstanceException) {
361363
// memory flush
362-
$this->context->getClient()->reject($request, $reason);
364+
$client->reject($request, $reason);
363365
return;
364366
}
365367

366-
if ($this->context->getClient()->isQueued($request)) {
367-
$this->context->getClient()->cancel($request);
368+
if ($client->isQueued($request)) {
369+
$client->cancel($request);
368370
return;
369371
}
370-
// todo ->context or ->scopeContext?
371372

372-
$this->context->getClient()->request(new Cancel($request->getID()), $this->scopeContext);
373+
$client->request(new Cancel($request->getID()), $this->scopeContext);
373374
};
374375

375376
$cancelID = $this->cancelID;
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Temporal\Tests\Acceptance\Extra\Stability\DynamicSignalWithPromises;
6+
7+
use PHPUnit\Framework\Attributes\Test;
8+
use React\Promise\Deferred;
9+
use React\Promise\PromiseInterface;
10+
use Temporal\Client\WorkflowStubInterface;
11+
use Temporal\Tests\Acceptance\App\Attribute\Stub;
12+
use Temporal\Tests\Acceptance\App\TestCase;
13+
use Temporal\Workflow;
14+
use Temporal\Workflow\WorkflowMethod;
15+
16+
class DynamicSignalWithPromisesTest extends TestCase
17+
{
18+
#[Test]
19+
public function steps(
20+
#[Stub('Extra_Stability_DynamicSignalWithPromises')] WorkflowStubInterface $stub,
21+
): void {
22+
# Send signals to the workflow to trigger steps
23+
$stub->signal('begin', 'foo');
24+
$stub->signal('next1', 'bar');
25+
26+
# Assert that the workflow has processed the signals and updated the value
27+
$this->assertSame(2, $stub->query('value')->getValue(0, 'int'));
28+
29+
# Send another signal to continue the workflow
30+
$stub->signal('next2', 'baz');
31+
32+
# Assert that the workflow has processed the final signal and returned the expected value
33+
$this->assertSame(3, $stub->query('value')->getValue(0, 'int'));
34+
35+
# Assert that the workflow has completed and returned the final result
36+
$this->assertSame(3, $stub->getResult());
37+
}
38+
}
39+
40+
#[Workflow\WorkflowInterface]
41+
class TestWorkflow
42+
{
43+
#[WorkflowMethod(name: 'Extra_Stability_DynamicSignalWithPromises')]
44+
public function handler()
45+
{
46+
$value = 0;
47+
Workflow::registerQuery('value', static function () use (&$value) {
48+
return $value;
49+
});
50+
51+
yield $this->promiseSignal('begin');
52+
$value++;
53+
54+
yield $this->promiseSignal('next1');
55+
$value++;
56+
57+
yield $this->promiseSignal('next2');
58+
$value++;
59+
60+
return $value;
61+
}
62+
63+
private function promiseSignal(string $name): PromiseInterface
64+
{
65+
$signal = new Deferred();
66+
Workflow::registerSignal($name, static function (mixed $value) use ($signal): void {
67+
$signal->resolve($value);
68+
});
69+
70+
return $signal->promise();
71+
}
72+
}

0 commit comments

Comments
 (0)