Skip to content

Commit c2f38e9

Browse files
committed
Deliver rates example
1 parent e2927fe commit c2f38e9

File tree

3 files changed

+40
-9
lines changed

3 files changed

+40
-9
lines changed
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<?php declare(strict_types=1);
2+
3+
use ApiClients\Client\RabbitMQ\Management\AsyncClient;
4+
use function ApiClients\Foundation\resource_pretty_print;
5+
use ApiClients\Client\RabbitMQ\Management\Resource\QueueInterface;
6+
use React\EventLoop\Factory;
7+
use Rx\Observer\CallbackObserver;
8+
9+
require dirname(__DIR__) . DIRECTORY_SEPARATOR . 'vendor/autoload.php';
10+
11+
$loop = Factory::create();
12+
$config = require 'resolve_config.php';
13+
14+
$client = AsyncClient::create($loop, $config['baseUrl'], $config['username'], $config['password']);
15+
$queues = $client->queues(1)->filter(function (QueueInterface $queue) {
16+
return strpos($queue->name(), 'amq.gen') !== 0;
17+
})->subscribe(new CallbackObserver(function (QueueInterface $queue) {
18+
echo $queue->name(), ': ', $queue->messageStats()->deliverDetails()->rate(), PHP_EOL;
19+
}));
20+
21+
$loop->run();

src/AsyncClient.php

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use Rx\ObservableInterface;
1515
use Rx\React\Promise;
1616
use function React\Promise\resolve;
17+
use Rx\Scheduler\EventLoopScheduler;
1718

1819
final class AsyncClient implements AsyncClientInterface
1920
{
@@ -79,18 +80,26 @@ public function overview(): PromiseInterface
7980
}
8081

8182
/**
83+
* @param int|null $interval
8284
* @return ObservableInterface
8385
*/
84-
public function queues(): ObservableInterface
86+
public function queues(int $interval = null): ObservableInterface
8587
{
86-
return Promise::toObservable($this->client->handle(
87-
new SimpleRequestCommand('queues')
88-
))->flatMap(function (ResponseInterface $response) {
89-
return Observable::fromArray($response->getBody()->getJson());
90-
})->flatMap(function ($queue) {
88+
if ($interval === null) {
9189
return Promise::toObservable($this->client->handle(
92-
new HydrateCommand('Queue', $queue)
93-
));
90+
new SimpleRequestCommand('queues')
91+
))->flatMap(function (ResponseInterface $response) {
92+
return Observable::fromArray($response->getBody()->getJson());
93+
})->flatMap(function ($queue) {
94+
return Promise::toObservable($this->client->handle(
95+
new HydrateCommand('Queue', $queue)
96+
));
97+
});
98+
}
99+
100+
$scheduler = new EventLoopScheduler($this->client->getFromContainer(LoopInterface::class));
101+
return Observable::interval($interval * 1000, $scheduler)->flatMap(function () {
102+
return $this->queues();
94103
});
95104
}
96105

src/AsyncClientInterface.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@ interface AsyncClientInterface
1616
public function overview(): PromiseInterface;
1717

1818
/**
19+
* @param int|null $interval
1920
* @return ObservableInterface
2021
*/
21-
public function queues(): ObservableInterface;
22+
public function queues(int $interval = null): ObservableInterface;
2223

2324
/**
2425
* @return ObservableInterface

0 commit comments

Comments
 (0)