Skip to content

Commit c3cb532

Browse files
sdelicataAlptisAssurances
authored andcommitted
Add a new time limit receiver
1 parent 1bee13f commit c3cb532

File tree

3 files changed

+113
-1
lines changed

3 files changed

+113
-1
lines changed

Command/ConsumeMessagesCommand.php

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@
1919
use Symfony\Component\Console\Input\InputOption;
2020
use Symfony\Component\Console\Output\OutputInterface;
2121
use Symfony\Component\Messenger\MessageBusInterface;
22-
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMessageCountIsExceededReceiver;
2322
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMemoryUsageIsExceededReceiver;
23+
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMessageCountIsExceededReceiver;
24+
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenTimeLimitIsReachedReceiver;
2425
use Symfony\Component\Messenger\Transport\ReceiverInterface;
2526
use Symfony\Component\Messenger\Worker;
2627

@@ -56,6 +57,7 @@ protected function configure(): void
5657
new InputArgument('receiver', InputArgument::REQUIRED, 'Name of the receiver'),
5758
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
5859
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
60+
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
5961
))
6062
->setDescription('Consumes messages')
6163
->setHelp(<<<'EOF'
@@ -70,6 +72,10 @@ protected function configure(): void
7072
Use the --memory-limit option to stop the worker if it exceeds a given memory usage limit. You can use shorthand byte values [K, M or G]:
7173
7274
<info>php %command.full_name% <receiver-name> --memory-limit=128M</info>
75+
76+
Use the --time-limit option to stop the worker when the given time limit (in seconds) is reached:
77+
78+
<info>php %command.full_name% <receiver-name> --time-limit=3600</info>
7379
EOF
7480
)
7581
;
@@ -96,6 +102,10 @@ protected function execute(InputInterface $input, OutputInterface $output): void
96102
$receiver = new StopWhenMemoryUsageIsExceededReceiver($receiver, $this->convertToBytes($memoryLimit), $this->logger);
97103
}
98104

105+
if ($timeLimit = $input->getOption('time-limit')) {
106+
$receiver = new StopWhenTimeLimitIsReachedReceiver($receiver, $timeLimit, $this->logger);
107+
}
108+
99109
$worker = new Worker($receiver, $this->bus);
100110
$worker->run();
101111
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Transport\Enhancers;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Psr\Log\LoggerInterface;
16+
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
17+
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
18+
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenTimeLimitIsReachedReceiver;
19+
20+
class StopWhenTimeLimitIsReachedReceiverTest extends TestCase
21+
{
22+
/**
23+
* @group time-sensitive
24+
*/
25+
public function testReceiverStopsWhenTimeLimitIsReached()
26+
{
27+
$callable = function ($handler) {
28+
$handler(new DummyMessage('API'));
29+
};
30+
31+
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
32+
->setConstructorArgs(array($callable))
33+
->enableProxyingToOriginalMethods()
34+
->getMock();
35+
36+
$decoratedReceiver->expects($this->once())->method('receive');
37+
$decoratedReceiver->expects($this->once())->method('stop');
38+
39+
$logger = $this->createMock(LoggerInterface::class);
40+
$logger->expects($this->once())->method('info')
41+
->with('Receiver stopped due to time limit of {timeLimit}s reached', array('timeLimit' => 1));
42+
43+
$timeoutReceiver = new StopWhenTimeLimitIsReachedReceiver($decoratedReceiver, 1, $logger);
44+
$timeoutReceiver->receive(function () {
45+
sleep(2);
46+
});
47+
}
48+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Transport\Enhancers;
13+
14+
use Psr\Log\LoggerInterface;
15+
use Symfony\Component\Messenger\Transport\ReceiverInterface;
16+
17+
/**
18+
* @author Simon Delicata <[email protected]>
19+
*/
20+
class StopWhenTimeLimitIsReachedReceiver implements ReceiverInterface
21+
{
22+
private $decoratedReceiver;
23+
private $timeLimitInSeconds;
24+
private $logger;
25+
26+
public function __construct(ReceiverInterface $decoratedReceiver, int $timeLimitInSeconds, LoggerInterface $logger = null)
27+
{
28+
$this->decoratedReceiver = $decoratedReceiver;
29+
$this->timeLimitInSeconds = $timeLimitInSeconds;
30+
$this->logger = $logger;
31+
}
32+
33+
public function receive(callable $handler): void
34+
{
35+
$startTime = time();
36+
$endTime = $startTime + $this->timeLimitInSeconds;
37+
38+
$this->decoratedReceiver->receive(function ($message) use ($handler, $endTime) {
39+
$handler($message);
40+
41+
if ($endTime < time()) {
42+
$this->stop();
43+
if (null !== $this->logger) {
44+
$this->logger->info('Receiver stopped due to time limit of {timeLimit}s reached', array('timeLimit' => $this->timeLimitInSeconds));
45+
}
46+
}
47+
});
48+
}
49+
50+
public function stop(): void
51+
{
52+
$this->decoratedReceiver->stop();
53+
}
54+
}

0 commit comments

Comments
 (0)