Skip to content

Commit c2810d6

Browse files
committed
Producer configurable Keepalive Connection
1 parent 3e1171d commit c2810d6

File tree

11 files changed

+207
-9
lines changed

11 files changed

+207
-9
lines changed

src/Client.php

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,9 +165,12 @@ protected function initialization()
165165
}
166166

167167
$this->topicManage->setConnection($topic, $this->connections[ $connectionKey ]);
168+
}
168169

169-
// Add to EventLoop
170-
$this->eventloop->addRead($this->topicManage->getConnection($topic));
170+
// Add to Eventloop
171+
foreach ($this->connections as $connection) {
172+
$connection->setKeepalive($this->options->getKeepalive());
173+
$this->eventloop->addRead($connection);
171174
}
172175

173176
$this->isHandshake = true;

src/ConsumerOptions.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*
1717
* @package Pulsar
1818
*/
19-
class ConsumerOptions extends Options
19+
final class ConsumerOptions extends Options
2020
{
2121
/**
2222
* consumer name

src/IO/AbstractIO.php

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,16 @@ abstract class AbstractIO
7272
protected $lastSendPingCommandTime = 0;
7373

7474

75+
/**
76+
* @var int
77+
*/
78+
protected $fd = 0;
79+
80+
/**
81+
* @var bool
82+
*/
83+
protected $keepalive = false;
84+
7585
/**
7686
* @return void
7787
*/
@@ -123,6 +133,33 @@ public function getMaxMessageSize()
123133
}
124134

125135

136+
137+
/**
138+
* @param bool $status
139+
* @return void
140+
*/
141+
public function setKeepalive(bool $status)
142+
{
143+
$this->keepalive = $status;
144+
}
145+
146+
147+
/**
148+
* @return int
149+
*/
150+
public function fd(): int
151+
{
152+
return $this->fd;
153+
}
154+
155+
/**
156+
* Handling readable events
157+
*
158+
* @return mixed
159+
*/
160+
abstract public function handleRead();
161+
162+
126163
/**
127164
* @param string $host
128165
* @param int $port

src/IO/ChannelManager.php

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
<?php
2+
/**
3+
* Created by PhpStorm
4+
* User: Sunny、
5+
* Date: 2023/5/30
6+
* Time: 16:54
7+
*/
8+
9+
namespace Pulsar\IO;
10+
11+
12+
use Swoole\Coroutine\Channel;
13+
14+
/**
15+
* Class ChannelManager
16+
*
17+
* @package Pulsar\IO
18+
*/
19+
final class ChannelManager
20+
{
21+
22+
/**
23+
* @var array
24+
*/
25+
private static $inner = [];
26+
27+
28+
/**
29+
* @param int $fd
30+
* @param int $size
31+
* @return void
32+
*/
33+
public static function init(int $fd, int $size = 10)
34+
{
35+
self::$inner[ $fd ] = new Channel($size);
36+
}
37+
38+
39+
/**
40+
* @param int $fd
41+
* @return Channel
42+
*/
43+
public static function get(int $fd): Channel
44+
{
45+
return self::$inner[ $fd ];
46+
}
47+
}

src/IO/StreamIO.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ public function connect(string $host, int $port, $timeout = null)
5757

5858
$this->host = $host;
5959
$this->port = $port;
60+
$this->fd = (int)$this->socket;
6061
}
6162

6263

@@ -173,6 +174,11 @@ public function close()
173174
*/
174175
public function wait($seconds = null)
175176
{
177+
// Receive from Channel
178+
if ($this->keepalive) {
179+
return ChannelManager::get($this->fd)->pop($seconds);
180+
}
181+
176182
$reads = [$this->socket];
177183
$n = stream_select($reads, $writes, $excepts, $seconds);
178184
if ($n <= 0) {
@@ -255,6 +261,6 @@ public function handleRead()
255261
} while ($remainingSize > 0);
256262
}
257263

258-
return new Response($buffer, $baseCommand);
264+
return new Response($buffer, $baseCommand, $this->fd);
259265
}
260266
}

src/Options.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,18 @@ public function getTopics(): array
129129
}
130130

131131

132+
133+
/**
134+
* This connection pool is not enabled by default
135+
*
136+
* @return bool
137+
*/
138+
public function getKeepalive(): bool
139+
{
140+
return false;
141+
}
142+
143+
132144
/**
133145
* @param ISchema $schema
134146
* @return void

src/Producer.php

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
use Protobuf\AbstractMessage;
1313
use Pulsar\Exception\OptionsException;
1414
use Pulsar\Exception\RuntimeException;
15+
use Pulsar\IO\ChannelManager;
1516
use Pulsar\Proto\CommandSendReceipt;
1617
use Pulsar\Proto\KeyValue;
1718
use Pulsar\Proto\MessageMetadata;
@@ -20,6 +21,7 @@
2021
use Pulsar\Traits\ProducerKeepAlive;
2122
use Pulsar\Util\Buffer;
2223
use Pulsar\Util\Helper;
24+
use Swoole\Coroutine;
2325

2426
/**
2527
* Class Producer
@@ -53,9 +55,22 @@ class Producer extends Client
5355
* @param string $url
5456
* @param ProducerOptions $options
5557
* @throws Exception\OptionsException
58+
* @throws RuntimeException
5659
*/
5760
public function __construct(string $url, ProducerOptions $options)
5861
{
62+
// validate keepalive condition
63+
if ($options->getKeepalive()) {
64+
65+
if (!extension_loaded('swoole')) {
66+
throw new RuntimeException('Keepalive require swoole extension');
67+
}
68+
69+
if (Coroutine::getCid() === -1) {
70+
throw new RuntimeException('Keepalive Must be in a coroutine environment');
71+
}
72+
}
73+
5974
parent::__construct($url, $options);
6075
}
6176

@@ -66,12 +81,41 @@ public function __construct(string $url, ProducerOptions $options)
6681
*/
6782
public function connect()
6883
{
84+
// Establish tcp connection And complete the pulsar server handshake
6985
parent::initialization();
7086

87+
// Enable Keepalive
88+
if ($this->options->getKeepalive()) {
89+
Coroutine::create(function () {
90+
while ($this->keepalive) {
91+
92+
/**
93+
* @var $response Response
94+
*/
95+
$response = $this->eventloop->wait(3);
96+
if (is_null($response)) {
97+
continue;
98+
}
99+
100+
$fd = $response->fd();
101+
if ($fd <= 0) {
102+
continue;
103+
}
104+
105+
// Push data to Channel
106+
ChannelManager::get($fd)->push($response);
107+
}
108+
});
109+
}
110+
71111
// Send CreateProducer Command
72112
foreach ($this->topicManage->all() as $id => $topic) {
73-
$io = $this->topicManage->getConnection($topic);
74-
$this->producers[] = new PartitionProducer($id, $topic, $io, $this->options);
113+
$connection = $this->topicManage->getConnection($topic);
114+
if ($this->options->getKeepalive()) {
115+
ChannelManager::init($connection->fd());
116+
}
117+
118+
$this->producers[] = new PartitionProducer($id, $topic, $connection, $this->options);
75119
}
76120
}
77121

@@ -123,6 +167,7 @@ public function send($payload, array $options = []): string
123167
* @return void
124168
* @throws RuntimeException|OptionsException
125169
* @throws \Exception
170+
* @deprecated 1.3.0
126171
*/
127172
public function sendAsync(string $payload, callable $callable, array $options = [])
128173
{
@@ -140,6 +185,7 @@ public function sendAsync(string $payload, callable $callable, array $options =
140185
* @return void
141186
* @throws Exception\IOException
142187
* @throws RuntimeException
188+
* @deprecated 1.3.0
143189
*/
144190
public function wait()
145191
{

src/ProducerOptions.php

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*
1818
* @package Pulsar
1919
*/
20-
class ProducerOptions extends Options
20+
final class ProducerOptions extends Options
2121
{
2222
/**
2323
* @var string
@@ -35,6 +35,13 @@ class ProducerOptions extends Options
3535
*/
3636
const INITIAL_SUBSCRIPTION_NAME = 'initial_subscription_name';
3737

38+
39+
/**
40+
* @var string
41+
*/
42+
const KEEPALIVE = 'keepalive';
43+
44+
3845
/**
3946
* @param string $name
4047
* @return void
@@ -56,6 +63,17 @@ public function setInitialSubscriptionName(string $name)
5663
$this->data[ self::INITIAL_SUBSCRIPTION_NAME ] = $name;
5764
}
5865

66+
/**
67+
* Whether to enable keep-alive
68+
*
69+
* @param bool $status
70+
* @return void
71+
*/
72+
public function setKeepalive(bool $status)
73+
{
74+
$this->data[ self::KEEPALIVE ] = $status;
75+
}
76+
5977

6078
/**
6179
* @param int $type
@@ -99,4 +117,13 @@ public function getInitialSubscriptionName()
99117
{
100118
return $this->data[ self::INITIAL_SUBSCRIPTION_NAME ] ?? '';
101119
}
120+
121+
122+
/**
123+
* @return bool
124+
*/
125+
public function getKeepalive(): bool
126+
{
127+
return $this->data[ self::KEEPALIVE ] ?? false;
128+
}
102129
}

src/Response.php

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,16 +40,23 @@ class Response
4040
public $subCommand;
4141

4242

43+
/**
44+
* @var int
45+
*/
46+
public $fd = 0;
47+
4348
/**
4449
* @param Buffer $buffer
4550
* @param BaseCommand $message
51+
* @param int $fd
4652
* @throws RuntimeException
4753
*/
48-
public function __construct(Buffer $buffer, BaseCommand $message)
54+
public function __construct(Buffer $buffer, BaseCommand $message, int $fd)
4955
{
5056
$this->buffer = $buffer;
5157
$this->baseCommand = $message;
5258
$this->subCommand = $this->getSubCommand();
59+
$this->fd = $fd;
5360
$this->checkError();
5461
}
5562

@@ -71,6 +78,14 @@ public function getBuffer(): Buffer
7178
return $this->buffer;
7279
}
7380

81+
/**
82+
* @return int
83+
*/
84+
public function fd(): int
85+
{
86+
return $this->fd;
87+
}
88+
7489

7590
/**
7691
* @return AbstractMessage

src/TopicManage.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public function countPartitions(): int
7575
/**
7676
* @return int
7777
*/
78-
public function countTopics(): int
78+
public function size(): int
7979
{
8080
return count($this->data);
8181
}

0 commit comments

Comments
 (0)