Skip to content

Commit 137c718

Browse files
authored
Merge pull request #590 : Fix memory leak on upsert SA/MEMO requests
2 parents 34ae898 + 17a9fee commit 137c718

File tree

8 files changed

+29
-31
lines changed

8 files changed

+29
-31
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,4 @@ Thumbs.db
2020
rr
2121
temporal-test-server
2222
.ai
23+
.context

psalm-baseline.xml

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -966,19 +966,6 @@
966966
</PossiblyInvalidArgument>
967967
</file>
968968
<file src="src/Internal/Transport/Client.php">
969-
<InternalClass>
970-
<code><![CDATA[self::ERROR_REQUEST_ID_DUPLICATION]]></code>
971-
<code><![CDATA[self::ERROR_REQUEST_NOT_FOUND]]></code>
972-
</InternalClass>
973-
<InternalMethod>
974-
<code><![CDATA[fetch]]></code>
975-
<code><![CDATA[get]]></code>
976-
<code><![CDATA[reject]]></code>
977-
<code><![CDATA[request]]></code>
978-
</InternalMethod>
979-
<InternalProperty>
980-
<code><![CDATA[$this->requests]]></code>
981-
</InternalProperty>
982969
<PossiblyInvalidArgument>
983970
<code><![CDATA[$command->getID()]]></code>
984971
<code><![CDATA[$command->getID()]]></code>

src/Internal/Transport/Client.php

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626
/**
2727
* @internal Client is an internal library class, please do not use it in your code.
28-
* @psalm-internal Temporal\Client\Internal\Transport
28+
* @psalm-internal Temporal\Internal\Transport
2929
*/
3030
final class Client implements ClientInterface
3131
{
@@ -49,7 +49,7 @@ public function dispatch(ServerResponseInterface $response): void
4949
{
5050
$id = $response->getID();
5151
if (!isset($this->requests[$id])) {
52-
$this->request(new UndefinedResponse(
52+
$this->send(new UndefinedResponse(
5353
\sprintf('Got the response to undefined request %s', $id),
5454
));
5555
return;
@@ -84,9 +84,9 @@ public function request(RequestInterface $request, ?WorkflowContextInterface $co
8484

8585
$id = $request->getID();
8686

87-
if (isset($this->requests[$id])) {
88-
throw new \OutOfBoundsException(\sprintf(self::ERROR_REQUEST_ID_DUPLICATION, $id));
89-
}
87+
\array_key_exists($id, $this->requests) and throw new \OutOfBoundsException(
88+
\sprintf(self::ERROR_REQUEST_ID_DUPLICATION, $id),
89+
);
9090

9191
$deferred = new Deferred();
9292
$this->requests[$id] = [$deferred, $context];

src/Internal/Workflow/ScopeContext.php

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
use Temporal\Exception\Failure\CanceledFailure;
1717
use Temporal\Internal\Transport\CompletableResult;
1818
use Temporal\Internal\Workflow\Process\Scope;
19-
use Temporal\Promise;
2019
use Temporal\Worker\Transport\Command\RequestInterface;
2120
use Temporal\Workflow\CancellationScopeInterface;
2221
use Temporal\Workflow\ScopedContextInterface;
@@ -75,11 +74,11 @@ public function request(
7574
'Attempt to send request to cancelled scope',
7675
);
7776

78-
$promise = $this->parent->request($request);
7977
if (!$waitResponse) {
80-
return Promise::resolve();
78+
return $this->parent->request($request, $cancellable, false);
8179
}
8280

81+
$promise = $this->parent->request($request);
8382
($this->onRequest)($request, $promise);
8483

8584
return new CompletableResult(

src/Internal/Workflow/WorkflowContext.php

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -444,13 +444,21 @@ public function timer($interval): PromiseInterface
444444
)(new TimerInput($dateInterval));
445445
}
446446

447-
public function request(RequestInterface $request, bool $cancellable = true): PromiseInterface
448-
{
447+
public function request(
448+
RequestInterface $request,
449+
bool $cancellable = true,
450+
bool $waitResponse = true,
451+
): PromiseInterface {
449452
$this->recordTrace();
450453

451454
// Intercept workflow outbound calls
452455
return $this->requestInterceptor->with(
453-
function (RequestInterface $request): PromiseInterface {
456+
function (RequestInterface $request) use ($waitResponse): PromiseInterface {
457+
if (!$waitResponse) {
458+
$this->client->send($request);
459+
return Promise::resolve();
460+
}
461+
454462
return $this->client->request($request, $this);
455463
},
456464
/** @see WorkflowOutboundRequestInterceptor::handleOutboundRequest() */
@@ -476,7 +484,7 @@ function (UpsertMemoInput $input): PromiseInterface {
476484
return resolve();
477485
}
478486

479-
$result = $this->request(new UpsertMemo($input->memo), false);
487+
$result = $this->request(new UpsertMemo($input->memo), false, false);
480488

481489
/** @psalm-suppress UnsupportedPropertyReferenceUsage $memo */
482490
$memo = &$this->input->info->memo;
@@ -505,7 +513,7 @@ function (UpsertSearchAttributesInput $input): PromiseInterface {
505513
return resolve();
506514
}
507515

508-
$result = $this->request(new UpsertSearchAttributes($input->searchAttributes), false);
516+
$result = $this->request(new UpsertSearchAttributes($input->searchAttributes), false, false);
509517

510518
/** @psalm-suppress UnsupportedPropertyReferenceUsage $sa */
511519
$sa = &$this->input->info->searchAttributes;
@@ -533,7 +541,7 @@ function (UpsertTypedSearchAttributesInput $input): PromiseInterface {
533541
return resolve();
534542
}
535543

536-
$result = $this->request(new UpsertTypedSearchAttributes($input->updates), false);
544+
$result = $this->request(new UpsertTypedSearchAttributes($input->updates), false, false);
537545

538546
// Merge changes
539547
$tsa = $this->input->info->typedSearchAttributes;

src/Workflow/WorkflowContextInterface.php

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,11 @@ public function registerUpdate(string $name, callable $handler, ?callable $valid
128128
*
129129
* @internal This is an internal method
130130
*/
131-
public function request(RequestInterface $request, bool $cancellable = true): PromiseInterface;
131+
public function request(
132+
RequestInterface $request,
133+
bool $cancellable = true,
134+
bool $waitResponse = true,
135+
): PromiseInterface;
132136

133137
/**
134138
* Updates the behavior of an existing workflow to resolve inconsistency errors during replay.

tests/Acceptance/App/Attribute/Client.php

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,5 @@ public function __construct(
1616
public float|null $timeout = null,
1717
public \Closure|array|string|null $pipelineProvider = null,
1818
public array $payloadConverters = [],
19-
) {
20-
}
19+
) {}
2120
}

tests/Unit/Framework/ClientMock.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public function __construct(QueueInterface $queue)
4545
public function dispatch(ServerResponseInterface $response): void
4646
{
4747
if (!isset($this->requests[$response->getID()])) {
48-
$this->request(new UndefinedResponse(
48+
$this->send(new UndefinedResponse(
4949
\sprintf('Got the response to undefined request %s', $response->getID()),
5050
));
5151
return;

0 commit comments

Comments
 (0)