Skip to content
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
"iron-io/iron_mq": "~4.0",
"league/container": "~2.3",
"queue-interop/queue-interop": "^0.6",
"queue-interop/amqp-interop": "^0.6"
"queue-interop/amqp-interop": "^0.6",
"ackintosh/snidel": "^0.10.2"
},
"suggest": {
"php-amqplib/php-amqplib": "Allow sending messages to an AMQP server using php-amqplib",
Expand Down
25 changes: 21 additions & 4 deletions src/Driver/FlatFileDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,17 @@ public function popMessage($queueName, $duration = 5)
$runtime = microtime(true) + $duration;
$queueDir = $this->getQueueDirectory($queueName);

$it = new \GlobIterator($queueDir.DIRECTORY_SEPARATOR.'*.job', \FilesystemIterator::KEY_AS_FILENAME);
$files = array_keys(iterator_to_array($it));

natsort($files);
$files = $this->getJobFiles($queueName);

while (microtime(true) < $runtime) {
if ($files) {
$id = array_pop($files);
if (@rename($queueDir.DIRECTORY_SEPARATOR.$id, $queueDir.DIRECTORY_SEPARATOR.$id.'.proceed')) {
return array(file_get_contents($queueDir.DIRECTORY_SEPARATOR.$id.'.proceed'), $id);
}
} else {
// In order to notice that a new message received, update the list.
$files = $this->getJobFiles($queueName);
}

usleep(1000);
Expand Down Expand Up @@ -225,4 +225,21 @@ private function getJobFilename($queueName)

return $filename;
}

/**
* @param string $queueName
*
* @return string[]
*/
private function getJobFiles($queueName)
{
$it = new \GlobIterator(
$this->getQueueDirectory($queueName) . DIRECTORY_SEPARATOR . '*.job',
\FilesystemIterator::KEY_AS_FILENAME
);
$files = array_keys(iterator_to_array($it));
natsort($files);

return $files;
}
}
38 changes: 38 additions & 0 deletions tests/Driver/FlatFileDriverTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Bernard\Tests\Driver;

use Ackintosh\Snidel;
use Bernard\Driver\FlatFileDriver;

/**
Expand Down Expand Up @@ -92,6 +93,43 @@ public function testPopMessage()
}
}

public function testPopMessageWhichPushedAfterTheInitialCollect()
{
if (defined('HHVM_VERSION')) {
$this->markTestSkipped('Snidel is only supported on PHP.');
}

$this->driver->createQueue('send-newsletter');
$snidel = new Snidel();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A test case for this behavior requires multiprocess(pop, push). Snidel (depends bernard!) provides the easiest way to achieve that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the php internal function pcntl_fork for this instead of snidel?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No real big issue, just checking if we can avoid an extra dependency 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. If that's what you want, I will rewrite it. 💡

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This library looks really interesting, thanks for sharing. Nevertheless, I second @acrobat 's opinion: let's keep the dependencies at the absolute minimum if possible.


// Fork a process which pops message
$snidel->process(
function () {
list($message, ) = $this->driver->popMessage('send-newsletter', 10);
return $message;
},
[],
'popMessage'
);

// Fork another process which pushes message
$snidel->process(
function () {
// Push a message after the initial collect
sleep(5);
$this->driver->pushMessage('send-newsletter', 'test');
},
[],
'pushMessage'
);

foreach ($snidel->results() as $result) {
if ($result->getTask()->getTag() === 'popMessage') {
$this->assertSame('test', $result->getReturn());
}
}
}

public function testAcknowledgeMessage()
{
$this->driver->createQueue('send-newsletter');
Expand Down