3
3
namespace Markup \JobQueueBundle \Entity \Repository ;
4
4
5
5
use Doctrine \ORM \EntityRepository ;
6
+ use Doctrine \ORM \NonUniqueResultException ;
7
+ use Doctrine \ORM \NoResultException ;
8
+ use Doctrine \Persistence \ManagerRegistry ;
6
9
use Markup \JobQueueBundle \Entity \ScheduledJob ;
7
10
use Markup \JobQueueBundle \Model \ScheduledJobRepositoryInterface ;
8
11
use Symfony \Component \DependencyInjection \ContainerAwareTrait ;
9
12
10
- class ScheduledJobRepository extends EntityRepository implements ScheduledJobRepositoryInterface
13
+ class ScheduledJobRepository implements ScheduledJobRepositoryInterface
11
14
{
12
- use ContainerAwareTrait;
15
+ use DoctrineOrmAwareRepositoryTrait;
16
+
17
+ public function __construct (ManagerRegistry $ doctrine )
18
+ {
19
+ $ this ->doctrine = $ doctrine ;
20
+ $ this ->entity = ScheduledJob::class;
21
+ }
13
22
14
23
/**
15
- * @return array|null
24
+ * @return ?iterable<ScheduledJob>
16
25
*/
17
26
public function fetchUnqueuedJobs ()
18
27
{
19
- $ qb = $ this ->createQueryBuilder ('job ' );
28
+ $ qb = $ this ->getEntityRepository ()
29
+ ->createQueryBuilder ('job ' );
20
30
$ qb ->andWhere ($ qb ->expr ()->eq ('job.queued ' , ':queued ' ));
21
31
$ qb ->andWhere ($ qb ->expr ()->lt ('job.scheduledTime ' , ':now ' ));
22
32
$ qb ->setParameter (':queued ' , false );
@@ -30,17 +40,65 @@ public function fetchUnqueuedJobs()
30
40
return null ;
31
41
}
32
42
33
- /**
34
- * @param ScheduledJob $scheduledJob
35
- * @param bool $flush
36
- */
37
- public function save (ScheduledJob $ scheduledJob , $ flush = false )
38
- {
39
- $ this ->_em ->persist ($ scheduledJob );
43
+ public function isJobScheduledWithinRange (
44
+ string $ job ,
45
+ \DateTime $ rangeFrom ,
46
+ \DateTime $ rangeTo ,
47
+ ?array $ arguments
48
+ ): bool {
49
+ $ qb = $ this ->getEntityRepository ()
50
+ ->createQueryBuilder ('j ' )
51
+ ->select ('COUNT(1) ' )
52
+ ->where ('j.job = :job ' )
53
+ ->andWhere ('j.scheduledTime >= :from ' )
54
+ ->andWhere ('j.scheduledTime <= :to ' )
55
+ ->setParameter ('job ' , $ job )
56
+ ->setParameter ('from ' , $ rangeFrom )
57
+ ->setParameter ('to ' , $ rangeTo );
58
+
59
+ if ($ arguments ) {
60
+ $ qb
61
+ ->andWhere ('j.arguments = :arguments ' )
62
+ ->setParameter ('arguments ' , serialize ($ arguments ));
63
+ }
64
+ try {
65
+ return boolval ($ qb ->getQuery ()->getSingleScalarResult ());
66
+ } catch (NoResultException |NonUniqueResultException $ e ) {
67
+ return false ;
68
+ }
69
+ }
40
70
71
+ public function hasUnQueuedDuplicate (string $ job , ?array $ arguments ): bool
72
+ {
73
+ $ qb = $ this ->getEntityRepository ()
74
+ ->createQueryBuilder ('j ' )
75
+ ->select ('COUNT(1) ' )
76
+ ->where ('j.job = :job ' )
77
+ ->andWhere ('j.queued = :queued ' )
78
+ ->andWhere ('j.scheduledTime <= :now ' )
79
+ ->setParameter ('queued ' , false )
80
+ ->setParameter ('job ' , $ job )
81
+ ->setParameter ('now ' , (new \DateTime ()));
82
+
83
+ if ($ arguments ) {
84
+ $ qb
85
+ ->andWhere ('j.arguments = :arguments ' )
86
+ ->setParameter ('arguments ' , serialize ($ arguments ));
87
+ }
88
+
89
+ try {
90
+ return boolval ($ qb ->getQuery ()->getSingleScalarResult ());
91
+ } catch (NoResultException |NonUniqueResultException $ e ) {
92
+ return false ;
93
+ }
94
+ }
95
+
96
+ public function save (ScheduledJob $ scheduledJob , $ flush = false ): void
97
+ {
98
+ $ this ->persist ($ scheduledJob );
99
+
41
100
if ($ flush ) {
42
- $ this ->_em -> flush ();
101
+ $ this ->flush ($ scheduledJob );
43
102
}
44
103
}
45
-
46
104
}
0 commit comments