Skip to content

Commit b5f64e3

Browse files
committed
feature #47112 [Messenger] Add a scheduler component (upyx, fabpot)
This PR was merged into the 6.3 branch. Discussion ---------- [Messenger] Add a scheduler component | Q | A | ------------- | --- | Branch? | 6.3 | Bug fix? | no | New feature? | yes | Deprecations? | no | Tickets | no | License | MIT | Doc PR | TBD ### Introdution There is no easy way to schedule periodical tasks. There are few useful tools which I touched: - https://github.com/Guikingone/SchedulerBundle - https://github.com/Cron/Symfony-Bundle - https://github.com/zenstruck/schedule-bundle - https://github.com/lavary/crunz They are complicated. They doesn't allow to set time with precision in seconds (at least it isn't easy). They require difficult tricks to set not linear periods (like on sunset in Tokyo). ~They are~ Part of them inefficient with infrequent tasks because resources are needed on every run (e.g. Kubernetes [CronJob](https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/) creates and destroes an entire container). ### Proposal I use a custom transport of Messenger that **generates** messages periodically. It's a simple solution to run periodical jobs without external software. It's especially helpful in environments where additional runners are impossible or very painful (like Kubernetes). Configuration is very flexible and accurate, it's possible to configure any rules when to run or not to run. Compared to `crond`: there is no need to install something external, precision of schedule in microseconds instead of *minutes*, it's possible to set any periods. #### Simple example ```yaml # messenger.yaml framework: messenger: transports: schedule_default: 'schedule://default' ``` Few types of messages: ```php class SomeJob {} class OtherJob {} ``` A handlers: ```php #[AsMessageHandler] class SomeJobHandler { public function __invoke(SomeJob $job) { // do job or delegate it to other service } } #[AsMessageHandler] class OtherJobHandler { public function __invoke(OtherJob $job) { // do job or delegate it to other service } } ``` A schedules are provided by locators. It's possible to create many locators and/or provide many schedules by the same locator: ```php class ExampleLocator implements ScheduleLocatorInterface { public function get(string $id): ScheduleConfig { // once after an hour from now $deferForAnHour = new \DatePeriod( new \DateTimeImmutable('now'), new \DateInterval('PT1H'), 1, \DatePeriod::EXCLUDE_START_DATE ); return (new ScheduleConfig()) ->add(new OnceTrigger(new \DateTimeImmutable()), new WarmUpJob()) // once on every worker's start ->add(PeriodicalTrigger::create('P1D', '2022-01-01T03:00:00Z'), new SomeJob()) // every night at 3 a.m. (UTC) ->add(PeriodicalTrigger::fromPeriod($deferForAnHour), new OtherJob()) ; } public function has(string $id): bool { return 'default' === $id; } } ``` To run schedule: ```sh bin/console messenger:consume schedule_default ``` It's easy to run jobs manually: ```php #[AsCommand(name: 'some', description: 'Manually runs SomeJob')] class SomeCommand extends Command { public function __construct(private MessageBusInterface $bus) { } protected function execute(InputInterface $input, OutputInterface $output): int { $this->bus->dispatch(new SomeJob()); } } ``` #### Example with returning a result ```php class GoodJob { public function __construct(public readonly ?LoggerInterface $logger) { } } ``` ```php #[AsMessageHandler] class GoodHandler { public function __construct(private readonly LoggerInterface $logger) { } public function __invoke(GoodJob $job){ // compute $result ($job->logger ?? $this->logger)->info('The result is: {result}', ['result' => $result]) } } ``` ```php #[AsCommand(name: 'job', description: 'Manually runs job')] class SomeCommand extends Command { public function __construct(private MessageBusInterface $bus) { } protected function execute(InputInterface $input, OutputInterface $output): int { $this->bus->dispatch(new GoodJob(new ConsoleLogger($output))); // result will be printed in console } } ``` #### Configuring A minimal configuration: ```yaml # messenger.yaml framework: messenger: transports: schedule_default: 'schedule://default' ``` More complex example: ```yaml # messenger.yaml framework: messenger: transports: schedule_default: dsn: 'schedule://default' options: cache: 'app.cache' lock: 'default' ``` Example HA configuration with redis: ```yaml framework: cache: default_redis_provider: '%env(REDIS_DSN)%' lock: redis: '%env(REDIS_DSN)%' messenger: transports: schedule_default: dsn: 'schedule://default' options: cache: 'cache.redis' lock: resource: 'redis' ttl: 60 auto_release: true ``` ### Deprecations None ### Implementation This PR contains an implementation. ### ToDo - [x] Remove obsolete code - [x] Add a configuration to the Framework - [x] Specialize exceptions and improve messages - [x] Cover with tests - [ ] Add acceptance tests for HA - [x] Fix `CHANGELOG`s & `README`s - [ ] Add documentation Commits ------- a18127b789 [Scheduler] Rework the component 6d9311f087 Add the Scheduler component
2 parents 96860f1 + 6907079 commit b5f64e3

23 files changed

+268
-11
lines changed

DependencyInjection/Compiler/UnusedTagsPass.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ class UnusedTagsPass implements CompilerPassInterface
8080
'routing.expression_language_provider',
8181
'routing.loader',
8282
'routing.route_loader',
83+
'scheduler.schedule_provider',
8384
'security.authenticator.login_linker',
8485
'security.expression_language_provider',
8586
'security.remember_me_aware',

DependencyInjection/Configuration.php

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
use Symfony\Component\PropertyInfo\PropertyInfoExtractorInterface;
3838
use Symfony\Component\RateLimiter\Policy\TokenBucketLimiter;
3939
use Symfony\Component\RemoteEvent\RemoteEvent;
40+
use Symfony\Component\Scheduler\Messenger\SchedulerTransportFactory;
4041
use Symfony\Component\Semaphore\Semaphore;
4142
use Symfony\Component\Serializer\Serializer;
4243
use Symfony\Component\Translation\Translator;
@@ -173,6 +174,7 @@ public function getConfigTreeBuilder(): TreeBuilder
173174
$this->addLockSection($rootNode, $enableIfStandalone);
174175
$this->addSemaphoreSection($rootNode, $enableIfStandalone);
175176
$this->addMessengerSection($rootNode, $enableIfStandalone);
177+
$this->addSchedulerSection($rootNode, $enableIfStandalone);
176178
$this->addRobotsIndexSection($rootNode);
177179
$this->addHttpClientSection($rootNode, $enableIfStandalone);
178180
$this->addMailerSection($rootNode, $enableIfStandalone);
@@ -1606,6 +1608,18 @@ function ($a) {
16061608
;
16071609
}
16081610

1611+
private function addSchedulerSection(ArrayNodeDefinition $rootNode, callable $enableIfStandalone): void
1612+
{
1613+
$rootNode
1614+
->children()
1615+
->arrayNode('scheduler')
1616+
->info('Scheduler configuration')
1617+
->{$enableIfStandalone('symfony/scheduler', SchedulerTransportFactory::class)}()
1618+
->end()
1619+
->end()
1620+
;
1621+
}
1622+
16091623
private function addRobotsIndexSection(ArrayNodeDefinition $rootNode): void
16101624
{
16111625
$rootNode

DependencyInjection/FrameworkExtension.php

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,8 @@
216216
use Symfony\Component\RemoteEvent\Attribute\AsRemoteEventConsumer;
217217
use Symfony\Component\RemoteEvent\RemoteEvent;
218218
use Symfony\Component\Routing\Loader\Psr4DirectoryLoader;
219+
use Symfony\Component\Scheduler\Attribute\AsSchedule;
220+
use Symfony\Component\Scheduler\Messenger\SchedulerTransportFactory;
219221
use Symfony\Component\Security\Core\AuthenticationEvents;
220222
use Symfony\Component\Security\Core\Exception\AuthenticationException;
221223
use Symfony\Component\Security\Csrf\CsrfTokenManagerInterface;
@@ -527,9 +529,18 @@ public function load(array $configs, ContainerBuilder $container)
527529
// validation depends on form, annotations being registered
528530
$this->registerValidationConfiguration($config['validation'], $container, $loader, $propertyInfoEnabled);
529531

532+
$messengerEnabled = $this->readConfigEnabled('messenger', $container, $config['messenger']);
533+
534+
if ($this->readConfigEnabled('scheduler', $container, $config['scheduler'])) {
535+
if (!$messengerEnabled) {
536+
throw new LogicException('Scheduler support cannot be enabled as the Messenger component is not '.(interface_exists(MessageBusInterface::class) ? 'enabled.' : 'installed. Try running "composer require symfony/messenger".'));
537+
}
538+
$this->registerSchedulerConfiguration($config['scheduler'], $container, $loader);
539+
}
540+
530541
// messenger depends on validation being registered
531-
if ($this->readConfigEnabled('messenger', $container, $config['messenger'])) {
532-
$this->registerMessengerConfiguration($config['messenger'], $container, $loader, $config['validation']);
542+
if ($messengerEnabled) {
543+
$this->registerMessengerConfiguration($config['messenger'], $container, $loader, $this->readConfigEnabled('validation', $container, $config['validation']));
533544
} else {
534545
$container->removeDefinition('console.command.messenger_consume_messages');
535546
$container->removeDefinition('console.command.messenger_stats');
@@ -707,10 +718,12 @@ public function load(array $configs, ContainerBuilder $container)
707718
}
708719
$definition->addTag('messenger.message_handler', $tagAttributes);
709720
});
710-
711721
$container->registerAttributeForAutoconfiguration(AsTargetedValueResolver::class, static function (ChildDefinition $definition, AsTargetedValueResolver $attribute): void {
712722
$definition->addTag('controller.targeted_value_resolver', $attribute->name ? ['name' => $attribute->name] : []);
713723
});
724+
$container->registerAttributeForAutoconfiguration(AsSchedule::class, static function (ChildDefinition $definition, AsSchedule $attribute): void {
725+
$definition->addTag('scheduler.schedule_provider', ['name' => $attribute->name]);
726+
});
714727

715728
if (!$container->getParameter('kernel.debug')) {
716729
// remove tagged iterator argument for resource checkers
@@ -1996,7 +2009,20 @@ private function registerSemaphoreConfiguration(array $config, ContainerBuilder
19962009
}
19972010
}
19982011

1999-
private function registerMessengerConfiguration(array $config, ContainerBuilder $container, PhpFileLoader $loader, array $validationConfig): void
2012+
private function registerSchedulerConfiguration(array $config, ContainerBuilder $container, PhpFileLoader $loader): void
2013+
{
2014+
if (!class_exists(SchedulerTransportFactory::class)) {
2015+
throw new LogicException('Scheduler support cannot be enabled as the Scheduler component is not installed. Try running "composer require symfony/scheduler".');
2016+
}
2017+
2018+
if (!interface_exists(MessageBusInterface::class)) {
2019+
throw new LogicException('Scheduler support cannot be enabled as the Messenger component is not installed. Try running "composer require symfony/messenger".');
2020+
}
2021+
2022+
$loader->load('scheduler.php');
2023+
}
2024+
2025+
private function registerMessengerConfiguration(array $config, ContainerBuilder $container, PhpFileLoader $loader, bool $validationEnabled): void
20002026
{
20012027
if (!interface_exists(MessageBusInterface::class)) {
20022028
throw new LogicException('Messenger support cannot be enabled as the Messenger component is not installed. Try running "composer require symfony/messenger".');
@@ -2058,7 +2084,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
20582084
}
20592085

20602086
foreach ($middleware as $middlewareItem) {
2061-
if (!$validationConfig['enabled'] && \in_array($middlewareItem['id'], ['validation', 'messenger.middleware.validation'], true)) {
2087+
if (!$validationEnabled && \in_array($middlewareItem['id'], ['validation', 'messenger.middleware.validation'], true)) {
20622088
throw new LogicException('The Validation middleware is only available when the Validator component is installed and enabled. Try running "composer require symfony/validator".');
20632089
}
20642090
}

FrameworkBundle.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
use Symfony\Component\Mime\DependencyInjection\AddMimeTypeGuesserPass;
6161
use Symfony\Component\PropertyInfo\DependencyInjection\PropertyInfoPass;
6262
use Symfony\Component\Routing\DependencyInjection\RoutingResolverPass;
63+
use Symfony\Component\Scheduler\DependencyInjection\AddScheduleMessengerPass;
6364
use Symfony\Component\Serializer\DependencyInjection\SerializerPass;
6465
use Symfony\Component\Translation\DependencyInjection\TranslationDumperPass;
6566
use Symfony\Component\Translation\DependencyInjection\TranslationExtractorPass;
@@ -165,6 +166,7 @@ public function build(ContainerBuilder $container)
165166
$container->addCompilerPass(new TestServiceContainerWeakRefPass(), PassConfig::TYPE_BEFORE_REMOVING, -32);
166167
$container->addCompilerPass(new TestServiceContainerRealRefPass(), PassConfig::TYPE_AFTER_REMOVING);
167168
$this->addCompilerPassIfExists($container, AddMimeTypeGuesserPass::class);
169+
$this->addCompilerPassIfExists($container, AddScheduleMessengerPass::class);
168170
$this->addCompilerPassIfExists($container, MessengerPass::class);
169171
$this->addCompilerPassIfExists($container, HttpClientPass::class);
170172
$this->addCompilerPassIfExists($container, AddAutoMappingConfigurationPass::class);

Resources/config/cache.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,11 @@
7171
->private()
7272
->tag('cache.pool')
7373

74+
->set('cache.scheduler')
75+
->parent('cache.app')
76+
->private()
77+
->tag('cache.pool')
78+
7479
->set('cache.adapter.system', AdapterInterface::class)
7580
->abstract()
7681
->factory([AbstractAdapter::class, 'createSystemCache'])

Resources/config/messenger.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
->tag('serializer.normalizer', ['priority' => -880])
7676

7777
->set('messenger.transport.native_php_serializer', PhpSerializer::class)
78+
->alias('messenger.default_serializer', 'messenger.transport.native_php_serializer')
7879

7980
// Middleware
8081
->set('messenger.middleware.handle_message', HandleMessageMiddleware::class)

Resources/config/scheduler.php

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\DependencyInjection\Loader\Configurator;
13+
14+
use Symfony\Component\Scheduler\Messenger\SchedulerTransportFactory;
15+
16+
return static function (ContainerConfigurator $container) {
17+
$container->services()
18+
->set('scheduler.messenger_transport_factory', SchedulerTransportFactory::class)
19+
->args([
20+
tagged_locator('scheduler.schedule_provider', 'name'),
21+
service('clock'),
22+
])
23+
->tag('messenger.transport_factory')
24+
;
25+
};

Resources/config/schema/symfony-1.0.xsd

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
<xsd:element name="validation" type="validation" minOccurs="0" maxOccurs="1" />
2525
<xsd:element name="annotations" type="annotations" minOccurs="0" maxOccurs="1" />
2626
<xsd:element name="property-access" type="property_access" minOccurs="0" maxOccurs="1" />
27+
<xsd:element name="scheduler" type="scheduler" minOccurs="0" maxOccurs="1" />
2728
<xsd:element name="serializer" type="serializer" minOccurs="0" maxOccurs="1" />
2829
<xsd:element name="property-info" type="property_info" minOccurs="0" maxOccurs="1" />
2930
<xsd:element name="cache" type="cache" minOccurs="0" maxOccurs="1" />
@@ -271,6 +272,10 @@
271272
<xsd:attribute name="throw-exception-on-invalid-property-path" type="xsd:boolean" />
272273
</xsd:complexType>
273274

275+
<xsd:complexType name="scheduler">
276+
<xsd:attribute name="enabled" type="xsd:boolean" />
277+
</xsd:complexType>
278+
274279
<xsd:complexType name="serializer">
275280
<xsd:choice minOccurs="0" maxOccurs="unbounded">
276281
<xsd:element name="mapping" type="file_mapping" />

Tests/DependencyInjection/ConfigurationTest.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
use Symfony\Component\Messenger\MessageBusInterface;
2727
use Symfony\Component\Notifier\Notifier;
2828
use Symfony\Component\RateLimiter\Policy\TokenBucketLimiter;
29+
use Symfony\Component\Scheduler\Messenger\SchedulerTransportFactory;
2930
use Symfony\Component\Uid\Factory\UuidFactory;
3031

3132
class ConfigurationTest extends TestCase
@@ -687,6 +688,9 @@ class_exists(SemaphoreStore::class) && SemaphoreStore::isSupported() ? 'semaphor
687688
'enabled' => !class_exists(FullStack::class) && class_exists(HtmlSanitizer::class),
688689
'sanitizers' => [],
689690
],
691+
'scheduler' => [
692+
'enabled' => !class_exists(FullStack::class) && class_exists(SchedulerTransportFactory::class),
693+
],
690694
'exceptions' => [],
691695
'webhook' => [
692696
'enabled' => false,

Tests/DependencyInjection/Fixtures/php/messenger.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
$container->loadFromExtension('framework', [
77
'http_method_override' => false,
8+
'scheduler' => true,
89
'messenger' => [
910
'routing' => [
1011
FooMessage::class => ['sender.bar', 'sender.biz'],

0 commit comments

Comments
 (0)