Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/cpp_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
# Disable VS 2022 before https://github.com/bazelbuild/bazel/issues/18592 issue is solved
# Remove macos-11 since there is no such runner available
# os: [ubuntu-20.04, ubuntu-22.04, macos-11, macos-12, windows-2019, windows-2022]
os: [ubuntu-20.04, ubuntu-22.04, macos-12, windows-2019]
os: [ubuntu-20.04, ubuntu-22.04, windows-2019]
steps:
- uses: actions/checkout@v2
- name: Compile On Linux
Expand Down
1 change: 1 addition & 0 deletions cpp/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ bazel-rocketmq-client-cpp
/compile_commands.json
/.cache/
.clangd
build
5 changes: 2 additions & 3 deletions cpp/examples/ExampleFifoProducer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
#include "rocketmq/FifoProducer.h"
#include "rocketmq/Logger.h"
#include "rocketmq/Message.h"
#include "rocketmq/Producer.h"
#include "rocketmq/SendReceipt.h"

using namespace ROCKETMQ_NAMESPACE;
Expand Down Expand Up @@ -93,8 +92,8 @@ std::string randomString(std::string::size_type len) {
return result;
}

DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are published");
DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider");
DEFINE_string(topic, "FifoTopic", "Topic to which messages are published");
DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by your service provider");
DEFINE_int32(message_body_size, 4096, "Message body size");
DEFINE_uint32(total, 256, "Number of sample messages to publish");
DEFINE_string(access_key, "", "Your access key ID");
Expand Down
4 changes: 2 additions & 2 deletions cpp/examples/ExampleProducer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ std::string randomString(std::string::size_type len) {
return result;
}

DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are published");
DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider");
DEFINE_string(topic, "NormalTopic", "Topic to which messages are published");
DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by your service provider");
DEFINE_int32(message_body_size, 4096, "Message body size");
DEFINE_uint32(total, 256, "Number of sample messages to publish");
DEFINE_string(access_key, "", "Your access key ID");
Expand Down
4 changes: 2 additions & 2 deletions cpp/examples/ExampleProducerWithAsync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ std::string randomString(std::string::size_type len) {
return result;
}

DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are published");
DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider");
DEFINE_string(topic, "NormalTopic", "Topic to which messages are published");
DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by your service provider");
DEFINE_int32(message_body_size, 4096, "Message body size");
DEFINE_uint32(total, 256, "Number of sample messages to publish");
DEFINE_uint32(concurrency, 128, "Concurrency of async send");
Expand Down
4 changes: 2 additions & 2 deletions cpp/examples/ExampleProducerWithFifoMessage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ std::string randomString(std::string::size_type len) {
return result;
}

DEFINE_string(topic, "fifo_topic_sample", "Topic to which messages are published");
DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider");
DEFINE_string(topic, "FifoTopic", "Topic to which messages are published");
DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by your service provider");
DEFINE_int32(message_body_size, 4096, "Message body size");
DEFINE_uint32(total, 256, "Number of sample messages to publish");
DEFINE_string(access_key, "", "Your access key ID");
Expand Down
5 changes: 2 additions & 3 deletions cpp/examples/ExampleProducerWithTimedMessage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include <algorithm>
#include <atomic>
#include <chrono>
#include <cstddef>
#include <iostream>
#include <random>
#include <string>
Expand Down Expand Up @@ -50,8 +49,8 @@ std::string randomString(std::string::size_type len) {
return result;
}

DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are published");
DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider");
DEFINE_string(topic, "TimerTopic", "Topic to which messages are published");
DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by your service provider");
DEFINE_int32(message_body_size, 4096, "Message body size");
DEFINE_uint32(total, 256, "Number of sample messages to publish");
DEFINE_string(access_key, "", "Your access key ID");
Expand Down
4 changes: 2 additions & 2 deletions cpp/examples/ExampleProducerWithTransactionalMessage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ std::string randomString(std::string::size_type len) {
return result;
}

DEFINE_string(topic, "tx_topic_sample", "Topic to which messages are published");
DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider");
DEFINE_string(topic, "TransTopic", "Topic to which messages are published");
DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by your service provider");
DEFINE_int32(message_body_size, 4096, "Message body size");
DEFINE_uint32(total, 256, "Number of sample messages to publish");
DEFINE_string(access_key, "", "Your access key ID");
Expand Down
6 changes: 3 additions & 3 deletions cpp/examples/ExamplePushConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@

using namespace ROCKETMQ_NAMESPACE;

DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are published");
DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider");
DEFINE_string(group, "CID_standard_topic_sample", "GroupId, created through your instance management console");
DEFINE_string(topic, "NormalTopic", "Topic to which messages are published");
DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by your service provider");
DEFINE_string(group, "PushConsumer", "GroupId, created through your instance management console");
DEFINE_string(access_key, "", "Your access key ID");
DEFINE_string(access_secret, "", "Your access secret");
DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");
Expand Down
6 changes: 3 additions & 3 deletions cpp/examples/ExampleSimpleConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@

using namespace ROCKETMQ_NAMESPACE;

DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are published");
DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider");
DEFINE_string(group, "CID_standard_topic_sample", "GroupId, created through your instance management console");
DEFINE_string(topic, "NormalTopic", "Topic to which messages are published");
DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by your service provider");
DEFINE_string(group, "SimpleConsumer", "GroupId, created through your instance management console");
DEFINE_string(access_key, "", "Your access key ID");
DEFINE_string(access_secret, "", "Your access secret");
DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");
Expand Down
4 changes: 2 additions & 2 deletions cpp/source/base/include/InvocationContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ struct InvocationContext : public BaseInvocationContext {

if (!status.ok() && grpc::StatusCode::DEADLINE_EXCEEDED == status.error_code()) {
auto diff =
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - context.deadline())
.count();
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now() - context.deadline()).count();
SPDLOG_WARN("Asynchronous RPC[{}.{}] timed out, elapsing {}ms, deadline-over-due: {}ms",
absl::FormatTime(created_time, absl::UTCTimeZone()), elapsed, diff);
}
Expand Down
11 changes: 6 additions & 5 deletions cpp/source/client/ClientManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ ClientManagerImpl::ClientManagerImpl(std::string resource_namespace, bool with_s
state_(State::CREATED),
callback_thread_pool_(absl::make_unique<ThreadPoolImpl>(std::thread::hardware_concurrency())),
with_ssl_(with_ssl) {

certificate_verifier_ = grpc::experimental::ExternalCertificateVerifier::Create<InsecureCertificateVerifier>();
tls_channel_credential_options_.set_verify_server_certs(false);
tls_channel_credential_options_.set_check_call_host(false);
Expand Down Expand Up @@ -78,7 +79,7 @@ ClientManagerImpl::ClientManagerImpl(std::string resource_namespace, bool with_s
*/
channel_arguments_.SetInt(GRPC_ARG_ENABLE_RETRIES, 0);

channel_arguments_.SetSslTargetNameOverride("localhost");
// channel_arguments_.SetSslTargetNameOverride("localhost");

SPDLOG_INFO("ClientManager[ResourceNamespace={}] created", resource_namespace_);
}
Expand Down Expand Up @@ -282,7 +283,7 @@ bool ClientManagerImpl::send(const std::string& target_host,
SendMessageRequest& request,
SendResultCallback cb) {
assert(cb);
SPDLOG_DEBUG("Prepare to send message to {} asynchronously. Request: {}", target_host, request.DebugString());
SPDLOG_DEBUG("Prepare to send message to {} asynchronously. Request: {}", target_host, request.ShortDebugString());
RpcClientSharedPtr client = getRpcClient(target_host);
// Invocation context will be deleted in its onComplete() method.
auto invocation_context = new InvocationContext<SendMessageResponse>();
Expand Down Expand Up @@ -440,7 +441,7 @@ bool ClientManagerImpl::send(const std::string& target_host,

case rmq::Code::MESSAGE_PROPERTY_CONFLICT_WITH_TYPE: {
SPDLOG_WARN("Message-property-conflict-with-type: Host={}, Response={}", invocation_context->remote_address,
invocation_context->response.DebugString());
invocation_context->response.ShortDebugString());
send_result.ec = ErrorCode::MessagePropertyConflictWithType;
break;
}
Expand Down Expand Up @@ -482,7 +483,7 @@ RpcClientSharedPtr ClientManagerImpl::getRpcClient(const std::string& target_hos
auto search = rpc_clients_.find(target_host);
if (search == rpc_clients_.end() || !search->second->ok()) {
if (search == rpc_clients_.end()) {
SPDLOG_INFO("Create a RPC client to {}", target_host.data());
SPDLOG_INFO("Create a RPC client to [{}]", target_host.data());
} else if (!search->second->ok()) {
SPDLOG_INFO("Prior RPC client to {} is not OK. Re-create one", target_host);
}
Expand Down Expand Up @@ -549,7 +550,7 @@ void ClientManagerImpl::resolveRoute(const std::string& target_host,
std::chrono::milliseconds timeout,
const std::function<void(const std::error_code&, const TopicRouteDataPtr&)>& cb) {
SPDLOG_DEBUG("Name server connection URL: {}", target_host);
SPDLOG_DEBUG("Query route request: {}", request.DebugString());
SPDLOG_DEBUG("Query route request: {}", request.ShortDebugString());
RpcClientSharedPtr client = getRpcClient(target_host, false);
if (!client) {
SPDLOG_WARN("Failed to create RPC client for name server[host={}]", target_host);
Expand Down
14 changes: 7 additions & 7 deletions cpp/source/client/LogInterceptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ void LogInterceptor::Intercept(grpc::experimental::InterceptorBatchMethods* meth
if (methods->QueryInterceptionHookPoint(grpc::experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) {
std::multimap<std::string, std::string>* metadata = methods->GetSendInitialMetadata();
if (metadata) {
SPDLOG_DEBUG("[Outbound]Headers of {}: \n{}", client_rpc_info_->method(),
absl::StrJoin(*metadata, "\n", absl::PairFormatter(" --> ")));
SPDLOG_DEBUG("[Outbound]Headers of {}: {}", client_rpc_info_->method(),
absl::StrJoin(*metadata, " ", absl::PairFormatter(" --> ")));
}
}

Expand All @@ -73,8 +73,8 @@ void LogInterceptor::Intercept(grpc::experimental::InterceptorBatchMethods* meth
absl::string_view(it.second.data(), it.second.length())});
}
if (!response_headers.empty()) {
SPDLOG_DEBUG("[Inbound]Response Headers of {}:\n{}", client_rpc_info_->method(),
absl::StrJoin(response_headers, "\n", absl::PairFormatter(" --> ")));
SPDLOG_DEBUG("[Inbound]Response Headers of {}: {}", client_rpc_info_->method(),
absl::StrJoin(response_headers, " ", absl::PairFormatter(" --> ")));
} else {
SPDLOG_DEBUG("[Inbound]Response metadata of {} is empty", client_rpc_info_->method());
}
Expand All @@ -85,12 +85,12 @@ void LogInterceptor::Intercept(grpc::experimental::InterceptorBatchMethods* meth
void* message = methods->GetRecvMessage();
if (message) {
auto* response = reinterpret_cast<google::protobuf::Message*>(message);
std::string&& response_text = response->DebugString();
std::string&& response_text = response->ShortDebugString();
std::size_t limit = 1024;
if (response_text.size() <= limit) {
SPDLOG_DEBUG("[Inbound] {}\n{}", client_rpc_info_->method(), response_text);
SPDLOG_DEBUG("[Inbound] {} {}", client_rpc_info_->method(), response_text);
} else {
SPDLOG_DEBUG("[Inbound] {}\n{}...", client_rpc_info_->method(), response_text.substr(0, limit));
SPDLOG_DEBUG("[Inbound] {} {}...", client_rpc_info_->method(), response_text.substr(0, limit));
}
}
}
Expand Down
2 changes: 0 additions & 2 deletions cpp/source/client/RpcClientImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
#include "RpcClientImpl.h"

#include <chrono>
#include <functional>
#include <sstream>
#include <thread>
Expand All @@ -26,7 +25,6 @@
#include "RpcClient.h"
#include "TelemetryBidiReactor.h"
#include "TlsHelper.h"
#include "absl/time/time.h"

ROCKETMQ_NAMESPACE_BEGIN

Expand Down
2 changes: 1 addition & 1 deletion cpp/source/client/SessionImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ bool SessionImpl::await() {

void SessionImpl::syncSettings() {
auto ptr = client_.lock();
SPDLOG_INFO("Sync client settings to {}", rpc_client_->remoteAddress());
SPDLOG_INFO("Request client settings to {}", rpc_client_->remoteAddress());
TelemetryCommand command;
command.mutable_settings()->CopyFrom(ptr->clientSettings());
telemetry_->write(command);
Expand Down
22 changes: 10 additions & 12 deletions cpp/source/client/TelemetryBidiReactor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,12 @@
*/
#include "TelemetryBidiReactor.h"

#include <atomic>
#include <cstdint>
#include <memory>
#include <utility>

#include "ClientManager.h"
#include "MessageExt.h"
#include "Metadata.h"
#include "RpcClient.h"
#include "Signature.h"
#include "google/protobuf/util/time_util.h"
#include "rocketmq/Logger.h"
Expand Down Expand Up @@ -70,7 +67,7 @@ void TelemetryBidiReactor::OnWriteDone(bool ok) {
RemoveHold();

if (!ok) {
SPDLOG_WARN("Failed to write telemetry command {} to {}", writes_.front().DebugString(), peer_address_);
SPDLOG_WARN("Failed to write telemetry command {} to {}", writes_.front().ShortDebugString(), peer_address_);
signalClose();
return;
}
Expand All @@ -91,7 +88,7 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
if (!ok) {
// for read stream
RemoveHold();
SPDLOG_WARN("Failed to read from telemetry stream from {}", peer_address_);
// SPDLOG_WARN("Failed to read from telemetry stream from {}", peer_address_);
signalClose();
return;
}
Expand All @@ -103,7 +100,7 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
}
}

SPDLOG_DEBUG("Read a telemetry command from {}: {}", peer_address_, read_.DebugString());
SPDLOG_DEBUG("Read a telemetry command from {}: {}", peer_address_, read_.ShortDebugString());
auto client = client_.lock();
if (!client) {
SPDLOG_INFO("Client for {} has destructed", peer_address_);
Expand All @@ -114,19 +111,20 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
switch (read_.command_case()) {
case rmq::TelemetryCommand::kSettings: {
auto settings = read_.settings();
SPDLOG_INFO("Received settings from {}: {}", peer_address_, settings.DebugString());
SPDLOG_INFO("Receive settings from {}: {}", peer_address_, settings.ShortDebugString());
applySettings(settings);
sync_settings_promise_.set_value(true);
break;
}

case rmq::TelemetryCommand::kRecoverOrphanedTransactionCommand: {
SPDLOG_DEBUG("Receive orphan transaction command: {}", read_.DebugString());
auto message = client->manager()->wrapMessage(read_.release_verify_message_command()->message());
SPDLOG_INFO("Receive orphan transaction command: {}", read_.ShortDebugString());
auto message = client->manager()->wrapMessage(
read_.recover_orphaned_transaction_command().message());
auto raw = const_cast<Message*>(message.get());
raw->mutableExtension().target_endpoint = peer_address_;
raw->mutableExtension().transaction_id = read_.recover_orphaned_transaction_command().transaction_id();
client->recoverOrphanedTransaction(message);

break;
}

Expand Down Expand Up @@ -156,7 +154,7 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
}

default: {
SPDLOG_WARN("Unsupported command");
SPDLOG_WARN("Telemetry command receive unsupported command");
break;
}
}
Expand Down Expand Up @@ -291,7 +289,7 @@ void TelemetryBidiReactor::tryWriteNext() {
}

if (!writes_.empty()) {
SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_, writes_.front().DebugString());
SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_, writes_.front().ShortDebugString());
AddHold();
StartWrite(&(writes_.front()));
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/source/client/include/TopicRouteData.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class TopicRouteData {

std::string debugString() const {
return absl::StrJoin(message_queues_.begin(), message_queues_.end(), ",",
[](std::string* out, const rmq::MessageQueue& m) { out->append(m.DebugString()); });
[](std::string* out, const rmq::MessageQueue& m) { out->append(m.ShortDebugString()); });
};

private:
Expand Down
3 changes: 2 additions & 1 deletion cpp/source/log/LoggerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ Logger& getLogger() {
const std::size_t LoggerImpl::DEFAULT_MAX_LOG_FILE_QUANTITY = 16;
const std::size_t LoggerImpl::DEFAULT_FILE_SIZE = 1048576 * 256;
const char* LoggerImpl::USER_HOME_ENV = "HOME";
const char* LoggerImpl::DEFAULT_PATTERN = "[%Y/%m/%d-%H:%M:%S.%e %z] [%n] [%^---%L---%$] [thread %t] %v %@";
const char* LoggerImpl::DEFAULT_PATTERN = "%Y-%m-%d %H:%M:%S.%e [%^--%L--%$] [%7t] %v %@";
// const char* LoggerImpl::DEFAULT_PATTERN = "[%Y/%m/%d-%H:%M:%S.%e %z] [%n] [%^---%L---%$] [thread %t] %v %@";

ROCKETMQ_NAMESPACE_END
13 changes: 5 additions & 8 deletions cpp/source/rocketmq/ClientImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@
#include <algorithm>
#include <atomic>
#include <chrono>
#include <cstdint>
#include <cstdlib>
#include <exception>
#include <functional>
#include <iterator>
#include <memory>
#include <string>
#include <system_error>
Expand All @@ -43,9 +41,6 @@
#include "absl/strings/str_split.h"
#include "fmt/format.h"
#include "opencensus/stats/stats.h"
#include "rocketmq/Logger.h"
#include "rocketmq/Message.h"
#include "rocketmq/MessageListener.h"
#include "spdlog/spdlog.h"

ROCKETMQ_NAMESPACE_BEGIN
Expand Down Expand Up @@ -175,12 +170,14 @@ void ClientImpl::start() {
auto telemetry_functor = [ptr]() {
std::shared_ptr<ClientImpl> base = ptr.lock();
if (base) {
SPDLOG_INFO("Sync client settings to servers");
SPDLOG_DEBUG("Sync client settings to servers");
base->syncClientSettings();
}
};
telemetry_handle_ = client_manager_->getScheduler()->schedule(telemetry_functor, TELEMETRY_TASK_NAME,
std::chrono::minutes(5), std::chrono::minutes(5));

telemetry_handle_ = client_manager_->getScheduler()->schedule(
telemetry_functor, TELEMETRY_TASK_NAME,
std::chrono::minutes(5), std::chrono::minutes(5));

auto&& metric_service_endpoint = metricServiceEndpoint();
if (!metric_service_endpoint.empty()) {
Expand Down
Loading
Loading