Skip to content

Commit 00eaec9

Browse files
committed
[ISSUE #934] C++ trans producer should return a SendReceipt object in the send call
1 parent 9b9089e commit 00eaec9

File tree

5 files changed

+26
-21
lines changed

5 files changed

+26
-21
lines changed

cpp/examples/ExampleProducerWithTransactionalMessage.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,11 @@ int main(int argc, char* argv[]) {
109109
auto transaction = producer.beginTransaction();
110110
std::error_code ec;
111111

112-
producer.send(std::move(message), ec, *transaction);
112+
SendReceipt send_receipt = producer.send(std::move(message), ec, *transaction);
113113

114114
if (!ec) {
115+
std::cout << "Send transactional message to " << FLAGS_topic << " OK. "
116+
<< "Message-ID: " << send_receipt.message_id << std::endl;
115117
if (!transaction->commit()) {
116118
std::cerr << "Failed to commit message" << std::endl;
117119
}

cpp/include/rocketmq/Producer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ class Producer {
6565

6666
std::unique_ptr<Transaction> beginTransaction();
6767

68-
void send(MessageConstPtr message, std::error_code& ec, Transaction& transaction);
68+
SendReceipt send(MessageConstPtr message, std::error_code& ec, Transaction& transaction);
6969

7070
private:
7171
explicit Producer(std::shared_ptr<ProducerImpl> impl) : impl_(std::move(impl)) {

cpp/source/rocketmq/Producer.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ std::unique_ptr<Transaction> Producer::beginTransaction() {
6464
return impl_->beginTransaction();
6565
}
6666

67-
void Producer::send(MessageConstPtr message, std::error_code& ec, Transaction& transaction) {
68-
impl_->send(std::move(message), ec, transaction);
67+
SendReceipt Producer::send(MessageConstPtr message, std::error_code& ec, Transaction& transaction) {
68+
return impl_->send(std::move(message), ec, transaction);
6969
}
7070

7171
ProducerBuilder Producer::newBuilder() {

cpp/source/rocketmq/ProducerImpl.cpp

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -474,21 +474,25 @@ void ProducerImpl::isolateEndpoint(const std::string& target) {
474474
isolated_endpoints_.insert(target);
475475
}
476476

477-
void ProducerImpl::send(MessageConstPtr message, std::error_code& ec, Transaction& transaction) {
477+
SendReceipt ProducerImpl::send(MessageConstPtr message, std::error_code& ec, Transaction& transaction) {
478478
MiniTransaction mini = {};
479479
mini.topic = message->topic();
480480
mini.trace_context = message->traceContext();
481481

482482
if (!message->group().empty()) {
483483
ec = ErrorCode::MessagePropertyConflictWithType;
484484
SPDLOG_WARN("FIFO message may not be transactional");
485-
return;
485+
SendReceipt send_receipt{};
486+
send_receipt.message = std::move(message);
487+
return send_receipt;
486488
}
487489

488490
if (message->deliveryTimestamp().time_since_epoch().count()) {
489491
ec = ErrorCode::MessagePropertyConflictWithType;
490492
SPDLOG_WARN("Timed message may not be transactional");
491-
return;
493+
SendReceipt send_receipt{};
494+
send_receipt.message = std::move(message);
495+
return send_receipt;
492496
}
493497

494498
Message* msg = const_cast<Message*>(message.get());
@@ -501,6 +505,8 @@ void ProducerImpl::send(MessageConstPtr message, std::error_code& ec, Transactio
501505
mini.target = send_receipt.target;
502506
auto& impl = dynamic_cast<TransactionImpl&>(transaction);
503507
impl.appendMiniTransaction(mini);
508+
509+
return send_receipt;
504510
}
505511

506512
void ProducerImpl::getPublishInfoAsync(const std::string& topic, const PublishInfoCallback& cb) {

cpp/source/rocketmq/include/ProducerImpl.h

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -49,20 +49,20 @@ class ProducerImpl : virtual public ClientImpl, public std::enable_shared_from_t
4949
void shutdown() override;
5050

5151
/**
52-
* Note we requrie application to transfer ownership of the message to send to avoid concurrent modification during
53-
* sent.
52+
* Note we require application to transfer ownership of the message
53+
* to send to avoid concurrent modification during sent.
5454
*
55-
* Regardless of the send result, SendReceipt would have the std::unique_ptr<const Message>, facilliating
56-
* application to conduct customized retry policy.
55+
* Regardless of the send result, SendReceipt would have the std::unique_ptr<const Message>,
56+
* facilitating application to conduct customized retry policy.
5757
*/
5858
SendReceipt send(MessageConstPtr message, std::error_code& ec) noexcept;
5959

6060
/**
61-
* Note we requrie application to transfer ownership of the message to send to avoid concurrent modification during
62-
* sent.
61+
* Note we require application to transfer ownership of the message
62+
* to send to avoid concurrent modification during sent.
6363
*
64-
* Regardless of the send result, SendReceipt would have the std::unique_ptr<const Message>, facilliating
65-
* application to conduct customized retry policy.
64+
* Regardless of the send result, SendReceipt would have the std::unique_ptr<const Message>,
65+
* facilitating application to conduct customized retry policy.
6666
*/
6767
void send(MessageConstPtr message, SendCallback callback);
6868

@@ -74,13 +74,10 @@ class ProducerImpl : virtual public ClientImpl, public std::enable_shared_from_t
7474
}
7575

7676
/**
77-
* Note we requrie application to transfer ownership of the message to send to avoid concurrent modification during
78-
* sent.
79-
*
80-
* TODO: Refine this API. Current API is not good enough as it cannot handle the message back to its caller on publish
81-
* failure.
77+
* Note we require application to transfer ownership of the message
78+
* to send to avoid concurrent modification during sent.
8279
*/
83-
void send(MessageConstPtr message, std::error_code& ec, Transaction& transaction);
80+
SendReceipt send(MessageConstPtr message, std::error_code& ec, Transaction& transaction);
8481

8582
/**
8683
* Check if the RPC client for the target host is isolated or not

0 commit comments

Comments
 (0)