|
| 1 | +# PHP Message Queue SDK [](https://travis-ci.org/eexit/php-mq-sdk) |
| 2 | + |
| 3 | +This PHP SDK aims for easy integration of message queues in various developments such as microservices. |
| 4 | +The use of this SDK allows only the publishing and fetching for messages, you cannot create, delete, purge queue or any other action. |
| 5 | + |
| 6 | +### Available adapters |
| 7 | + |
| 8 | + - [Amazon SQS](http://aws.amazon.com/sqs/) v.2 |
| 9 | + - [AMQP](https://github.com/php-amqplib/php-amqplib) v.0.9.1 |
| 10 | + |
| 11 | +#### Adapter constraints |
| 12 | + |
| 13 | +##### SQS |
| 14 | + |
| 15 | + - Batch sending, receiving and deletion not supported |
| 16 | + - Message attribute binary type not supported |
| 17 | + |
| 18 | +##### AMQP |
| 19 | + |
| 20 | + - Publishing to exchange not supported |
| 21 | + |
| 22 | +See the [CHANGE LOG](CHANGELOG.md) for version release information. |
| 23 | + |
| 24 | +## Installation |
| 25 | + |
| 26 | +Then run the command: |
| 27 | + |
| 28 | + $ composer require eexit/php-mq-sdk:~1.0 |
| 29 | + |
| 30 | +## Usage |
| 31 | + |
| 32 | +See the `examples` directory content. |
| 33 | + |
| 34 | +### Logging |
| 35 | + |
| 36 | +Example with a [PSR-3](http://www.php-fig.org/psr/psr-3/) logger such as [Monolog](https://github.com/Seldaek/monolog): |
| 37 | + |
| 38 | +```php |
| 39 | +<?php |
| 40 | +use Monolog\Logger; |
| 41 | +use Monolog\Handler\StreamHandler; |
| 42 | + |
| 43 | +$handler = new StreamHandler(__DIR__ . '/sandbox.log', Logger::INFO); |
| 44 | +$logger = new Logger('Sandbox'); |
| 45 | +$logger->pushHandler($handler); |
| 46 | + |
| 47 | +/** \Eexit\Mq\MessageQueue $mq */ |
| 48 | +$mq->setLogger($logger); |
| 49 | +``` |
| 50 | + |
| 51 | +Example of log with the `INFO` level: |
| 52 | + |
| 53 | +``` |
| 54 | +[2015-07-08 13:33:57] Sandbox.INFO: Open a connection [] [] |
| 55 | +[2015-07-08 13:33:59] Sandbox.INFO: Published message abb12d0a-97c3-4dcd-a45f-8be097bbe6bf in 1.6550381183624 ms [] [] |
| 56 | +[2015-07-08 13:33:59] Sandbox.INFO: Start listening to on incoming messages [] [] |
| 57 | +[2015-07-08 13:33:59] Sandbox.INFO: Fetched message 5c13c13e-86e5-4100-8e50-5168a0bd9608 in 0.15714406967163 ms [] [] |
| 58 | +[2015-07-08 13:33:59] Sandbox.INFO: Acked message 5c13c13e-86e5-4100-8e50-5168a0bd9608 in 0.13068604469299 ms [] [] |
| 59 | +[2015-07-08 13:33:59] Sandbox.INFO: Close the connection [] [] |
| 60 | +``` |
| 61 | + |
| 62 | +If you use the `DEBUG` level, you'll get way more information such as the message content and error stack traces. |
| 63 | + |
| 64 | +### Unix signal handling |
| 65 | + |
| 66 | +The SDK supports Unix signal handling (via [PCNTL extension](http://php.net/manual/en/book.pcntl.php)) in order to gracefully shutdown your processes: |
| 67 | + |
| 68 | +```php |
| 69 | +<?php |
| 70 | +// MQ signal handler: |
| 71 | +$signalHandler = function ($signal) { |
| 72 | + switch ($signal) { |
| 73 | + case SIGINT: |
| 74 | + case SIGQUIT: |
| 75 | + case SIGTERM: |
| 76 | + $this->mq->stop(); |
| 77 | + \pcntl_signal($signal, SIG_DFL); // Restores original signal handler |
| 78 | + break; |
| 79 | + } |
| 80 | +}; |
| 81 | + |
| 82 | +// If the extension is loaded, registers the signal handlers |
| 83 | +if (extension_loaded('pcntl')) { |
| 84 | + \pcntl_signal(SIGINT, $signalHandler); |
| 85 | + \pcntl_signal(SIGQUIT, $signalHandler); |
| 86 | + \pcntl_signal(SIGTERM, $signalHandler); |
| 87 | +} |
| 88 | + |
| 89 | +/* |
| 90 | + MQ bootstrap... |
| 91 | +*/ |
| 92 | + |
| 93 | +$mq->listen($queue, function(EnvelopeInterface $message, MessageQueue $mq) { |
| 94 | + // The process can be stop from inside |
| 95 | + return $mq->stop(); |
| 96 | + |
| 97 | + throw new WillNeverBeThrown(); |
| 98 | +}); |
| 99 | + |
| 100 | +// Closes the connections/gathers log & metrics accordingly! |
| 101 | +$mq->close(); |
| 102 | +``` |
| 103 | + |
| 104 | +There is [a working example](examples/amqp/worker.php) of signal handling for AMQP. |
| 105 | + |
| 106 | +### Metric collection |
| 107 | + |
| 108 | +This library use the [Collector interface](https://github.com/beberlei/metrics/blob/master/src/Beberlei/Metrics/Collector/Collector.php) of [beberlei/metrics](https://github.com/beberlei/metrics) library. This allows you to use any of the supported metric backends. |
| 109 | + |
| 110 | +Here's an example with StatsD: |
| 111 | + |
| 112 | +```php |
| 113 | +<?php |
| 114 | +use Eexit\Mq\Adapter\Sqs\Sqs; |
| 115 | +use Beberlei\Metrics\Collector\StatsD; |
| 116 | + |
| 117 | +$collector = new StatsD(/* backend host */); |
| 118 | + |
| 119 | +// Adds the collector and a prefix to avoid metric naming conflicts |
| 120 | +// You can use the adapter prefix if you want |
| 121 | +/** \Eexit\Mq\MessageQueue $mq */ |
| 122 | +$mq->setMetricCollector($collector, Sqs::METRIC_PREFIX); |
| 123 | + |
| 124 | +// In your worker business code you can add other metrics |
| 125 | +// Note: the metric prefix is only used internally. You may use you own prefix here |
| 126 | +$mq->getMetricCollector()->increment('my_app.my_metric.succeed'); |
| 127 | +``` |
| 128 | + |
| 129 | +#### Internal metrics |
| 130 | + |
| 131 | +| **Description** | **Metric name** | |
| 132 | +|-------------------------------- |---------------------------------- | |
| 133 | +| Connection open success count | `{prefix}.connection.open.succeed` | |
| 134 | +| Connection open duration | `{prefix}.connection.open_time` | |
| 135 | +| Connection open failure count | `{prefix}.connection.open.failed` | |
| 136 | +| Connection stop success count | `{prefix}.connection.stop.succeed` | |
| 137 | +| Connection stop duration | `{prefix}.connection.stop_time` | |
| 138 | +| Connection stop failure count | `{prefix}.connection.stop.failed` | |
| 139 | +| Connection close success count | `{prefix}.connection.close.succeed` | |
| 140 | +| Connection close duration | `{prefix}.connection.close_time` | |
| 141 | +| Connection close failure count | `{prefix}.connection.close.failed` | |
| 142 | +| Message publication success count | `{prefix}.message.publish.succeed` | |
| 143 | +| Message publication duration | `{prefix}.message.publish_time` | |
| 144 | +| Message publication failure count | `{prefix}.message.publish.failed` | |
| 145 | +| Message fetch success count | `{prefix}.message.fetch.succeed` | |
| 146 | +| Message fetch duration | `{prefix}.message.fetch_time` | |
| 147 | +| Message listen failure count | `{prefix}.message.listen.failed` | |
| 148 | +| Message ack success count | `{prefix}.message.ack.succeed` | |
| 149 | +| Message ack duration | `{prefix}.message.ack_time` | |
| 150 | +| Message ack failure count | `{prefix}.message.ack.failed` | |
| 151 | +| Message nack success count | `{prefix}.message.nack.succeed` | |
| 152 | +| Message nack duration | `{prefix}.message.nack_time` | |
| 153 | +| Message nack failure count | `{prefix}.message.nack.failed` | |
| 154 | +| Message processing duration | `{prefix}.message.process_time` | |
| 155 | + |
| 156 | +For example, if you use the SQS adapter and use the `Sqs::METRIC_PREFIX` prefix, your metrics will look like this: |
| 157 | + |
| 158 | + mq.sqs.connection.open_time |
| 159 | + mq.sqs.message.publish.succeed |
| 160 | + mq.sqs.message.publish_time |
| 161 | + |
0 commit comments