File tree Expand file tree Collapse file tree 2 files changed +42
-4
lines changed
Expand file tree Collapse file tree 2 files changed +42
-4
lines changed Original file line number Diff line number Diff line change @@ -94,10 +94,7 @@ public function popMessage($queueName, $duration = 5)
9494 $ runtime = microtime (true ) + $ duration ;
9595 $ queueDir = $ this ->getQueueDirectory ($ queueName );
9696
97- $ it = new \GlobIterator ($ queueDir .DIRECTORY_SEPARATOR .'*.job ' , \FilesystemIterator::KEY_AS_FILENAME );
98- $ files = array_keys (iterator_to_array ($ it ));
99-
100- natsort ($ files );
97+ $ files = $ this ->getJobFiles ($ queueName );
10198
10299 while (microtime (true ) < $ runtime ) {
103100 if ($ files ) {
@@ -107,6 +104,9 @@ public function popMessage($queueName, $duration = 5)
107104 }
108105
109106 return $ this ->processFileOrFail ($ queueDir , $ id );
107+ } else {
108+ // In order to notice that a new message received, update the list.
109+ $ files = $ this ->getJobFiles ($ queueName );
110110 }
111111
112112 usleep (1000 );
@@ -244,4 +244,21 @@ private function getJobFilename($queueName)
244244
245245 return $ filename ;
246246 }
247+
248+ /**
249+ * @param string $queueName
250+ *
251+ * @return string[]
252+ */
253+ private function getJobFiles ($ queueName )
254+ {
255+ $ it = new \GlobIterator (
256+ $ this ->getQueueDirectory ($ queueName ) . DIRECTORY_SEPARATOR . '*.job ' ,
257+ \FilesystemIterator::KEY_AS_FILENAME
258+ );
259+ $ files = array_keys (iterator_to_array ($ it ));
260+ natsort ($ files );
261+
262+ return $ files ;
263+ }
247264}
Original file line number Diff line number Diff line change @@ -92,6 +92,27 @@ public function testPopMessage()
9292 }
9393 }
9494
95+ public function testPopMessageWhichPushedAfterTheInitialCollect ()
96+ {
97+ $ this ->driver ->createQueue ('send-newsletter ' );
98+
99+ $ pid = pcntl_fork ();
100+
101+ if ($ pid === -1 ) {
102+ $ this ->fail ('Failed to fork the currently running process: ' . pcntl_strerror (pcntl_get_last_error ()));
103+ } elseif ($ pid === 0 ) {
104+ // Child process pushes a message after the initial collect
105+ sleep (5 );
106+ $ this ->driver ->pushMessage ('send-newsletter ' , 'test ' );
107+ exit ;
108+ }
109+
110+ list ($ message , ) = $ this ->driver ->popMessage ('send-newsletter ' , 10 );
111+ $ this ->assertSame ('test ' , $ message );
112+
113+ pcntl_waitpid ($ pid , $ status );
114+ }
115+
95116 public function testAcknowledgeMessage ()
96117 {
97118 $ this ->driver ->createQueue ('send-newsletter ' );
You can’t perform that action at this time.
0 commit comments