Skip to content

Commit c68bfa0

Browse files
committed
keepalive example
1 parent 8289aea commit c68bfa0

File tree

3 files changed

+110
-28
lines changed

3 files changed

+110
-28
lines changed

README-CN.md

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -73,16 +73,6 @@ for ($i = 0; $i < 10; $i++) {
7373
echo 'messageID ' . $messageID . "\n";
7474
}
7575

76-
// Sending messages asynchronously
77-
//for ($i = 0; $i < 10; $i++) {
78-
// $producer->sendAsync(sprintf('hello-async %d',$i),function(string $messageID){
79-
// echo 'messageID ' . $messageID . "\n";
80-
// });
81-
//}
82-
//
83-
//// Add this line when sending asynchronously
84-
//$producer->wait();
85-
8676
// Sending delayed messages
8777
for ($i = 0; $i < 10; $i++) {
8878
$producer->send(sprintf('hello-delay %d',$i),[
@@ -93,10 +83,18 @@ for ($i = 0; $i < 10; $i++) {
9383
// close
9484
$producer->close();
9585

96-
// or
97-
// keepalive connection This method allows you to wrap the connection pool yourself
98-
// When the call is close() Will close the hold connection
99-
$producer->keepalive();
86+
```
87+
88+
> 保持连接
89+
90+
* 依赖 `Swoole` 扩展,必须开启协程
91+
* 如果是常驻内存应用,建议开启
92+
* 会保持连接,无需反复建立连接
93+
* 可以通过 `$producer->close()` 关闭连接,但不应该去调用
94+
* 请看[示例](./examples/producer-keepalive.php)
95+
96+
```php
97+
$options->setKeepalive(true);
10098
```
10199

102100
> 消息去重

README.md

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -74,16 +74,6 @@ for ($i = 0; $i < 10; $i++) {
7474
echo 'messageID ' . $messageID . "\n";
7575
}
7676

77-
// Sending messages asynchronously
78-
//for ($i = 0; $i < 10; $i++) {
79-
// $producer->sendAsync(sprintf('hello-async %d',$i),function(string $messageID){
80-
// echo 'messageID ' . $messageID . "\n";
81-
// });
82-
//}
83-
//
84-
//// Add this line when sending asynchronously
85-
//$producer->wait();
86-
8777
// Sending delayed messages
8878
for ($i = 0; $i < 10; $i++) {
8979
$producer->send(sprintf('hello-delay %d',$i),[
@@ -93,11 +83,18 @@ for ($i = 0; $i < 10; $i++) {
9383

9484
// close
9585
$producer->close();
86+
```
87+
88+
> Keepalive Connection (Recommended)
9689
97-
// or
98-
// keepalive connection This method allows you to wrap the connection pool yourself
99-
// When the call is close() Will close the hold connection
100-
$producer->keepalive();
90+
* require `Swoole` extension
91+
* If it is a resident memory application, it is recommended to open it.
92+
* Will keep connected, no need to repeatedly establish a connection
93+
* Calling the `close` method closes the connection
94+
* Please see [example](./examples/producer-keepalive.php)
95+
96+
```php
97+
$options->setKeepalive(true);
10198
```
10299

103100
> Message deduplication
@@ -378,6 +375,7 @@ $reader->close();
378375
* setProducerName()
379376
* setCompression()
380377
* setSchema()
378+
* setKeepalive()
381379
* ConsumerOptions
382380
* setTopic()
383381
* setTopics()

examples/producer-keepalive.php

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
<?php
2+
3+
use Pulsar\Compression\Compression;
4+
use Pulsar\Exception\IOException;
5+
use Pulsar\Exception\OptionsException;
6+
use Pulsar\Producer;
7+
use Pulsar\ProducerOptions;
8+
use Swoole\Http\Response;
9+
use Swoole\Http\Server;
10+
11+
require_once __DIR__ . '/../vendor/autoload.php';
12+
13+
14+
15+
/**
16+
* Class ProducerStore
17+
*/
18+
class ProducerStore
19+
{
20+
21+
/**
22+
* @var array<string,Producer>
23+
*/
24+
protected static $inner = [];
25+
26+
27+
/**
28+
* @param string $topic
29+
* @return Producer
30+
* @throws IOException
31+
* @throws OptionsException
32+
* @throws \Pulsar\Exception\RuntimeException
33+
*/
34+
public static function get(string $topic): Producer
35+
{
36+
if (!isset(self::$inner[ $topic ])) {
37+
self::create($topic);
38+
}
39+
40+
return self::$inner[ $topic ];
41+
}
42+
43+
44+
/**
45+
* @param string $topic
46+
* @return void
47+
* @throws IOException
48+
* @throws OptionsException
49+
* @throws \Pulsar\Exception\RuntimeException
50+
*/
51+
private static function create(string $topic)
52+
{
53+
$options = new ProducerOptions();
54+
55+
// If permission authentication is available
56+
// Only JWT authentication is currently supported
57+
// $options->setAuthentication(new Jwt('token'));
58+
59+
$options->setConnectTimeout(3);
60+
$options->setTopic($topic);
61+
$options->setCompression(Compression::ZLIB);
62+
$options->setKeepalive(true);
63+
$producer = new Producer('pulsar://localhost:6650', $options);
64+
$producer->connect();
65+
66+
self::$inner[ $topic ] = $producer;
67+
}
68+
}
69+
70+
71+
$server = new Server('0.0.0.0', 1234);
72+
$server->set([
73+
'enable_coroutine' => true,
74+
'hook_flags' => SWOOLE_HOOK_ALL,
75+
]);
76+
77+
$server->on('request', function ($req, Response $resp) {
78+
79+
// Should be taken from here to keep this connection from being closed
80+
$producer = ProducerStore::get('persistent://public/default/demo');
81+
82+
$id = $producer->send('hello');
83+
$resp->end(json_encode(['id' => $id]));
84+
});
85+
86+
$server->start();

0 commit comments

Comments
 (0)