File tree Expand file tree Collapse file tree 1 file changed +43
-0
lines changed
Expand file tree Collapse file tree 1 file changed +43
-0
lines changed Original file line number Diff line number Diff line change 1+ <?php
2+
3+
4+ use Pulsar \Message ;
5+ use Pulsar \Reader ;
6+ use Pulsar \ReaderOptions ;
7+
8+ require_once __DIR__ . '/../vendor/autoload.php ' ;
9+
10+
11+ $ options = new ReaderOptions ();
12+
13+ // If permission authentication is available
14+ // Only JWT authentication is currently supported
15+ // $options->setAuthentication(new Jwt('token'));
16+
17+ $ options ->setConnectTimeout (3 );
18+ $ options ->setTopic ('persistent://public/default/demo ' ); // support partition topic
19+
20+ // Read the latest message
21+ $ options ->setStartMessageID (Message::latestMessageIdData ());
22+
23+ // From the earliest message
24+ // $options->setStartMessageID(Message::earliestMessageIdData());
25+
26+ // Start reading from a message
27+ // $options->setStartMessageID(Message::deserialize('621:103:0'));
28+
29+ $ reader = new Reader ('pulsar://localhost:6650 ' , $ options );
30+ $ reader ->connect ();
31+
32+ while (true ) {
33+ $ message = $ reader ->next ();
34+ echo sprintf ('Got message 【%s】messageID[%s] topic[%s] publishTime[%s] ' ,
35+ $ message ->getPayload (),
36+ $ message ->getMessageId (),
37+ $ message ->getTopic (),
38+ $ message ->getPublishTime ()
39+ ) . "\n" ;
40+
41+ }
42+
43+ $ reader ->close ();
You can’t perform that action at this time.
0 commit comments