Skip to content

Commit 34ae898

Browse files
authored
Merge pull request #581: Add methods to register dynamic handlers
2 parents bd24c8d + b991335 commit 34ae898

File tree

13 files changed

+548
-73
lines changed

13 files changed

+548
-73
lines changed

composer.json

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
"dereuromark/composer-prefer-lowest": "^0.1.10",
5858
"doctrine/annotations": "^1.14.4 || ^2.0.2",
5959
"internal/dload": "^1.0",
60-
"jetbrains/phpstorm-attributes": "dev-master@dev",
60+
"jetbrains/phpstorm-attributes": "dev-master",
6161
"laminas/laminas-code": "^4.16",
6262
"phpunit/phpunit": "^10.5.41",
6363
"spiral/code-style": "~2.1.2",
@@ -77,8 +77,9 @@
7777
}
7878
},
7979
"suggest": {
80-
"doctrine/annotations": "For Doctrine metadata driver support",
81-
"ext-grpc": "For Client calls"
80+
"ext-grpc": "For Client calls",
81+
"ext-protobuf": "For better performance",
82+
"buggregator/trap": "For better debugging"
8283
},
8384
"scripts": {
8485
"post-update-cmd": "Temporal\\Worker\\Transport\\RoadRunnerVersionChecker::postUpdate",

psalm-baseline.xml

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
<?xml version="1.0" encoding="UTF-8"?>
2-
<files psalm-version="6.9.1@81c8a77c0793d450fee40265cfe68891df11d505">
2+
<files psalm-version="6.10.0@9c0add4eb88d4b169ac04acb7c679918cbb9c252">
33
<file src="src/Activity.php">
44
<ImplicitToStringCast>
55
<code><![CDATA[$type]]></code>
@@ -606,23 +606,16 @@
606606
</RedundantConditionGivenDocblockType>
607607
</file>
608608
<file src="src/Internal/Declaration/WorkflowInstance.php">
609-
<MissingClosureReturnType>
610-
<code><![CDATA[function (QueryInput $input) use ($fn) {]]></code>
611-
</MissingClosureReturnType>
609+
<LessSpecificImplementedReturnType>
610+
<code><![CDATA[UpdateHandler|null]]></code>
611+
</LessSpecificImplementedReturnType>
612612
<PropertyNotSetInConstructor>
613613
<code><![CDATA[$queryExecutor]]></code>
614614
<code><![CDATA[$updateExecutor]]></code>
615615
<code><![CDATA[$updateValidator]]></code>
616616
</PropertyNotSetInConstructor>
617-
<PropertyTypeCoercion>
618-
<code><![CDATA[$this->queryHandlers]]></code>
619-
<code><![CDATA[$this->signalHandlers]]></code>
620-
</PropertyTypeCoercion>
621617
</file>
622618
<file src="src/Internal/Declaration/WorkflowInstance/SignalQueue.php">
623-
<ArgumentTypeCoercion>
624-
<code><![CDATA[$signal]]></code>
625-
</ArgumentTypeCoercion>
626619
<MissingConstructor>
627620
<code><![CDATA[$onSignal]]></code>
628621
</MissingConstructor>
@@ -1187,9 +1180,6 @@
11871180
</UnevaluatedCode>
11881181
</file>
11891182
<file src="src/Internal/Workflow/Process/Process.php">
1190-
<MissingClosureParamType>
1191-
<code><![CDATA[$result]]></code>
1192-
</MissingClosureParamType>
11931183
<PropertyNotSetInConstructor>
11941184
<code><![CDATA[Process]]></code>
11951185
</PropertyNotSetInConstructor>
@@ -1521,7 +1511,7 @@
15211511
<InvalidReturnStatement>
15221512
<code><![CDATA[self::getCurrentContext()->newActivityStub($class, $options)]]></code>
15231513
<code><![CDATA[self::getCurrentContext()->registerQuery($queryType, $handler)]]></code>
1524-
<code><![CDATA[self::getCurrentContext()->registerSignal($queryType, $handler)]]></code>
1514+
<code><![CDATA[self::getCurrentContext()->registerSignal($name, $handler)]]></code>
15251515
</InvalidReturnStatement>
15261516
<InvalidReturnType>
15271517
<code><![CDATA[ScopedContextInterface]]></code>

src/Interceptor/WorkflowInbound/QueryInput.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ class QueryInput
2424
* @internal Don't use the constructor. Use {@see self::with()} instead.
2525
*/
2626
public function __construct(
27+
/** @var non-empty-string */
2728
public readonly string $queryName,
2829
public readonly ValuesInterface $arguments,
2930
public readonly WorkflowInfo $info,

src/Internal/Declaration/WorkflowInstance.php

Lines changed: 61 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -24,35 +24,36 @@
2424

2525
/**
2626
* @psalm-type QueryHandler = \Closure(QueryInput): mixed
27-
* @psalm-type UpdateHandler = \Closure(UpdateInput, Deferred): PromiseInterface
27+
* @psalm-type UpdateHandler = \Closure(UpdateInput, Deferred): mixed
2828
* @psalm-type ValidateUpdateHandler = \Closure(UpdateInput): void
2929
* @psalm-type QueryExecutor = \Closure(QueryInput, callable(ValuesInterface): mixed): mixed
3030
* @psalm-type UpdateExecutor = \Closure(UpdateInput, callable(ValuesInterface): mixed, Deferred): PromiseInterface
31-
* @psalm-type ValidateUpdateExecutor = \Closure(UpdateInput, callable(ValuesInterface): mixed): mixed
31+
* @psalm-type ValidateUpdateExecutor = \Closure(UpdateInput, callable(ValuesInterface): mixed): void
3232
* @psalm-type UpdateValidator = \Closure(UpdateInput, UpdateHandler): void
3333
*
3434
* @internal
3535
*/
3636
final class WorkflowInstance extends Instance implements WorkflowInstanceInterface
3737
{
38-
/**
39-
* @var array<non-empty-string, QueryHandler>
40-
*/
38+
/** @var array<non-empty-string, QueryHandler> */
4139
private array $queryHandlers = [];
4240

43-
/**
44-
* @var array<non-empty-string, MethodHandler>
45-
*/
41+
/** @var null|QueryHandler */
42+
private ?\Closure $queryDynamicHandler = null;
43+
44+
/** @var null|UpdateHandler */
45+
private ?\Closure $updateDynamicHandler = null;
46+
47+
/** @var null|ValidateUpdateHandler */
48+
private ?\Closure $updateDynamicValidator = null;
49+
50+
/** @var array<non-empty-string, MethodHandler> */
4651
private array $signalHandlers = [];
4752

48-
/**
49-
* @var array<non-empty-string, UpdateHandler>
50-
*/
53+
/** @var array<non-empty-string, UpdateHandler> */
5154
private array $updateHandlers = [];
5255

53-
/**
54-
* @var array<non-empty-string, null|ValidateUpdateHandler>
55-
*/
56+
/** @var array<non-empty-string, null|ValidateUpdateHandler> */
5657
private array $validateUpdateHandlers = [];
5758

5859
private SignalQueue $signalQueue;
@@ -174,7 +175,7 @@ public function getSignalQueue(): SignalQueue
174175
*/
175176
public function findQueryHandler(string $name): ?\Closure
176177
{
177-
return $this->queryHandlers[$name] ?? null;
178+
return $this->queryHandlers[$name] ?? $this->queryDynamicHandler;
178179
}
179180

180181
/**
@@ -185,7 +186,7 @@ public function findQueryHandler(string $name): ?\Closure
185186
*/
186187
public function findUpdateHandler(string $name): ?\Closure
187188
{
188-
return $this->updateHandlers[$name] ?? null;
189+
return $this->updateHandlers[$name] ?? $this->updateDynamicHandler;
189190
}
190191

191192
/**
@@ -196,7 +197,11 @@ public function findUpdateHandler(string $name): ?\Closure
196197
*/
197198
public function findValidateUpdateHandler(string $name): ?\Closure
198199
{
199-
return $this->validateUpdateHandlers[$name] ?? null;
200+
return $this->validateUpdateHandlers[$name] ?? (
201+
\array_key_exists($name, $this->updateHandlers)
202+
? null
203+
: $this->updateDynamicValidator
204+
);
200205
}
201206

202207
/**
@@ -207,16 +212,13 @@ public function addQueryHandler(string $name, callable $handler): void
207212
$fn = $this->createCallableHandler($handler);
208213

209214
$this->queryHandlers[$name] = $this->pipeline->with(
210-
function (QueryInput $input) use ($fn) {
211-
return ($this->queryExecutor)($input, $fn);
212-
},
215+
fn(QueryInput $input): mixed => ($this->queryExecutor)($input, $fn),
213216
/** @see WorkflowInboundCallsInterceptor::handleQuery() */
214217
'handleQuery',
215218
)(...);
216219
}
217220

218221
/**
219-
* @param non-empty-string $name
220222
* @throws \ReflectionException
221223
*/
222224
public function addUpdateHandler(string $name, callable $handler): void
@@ -233,7 +235,6 @@ function (UpdateInput $input, Deferred $deferred) use ($fn) {
233235
}
234236

235237
/**
236-
* @param non-empty-string $name
237238
* @throws \ReflectionException
238239
*/
239240
public function addValidateUpdateHandler(string $name, callable $handler): void
@@ -263,15 +264,49 @@ public function getSignalHandler(string $name): \Closure
263264
return fn(ValuesInterface $values) => $this->signalQueue->push($name, $values);
264265
}
265266

266-
/**
267-
* @throws \ReflectionException
268-
*/
269267
public function addSignalHandler(string $name, callable $handler): void
270268
{
271269
$this->signalHandlers[$name] = $this->createCallableHandler($handler);
272270
$this->signalQueue->attach($name, $this->signalHandlers[$name]);
273271
}
274272

273+
public function setDynamicSignalHandler(callable $handler): void
274+
{
275+
$this->signalQueue->setFallback($handler(...));
276+
}
277+
278+
public function setDynamicQueryHandler(callable $handler): void
279+
{
280+
$this->queryDynamicHandler = $this->pipeline->with(
281+
fn(QueryInput $input): mixed => ($this->queryExecutor)(
282+
$input,
283+
static fn(ValuesInterface $arguments): mixed => $handler($input->queryName, $arguments),
284+
),
285+
/** @see WorkflowInboundCallsInterceptor::handleQuery() */
286+
'handleQuery',
287+
)(...);
288+
}
289+
290+
public function setDynamicUpdateHandler(callable $handler, ?callable $validator = null): void
291+
{
292+
$this->updateDynamicValidator = $validator === null
293+
? null
294+
: fn(UpdateInput $input): mixed => ($this->updateValidator)(
295+
$input,
296+
static fn(ValuesInterface $arguments): mixed => $validator($input->updateName, $arguments),
297+
);
298+
299+
$this->updateDynamicHandler = $this->pipeline->with(
300+
fn(UpdateInput $input, Deferred $deferred): mixed => ($this->updateExecutor)(
301+
$input,
302+
static fn(ValuesInterface $arguments): mixed => $handler($input->updateName, $arguments),
303+
$deferred,
304+
),
305+
/** @see WorkflowInboundCallsInterceptor::handleUpdate() */
306+
'handleUpdate',
307+
)(...);
308+
}
309+
275310
public function clearSignalQueue(): void
276311
{
277312
$this->signalQueue->clear();
@@ -290,6 +325,7 @@ public function destroy(): void
290325
$this->updateValidator,
291326
$this->prototype,
292327
$this->pipeline,
328+
$this->queryDynamicHandler,
293329
);
294330
parent::destroy();
295331
}

src/Internal/Declaration/WorkflowInstance/SignalQueue.php

Lines changed: 60 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,54 +16,90 @@
1616
/**
1717
* @psalm-type Consumer = callable(ValuesInterface): mixed
1818
*
19-
* @psalm-type OnSignalCallable = callable(non-empty-string $name, callable $handler, ValuesInterface $arguments): void
19+
* @psalm-type OnSignalCallable = \Closure(non-empty-string $name, Consumer $handler, ValuesInterface $arguments): void
2020
*/
2121
final class SignalQueue
2222
{
2323
/**
24-
* @var array<string, list<ValuesInterface>>
24+
* @var array<int, SignalQueueItem>
2525
*/
2626
private array $queue = [];
2727

2828
/**
29-
* @var array<Consumer>
29+
* @var array<non-empty-string, Consumer>
3030
*/
3131
private array $consumers = [];
3232

3333
/**
3434
* @var OnSignalCallable
3535
*/
36-
private $onSignal;
36+
private \Closure $onSignal;
37+
38+
/**
39+
* A fallback consumer to handle signals when no consumer is attached.
40+
*
41+
* @var null|\Closure(non-empty-string, ValuesInterface): mixed
42+
*/
43+
private ?\Closure $dynamicConsumer = null;
3744

3845
/**
3946
* @param non-empty-string $signal
4047
*/
4148
public function push(string $signal, ValuesInterface $values): void
4249
{
4350
if (isset($this->consumers[$signal])) {
44-
($this->onSignal)($signal, $this->consumers[$signal], $values);
51+
$this->consume($signal, $values, $this->consumers[$signal]);
4552
return;
4653
}
4754

48-
$this->queue[$signal][] = $values;
49-
$this->flush($signal);
55+
if ($this->dynamicConsumer !== null) {
56+
$this->consumeFallback($signal, $values);
57+
return;
58+
}
59+
60+
$this->queue[] = new SignalQueueItem($signal, $values);
5061
}
5162

5263
/**
5364
* @param OnSignalCallable $handler
5465
*/
55-
public function onSignal(callable $handler): void
66+
public function onSignal(\Closure $handler): void
5667
{
5768
$this->onSignal = $handler;
5869
}
5970

6071
/**
72+
* @param non-empty-string $signal
6173
* @param Consumer $consumer
6274
*/
6375
public function attach(string $signal, callable $consumer): void
6476
{
6577
$this->consumers[$signal] = $consumer; // overwrite
66-
$this->flush($signal);
78+
79+
foreach ($this->queue as $k => $item) {
80+
if ($item->name === $signal) {
81+
unset($this->queue[$k]);
82+
$this->consume($signal, $item->values, $consumer);
83+
}
84+
}
85+
}
86+
87+
/**
88+
* @param \Closure(non-empty-string, ValuesInterface): mixed $consumer
89+
*/
90+
public function setFallback(\Closure $consumer): void
91+
{
92+
$this->dynamicConsumer = $consumer;
93+
94+
// Flush all signals that have no consumer
95+
foreach ($this->queue as $k => $item) {
96+
if (\array_key_exists($item->name, $this->consumers)) {
97+
continue;
98+
}
99+
100+
unset($this->queue[$k]);
101+
$this->consumeFallback($item->name, $item->values);
102+
}
67103
}
68104

69105
public function clear(): void
@@ -72,18 +108,24 @@ public function clear(): void
72108
}
73109

74110
/**
75-
* @psalm-suppress UnusedVariable
111+
* @param non-empty-string $signal
112+
* @param Consumer $consumer
76113
*/
77-
private function flush(string $signal): void
114+
private function consume(string $signal, ValuesInterface $values, callable $consumer): void
78115
{
79-
if (!isset($this->queue[$signal], $this->consumers[$signal])) {
80-
return;
81-
}
116+
($this->onSignal)($signal, $consumer, $values);
117+
}
82118

83-
while ($this->queue[$signal] !== []) {
84-
$args = \array_shift($this->queue[$signal]);
119+
/**
120+
* @param non-empty-string $signal
121+
*/
122+
private function consumeFallback(string $signal, ValuesInterface $values): void
123+
{
124+
$handler = $this->dynamicConsumer;
125+
\assert($handler !== null);
85126

86-
($this->onSignal)($signal, $this->consumers[$signal], $args);
87-
}
127+
// Wrap the fallback consumer to call interceptors
128+
$consumer = static fn(ValuesInterface $values): mixed => $handler($signal, $values);
129+
($this->onSignal)($signal, $consumer, $values);
88130
}
89131
}

0 commit comments

Comments
 (0)