1
+ <?php
2
+ /**
3
+ * ResqueScheduler core class to handle scheduling of jobs in the future.
4
+ *
5
+ * @package ResqueScheduler
6
+ * @author Chris Boulton <[email protected] >
7
+ * @copyright (c) 2012 Chris Boulton
8
+ * @license http://www.opensource.org/licenses/mit-license.php
9
+ */
10
+ class ResqueScheduler
11
+ {
12
+ /**
13
+ * Enqueue a job in a given number of seconds from now.
14
+ *
15
+ * Identical to Resque::enqueue, however the first argument is the number
16
+ * of seconds before the job should be executed.
17
+ *
18
+ * @param int $in Number of seconds from now when the job should be executed.
19
+ * @param string $queue The name of the queue to place the job in.
20
+ * @param string $class The name of the class that contains the code to execute the job.
21
+ * @param array $args Any optional arguments that should be passed when the job is executed.
22
+ */
23
+ public static function enqueueIn ($ in , $ queue , $ class , array $ args = array ())
24
+ {
25
+ self ::enqueueAt (time () + $ in , $ queue , $ class , $ args );
26
+ }
27
+
28
+ /**
29
+ * Enqueue a job for execution at a given timestamp.
30
+ *
31
+ * Identical to Resque::enqueue, however the first argument is a timestamp
32
+ * (either UNIX timestamp in integer format or an instance of the DateTime
33
+ * class in PHP).
34
+ *
35
+ * @param DateTime|int $at Instance of PHP DateTime object or int of UNIX timestamp.
36
+ * @param string $queue The name of the queue to place the job in.
37
+ * @param string $class The name of the class that contains the code to execute the job.
38
+ * @param array $args Any optional arguments that should be passed when the job is executed.
39
+ */
40
+ public static function enqueueAt ($ at , $ queue , $ class , $ args = array ())
41
+ {
42
+ self ::validateJob ($ class , $ queue );
43
+
44
+ $ job = self ::jobToHash ($ queue , $ class , $ args );
45
+ self ::delayedPush ($ at , $ job );
46
+
47
+ Resque_Event::trigger ('afterSchedule ' , array (
48
+ 'at ' => $ at ,
49
+ 'queue ' => $ queue ,
50
+ 'class ' => $ class ,
51
+ 'args ' => $ args ,
52
+ ));
53
+ }
54
+
55
+ /**
56
+ * Directly append an item to the delayed queue schedule.
57
+ *
58
+ * @param DateTime|int $timestamp Timestamp job is scheduled to be run at.
59
+ * @param array $item Hash of item to be pushed to schedule.
60
+ */
61
+ public static function delayedPush ($ timestamp , $ item )
62
+ {
63
+ $ timestamp = self ::getTimestamp ($ timestamp );
64
+ $ redis = Resque::redis ();
65
+ $ redis ->rpush ('delayed: ' . $ timestamp , json_encode ($ item ));
66
+
67
+ $ redis ->zadd ('delayed_queue_schedule ' , $ timestamp , $ timestamp );
68
+ }
69
+
70
+ /**
71
+ * Get the total number of jobs in the delayed schedule.
72
+ *
73
+ * @return int Number of scheduled jobs.
74
+ */
75
+ public static function getDelayedQueueScheduleSize ()
76
+ {
77
+ return (int )Resque::redis ()->zcard ('delayed_queue_schedule ' );
78
+ }
79
+
80
+ /**
81
+ * Get the number of jobs for a given timestamp in the delayed schedule.
82
+ *
83
+ * @param DateTime|int $timestamp Timestamp
84
+ * @return int Number of scheduled jobs.
85
+ */
86
+ public static function getDelayedTimestampSize ($ timestamp )
87
+ {
88
+ $ timestamp = self ::toTimestamp ($ timestamp );
89
+ return Resque::redis ()->llen ('delayed: ' . $ timestamp , $ timestamp );
90
+ }
91
+
92
+ /**
93
+ * Generate hash of all job properties to be saved in the scheduled queue.
94
+ *
95
+ * @param string $queue Name of the queue the job will be placed on.
96
+ * @param string $class Name of the job class.
97
+ * @param array $args Array of job arguments.
98
+ */
99
+
100
+ private static function jobToHash ($ queue , $ class , $ args )
101
+ {
102
+ return array (
103
+ 'class ' => $ class ,
104
+ 'args ' => $ args ,
105
+ 'queue ' => $ queue ,
106
+ );
107
+ }
108
+
109
+ /**
110
+ * If there are no jobs for a given key/timestamp, delete references to it.
111
+ *
112
+ * Used internally to remove empty delayed: items in Redis when there are
113
+ * no more jobs left to run at that timestamp.
114
+ *
115
+ * @param string $key Key to count number of items at.
116
+ * @param int $timestamp Matching timestamp for $key.
117
+ */
118
+ private static function cleanupTimestamp ($ key , $ timestamp )
119
+ {
120
+ $ timestamp = self ::getTimestamp ($ timestamp );
121
+ $ redis = Resque::redis ();
122
+
123
+ if ($ redis ->llen ($ key ) == 0 ) {
124
+ $ redis ->del ($ key );
125
+ $ redis ->zrem ('delayed_queue_schedule ' , $ timestamp );
126
+ }
127
+ }
128
+
129
+ /**
130
+ * Convert a timestamp in some format in to a unix timestamp as an integer.
131
+ *
132
+ * @param DateTime|int $timestamp Instance of DateTime or UNIX timestamp.
133
+ * @return int Timestamp
134
+ * @throws ResqueScheduler_InvalidTimestampException
135
+ */
136
+ private static function getTimestamp ($ timestamp )
137
+ {
138
+ if ($ timestamp instanceof DateTime) {
139
+ $ timestamp = $ timestamp ->getTimestamp ();
140
+ }
141
+
142
+ if ((int )$ timestamp != $ timestamp ) {
143
+ throw new ResqueScheduler_InvalidTimestampExeption (
144
+ 'The supplied timestamp value could not be converted to an integer. '
145
+ );
146
+ }
147
+
148
+ return (int )$ timestamp ;
149
+ }
150
+
151
+ /**
152
+ * Find the first timestamp in the delayed schedule before/including the timestamp.
153
+ *
154
+ * Will find and return the first timestamp upto and including the given
155
+ * timestamp. This is the heart of the ResqueScheduler that will make sure
156
+ * that any jobs scheduled for the past when the worker wasn't running are
157
+ * also queued up.
158
+ *
159
+ * @param DateTime|int $timestamp Instance of DateTime or UNIX timestamp.
160
+ * Defaults to now.
161
+ * @return int|false UNIX timestamp, or false if nothing to run.
162
+ */
163
+ public function nextDelayedTimestamp ($ at = null )
164
+ {
165
+ if ($ at === null ) {
166
+ $ at = time ();
167
+ }
168
+ else {
169
+ $ at = self ::getTimestamp ($ at );
170
+ }
171
+
172
+ $ items = Resque::redis ()->zrangebyscore ('delayed_queue_schedule ' , '-inf ' , $ at , 'LIMIT ' , 0 , 1 );
173
+ if (!empty ($ items )) {
174
+ return $ items [0 ];
175
+ }
176
+
177
+ return false ;
178
+ }
179
+
180
+ /**
181
+ * Pop a job off the delayed queue for a given timestamp.
182
+ *
183
+ * @param DateTime|int $timestamp Instance of DateTime or UNIX timestamp.
184
+ * @return array Matching job at timestamp.
185
+ */
186
+ public function nextItemForTimestamp ($ timestamp )
187
+ {
188
+ $ timestamp = self ::getTimestamp ($ timestamp );
189
+ $ key = 'delayed: ' . $ timestamp ;
190
+
191
+ $ item = json_decode (Resque::redis ()->lpop ($ key ), true );
192
+
193
+ self ::cleanupTimestamp ($ key , $ timestamp );
194
+ return $ item ;
195
+ }
196
+
197
+ /**
198
+ * Ensure that supplied job class/queue is valid.
199
+ *
200
+ * @param string $class Name of job class.
201
+ * @param string $queue Name of queue.
202
+ * @throws Resque_Exception
203
+ */
204
+ private static function validateJob ($ class , $ queue )
205
+ {
206
+ if (empty ($ class )) {
207
+ throw new Resque_Exception ('Jobs must be given a class. ' );
208
+ }
209
+ else if (empty ($ queue )) {
210
+ throw new Resque_Exception ('Jobs must be put in a queue. ' );
211
+ }
212
+
213
+ return true ;
214
+ }
215
+ }
0 commit comments