Skip to content

Commit 664ab66

Browse files
committed
Async client
1 parent f3acd04 commit 664ab66

File tree

1 file changed

+91
-0
lines changed

1 file changed

+91
-0
lines changed

src/AsyncClient.php

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
<?php
2+
declare(strict_types=1);
3+
4+
namespace ApiClients\Client\RabbitMQ\Management;
5+
6+
use ApiClients\Foundation\Hydrator\CommandBus\Command\HydrateCommand;
7+
use ApiClients\Foundation\Transport\CommandBus\Command\SimpleRequestCommand;
8+
use Psr\Http\Message\ResponseInterface;
9+
use React\EventLoop\LoopInterface;
10+
use React\Promise\PromiseInterface;
11+
use Rx\Observable;
12+
use Rx\ObservableInterface;
13+
use Rx\React\Promise;
14+
use ApiClients\Foundation\Client;
15+
use ApiClients\Foundation\Factory;
16+
use function React\Promise\resolve;
17+
18+
final class AsyncClient
19+
{
20+
/**
21+
* @var Client
22+
*/
23+
protected $client;
24+
25+
/**
26+
* @param LoopInterface $loop
27+
* @param string $baseUrl
28+
* @param string $username
29+
* @param string $password
30+
* @param Client|null $client
31+
*/
32+
public function __construct(
33+
LoopInterface $loop,
34+
string $baseUrl,
35+
string $username,
36+
string $password,
37+
Client $client = null
38+
) {
39+
if (!($client instanceof Client)) {
40+
$options = ApiSettings::getOptions($baseUrl, $username, $password, 'Async');
41+
$client = Factory::create($loop, $options);
42+
}
43+
$this->client = $client;
44+
}
45+
46+
/**
47+
* @return PromiseInterface
48+
*/
49+
public function overview(): PromiseInterface
50+
{
51+
return $this->client->handle(
52+
new SimpleRequestCommand('overview')
53+
)->then(function (ResponseInterface $response) {
54+
return resolve($this->client->handle(
55+
new HydrateCommand('Overview', $response->getBody()->getJson())
56+
));
57+
});
58+
}
59+
60+
/**
61+
* @return ObservableInterface
62+
*/
63+
public function queues(): ObservableInterface
64+
{
65+
return Promise::toObservable($this->client->handle(
66+
new SimpleRequestCommand('queues')
67+
))->flatMap(function (ResponseInterface $response) {
68+
return Observable::fromArray($response->getBody()->getJson());
69+
})->flatMap(function ($queue) {
70+
return Promise::toObservable($this->client->handle(
71+
new HydrateCommand('Queue', $queue)
72+
));
73+
});
74+
}
75+
76+
/**
77+
* @return ObservableInterface
78+
*/
79+
public function connections(): ObservableInterface
80+
{
81+
return Promise::toObservable($this->client->handle(
82+
new SimpleRequestCommand('connections')
83+
))->flatMap(function (ResponseInterface $response) {
84+
return Observable::fromArray($response->getBody()->getJson());
85+
})->flatMap(function ($connection) {
86+
return Promise::toObservable($this->client->handle(
87+
new HydrateCommand('Connection', $connection)
88+
));
89+
});
90+
}
91+
}

0 commit comments

Comments
 (0)