Skip to content

Commit 9d5fb5c

Browse files
committed
Merge pull request #590 : Fix memory leak on upsert SA/MEMO requests
(cherry picked from commit 137c718)
1 parent a11699a commit 9d5fb5c

File tree

8 files changed

+29
-31
lines changed

8 files changed

+29
-31
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,4 @@ Thumbs.db
2020
rr
2121
temporal-test-server
2222
.ai
23+
.context

psalm-baseline.xml

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -921,19 +921,6 @@
921921
</PossiblyInvalidArgument>
922922
</file>
923923
<file src="src/Internal/Transport/Client.php">
924-
<InternalClass>
925-
<code><![CDATA[self::ERROR_REQUEST_ID_DUPLICATION]]></code>
926-
<code><![CDATA[self::ERROR_REQUEST_NOT_FOUND]]></code>
927-
</InternalClass>
928-
<InternalMethod>
929-
<code><![CDATA[fetch]]></code>
930-
<code><![CDATA[get]]></code>
931-
<code><![CDATA[reject]]></code>
932-
<code><![CDATA[request]]></code>
933-
</InternalMethod>
934-
<InternalProperty>
935-
<code><![CDATA[$this->requests]]></code>
936-
</InternalProperty>
937924
<PossiblyInvalidArgument>
938925
<code><![CDATA[$command->getID()]]></code>
939926
<code><![CDATA[$command->getID()]]></code>

src/Internal/Transport/Client.php

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626
/**
2727
* @internal Client is an internal library class, please do not use it in your code.
28-
* @psalm-internal Temporal\Client\Internal\Transport
28+
* @psalm-internal Temporal\Internal\Transport
2929
*/
3030
final class Client implements ClientInterface
3131
{
@@ -49,7 +49,7 @@ public function dispatch(ServerResponseInterface $response): void
4949
{
5050
$id = $response->getID();
5151
if (!isset($this->requests[$id])) {
52-
$this->request(new UndefinedResponse(
52+
$this->send(new UndefinedResponse(
5353
\sprintf('Got the response to undefined request %s', $id),
5454
));
5555
return;
@@ -84,9 +84,9 @@ public function request(RequestInterface $request, ?WorkflowContextInterface $co
8484

8585
$id = $request->getID();
8686

87-
if (isset($this->requests[$id])) {
88-
throw new \OutOfBoundsException(\sprintf(self::ERROR_REQUEST_ID_DUPLICATION, $id));
89-
}
87+
\array_key_exists($id, $this->requests) and throw new \OutOfBoundsException(
88+
\sprintf(self::ERROR_REQUEST_ID_DUPLICATION, $id),
89+
);
9090

9191
$deferred = new Deferred();
9292
$this->requests[$id] = [$deferred, $context];

src/Internal/Workflow/ScopeContext.php

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
use Temporal\Exception\Failure\CanceledFailure;
1717
use Temporal\Internal\Transport\CompletableResult;
1818
use Temporal\Internal\Workflow\Process\Scope;
19-
use Temporal\Promise;
2019
use Temporal\Worker\Transport\Command\RequestInterface;
2120
use Temporal\Workflow\CancellationScopeInterface;
2221
use Temporal\Workflow\ScopedContextInterface;
@@ -75,11 +74,11 @@ public function request(
7574
'Attempt to send request to cancelled scope',
7675
);
7776

78-
$promise = $this->parent->request($request);
7977
if (!$waitResponse) {
80-
return Promise::resolve();
78+
return $this->parent->request($request, $cancellable, false);
8179
}
8280

81+
$promise = $this->parent->request($request);
8382
($this->onRequest)($request, $promise);
8483

8584
return new CompletableResult(

src/Internal/Workflow/WorkflowContext.php

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -426,13 +426,21 @@ public function timer($interval): PromiseInterface
426426
)(new TimerInput($dateInterval));
427427
}
428428

429-
public function request(RequestInterface $request, bool $cancellable = true): PromiseInterface
430-
{
429+
public function request(
430+
RequestInterface $request,
431+
bool $cancellable = true,
432+
bool $waitResponse = true,
433+
): PromiseInterface {
431434
$this->recordTrace();
432435

433436
// Intercept workflow outbound calls
434437
return $this->requestInterceptor->with(
435-
function (RequestInterface $request): PromiseInterface {
438+
function (RequestInterface $request) use ($waitResponse): PromiseInterface {
439+
if (!$waitResponse) {
440+
$this->client->send($request);
441+
return Promise::resolve();
442+
}
443+
436444
return $this->client->request($request, $this);
437445
},
438446
/** @see WorkflowOutboundRequestInterceptor::handleOutboundRequest() */
@@ -458,7 +466,7 @@ function (UpsertMemoInput $input): PromiseInterface {
458466
return resolve();
459467
}
460468

461-
$result = $this->request(new UpsertMemo($input->memo), false);
469+
$result = $this->request(new UpsertMemo($input->memo), false, false);
462470

463471
/** @psalm-suppress UnsupportedPropertyReferenceUsage $memo */
464472
$memo = &$this->input->info->memo;
@@ -487,7 +495,7 @@ function (UpsertSearchAttributesInput $input): PromiseInterface {
487495
return resolve();
488496
}
489497

490-
$result = $this->request(new UpsertSearchAttributes($input->searchAttributes), false);
498+
$result = $this->request(new UpsertSearchAttributes($input->searchAttributes), false, false);
491499

492500
/** @psalm-suppress UnsupportedPropertyReferenceUsage $sa */
493501
$sa = &$this->input->info->searchAttributes;
@@ -515,7 +523,7 @@ function (UpsertTypedSearchAttributesInput $input): PromiseInterface {
515523
return resolve();
516524
}
517525

518-
$result = $this->request(new UpsertTypedSearchAttributes($input->updates), false);
526+
$result = $this->request(new UpsertTypedSearchAttributes($input->updates), false, false);
519527

520528
// Merge changes
521529
$tsa = $this->input->info->typedSearchAttributes;

src/Workflow/WorkflowContextInterface.php

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,11 @@ public function registerUpdate(string $name, callable $handler, ?callable $valid
8585
*
8686
* @internal This is an internal method
8787
*/
88-
public function request(RequestInterface $request, bool $cancellable = true): PromiseInterface;
88+
public function request(
89+
RequestInterface $request,
90+
bool $cancellable = true,
91+
bool $waitResponse = true,
92+
): PromiseInterface;
8993

9094
/**
9195
* Updates the behavior of an existing workflow to resolve inconsistency errors during replay.

tests/Acceptance/App/Attribute/Client.php

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,5 @@ public function __construct(
1616
public float|null $timeout = null,
1717
public \Closure|array|string|null $pipelineProvider = null,
1818
public array $payloadConverters = [],
19-
) {
20-
}
19+
) {}
2120
}

tests/Unit/Framework/ClientMock.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public function __construct(QueueInterface $queue)
4545
public function dispatch(ServerResponseInterface $response): void
4646
{
4747
if (!isset($this->requests[$response->getID()])) {
48-
$this->request(new UndefinedResponse(
48+
$this->send(new UndefinedResponse(
4949
\sprintf('Got the response to undefined request %s', $response->getID()),
5050
));
5151
return;

0 commit comments

Comments
 (0)