Skip to content

Commit b028483

Browse files
committed
Make internal transport Client forkable and destroyable
1 parent f8e7adc commit b028483

File tree

9 files changed

+166
-40
lines changed

9 files changed

+166
-40
lines changed

src/Internal/Transport/Client.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,21 @@ public function reject(CommandInterface $command, \Throwable $reason): void
116116
$this->fetch($command->getID())->reject($reason);
117117
}
118118

119+
public function fork(): ClientInterface
120+
{
121+
return new DetachedClient($this, function (array $ids): void {
122+
foreach ($ids as $id) {
123+
unset($this->requests[$id]);
124+
}
125+
});
126+
}
127+
128+
public function destroy(): void
129+
{
130+
$this->requests = [];
131+
unset($this->queue);
132+
}
133+
119134
private function fetch(int $id): Deferred
120135
{
121136
$request = $this->get($id);

src/Internal/Transport/ClientInterface.php

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@
1212
namespace Temporal\Internal\Transport;
1313

1414
use React\Promise\PromiseInterface;
15+
use Temporal\Internal\Declaration\Destroyable;
1516
use Temporal\Worker\Transport\Command\CommandInterface;
1617
use Temporal\Worker\Transport\Command\RequestInterface;
18+
use Temporal\Worker\Transport\Command\ServerResponseInterface;
1719
use Temporal\Workflow\WorkflowContextInterface;
1820

19-
interface ClientInterface
21+
interface ClientInterface extends Destroyable
2022
{
2123
/**
2224
* Send a request and return a promise.
@@ -39,4 +41,14 @@ public function cancel(CommandInterface $command): void;
3941
* Reject pending promise.
4042
*/
4143
public function reject(CommandInterface $command, \Throwable $reason): void;
44+
45+
/**
46+
* Dispatch a response to the request.
47+
*/
48+
public function dispatch(ServerResponseInterface $response): void;
49+
50+
/**
51+
* Create a new client that can work with parent's request queue.
52+
*/
53+
public function fork(): ClientInterface;
4254
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
<?php
2+
3+
/**
4+
* This file is part of Temporal package.
5+
*
6+
* For the full copyright and license information, please view the LICENSE
7+
* file that was distributed with this source code.
8+
*/
9+
10+
declare(strict_types=1);
11+
12+
namespace Temporal\Internal\Transport;
13+
14+
use React\Promise\PromiseInterface;
15+
use Temporal\Internal\Declaration\Destroyable;
16+
use Temporal\Worker\Transport\Command\CommandInterface;
17+
use Temporal\Worker\Transport\Command\RequestInterface;
18+
use Temporal\Worker\Transport\Command\ServerResponseInterface;
19+
use Temporal\Workflow\WorkflowContextInterface;
20+
21+
/**
22+
* @internal Client is an internal library class, please do not use it in your code.
23+
* @psalm-internal Temporal\Internal\Transport
24+
*/
25+
final class DetachedClient implements ClientInterface, Destroyable
26+
{
27+
/** @var list<int> */
28+
private array $requests = [];
29+
30+
/**
31+
* @param \Closure(list<int>): void $cleanup Handler that removes requests from the parent using their IDs.
32+
*/
33+
public function __construct(
34+
private ClientInterface $parent,
35+
private \Closure $cleanup,
36+
) {}
37+
38+
#[\Override]
39+
public function request(RequestInterface $request, ?WorkflowContextInterface $context = null): PromiseInterface
40+
{
41+
$this->requests[] = $request->getID();
42+
return $this->parent->request($request, $context);
43+
}
44+
45+
#[\Override]
46+
public function send(CommandInterface $request): void
47+
{
48+
$this->parent->send($request);
49+
}
50+
51+
#[\Override]
52+
public function isQueued(CommandInterface $command): bool
53+
{
54+
return $this->parent->isQueued($command);
55+
}
56+
57+
#[\Override]
58+
public function cancel(CommandInterface $command): void
59+
{
60+
$this->parent->cancel($command);
61+
}
62+
63+
#[\Override]
64+
public function reject(CommandInterface $command, \Throwable $reason): void
65+
{
66+
$this->parent->reject($command, $reason);
67+
}
68+
69+
#[\Override]
70+
public function dispatch(ServerResponseInterface $response): void
71+
{
72+
$this->parent->dispatch($response);
73+
}
74+
75+
#[\Override]
76+
public function fork(): ClientInterface
77+
{
78+
return $this->parent->fork();
79+
}
80+
81+
public function destroy(): void
82+
{
83+
$this->requests === [] or ($this->cleanup)($this->requests);
84+
$this->requests = [];
85+
unset($this->parent, $this->cleanup);
86+
}
87+
}

src/Internal/Transport/Router/StartWorkflow.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public function handle(ServerRequestInterface $request, array $headers, Deferred
8585

8686
$context = new WorkflowContext(
8787
$this->services,
88-
$this->services->client,
88+
$this->services->client->fork(),
8989
$instance,
9090
$input,
9191
$lastCompletionResult,

src/Internal/Workflow/WorkflowContext.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -666,7 +666,8 @@ public function destroy(): void
666666
{
667667
$this->awaits = [];
668668
$this->workflowInstance->destroy();
669-
unset($this->workflowInstance);
669+
$this->client->destroy();
670+
unset($this->workflowInstance, $this->client);
670671
}
671672

672673
protected function awaitRequest(callable|Mutex|PromiseInterface ...$conditions): PromiseInterface

tests/Feature/Testing/CapturedClient.php

Lines changed: 37 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -52,33 +52,6 @@ public function send(CommandInterface $request): void
5252
$this->parent->send($request);
5353
}
5454

55-
/**
56-
* @param RequestInterface $request
57-
* @return \Closure
58-
*/
59-
private function onFulfilled(RequestInterface $request): \Closure
60-
{
61-
return function ($response) use ($request) {
62-
unset($this->requests[$request->getID()]);
63-
64-
return $response;
65-
};
66-
}
67-
68-
/**
69-
* @param RequestInterface $request
70-
* @return \Closure
71-
* @psalm-suppress UnusedClosureParam
72-
*/
73-
private function onRejected(RequestInterface $request): \Closure
74-
{
75-
return function (\Throwable $error) use ($request) {
76-
unset($this->requests[$request->getID()]);
77-
78-
throw $error;
79-
};
80-
}
81-
8255
/**
8356
* {@inheritDoc}
8457
*/
@@ -121,4 +94,41 @@ public function reject(CommandInterface $command, \Throwable $reason): void
12194
{
12295
$this->parent->reject($command, $reason);
12396
}
97+
98+
public function dispatch(CommandInterface $response): void
99+
{
100+
$this->parent->dispatch($response);
101+
}
102+
103+
public function fork(): ClientInterface
104+
{
105+
return $this->parent->fork();
106+
}
107+
108+
/**
109+
* @param RequestInterface $request
110+
* @return \Closure
111+
*/
112+
private function onFulfilled(RequestInterface $request): \Closure
113+
{
114+
return function ($response) use ($request) {
115+
unset($this->requests[$request->getID()]);
116+
117+
return $response;
118+
};
119+
}
120+
121+
/**
122+
* @param RequestInterface $request
123+
* @return \Closure
124+
* @psalm-suppress UnusedClosureParam
125+
*/
126+
private function onRejected(RequestInterface $request): \Closure
127+
{
128+
return function (\Throwable $error) use ($request): void {
129+
unset($this->requests[$request->getID()]);
130+
131+
throw $error;
132+
};
133+
}
124134
}

tests/Functional/WorkflowTestCase.php

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -363,12 +363,12 @@ public function testAwaitWithOneTimer_Leaks(): void
363363
$this->assertSame(0, $after - $before);
364364
}
365365

366-
public function testDetachedScope_Leaks(): void
366+
public function testDetachedScopeRequests_Leaks(): void
367367
{
368368
$worker = WorkerMock::createMock();
369369

370370
// Run the workflow $i times
371-
for ($id = 9000, $i = 0; $i < 100; ++$i) {
371+
for ($id = 9000, $i = 0; $i < 20; ++$i) {
372372
$uuid1 = Uuid::v4();
373373
$uuid2 = Uuid::v4();
374374
$id1 = ++$id;
@@ -383,19 +383,13 @@ public function testDetachedScope_Leaks(): void
383383
LOG;
384384

385385
$worker->run($this, Splitter::createFromString($log)->getQueue());
386-
if ($i === 3) {
387-
\gc_collect_cycles();
388-
$before = \memory_get_usage();
389-
}
390386
}
391-
$after = \memory_get_usage();
392387

388+
// Check there are no hanging requests
393389
$factory = self::getPrivate($worker, 'factory');
394390
$client = self::getPrivate($factory, 'client');
395391
$requests = self::getPrivate($client, 'requests');
396392
self::assertCount(0, $requests);
397-
398-
$this->assertSame(0, $after - $before);
399393
}
400394

401395
/**

tests/Unit/Framework/ClientMock.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,13 @@ public function reject(CommandInterface $command, \Throwable $reason): void
104104
$request->reject($reason);
105105
}
106106

107+
public function fork(): self
108+
{
109+
return new self($this->queue);
110+
}
111+
112+
public function destroy(): void {}
113+
107114
private function fetch(int $id): Deferred
108115
{
109116
$request = $this->get($id);

tests/Unit/Router/StartWorkflowTestCase.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ protected function setUp(): void
132132
$this->router = new StartWorkflow($this->services);
133133
$this->workflowContext = new WorkflowContext(
134134
$this->services,
135-
$this->services->client,
135+
$this->services->client->fork(),
136136
$this->createMockForIntersectionOfInterfaces([WorkflowInstanceInterface::class, Destroyable::class]),
137137
new Input(),
138138
EncodedValues::empty(),

0 commit comments

Comments
 (0)