File tree Expand file tree Collapse file tree 1 file changed +9
-2
lines changed
pulsar-broker/src/main/java/org/apache/pulsar/broker/rest Expand file tree Collapse file tree 1 file changed +9
-2
lines changed Original file line number Diff line number Diff line change 61
61
import org .apache .pulsar .broker .lookup .LookupResult ;
62
62
import org .apache .pulsar .broker .namespace .LookupOptions ;
63
63
import org .apache .pulsar .broker .service .BrokerServiceException ;
64
+ import org .apache .pulsar .broker .service .Topic ;
64
65
import org .apache .pulsar .broker .service .schema .SchemaRegistry ;
65
66
import org .apache .pulsar .broker .service .schema .exceptions .SchemaException ;
66
67
import org .apache .pulsar .broker .web .RestException ;
@@ -280,8 +281,14 @@ private CompletableFuture<PositionImpl> publishSingleMessageToPartition(String t
280
281
.remove (topicName .getPartitionIndex ());
281
282
} else {
282
283
try {
283
- t .get ().publishMessage (messageToByteBuf (message ),
284
- RestMessagePublishContext .get (publishResult , t .get (), System .nanoTime ()));
284
+ ByteBuf headersAndPayload = messageToByteBuf (message );
285
+ try {
286
+ Topic topicObj = t .get ();
287
+ topicObj .publishMessage (headersAndPayload ,
288
+ RestMessagePublishContext .get (publishResult , topicObj , System .nanoTime ()));
289
+ } finally {
290
+ headersAndPayload .release ();
291
+ }
285
292
} catch (Exception e ) {
286
293
if (log .isDebugEnabled ()) {
287
294
log .debug ("Fail to publish single messages to topic {}: {} " ,
You can’t perform that action at this time.
0 commit comments