Skip to content

Commit ba2c90e

Browse files
committed
feat: add methods to check duplicate jobs
1 parent ee9a7dc commit ba2c90e

11 files changed

+215
-196
lines changed

Command/AddScheduleJobToQueueCommand.php

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
namespace Markup\JobQueueBundle\Command;
44

5+
use Markup\JobQueueBundle\Entity\Repository\ScheduledJobRepository;
56
use Markup\JobQueueBundle\Entity\ScheduledJob;
67
use Markup\JobQueueBundle\Service\JobManager;
7-
use Markup\JobQueueBundle\Service\ScheduledJobService;
88
use Psr\Log\LoggerInterface;
99
use Symfony\Component\Console\Command\Command;
1010
use Symfony\Component\Console\Input\InputInterface;
@@ -29,18 +29,18 @@ class AddScheduleJobToQueueCommand extends Command
2929
private $logger;
3030

3131
/**
32-
* @var ScheduledJobService
32+
* @var ScheduledJobRepository
3333
*/
34-
private $scheduledJobService;
34+
private $scheduledJobRepository;
3535

3636
public function __construct(
3737
JobManager $jobManager,
38-
ScheduledJobService $scheduledJobService,
38+
ScheduledJobRepository $scheduledJobRepository,
3939
?LoggerInterface $logger = null
4040
) {
4141
$this->jobManager = $jobManager;
4242
$this->logger = $logger;
43-
$this->scheduledJobService = $scheduledJobService;
43+
$this->scheduledJobRepository = $scheduledJobRepository;
4444

4545
parent::__construct(null);
4646
}
@@ -56,7 +56,7 @@ protected function configure()
5656

5757
protected function execute(InputInterface $input, OutputInterface $output)
5858
{
59-
$jobs = $this->scheduledJobService->getUnqueuedJobs();
59+
$jobs = $this->scheduledJobRepository->fetchUnqueuedJobs();
6060

6161
if ($jobs) {
6262
foreach ($jobs as $job) {
@@ -74,7 +74,7 @@ protected function execute(InputInterface $input, OutputInterface $output)
7474
);
7575
$job->setQueued(true);
7676

77-
$this->scheduledJobService->save($job, $flush = true);
77+
$this->scheduledJobRepository->save($job, $flush = true);
7878
} catch (\Exception $e) {
7979
$this->logger->error(
8080
sprintf(
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
<?php
2+
declare(strict_types=1);
3+
4+
namespace Markup\JobQueueBundle\Entity\Repository;
5+
6+
use Doctrine\ORM\EntityManager;
7+
use Doctrine\ORM\EntityManagerInterface;
8+
use Doctrine\ORM\EntityRepository;
9+
use Doctrine\Persistence\ManagerRegistry as Doctrine;
10+
11+
trait DoctrineOrmAwareRepositoryTrait
12+
{
13+
/**
14+
* @var Doctrine
15+
*/
16+
private $doctrine;
17+
18+
/**
19+
* @var string
20+
*/
21+
private $entity;
22+
23+
private function getEntityRepository(): EntityRepository
24+
{
25+
$this->ensureDatabaseConnectionIsOpen();
26+
27+
$repository = $this->doctrine->getRepository($this->entity);
28+
29+
if ($repository instanceof EntityRepository) {
30+
return $repository;
31+
}
32+
33+
throw new \RuntimeException(sprintf('Doctrine returned an invalid repository for entity %s', $this->entity));
34+
}
35+
36+
protected function getEntityManager(): EntityManager
37+
{
38+
$manager = $this->doctrine->getManager();
39+
40+
if ($manager instanceof EntityManagerInterface && !$manager->isOpen()) {
41+
$manager = $this->doctrine->resetManager();
42+
}
43+
44+
if (!$manager instanceof EntityManager) {
45+
throw new \RuntimeException('Doctrine returned an invalid manager');
46+
}
47+
48+
return $manager;
49+
}
50+
51+
private function ensureDatabaseConnectionIsOpen(): void
52+
{
53+
$manager = $this->doctrine->getManager();
54+
55+
if ($manager instanceof EntityManagerInterface && !$manager->isOpen()) {
56+
$this->doctrine->resetManager();
57+
}
58+
59+
return;
60+
}
61+
62+
/**
63+
* @param object $entity
64+
* @throws \Doctrine\ORM\ORMException
65+
*/
66+
protected function persist($entity): void
67+
{
68+
$this->getEntityManager()->persist($entity);
69+
}
70+
71+
protected function flush($entity): void
72+
{
73+
$this->getEntityManager()->flush($entity);
74+
}
75+
76+
/**
77+
* @param object $entity
78+
*/
79+
protected function remove($entity): void
80+
{
81+
$this->getEntityManager()->remove($entity);
82+
}
83+
}

Entity/Repository/ScheduledJobRepository.php

Lines changed: 71 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,30 @@
33
namespace Markup\JobQueueBundle\Entity\Repository;
44

55
use Doctrine\ORM\EntityRepository;
6+
use Doctrine\ORM\NonUniqueResultException;
7+
use Doctrine\ORM\NoResultException;
8+
use Doctrine\Persistence\ManagerRegistry;
69
use Markup\JobQueueBundle\Entity\ScheduledJob;
710
use Markup\JobQueueBundle\Model\ScheduledJobRepositoryInterface;
811
use Symfony\Component\DependencyInjection\ContainerAwareTrait;
912

10-
class ScheduledJobRepository extends EntityRepository implements ScheduledJobRepositoryInterface
13+
class ScheduledJobRepository implements ScheduledJobRepositoryInterface
1114
{
12-
use ContainerAwareTrait;
15+
use DoctrineOrmAwareRepositoryTrait;
16+
17+
public function __construct(ManagerRegistry $doctrine)
18+
{
19+
$this->doctrine = $doctrine;
20+
$this->entity = ScheduledJob::class;
21+
}
1322

1423
/**
15-
* @return array|null
24+
* @return ?iterable<ScheduledJob>
1625
*/
1726
public function fetchUnqueuedJobs()
1827
{
19-
$qb = $this->createQueryBuilder('job');
28+
$qb = $this->getEntityRepository()
29+
->createQueryBuilder('job');
2030
$qb->andWhere($qb->expr()->eq('job.queued', ':queued'));
2131
$qb->andWhere($qb->expr()->lt('job.scheduledTime', ':now'));
2232
$qb->setParameter(':queued', false);
@@ -30,17 +40,65 @@ public function fetchUnqueuedJobs()
3040
return null;
3141
}
3242

33-
/**
34-
* @param ScheduledJob $scheduledJob
35-
* @param bool $flush
36-
*/
37-
public function save(ScheduledJob $scheduledJob, $flush = false)
38-
{
39-
$this->_em->persist($scheduledJob);
43+
public function isJobScheduledWithinRange(
44+
string $job,
45+
\DateTime $rangeFrom,
46+
\DateTime $rangeTo,
47+
?array $arguments
48+
): bool {
49+
$qb = $this->getEntityRepository()
50+
->createQueryBuilder('j')
51+
->select('COUNT(1)')
52+
->where('j.job = :job')
53+
->andWhere('j.scheduledTime >= :from')
54+
->andWhere('j.scheduledTime <= :to')
55+
->setParameter('job', $job)
56+
->setParameter('from', $rangeFrom)
57+
->setParameter('to', $rangeTo);
58+
59+
if ($arguments) {
60+
$qb
61+
->andWhere('j.arguments = :arguments')
62+
->setParameter('arguments', serialize($arguments));
63+
}
64+
try {
65+
return boolval($qb->getQuery()->getSingleScalarResult());
66+
} catch (NoResultException|NonUniqueResultException $e) {
67+
return false;
68+
}
69+
}
4070

71+
public function hasUnQueuedDuplicate(string $job, ?array $arguments): bool
72+
{
73+
$qb = $this->getEntityRepository()
74+
->createQueryBuilder('j')
75+
->select('COUNT(1)')
76+
->where('j.job = :job')
77+
->andWhere('j.queued = :queued')
78+
->andWhere('j.scheduledTime <= :now')
79+
->setParameter('queued', false)
80+
->setParameter('job', $job)
81+
->setParameter('now', (new \DateTime()));
82+
83+
if ($arguments) {
84+
$qb
85+
->andWhere('j.arguments = :arguments')
86+
->setParameter('arguments', serialize($arguments));
87+
}
88+
89+
try {
90+
return boolval($qb->getQuery()->getSingleScalarResult());
91+
} catch (NoResultException|NonUniqueResultException $e) {
92+
return false;
93+
}
94+
}
95+
96+
public function save(ScheduledJob $scheduledJob, $flush = false): void
97+
{
98+
$this->persist($scheduledJob);
99+
41100
if ($flush) {
42-
$this->_em->flush();
101+
$this->flush($scheduledJob);
43102
}
44103
}
45-
46104
}

Model/ScheduledJobRepositoryInterface.php

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,13 @@ public function fetchUnqueuedJobs();
1515
* @param ScheduledJob $scheduledJob
1616
*/
1717
public function save(ScheduledJob $scheduledJob, $flush = false);
18+
19+
public function isJobScheduledWithinRange(
20+
string $job,
21+
\DateTime $rangeFrom,
22+
\DateTime $rangeTo,
23+
?array $arguments
24+
): bool;
25+
26+
public function hasUnQueuedDuplicate(string $job, ?array $arguments): bool;
1827
}

Resources/config/commands.yml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,10 @@ services:
1919
- { name: console.command, command: 'markup:job_queue:recurring:add' }
2020

2121
markup_job_queue.command.add_schedule_job_to_queue:
22+
autowire: true
2223
class: Markup\JobQueueBundle\Command\AddScheduleJobToQueueCommand
2324
arguments:
2425
$jobManager: '@jobby'
25-
$scheduledJobService: '@markup_job_queue.scheduled'
26-
$logger: '@logger'
2726
tags:
2827
- { name: console.command, command: 'markup:scheduled_job:add' }
2928

Resources/config/doctrine/ScheduledJob.orm.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
Markup\JobQueueBundle\Entity\ScheduledJob:
22
type: entity
3-
repositoryClass: Markup\JobQueueBundle\Entity\Repository\ScheduledJobRepository
43
table: scheduled_job
54
id:
65
id:

Resources/config/services.yml

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@ services:
1313
class: Markup\JobQueueBundle\Consumer\JobConsumer
1414
autowire: true
1515
markup_job_queue.manager:
16+
autowire: true
1617
class: Markup\JobQueueBundle\Service\JobManager
1718
arguments:
1819
- '@markup_job_queue.publisher'
19-
- '@markup_job_queue.scheduled'
2020
jobby:
2121
alias: markup_job_queue.manager
2222
markup_job_queue.reader.recurring_console_command:
@@ -34,11 +34,7 @@ services:
3434
- ~
3535
- ~
3636
- ~
37-
markup_job_queue.scheduled:
38-
class: Markup\JobQueueBundle\Service\ScheduledJobService
39-
arguments:
40-
- '@doctrine'
41-
37+
4238
markup_job_queue.writer.cli_consumer_config_file:
4339
class: Markup\JobQueueBundle\Service\CliConsumerConfigFileWriter
4440
markup_job_queue.rabbit_mq_api.client:
@@ -52,7 +48,9 @@ services:
5248
arguments:
5349
- '@markup_job_queue.rabbit_mq_api'
5450
- ~
55-
51+
Markup\JobQueueBundle\Entity\Repository\ScheduledJobRepository:
52+
autowire: true
53+
public: false
5654

5755
Markup\JobQueueBundle\Repository\JobLogRepository: '@markup_job_queue.repository.job_log'
5856

Service/JobManager.php

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
namespace Markup\JobQueueBundle\Service;
44

5+
use Markup\JobQueueBundle\Entity\Repository\ScheduledJobRepository;
6+
use Markup\JobQueueBundle\Entity\ScheduledJob;
57
use Markup\JobQueueBundle\Job\ConsoleCommandJob;
68
use Markup\JobQueueBundle\Model\Job;
79
use Markup\JobQueueBundle\Publisher\JobPublisher;
@@ -18,16 +20,16 @@ class JobManager
1820
private $publisher;
1921

2022
/**
21-
* @var ScheduledJobService
23+
* @var ScheduledJobRepository
2224
*/
23-
private $scheduledJobService;
24-
25+
private $scheduledJobRepository;
26+
2527
public function __construct(
2628
JobPublisher $publisher,
27-
ScheduledJobService $scheduledJobService
29+
ScheduledJobRepository $scheduledJobRepository
2830
) {
2931
$this->publisher = $publisher;
30-
$this->scheduledJobService = $scheduledJobService;
32+
$this->scheduledJobRepository = $scheduledJobRepository;
3133
}
3234

3335
public function addJob(Job $job, $supressLogging = false)
@@ -90,6 +92,14 @@ public function addScheduledConsoleCommandJob(
9092
$args['idleTimeout'] = $idleTimeout ?? $timeout;
9193
$job = new ConsoleCommandJob($args, $topic);
9294

93-
$this->scheduledJobService->addScheduledJob($job, $dateTime);
95+
$this->addScheduledJob($job, $dateTime);
96+
}
97+
98+
public function addScheduledJob(ConsoleCommandJob $job, $scheduledTime): ScheduledJob
99+
{
100+
$scheduledJob = new ScheduledJob($job->getCommand(), $job->getArguments(), $scheduledTime, $job->getTopic());
101+
$this->scheduledJobRepository->save($scheduledJob, true);
102+
103+
return $scheduledJob;
94104
}
95105
}

0 commit comments

Comments
 (0)