Skip to content

Commit 4f17d8e

Browse files
authored
Merge pull request #593: Expose Activity and Workflow instance from context
2 parents e99abe7 + faac77a commit 4f17d8e

File tree

20 files changed

+600
-126
lines changed

20 files changed

+600
-126
lines changed

src/Activity.php

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,4 +141,15 @@ public static function heartbeat($details): void
141141

142142
$context->heartbeat($details);
143143
}
144+
145+
/**
146+
* Get the currently running activity instance.
147+
*/
148+
public static function getInstance(): object
149+
{
150+
/** @var ActivityContextInterface $context */
151+
$context = self::getCurrentContext();
152+
153+
return $context->getInstance();
154+
}
144155
}

src/Activity/ActivityContextInterface.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,11 @@ public function doNotCompleteOnReturn(): void;
6363
* @param mixed $details
6464
*/
6565
public function heartbeat($details): void;
66+
67+
/**
68+
* Get the currently running activity instance.
69+
*
70+
* @see Activity::getInstance()
71+
*/
72+
public function getInstance(): object;
6673
}

src/Internal/Activity/ActivityContext.php

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ final class ActivityContext implements ActivityContextInterface, HeaderCarrier
3636
private ?ValuesInterface $heartbeatDetails;
3737
private ValuesInterface $input;
3838
private HeaderInterface $header;
39+
private ?\WeakReference $instance = null;
3940

4041
public function __construct(
4142
RPCConnectionInterface $rpc,
@@ -147,4 +148,26 @@ public function heartbeat($details): void
147148
throw ActivityCompletionException::fromActivityInfo($this->info, $e);
148149
}
149150
}
151+
152+
public function getInstance(): object
153+
{
154+
\assert($this->instance !== null, 'Activity instance is not available');
155+
$activity = $this->instance->get();
156+
\assert($activity !== null, 'Activity instance is not available');
157+
return $activity;
158+
}
159+
160+
/**
161+
* Set activity instance.
162+
*
163+
* @param object $instance Activity instance.
164+
* @return $this
165+
* @internal
166+
*/
167+
public function withInstance(object $instance): self
168+
{
169+
$clone = clone $this;
170+
$clone->instance = \WeakReference::create($instance);
171+
return $clone;
172+
}
150173
}

src/Internal/Declaration/Prototype/WorkflowPrototype.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use Temporal\Common\CronSchedule;
1515
use Temporal\Common\MethodRetry;
1616
use Temporal\Workflow\ReturnType;
17+
use Temporal\Workflow\WorkflowInit;
1718

1819
final class WorkflowPrototype extends Prototype
1920
{
@@ -40,6 +41,20 @@ final class WorkflowPrototype extends Prototype
4041
private ?CronSchedule $cronSchedule = null;
4142
private ?MethodRetry $methodRetry = null;
4243
private ?ReturnType $returnType = null;
44+
private bool $hasInitializer = false;
45+
46+
/**
47+
* Indicates if the workflow has a constructor with {@see WorkflowInit} attribute.
48+
*/
49+
public function hasInitializer(): bool
50+
{
51+
return $this->hasInitializer;
52+
}
53+
54+
public function setHasInitializer(bool $hasInitializer): void
55+
{
56+
$this->hasInitializer = $hasInitializer;
57+
}
4358

4459
public function getCronSchedule(): ?CronSchedule
4560
{

src/Internal/Declaration/Reader/WorkflowReader.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
use Temporal\Workflow\SignalMethod;
2525
use Temporal\Workflow\UpdateMethod;
2626
use Temporal\Workflow\UpdateValidatorMethod;
27+
use Temporal\Workflow\WorkflowInit;
2728
use Temporal\Workflow\WorkflowInterface;
2829
use Temporal\Workflow\WorkflowMethod;
2930

@@ -125,6 +126,13 @@ private function withMethods(ClassNode $graph, WorkflowPrototype $prototype): Wo
125126
foreach ($class->getMethods() as $method) {
126127
$contextClass = $method->getDeclaringClass();
127128

129+
// Check WorkflowInit method
130+
if ($method->isConstructor()) {
131+
$this->getAttributedMethod($graph, $method, WorkflowInit::class);
132+
$prototype->setHasInitializer(true);
133+
continue;
134+
}
135+
128136
/** @var UpdateMethod|null $update */
129137
$update = $this->getAttributedMethod($graph, $method, UpdateMethod::class);
130138

src/Internal/Declaration/WorkflowInstance.php

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
use Temporal\Internal\Declaration\Prototype\WorkflowPrototype;
2121
use Temporal\Internal\Declaration\WorkflowInstance\SignalQueue;
2222
use Temporal\Internal\Interceptor;
23-
use Temporal\Workflow\WorkflowInit;
2423

2524
/**
2625
* @psalm-type QueryHandler = \Closure(QueryInput): mixed
@@ -149,17 +148,7 @@ public function init(array $arguments = []): void
149148
return;
150149
}
151150

152-
if ($arguments === []) {
153-
$this->context->__construct();
154-
return;
155-
}
156-
157-
// Check InitMethod attribute
158-
$reflection = new \ReflectionMethod($this->context, '__construct');
159-
$attributes = $reflection->getAttributes(WorkflowInit::class);
160-
$attributes === []
161-
? $this->context->__construct()
162-
: $this->context->__construct(...$arguments);
151+
$this->context->__construct(...$arguments);
163152
}
164153

165154
public function getSignalQueue(): SignalQueue
@@ -296,15 +285,12 @@ public function setDynamicUpdateHandler(callable $handler, ?callable $validator
296285
static fn(ValuesInterface $arguments): mixed => $validator($input->updateName, $arguments),
297286
);
298287

299-
$this->updateDynamicHandler = $this->pipeline->with(
288+
$this->updateDynamicHandler =
300289
fn(UpdateInput $input, Deferred $deferred): mixed => ($this->updateExecutor)(
301290
$input,
302291
static fn(ValuesInterface $arguments): mixed => $handler($input->updateName, $arguments),
303292
$deferred,
304-
),
305-
/** @see WorkflowInboundCallsInterceptor::handleUpdate() */
306-
'handleUpdate',
307-
)(...);
293+
);
308294
}
309295

310296
public function clearSignalQueue(): void

src/Internal/Transport/Router/InvokeActivity.php

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,12 @@ public function handle(ServerRequestInterface $request, array $headers, Deferred
7777
$prototype = $this->findDeclarationOrFail($context->getInfo());
7878

7979
try {
80-
$handler = $prototype->getInstance()->getHandler();
80+
// Create ActivityInstance
81+
$instance = $prototype->getInstance();
82+
83+
// Register Activity instance in the context
84+
$context = $context->withInstance($instance->getContext());
85+
$handler = $instance->getHandler();
8186

8287
// Define Context for interceptors Pipeline
8388
Activity::setCurrentContext($context);

src/Internal/Transport/Router/StartWorkflow.php

Lines changed: 4 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
use Temporal\Common\TypedSearchAttributes;
1818
use Temporal\DataConverter\EncodedCollection;
1919
use Temporal\DataConverter\EncodedValues;
20-
use Temporal\Interceptor\WorkflowInbound\WorkflowInput;
21-
use Temporal\Interceptor\WorkflowInboundCallsInterceptor;
2220
use Temporal\Internal\Declaration\Instantiator\WorkflowInstantiator;
2321
use Temporal\Internal\Declaration\Prototype\WorkflowPrototype;
2422
use Temporal\Internal\ServiceContainer;
@@ -92,38 +90,11 @@ public function handle(ServerRequestInterface $request, array $headers, Deferred
9290
);
9391
$runId = $request->getID();
9492

95-
$starter = function (WorkflowInput $input) use (
96-
$resolver,
97-
$instance,
98-
$context,
99-
$runId,
100-
): void {
101-
$context = $context->withInput(new Input($input->info, $input->arguments, $input->header));
102-
$process = new Process($this->services, $context, $runId);
103-
$this->services->running->add($process);
104-
$resolver->resolve(EncodedValues::fromValues([null]));
105-
106-
$process->start($instance->getHandler(), $context->getInput(), $this->wfStartDeferred);
107-
};
108-
109-
// Define Context for interceptors Pipeline
11093
Workflow::setCurrentContext($context);
111-
112-
// Run workflow handler in an interceptor pipeline
113-
$this->services->interceptorProvider
114-
->getPipeline(WorkflowInboundCallsInterceptor::class)
115-
->with(
116-
$starter,
117-
/** @see WorkflowInboundCallsInterceptor::execute() */
118-
'execute',
119-
)(
120-
new WorkflowInput(
121-
$context->getInfo(),
122-
$context->getInput(),
123-
$context->getHeader(),
124-
$context->isReplaying(),
125-
),
126-
);
94+
$process = new Process($this->services, $runId, $instance);
95+
$this->services->running->add($process);
96+
$resolver->resolve(EncodedValues::fromValues([null]));
97+
$process->initAndStart($context, $instance, $this->wfStartDeferred);
12798
}
12899

129100
private function findWorkflowOrFail(WorkflowInfo $info): WorkflowPrototype

0 commit comments

Comments
 (0)