Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cpp/examples/ExampleProducerWithTransactionalMessage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,11 @@ int main(int argc, char* argv[]) {
auto transaction = producer.beginTransaction();
std::error_code ec;

producer.send(std::move(message), ec, *transaction);
SendReceipt send_receipt = producer.send(std::move(message), ec, *transaction);

if (!ec) {
std::cout << "Send transactional message to " << FLAGS_topic << " OK. "
<< "Message-ID: " << send_receipt.message_id << std::endl;
if (!transaction->commit()) {
std::cerr << "Failed to commit message" << std::endl;
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/include/rocketmq/Producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class Producer {

std::unique_ptr<Transaction> beginTransaction();

void send(MessageConstPtr message, std::error_code& ec, Transaction& transaction);
SendReceipt send(MessageConstPtr message, std::error_code& ec, Transaction& transaction);

private:
explicit Producer(std::shared_ptr<ProducerImpl> impl) : impl_(std::move(impl)) {
Expand Down
4 changes: 2 additions & 2 deletions cpp/source/rocketmq/Producer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ std::unique_ptr<Transaction> Producer::beginTransaction() {
return impl_->beginTransaction();
}

void Producer::send(MessageConstPtr message, std::error_code& ec, Transaction& transaction) {
impl_->send(std::move(message), ec, transaction);
SendReceipt Producer::send(MessageConstPtr message, std::error_code& ec, Transaction& transaction) {
return impl_->send(std::move(message), ec, transaction);
}

ProducerBuilder Producer::newBuilder() {
Expand Down
12 changes: 9 additions & 3 deletions cpp/source/rocketmq/ProducerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -474,21 +474,25 @@ void ProducerImpl::isolateEndpoint(const std::string& target) {
isolated_endpoints_.insert(target);
}

void ProducerImpl::send(MessageConstPtr message, std::error_code& ec, Transaction& transaction) {
SendReceipt ProducerImpl::send(MessageConstPtr message, std::error_code& ec, Transaction& transaction) {
MiniTransaction mini = {};
mini.topic = message->topic();
mini.trace_context = message->traceContext();

if (!message->group().empty()) {
ec = ErrorCode::MessagePropertyConflictWithType;
SPDLOG_WARN("FIFO message may not be transactional");
return;
SendReceipt send_receipt{};
send_receipt.message = std::move(message);
return send_receipt;
}

if (message->deliveryTimestamp().time_since_epoch().count()) {
ec = ErrorCode::MessagePropertyConflictWithType;
SPDLOG_WARN("Timed message may not be transactional");
return;
SendReceipt send_receipt{};
send_receipt.message = std::move(message);
return send_receipt;
}

Message* msg = const_cast<Message*>(message.get());
Expand All @@ -501,6 +505,8 @@ void ProducerImpl::send(MessageConstPtr message, std::error_code& ec, Transactio
mini.target = send_receipt.target;
auto& impl = dynamic_cast<TransactionImpl&>(transaction);
impl.appendMiniTransaction(mini);

return send_receipt;
}

void ProducerImpl::getPublishInfoAsync(const std::string& topic, const PublishInfoCallback& cb) {
Expand Down
25 changes: 11 additions & 14 deletions cpp/source/rocketmq/include/ProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,20 @@ class ProducerImpl : virtual public ClientImpl, public std::enable_shared_from_t
void shutdown() override;

/**
* Note we requrie application to transfer ownership of the message to send to avoid concurrent modification during
* sent.
* Note we require application to transfer ownership of the message
* to send to avoid concurrent modification during sent.
*
* Regardless of the send result, SendReceipt would have the std::unique_ptr<const Message>, facilliating
* application to conduct customized retry policy.
* Regardless of the send result, SendReceipt would have the std::unique_ptr<const Message>,
* facilitating application to conduct customized retry policy.
*/
SendReceipt send(MessageConstPtr message, std::error_code& ec) noexcept;

/**
* Note we requrie application to transfer ownership of the message to send to avoid concurrent modification during
* sent.
* Note we require application to transfer ownership of the message
* to send to avoid concurrent modification during sent.
*
* Regardless of the send result, SendReceipt would have the std::unique_ptr<const Message>, facilliating
* application to conduct customized retry policy.
* Regardless of the send result, SendReceipt would have the std::unique_ptr<const Message>,
* facilitating application to conduct customized retry policy.
*/
void send(MessageConstPtr message, SendCallback callback);

Expand All @@ -74,13 +74,10 @@ class ProducerImpl : virtual public ClientImpl, public std::enable_shared_from_t
}

/**
* Note we requrie application to transfer ownership of the message to send to avoid concurrent modification during
* sent.
*
* TODO: Refine this API. Current API is not good enough as it cannot handle the message back to its caller on publish
* failure.
* Note we require application to transfer ownership of the message
* to send to avoid concurrent modification during sent.
*/
void send(MessageConstPtr message, std::error_code& ec, Transaction& transaction);
SendReceipt send(MessageConstPtr message, std::error_code& ec, Transaction& transaction);

/**
* Check if the RPC client for the target host is isolated or not
Expand Down