Skip to content

Commit a884aca

Browse files
committed
add CommandSendBuilder
1 parent 152a623 commit a884aca

File tree

2 files changed

+139
-104
lines changed

2 files changed

+139
-104
lines changed

src/Producer.php

Lines changed: 7 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,11 @@
1212
use Protobuf\AbstractMessage;
1313
use Pulsar\Exception\OptionsException;
1414
use Pulsar\Exception\RuntimeException;
15-
use Pulsar\Proto\BaseCommand;
16-
use Pulsar\Proto\BaseCommand\Type;
17-
use Pulsar\Proto\CommandSend;
1815
use Pulsar\Proto\CommandSendReceipt;
1916
use Pulsar\Proto\KeyValue;
2017
use Pulsar\Proto\MessageMetadata;
2118
use Pulsar\Proto\SingleMessageMetadata;
19+
use Pulsar\Traits\CommandSendBuilder;
2220
use Pulsar\Traits\ProducerKeepAlive;
2321
use Pulsar\Util\Buffer;
2422
use Pulsar\Util\Helper;
@@ -31,6 +29,7 @@
3129
class Producer extends Client
3230
{
3331
use ProducerKeepAlive;
32+
use CommandSendBuilder;
3433

3534
/**
3635
* @var ProducerOptions
@@ -83,13 +82,14 @@ public function connect()
8382
* @param string $payload
8483
* @param array $options
8584
* @return string
86-
* @throws RuntimeException|OptionsException
85+
* @throws RuntimeException
86+
* @throws \Exception
8787
*/
8888
public function send(string $payload, array $options = []): string
8989
{
9090
$producer = $this->getPartitionProducer();
9191
$messageOptions = new MessageOptions($options);
92-
$buffer = $this->buildBuffer(
92+
$buffer = $this->buildSendBuffer(
9393
$producer,
9494
$payload,
9595
$messageOptions,
@@ -117,14 +117,15 @@ public function send(string $payload, array $options = []): string
117117
* @param array $options
118118
* @return void
119119
* @throws RuntimeException|OptionsException
120+
* @throws \Exception
120121
*/
121122
public function sendAsync(string $payload, callable $callable, array $options = [])
122123
{
123124
$messageOptions = new MessageOptions($options);
124125
$sequenceID = $messageOptions->getSequenceID();
125126

126127
$producer = $this->getPartitionProducer();
127-
$buffer = $this->buildBuffer($producer, $payload, $messageOptions, $sequenceID);
128+
$buffer = $this->buildSendBuffer($producer, $payload, $messageOptions, $sequenceID);
128129
$producer->sendAsync($buffer);
129130
$this->callbacks[ $sequenceID ] = [$producer->getID(), $callable];
130131
}
@@ -162,104 +163,6 @@ public function wait()
162163
} while (count($this->callbacks));
163164
}
164165

165-
/**
166-
* @param PartitionProducer $producer
167-
* @param string $payload
168-
* @param MessageOptions $messageOptions
169-
* @param int $sequenceID
170-
* @return Buffer
171-
* @throws RuntimeException|OptionsException
172-
*/
173-
protected function buildBuffer(PartitionProducer $producer, string $payload, MessageOptions $messageOptions, int $sequenceID): Buffer
174-
{
175-
// [totalSize] [commandSize] [command] [magicNumber] [checksum] [metadataSize] [metadata] [payload]
176-
177-
$buffer = new Buffer();
178-
179-
// BaseCommand
180-
$baseCommand = new BaseCommand();
181-
$baseCommand->setType(Type::SEND());
182-
183-
// CommandSend
184-
$commandSend = new CommandSend();
185-
$commandSend->setProducerId($producer->getID());
186-
$commandSend->setSequenceId($sequenceID);
187-
$commandSend->setNumMessages(1);
188-
$commandSend->setTxnidLeastBits(null);
189-
$commandSend->setTxnidMostBits(null);
190-
191-
$baseCommand->setSend($commandSend);
192-
193-
// serialize BaseCommand
194-
$baseCommandBytes = $baseCommand->toStream()->getContents();
195-
196-
// [commandSize]
197-
$buffer->writeUint32(strlen($baseCommandBytes));
198-
199-
// [command]
200-
$buffer->write($baseCommandBytes);
201-
202-
// [magicNumber]
203-
$buffer->writeUint16(0x0e01);
204-
205-
// only support zlib、none
206-
$compressionProvider = $this->options->getCompression();
207-
208-
// metadata
209-
$msgMetadata = new MessageMetadata();
210-
$msgMetadata->setProducerName($producer->getName());
211-
$msgMetadata->setSequenceId(0);
212-
$msgMetadata->setPublishTime(time() * 1000);
213-
$msgMetadata->setNumMessagesInBatch(1);
214-
$msgMetadata->setCompression($compressionProvider->getType());
215-
$msgMetadata->setPartitionKey($messageOptions->getKey());
216-
$msgMetadata->setDeliverAtTime($messageOptions->getDeliverAtTime());
217-
218-
// singleMessageMetadata
219-
$singleMsgMetadata = new SingleMessageMetadata();
220-
$singleMsgMetadata->setPayloadSize(strlen($payload));
221-
$singleMsgMetadata->setEventTime(time() * 1000);
222-
$singleMsgMetadata->setPartitionKey($messageOptions->getKey());
223-
$this->appendProperties($singleMsgMetadata, $messageOptions);
224-
$singleMsgMetadataBytes = $singleMsgMetadata->toStream()->getContents();
225-
226-
// [metadataSize] [metadata] [payload]
227-
$packet = '';
228-
$packet .= Helper::writeUint32(strlen($singleMsgMetadataBytes)); // [metadataSize]
229-
$packet .= $singleMsgMetadataBytes; // [metadata]
230-
$packet .= $payload; // [payload]
231-
232-
$msgMetadata->setUncompressedSize(strlen($packet));
233-
$msgMetadataBytes = $msgMetadata->toStream()->getContents();
234-
235-
$msgMetadataSize = strlen($msgMetadataBytes);
236-
237-
238-
// make checksum bytes
239-
$compressionPacket = $compressionProvider->encode($packet);
240-
$checksumBuffer = new Buffer();
241-
$checksumBuffer->writeUint32($msgMetadataSize); // [metadataSize]
242-
$checksumBuffer->write($msgMetadataBytes); // [metadata]
243-
$checksumBuffer->write($compressionPacket); // [payload]
244-
245-
// [checksum] === [metadataSize] [metadata] [payload]
246-
$buffer->writeUint32($this->getChecksum($checksumBuffer));
247-
248-
// [metadataSize]
249-
$buffer->writeUint32($msgMetadataSize);
250-
251-
// [metadata]
252-
$buffer->write($msgMetadataBytes);
253-
254-
// [payload]
255-
$buffer->write($compressionPacket);
256-
257-
// [totalSize]
258-
$buffer->put(Helper::writeUint32($buffer->length()), 0);
259-
260-
return $buffer;
261-
}
262-
263166

264167
/**
265168
* @return void

src/Traits/CommandSendBuilder.php

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
<?php
2+
/**
3+
* Created by PhpStorm
4+
* User: Sunny、
5+
* Date: 2023/1/3
6+
* Time: 17:34
7+
*/
8+
9+
namespace Pulsar\Traits;
10+
11+
12+
use Pulsar\MessageOptions;
13+
use Pulsar\PartitionProducer;
14+
use Pulsar\Proto\BaseCommand;
15+
use Pulsar\Proto\BaseCommand\Type;
16+
use Pulsar\Proto\CommandSend;
17+
use Pulsar\Proto\MessageMetadata;
18+
use Pulsar\Proto\SingleMessageMetadata;
19+
use Pulsar\Util\Buffer;
20+
use Pulsar\Util\Helper;
21+
22+
/**
23+
* Class MessageBuilder
24+
*
25+
* @package Pulsar\Traits
26+
*/
27+
trait CommandSendBuilder
28+
{
29+
30+
/**
31+
* @param PartitionProducer $producer
32+
* @param string $payload
33+
* @param MessageOptions $messageOptions
34+
* @param int $sequenceID
35+
* @return Buffer
36+
* @throws \Exception
37+
*/
38+
protected function buildSendBuffer(
39+
PartitionProducer $producer,
40+
string $payload,
41+
MessageOptions $messageOptions,
42+
int $sequenceID
43+
): Buffer
44+
{
45+
// [totalSize] [commandSize] [command] [magicNumber] [checksum] [metadataSize] [metadata] [payload]
46+
47+
$buffer = new Buffer();
48+
49+
// BaseCommand
50+
$baseCommand = new BaseCommand();
51+
$baseCommand->setType(Type::SEND());
52+
53+
// CommandSend
54+
$commandSend = new CommandSend();
55+
$commandSend->setProducerId($producer->getID());
56+
$commandSend->setSequenceId($sequenceID);
57+
$commandSend->setNumMessages(1);
58+
$commandSend->setTxnidLeastBits(null);
59+
$commandSend->setTxnidMostBits(null);
60+
61+
$baseCommand->setSend($commandSend);
62+
63+
// serialize BaseCommand
64+
$baseCommandBytes = $baseCommand->toStream()->getContents();
65+
66+
// [commandSize]
67+
$buffer->writeUint32(strlen($baseCommandBytes));
68+
69+
// [command]
70+
$buffer->write($baseCommandBytes);
71+
72+
// [magicNumber]
73+
$buffer->writeUint16(0x0e01);
74+
75+
// only support zlib、none
76+
$compressionProvider = $this->options->getCompression();
77+
78+
// metadata
79+
$msgMetadata = new MessageMetadata();
80+
$msgMetadata->setProducerName($producer->getName());
81+
$msgMetadata->setSequenceId(0);
82+
$msgMetadata->setPublishTime(time() * 1000);
83+
$msgMetadata->setNumMessagesInBatch(1);
84+
$msgMetadata->setCompression($compressionProvider->getType());
85+
$msgMetadata->setPartitionKey($messageOptions->getKey());
86+
$msgMetadata->setDeliverAtTime($messageOptions->getDeliverAtTime());
87+
88+
// singleMessageMetadata
89+
$singleMsgMetadata = new SingleMessageMetadata();
90+
$singleMsgMetadata->setPayloadSize(strlen($payload));
91+
$singleMsgMetadata->setEventTime(time() * 1000);
92+
$singleMsgMetadata->setPartitionKey($messageOptions->getKey());
93+
$this->appendProperties($singleMsgMetadata, $messageOptions);
94+
$singleMsgMetadataBytes = $singleMsgMetadata->toStream()->getContents();
95+
96+
// [metadataSize] [metadata] [payload]
97+
$packet = '';
98+
$packet .= Helper::writeUint32(strlen($singleMsgMetadataBytes)); // [metadataSize]
99+
$packet .= $singleMsgMetadataBytes; // [metadata]
100+
$packet .= $payload; // [payload]
101+
102+
$msgMetadata->setUncompressedSize(strlen($packet));
103+
$msgMetadataBytes = $msgMetadata->toStream()->getContents();
104+
105+
$msgMetadataSize = strlen($msgMetadataBytes);
106+
107+
108+
// make checksum bytes
109+
$compressionPacket = $compressionProvider->encode($packet);
110+
$checksumBuffer = new Buffer();
111+
$checksumBuffer->writeUint32($msgMetadataSize); // [metadataSize]
112+
$checksumBuffer->write($msgMetadataBytes); // [metadata]
113+
$checksumBuffer->write($compressionPacket); // [payload]
114+
115+
// [checksum] === [metadataSize] [metadata] [payload]
116+
$buffer->writeUint32($this->getChecksum($checksumBuffer));
117+
118+
// [metadataSize]
119+
$buffer->writeUint32($msgMetadataSize);
120+
121+
// [metadata]
122+
$buffer->write($msgMetadataBytes);
123+
124+
// [payload]
125+
$buffer->write($compressionPacket);
126+
127+
// [totalSize]
128+
$buffer->put(Helper::writeUint32($buffer->length()), 0);
129+
130+
return $buffer;
131+
}
132+
}

0 commit comments

Comments
 (0)