Skip to content

Commit df09cec

Browse files
authored
Merge pull request #8 from asiries335/mvp-6
Mvp 6
2 parents 0517dde + 600a3a1 commit df09cec

File tree

6 files changed

+214
-32
lines changed

6 files changed

+214
-32
lines changed

README.md

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,8 @@ $collection = $client->stream('test')->get();
135135

136136
_Listen to a stream_
137137

138+
functional works on a package basis https://github.com/reactphp/event-loop
139+
138140
```php
139141
$client->stream('test')->listen(
140142
function (\Asiries335\redisSteamPhp\Data\Message $message) {
@@ -143,4 +145,34 @@ $client->stream('test')->listen(
143145
);
144146
```
145147

146-
functional works on a package basis https://github.com/reactphp/event-loop
148+
149+
_Create a new consumer group_
150+
151+
```php
152+
$streamName = 'test';
153+
$groupName = 'demo-group-1';
154+
$isShowFullHistoryStream = false;
155+
156+
// return bool or ErrorException.
157+
$client->streamGroupConsumer($streamName)->create($groupName, $isShowFullHistoryStream);
158+
```
159+
160+
_Destroy a consumer group_
161+
162+
```php
163+
$streamName = 'test';
164+
$groupName = 'demo-group-1';
165+
166+
// return bool or ErrorException.
167+
$client->streamGroupConsumer($streamName)->destroy($groupName);
168+
```
169+
170+
_Delete a consumer from group_
171+
```php
172+
$streamName = 'test';
173+
$groupName = 'demo-group-1';
174+
$consumerName = 'consumer-name';
175+
176+
// return bool or ErrorException.
177+
$client->streamGroupConsumer($streamName)->deleteConsumer($groupName, $consumerName);
178+
```

src/Client.php

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,24 @@ public function __construct(ClientRedisStreamPhpInterface $redisClient)
3333
/**
3434
* Work with stream
3535
*
36-
* @param string $nameStream Name stream
36+
* @param string $streamName Name stream
3737
*
38-
* @return mixed
38+
* @return Stream
3939
*/
40-
public function stream(string $nameStream)
40+
public function stream(string $streamName) : Stream
4141
{
42-
return new Stream($this->_client, $nameStream);
42+
return new Stream($this->_client, $streamName);
43+
}
44+
45+
/**
46+
* Work with stream group
47+
*
48+
* @param string $streamName Name stream
49+
*
50+
* @return StreamGroupConsumer
51+
*/
52+
public function streamGroupConsumer(string $streamName) : StreamGroupConsumer
53+
{
54+
return new StreamGroupConsumer($this->_client, $streamName);
4355
}
4456
}

src/Data/Constants.php

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,41 @@ final class Constants
2020
*/
2121
public const COMMAND_XREAD = 'xread';
2222

23+
/**
24+
* COMMAND XGROUP
25+
*
26+
* @var string
27+
*/
28+
public const COMMAND_XGROUP = 'xgroup';
29+
2330
/**
2431
* COMMAND XRANGE
2532
*
2633
* @var string
2734
*/
2835
public const COMMAND_XRANGE = 'xrange';
2936

37+
/**
38+
* COMMAND OPTION CREATE
39+
*
40+
* @var string
41+
*/
42+
public const COMMAND_OPTION_XGROUP_CREATE = 'CREATE';
43+
44+
/**
45+
* COMMAND OPTION DESTROY
46+
*
47+
* @var string
48+
*/
49+
public const COMMAND_OPTION_XGROUP_DESTROY = 'DESTROY';
50+
51+
/**
52+
* COMMAND OPTION DELCONSUMER
53+
*
54+
* @var string
55+
*/
56+
public const COMMAND_OPTION_XGROUP_DELCONSUMER = 'DELCONSUMER';
57+
3058
/**
3159
* TIME TICK INTERVAL
3260
*

src/RedisStream.php

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Asiries335\redisSteamPhp;
6+
7+
abstract class RedisStream
8+
{
9+
/**
10+
* Client
11+
*
12+
* @var ClientRedisStreamPhpInterface
13+
*/
14+
protected $_client;
15+
16+
/**
17+
* Name stream
18+
*
19+
* @var string
20+
*/
21+
protected $_streamName;
22+
23+
/**
24+
* Stream constructor.
25+
*
26+
* @param ClientRedisStreamPhpInterface $client ClientRedisInterface
27+
* @param string $nameStream Name stream
28+
*/
29+
public function __construct(ClientRedisStreamPhpInterface $client, string $nameStream)
30+
{
31+
$this->_client = $client;
32+
$this->_streamName = $nameStream;
33+
}
34+
}

src/Stream.php

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -18,34 +18,8 @@
1818
use Asiries335\redisSteamPhp\Hydrator\MessageHydrator;
1919

2020

21-
final class Stream
21+
final class Stream extends RedisStream
2222
{
23-
/**
24-
* Client
25-
*
26-
* @var ClientRedisStreamPhpInterface
27-
*/
28-
private $_client;
29-
30-
/**
31-
* Name stream
32-
*
33-
* @var string
34-
*/
35-
private $_streamName;
36-
37-
/**
38-
* Stream constructor.
39-
*
40-
* @param ClientRedisStreamPhpInterface $client ClientRedisInterface
41-
* @param string $nameStream Name stream
42-
*/
43-
public function __construct(ClientRedisStreamPhpInterface $client, string $nameStream)
44-
{
45-
$this->_client = $client;
46-
$this->_streamName = $nameStream;
47-
}
48-
4923
/**
5024
* Appends the specified stream entry to the stream at the specified key
5125
*

src/StreamGroupConsumer.php

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
<?php
2+
declare(strict_types=1);
3+
4+
namespace Asiries335\redisSteamPhp;
5+
6+
7+
use Asiries335\redisSteamPhp\Data\Constants;
8+
use Asiries335\redisSteamPhp\Dto\StreamCommandCallTransporter;
9+
10+
final class StreamGroupConsumer extends RedisStream
11+
{
12+
/**
13+
* Create group Consumer
14+
*
15+
* @param string $groupName
16+
* @param bool $isShowFullHistoryStream
17+
*
18+
* @return bool
19+
*
20+
* @throws \Exception
21+
*/
22+
public function create(string $groupName, bool $isShowFullHistoryStream = true) : bool
23+
{
24+
$transporter = new StreamCommandCallTransporter(
25+
[
26+
'command' => Constants::COMMAND_XGROUP,
27+
'args' => [
28+
Constants::COMMAND_OPTION_XGROUP_CREATE,
29+
$this->_streamName,
30+
$groupName,
31+
(int) $isShowFullHistoryStream
32+
]
33+
]
34+
);
35+
36+
try {
37+
return (bool) $this->_client->call($transporter);
38+
} catch (\Exception $exception) {
39+
throw $exception;
40+
}
41+
}
42+
43+
/**
44+
* Delete group Consumer
45+
*
46+
* @param string $groupName
47+
*
48+
* @return bool
49+
*
50+
* @throws \Exception
51+
*/
52+
public function destroy(string $groupName) : bool
53+
{
54+
$transporter = new StreamCommandCallTransporter(
55+
[
56+
'command' => Constants::COMMAND_XGROUP,
57+
'args' => [
58+
Constants::COMMAND_OPTION_XGROUP_DESTROY,
59+
$this->_streamName,
60+
$groupName,
61+
]
62+
]
63+
);
64+
65+
try {
66+
return (bool) $this->_client->call($transporter);
67+
} catch (\Exception $exception) {
68+
throw $exception;
69+
}
70+
}
71+
72+
/**
73+
* Delete consumer from group
74+
*
75+
* @param string $groupName
76+
* @param string $consumerName
77+
*
78+
* @return bool
79+
*
80+
* @throws \Exception
81+
*/
82+
public function deleteConsumer(string $groupName, string $consumerName) : bool
83+
{
84+
$transporter = new StreamCommandCallTransporter(
85+
[
86+
'command' => Constants::COMMAND_XGROUP,
87+
'args' => [
88+
Constants::COMMAND_OPTION_XGROUP_DELCONSUMER,
89+
$this->_streamName,
90+
$groupName,
91+
$consumerName
92+
]
93+
]
94+
);
95+
96+
try {
97+
return (bool) $this->_client->call($transporter);
98+
} catch (\Exception $exception) {
99+
throw $exception;
100+
}
101+
}
102+
}

0 commit comments

Comments
 (0)