Skip to content

Commit 9fa7ce8

Browse files
committed
add examples
1 parent ea871bf commit 9fa7ce8

File tree

2 files changed

+88
-0
lines changed

2 files changed

+88
-0
lines changed

examples/consumer.php

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
<?php
2+
3+
4+
use Pulsar\Consumer;
5+
use Pulsar\ConsumerOptions;
6+
use Pulsar\SubscriptionType;
7+
8+
require_once __DIR__ . '/../vendor/autoload.php';
9+
10+
11+
$options = new ConsumerOptions();
12+
13+
// If permission authentication is available
14+
// Only JWT authentication is currently supported
15+
// $options->setAuthentication(new Jwt('token'));
16+
17+
$options->setConnectTimeout(3);
18+
$options->setTopic('persistent://public/default/demo');
19+
$options->setSubscription('logic');
20+
$options->setSubscriptionType(SubscriptionType::Shared);
21+
$options->setNackRedeliveryDelay(20);
22+
$consumer = new Consumer('pulsar://localhost:6650', $options);
23+
$consumer->connect();
24+
25+
while (true) {
26+
$message = $consumer->receive();
27+
echo sprintf('Got message 【%s】messageID[%s] topic[%s] publishTime[%s]',
28+
$message->getPayload(),
29+
$message->getMessageId(),
30+
$message->getTopic(),
31+
$message->getPublishTime()
32+
) . "\n";
33+
34+
// ...
35+
36+
// Remember to confirm that the message is complete after processing
37+
$consumer->ack($message);
38+
39+
// When processing fails, you can also execute the Nack
40+
// The message will be re-delivered after the specified time
41+
// $consumer->nack($message);
42+
}
43+
44+
$consumer->close();

examples/producer.php

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
<?php
2+
3+
use Pulsar\Compression\Compression;
4+
use Pulsar\MessageOptions;
5+
use Pulsar\Producer;
6+
use Pulsar\ProducerOptions;
7+
8+
require_once __DIR__ . '/../vendor/autoload.php';
9+
10+
$options = new ProducerOptions();
11+
12+
// If permission authentication is available
13+
// Only JWT authentication is currently supported
14+
// $options->setAuthentication(new Jwt('token'));
15+
16+
$options->setConnectTimeout(3);
17+
$options->setTopic('persistent://public/default/demo');
18+
$options->setCompression(Compression::ZLIB);
19+
$producer = new Producer('pulsar://localhost:6650', $options);
20+
$producer->connect();
21+
22+
for ($i = 0; $i < 10; $i++) {
23+
$messageID = $producer->send(sprintf('hello %d', $i));
24+
echo 'messageID ' . $messageID . "\n";
25+
}
26+
27+
// Sending messages asynchronously
28+
for ($i = 0; $i < 10; $i++) {
29+
$producer->sendAsync(sprintf('hello-async %d', $i), function (string $messageID) {
30+
echo 'messageID ' . $messageID . "\n";
31+
});
32+
}
33+
// Add this line when sending asynchronously
34+
$producer->wait();
35+
36+
// Sending delayed messages
37+
for ($i = 0; $i < 10; $i++) {
38+
$producer->send(sprintf('hello-delay %d', $i), [
39+
MessageOptions::DELAY_SECONDS => $i * 5, // Seconds
40+
]);
41+
}
42+
43+
// close
44+
$producer->close();

0 commit comments

Comments
 (0)