Skip to content

Commit a9bb6ff

Browse files
authored
[ISSUE #1035] [C++] Supports configuring the number of client threads (#1155)
1 parent 576e61b commit a9bb6ff

File tree

13 files changed

+38
-16
lines changed

13 files changed

+38
-16
lines changed

cpp/include/rocketmq/Configuration.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ class Configuration {
5151
return tls_;
5252
}
5353

54+
std::uint32_t callbackThreads() const {
55+
return callback_threads_;
56+
}
57+
5458
protected:
5559
friend class ConfigurationBuilder;
5660

@@ -62,6 +66,7 @@ class Configuration {
6266
CredentialsProviderPtr credentials_provider_;
6367
std::chrono::milliseconds request_timeout_{ConfigurationDefaults::RequestTimeout};
6468
bool tls_ = true;
69+
std::uint32_t callback_threads_{2};
6570
};
6671

6772
class ConfigurationBuilder {
@@ -76,6 +81,8 @@ class ConfigurationBuilder {
7681

7782
ConfigurationBuilder& withSsl(bool with_ssl);
7883

84+
ConfigurationBuilder& withCallbackThreads(std::uint32_t callback_threads);
85+
7986
Configuration build();
8087

8188
private:

cpp/include/rocketmq/Producer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ class ProducerBuilder {
9494
public:
9595
ProducerBuilder();
9696

97-
ProducerBuilder& withConfiguration(Configuration configuration);
97+
ProducerBuilder& withConfiguration(const Configuration& configuration);
9898

9999
ProducerBuilder& withTopics(const std::vector<std::string>& topics);
100100

cpp/source/base/Configuration.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ ConfigurationBuilder& ConfigurationBuilder::withSsl(bool with_ssl) {
4848
return *this;
4949
}
5050

51+
ConfigurationBuilder& ConfigurationBuilder::withCallbackThreads(std::uint32_t callback_threads){
52+
configuration_.callback_threads_ = callback_threads;
53+
return *this;
54+
}
55+
5156
Configuration ConfigurationBuilder::build() {
5257
return std::move(configuration_);
5358
}

cpp/source/base/ThreadPoolImpl.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222
#include "rocketmq/RocketMQ.h"
2323
#include "rocketmq/State.h"
2424
#include "spdlog/spdlog.h"
25-
#include <atomic>
26-
#include <cstdint>
2725
#include <exception>
2826
#include <system_error>
2927

@@ -33,6 +31,7 @@ ThreadPoolImpl::ThreadPoolImpl(std::uint16_t workers)
3331
: work_guard_(
3432
absl::make_unique<asio::executor_work_guard<asio::io_context::executor_type>>(context_.get_executor())),
3533
workers_(workers) {
34+
SPDLOG_INFO("ThreadPoolImpl created worker threads {}", workers);
3635
}
3736

3837
void ThreadPoolImpl::start() {

cpp/source/client/ClientManagerImpl.cpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
#include "ReceiveMessageContext.h"
3333
#include "RpcClient.h"
3434
#include "RpcClientImpl.h"
35-
#include "Scheduler.h"
3635
#include "SchedulerImpl.h"
3736
#include "UtilAll.h"
3837
#include "google/protobuf/util/time_util.h"
@@ -42,11 +41,11 @@
4241

4342
ROCKETMQ_NAMESPACE_BEGIN
4443

45-
ClientManagerImpl::ClientManagerImpl(std::string resource_namespace, bool with_ssl)
46-
: scheduler_(std::make_shared<SchedulerImpl>()),
44+
ClientManagerImpl::ClientManagerImpl(std::string resource_namespace, bool with_ssl, int thread_count)
45+
: scheduler_(std::make_shared<SchedulerImpl>(2)),
4746
resource_namespace_(std::move(resource_namespace)),
4847
state_(State::CREATED),
49-
callback_thread_pool_(absl::make_unique<ThreadPoolImpl>(std::thread::hardware_concurrency())),
48+
callback_thread_pool_(absl::make_unique<ThreadPoolImpl>(thread_count)),
5049
with_ssl_(with_ssl) {
5150

5251
certificate_verifier_ = grpc::experimental::ExternalCertificateVerifier::Create<InsecureCertificateVerifier>();

cpp/source/client/include/ClientConfig.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ struct ClientConfig {
6262
SubscriberConfig subscriber;
6363
Metric metric;
6464
bool withSsl;
65+
std::uint32_t callback_threads{2};
6566
std::unique_ptr<opencensus::trace::Sampler> sampler_;
6667
};
6768

cpp/source/client/include/ClientManagerImpl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class ClientManagerImpl : virtual public ClientManager, public std::enable_share
4848
* effectively.
4949
* @param resource_namespace Abstract resource namespace, in which this client manager lives.
5050
*/
51-
explicit ClientManagerImpl(std::string resource_namespace, bool with_ssl = true);
51+
explicit ClientManagerImpl(std::string resource_namespace, bool with_ssl = true, int thread_count = 1);
5252

5353
~ClientManagerImpl() override;
5454

cpp/source/rocketmq/ClientImpl.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,9 @@ void ClientImpl::start() {
108108
client_config_.client_id = clientId();
109109
if (!client_manager_) {
110110
client_manager_ = std::make_shared<ClientManagerImpl>(
111-
client_config_.resource_namespace, client_config_.withSsl);
111+
client_config_.resource_namespace,
112+
client_config_.withSsl,
113+
client_config_.callback_threads);
112114
client_manager_->start();
113115
}
114116

@@ -118,6 +120,11 @@ void ClientImpl::start() {
118120
return;
119121
}
120122

123+
// A gRPC I/O thread pool is created upon establishing a connection.
124+
// - https://github.com/grpc/grpc/issues/28642
125+
// - https://github.com/grpc/grpc/pull/31662
126+
// The source code initializes the number of I/O threads as follows:
127+
// int num_io_threads = grpc_core::Clamp(gpr_cpu_num_cores() / 2, 2u, 16u);
121128
while (true) {
122129
createSession(endpoint, false);
123130
{

cpp/source/rocketmq/Producer.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,14 +76,15 @@ ProducerBuilder Producer::newBuilder() {
7676
return {};
7777
}
7878

79-
ProducerBuilder::ProducerBuilder() : impl_(std::make_shared<ProducerImpl>()){};
79+
ProducerBuilder::ProducerBuilder() : impl_(std::make_shared<ProducerImpl>()){}
8080

81-
ProducerBuilder& ProducerBuilder::withConfiguration(Configuration configuration) {
81+
ProducerBuilder& ProducerBuilder::withConfiguration(const Configuration& configuration) {
8282
auto name_server_resolver = std::make_shared<StaticNameServerResolver>(configuration.endpoints());
8383
impl_->withNameServerResolver(std::move(name_server_resolver));
8484
impl_->withResourceNamespace(configuration.resourceNamespace());
8585
impl_->withCredentialsProvider(configuration.credentialsProvider());
8686
impl_->withRequestTimeout(configuration.requestTimeout());
87+
impl_->withCallbackThreads(configuration.callbackThreads());
8788
impl_->withSsl(configuration.withSsl());
8889
return *this;
8990
}

cpp/source/rocketmq/PushConsumer.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,11 @@ PushConsumer PushConsumerBuilder::build() {
4444
impl->consumeThreadPoolSize(consume_thread_);
4545
impl->withNameServerResolver(std::make_shared<StaticNameServerResolver>(configuration_.endpoints()));
4646
impl->withResourceNamespace(configuration_.resourceNamespace());
47-
impl->withSsl(configuration_.withSsl());
4847
impl->withCredentialsProvider(configuration_.credentialsProvider());
4948
impl->withRequestTimeout(configuration_.requestTimeout());
5049
impl->withFifoConsumeAccelerator(fifo_consume_accelerator_);
50+
impl->withCallbackThreads(configuration_.callbackThreads());
51+
impl->withSsl(configuration_.withSsl());
5152
impl->start();
5253
return PushConsumer(impl);
5354
}

0 commit comments

Comments
 (0)