Skip to content

Commit 0ec0c4c

Browse files
authored
Update tickTime and replay on batch only
1 parent 46b42b1 commit 0ec0c4c

File tree

3 files changed

+22
-4
lines changed

3 files changed

+22
-4
lines changed

src/Internal/ServiceContainer.php

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ final class ServiceContainer
107107

108108
/**
109109
* @param LoopInterface $loop
110+
* @param EnvironmentInterface $env
110111
* @param ClientInterface $client
111112
* @param ReaderInterface $reader
112113
* @param QueueInterface $queue
@@ -115,20 +116,21 @@ final class ServiceContainer
115116
*/
116117
public function __construct(
117118
LoopInterface $loop,
119+
EnvironmentInterface $env,
118120
ClientInterface $client,
119121
ReaderInterface $reader,
120122
QueueInterface $queue,
121123
MarshallerInterface $marshaller,
122124
DataConverterInterface $dataConverter
123125
) {
126+
$this->env = $env;
124127
$this->loop = $loop;
125128
$this->client = $client;
126129
$this->reader = $reader;
127130
$this->queue = $queue;
128131
$this->marshaller = $marshaller;
129132
$this->dataConverter = $dataConverter;
130133

131-
$this->env = new Environment();
132134
$this->workflows = new WorkflowCollection();
133135
$this->activities = new ActivityCollection();
134136
$this->running = new ProcessCollection($client);
@@ -145,6 +147,7 @@ public static function fromWorker(WorkerFactory $worker): self
145147
{
146148
return new self(
147149
$worker,
150+
$worker->getEnviroment(),
148151
$worker->getClient(),
149152
$worker->getReader(),
150153
$worker->getQueue(),

src/Worker/Worker.php

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,6 @@ public function getOptions(): WorkerOptions
9090
*/
9191
public function dispatch(RequestInterface $request, array $headers): PromiseInterface
9292
{
93-
$this->services->env->update($headers);
94-
9593
return $this->router->dispatch($request, $headers);
9694
}
9795

src/WorkerFactory.php

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
use Temporal\Internal\Marshaller\Marshaller;
2626
use Temporal\Internal\Marshaller\MarshallerInterface;
2727
use Temporal\Internal\ServiceContainer;
28+
use Temporal\Worker\Environment\Environment;
29+
use Temporal\Worker\Environment\EnvironmentInterface;
2830
use Temporal\Worker\Transport\Codec\CodecInterface;
2931
use Temporal\Internal\Events\EventEmitterTrait;
3032
use Temporal\Internal\Queue\ArrayQueue;
@@ -52,7 +54,7 @@
5254

5355
/**
5456
* WorkerFactory is primary entry point for the temporal application. This class is responsible for the communication
55-
* with parent RoadRunnner process and can be used to create taskQueue workflow and activity workers.
57+
* with parent RoadRunner process and can be used to create taskQueue workflow and activity workers.
5658
*
5759
* <code>
5860
* $factory = WorkerFactory::create();
@@ -150,6 +152,11 @@ final class WorkerFactory implements WorkerFactoryInterface, LoopInterface
150152
*/
151153
private MarshallerInterface $marshaller;
152154

155+
/**
156+
* @var EnvironmentInterface
157+
*/
158+
private EnvironmentInterface $env;
159+
153160
/**
154161
* @param DataConverterInterface $dataConverter
155162
* @param RPCConnectionInterface $rpc
@@ -235,6 +242,14 @@ public function getMarshaller(): MarshallerInterface
235242
return $this->marshaller;
236243
}
237244

245+
/**
246+
* @return EnvironmentInterface
247+
*/
248+
public function getEnviroment(): EnvironmentInterface
249+
{
250+
return $this->env;
251+
}
252+
238253
/**
239254
* {@inheritDoc}
240255
*/
@@ -277,6 +292,7 @@ private function boot(): void
277292
$this->responses = $this->createQueue();
278293
$this->client = $this->createClient();
279294
$this->server = $this->createServer();
295+
$this->env = new Environment();
280296
}
281297

282298
/**
@@ -375,6 +391,7 @@ private function createCodec(): CodecInterface
375391
private function dispatch(string $messages, array $headers): string
376392
{
377393
$commands = $this->codec->decode($messages);
394+
$this->env->update($headers);
378395

379396
foreach ($commands as $command) {
380397
if ($command instanceof RequestInterface) {

0 commit comments

Comments
 (0)