From af6e44dd1c18c51bc8608fe937d0b2d376b589d6 Mon Sep 17 00:00:00 2001 From: cschuet Date: Sat, 2 Feb 2019 18:09:29 +0100 Subject: [PATCH 1/2] Remove Mutex in two places --- async_grpc/event_queue_thread.cc | 4 ++++ async_grpc/event_queue_thread.h | 1 + async_grpc/rpc.cc | 8 ++------ async_grpc/rpc.h | 2 +- async_grpc/server.cc | 12 +++++------- async_grpc/server.h | 4 +--- async_grpc/service.cc | 29 ++++++++++++++++++++++------- async_grpc/service.h | 7 +++++-- 8 files changed, 41 insertions(+), 26 deletions(-) diff --git a/async_grpc/event_queue_thread.cc b/async_grpc/event_queue_thread.cc index d13f117..492113c 100644 --- a/async_grpc/event_queue_thread.cc +++ b/async_grpc/event_queue_thread.cc @@ -27,6 +27,10 @@ EventQueueThread::EventQueueThread() { EventQueue* EventQueueThread::event_queue() { return event_queue_.get(); } +const EventQueue* EventQueueThread::event_queue() const { + return event_queue_.get(); +} + void EventQueueThread::Start(EventQueueRunner runner) { CHECK(!thread_); EventQueue* event_queue = event_queue_.get(); diff --git a/async_grpc/event_queue_thread.h b/async_grpc/event_queue_thread.h index a9bc598..b8b0d71 100644 --- a/async_grpc/event_queue_thread.h +++ b/async_grpc/event_queue_thread.h @@ -32,6 +32,7 @@ class EventQueueThread { EventQueueThread(); EventQueue* event_queue(); + const EventQueue* event_queue() const; void Start(EventQueueRunner runner); void Shutdown(); diff --git a/async_grpc/rpc.cc b/async_grpc/rpc.cc index f29e84b..eb1ee95 100644 --- a/async_grpc/rpc.cc +++ b/async_grpc/rpc.cc @@ -320,8 +320,6 @@ bool Rpc::IsAnyEventPending() { std::weak_ptr Rpc::GetWeakPtr() { return weak_ptr_factory_(this); } -ActiveRpcs::ActiveRpcs() : lock_() {} - void Rpc::InitializeReadersAndWriters( ::grpc::internal::RpcMethod::RpcType rpc_type) { switch (rpc_type) { @@ -349,15 +347,15 @@ void Rpc::InitializeReadersAndWriters( } } +ActiveRpcs::ActiveRpcs() {} + ActiveRpcs::~ActiveRpcs() { - common::MutexLocker locker(&lock_); if (!rpcs_.empty()) { LOG(FATAL) << "RPCs still in flight!"; } } std::shared_ptr ActiveRpcs::Add(std::unique_ptr rpc) { - common::MutexLocker locker(&lock_); std::shared_ptr shared_ptr_rpc = std::move(rpc); const auto result = rpcs_.emplace(shared_ptr_rpc.get(), shared_ptr_rpc); CHECK(result.second) << "RPC already active."; @@ -365,7 +363,6 @@ std::shared_ptr ActiveRpcs::Add(std::unique_ptr rpc) { } bool ActiveRpcs::Remove(Rpc* rpc) { - common::MutexLocker locker(&lock_); auto it = rpcs_.find(rpc); if (it != rpcs_.end()) { rpcs_.erase(it); @@ -379,7 +376,6 @@ Rpc::WeakPtrFactory ActiveRpcs::GetWeakPtrFactory() { } std::weak_ptr ActiveRpcs::GetWeakPtr(Rpc* rpc) { - common::MutexLocker locker(&lock_); auto it = rpcs_.find(rpc); CHECK(it != rpcs_.end()); return it->second; diff --git a/async_grpc/rpc.h b/async_grpc/rpc.h index 6f01d08..f518f20 100644 --- a/async_grpc/rpc.h +++ b/async_grpc/rpc.h @@ -120,6 +120,7 @@ class Rpc { bool IsRpcEventPending(Event event); bool IsAnyEventPending(); void SetEventQueue(EventQueue* event_queue) { event_queue_ = event_queue; } + const EventQueue* event_queue() const { return event_queue_; } EventQueue* event_queue() { return event_queue_; } std::weak_ptr GetWeakPtr(); RpcHandlerInterface* handler() { return handler_.get(); } @@ -202,7 +203,6 @@ class ActiveRpcs { private: std::weak_ptr GetWeakPtr(Rpc* rpc); - common::Mutex lock_; std::map> rpcs_; }; diff --git a/async_grpc/server.cc b/async_grpc/server.cc index 8b58d92..a628116 100644 --- a/async_grpc/server.cc +++ b/async_grpc/server.cc @@ -125,7 +125,7 @@ void Server::AddService( const auto result = services_.emplace( std::piecewise_construct, std::make_tuple(service_name), std::make_tuple(service_name, rpc_handler_infos, - [this]() { return SelectNextEventQueueRoundRobin(); })); + [this]() { return SelectNextEventQueue(); })); CHECK(result.second) << "A service named " << service_name << " already exists."; server_builder_.RegisterService(&result.first->second); @@ -142,11 +142,9 @@ void Server::RunCompletionQueue( } } -EventQueue* Server::SelectNextEventQueueRoundRobin() { - common::MutexLocker locker(¤t_event_queue_id_lock_); - current_event_queue_id_ = - (current_event_queue_id_ + 1) % options_.num_event_threads; - return event_queue_threads_.at(current_event_queue_id_).event_queue(); +EventQueue* Server::SelectNextEventQueue() { + return event_queue_threads_.at(rand() % event_queue_threads_.size()) + .event_queue(); } void Server::RunEventQueue(EventQueue* event_queue) { @@ -184,7 +182,7 @@ void Server::Start() { // Start serving all services on all completion queues. for (auto& service : services_) { - service.second.StartServing(completion_queue_threads_, + service.second.StartServing(event_queue_threads_, completion_queue_threads_, execution_context_.get()); } diff --git a/async_grpc/server.h b/async_grpc/server.h index e396ed4..893b760 100644 --- a/async_grpc/server.h +++ b/async_grpc/server.h @@ -194,7 +194,7 @@ class Server { Server& operator=(const Server&) = delete; void RunCompletionQueue(::grpc::ServerCompletionQueue* completion_queue); void RunEventQueue(Rpc::EventQueue* event_queue); - Rpc::EventQueue* SelectNextEventQueueRoundRobin(); + Rpc::EventQueue* SelectNextEventQueue(); Options options_; @@ -209,8 +209,6 @@ class Server { // Threads processing RPC events. std::vector event_queue_threads_; - common::Mutex current_event_queue_id_lock_; - int current_event_queue_id_ = 0; // Map of service names to services. std::map services_; diff --git a/async_grpc/service.cc b/async_grpc/service.cc index d99529d..19b96b7 100644 --- a/async_grpc/service.cc +++ b/async_grpc/service.cc @@ -38,15 +38,27 @@ Service::Service(const std::string& service_name, } void Service::StartServing( + const std::vector& event_queue_threads, std::vector& completion_queue_threads, ExecutionContext* execution_context) { + CHECK(active_rpcs_.empty()); int i = 0; + + for (const auto& event_queue_thread : event_queue_threads) { + const auto* event_queue = event_queue_thread.event_queue(); + // TODO(cschuet): Prettify. + active_rpcs_[event_queue]; + LOG(INFO) << "Creating ActiveRpcs"; + } + for (const auto& rpc_handler_info : rpc_handler_infos_) { for (auto& completion_queue_thread : completion_queue_threads) { - std::shared_ptr rpc = active_rpcs_.Add(common::make_unique( + EventQueue* event_queue = event_queue_selector_(); + auto& active_rpcs = active_rpcs_.at(event_queue); + std::shared_ptr rpc = active_rpcs.Add(common::make_unique( i, completion_queue_thread.completion_queue(), event_queue_selector_(), execution_context, rpc_handler_info.second, - this, active_rpcs_.GetWeakPtrFactory())); + this, active_rpcs.GetWeakPtrFactory())); rpc->RequestNextMethodInvocation(); } ++i; @@ -81,13 +93,13 @@ void Service::HandleNewConnection(Rpc* rpc, bool ok) { if (ok) { LOG(WARNING) << "Server shutting down. Refusing to handle new RPCs."; } - active_rpcs_.Remove(rpc); + active_rpcs_.at(rpc->event_queue()).Remove(rpc); return; } if (!ok) { LOG(ERROR) << "Failed to establish connection for unknown reason."; - active_rpcs_.Remove(rpc); + active_rpcs_.at(rpc->event_queue()).Remove(rpc); } if (ok) { @@ -97,8 +109,11 @@ void Service::HandleNewConnection(Rpc* rpc, bool ok) { // Create new active rpc to handle next connection and register it for the // incoming connection. Assign event queue in a round-robin fashion. std::unique_ptr new_rpc = rpc->Clone(); - new_rpc->SetEventQueue(event_queue_selector_()); - active_rpcs_.Add(std::move(new_rpc))->RequestNextMethodInvocation(); + auto* next_event_queue = event_queue_selector_(); + new_rpc->SetEventQueue(next_event_queue); + active_rpcs_.at(next_event_queue) + .Add(std::move(new_rpc)) + ->RequestNextMethodInvocation(); } void Service::HandleRead(Rpc* rpc, bool ok) { @@ -139,7 +154,7 @@ void Service::HandleDone(Rpc* rpc, bool ok) { RemoveIfNotPending(rpc); } void Service::RemoveIfNotPending(Rpc* rpc) { if (!rpc->IsAnyEventPending()) { - active_rpcs_.Remove(rpc); + active_rpcs_.at(rpc->event_queue()).Remove(rpc); } } diff --git a/async_grpc/service.h b/async_grpc/service.h index d5f0ed3..df27aa3 100644 --- a/async_grpc/service.h +++ b/async_grpc/service.h @@ -17,6 +17,8 @@ #ifndef CPP_GRPC_SERVICE_H #define CPP_GRPC_SERVICE_H +#include + #include "async_grpc/completion_queue_thread.h" #include "async_grpc/event_queue_thread.h" #include "async_grpc/execution_context.h" @@ -38,7 +40,8 @@ class Service : public ::grpc::Service { Service(const std::string& service_name, const std::map& rpc_handlers, EventQueueSelector event_queue_selector); - void StartServing(std::vector& completion_queues, + void StartServing(const std::vector& event_queue_threads, + std::vector& completion_queues, ExecutionContext* execution_context); void HandleEvent(Rpc::Event event, Rpc* rpc, bool ok); void StopServing(); @@ -54,7 +57,7 @@ class Service : public ::grpc::Service { std::map rpc_handler_infos_; EventQueueSelector event_queue_selector_; - ActiveRpcs active_rpcs_; + std::unordered_map active_rpcs_; bool shutting_down_ = false; }; From 5eb57edc4ea494e424a81282c3497179ba1ab5c7 Mon Sep 17 00:00:00 2001 From: Andre Gaschler Date: Tue, 23 Jul 2019 17:28:58 +0200 Subject: [PATCH 2/2] Ran clang-format. --- async_grpc/client.h | 4 +--- async_grpc/completion_queue_pool.cc | 6 +++--- async_grpc/completion_queue_thread.h | 1 + async_grpc/opencensus_span.cc | 3 ++- async_grpc/retry.cc | 5 +++-- async_grpc/rpc.cc | 2 +- async_grpc/server.cc | 17 +++++++++-------- async_grpc/server.h | 3 +-- async_grpc/service.cc | 3 +-- 9 files changed, 22 insertions(+), 22 deletions(-) diff --git a/async_grpc/client.h b/async_grpc/client.h index c4ba45b..7cbca47 100644 --- a/async_grpc/client.h +++ b/async_grpc/client.h @@ -21,14 +21,12 @@ #include "async_grpc/retry.h" #include "async_grpc/rpc_handler_interface.h" #include "async_grpc/rpc_service_method_traits.h" - +#include "glog/logging.h" #include "grpc++/grpc++.h" #include "grpc++/impl/codegen/client_unary_call.h" #include "grpc++/impl/codegen/proto_utils.h" #include "grpc++/impl/codegen/sync_stream.h" -#include "glog/logging.h" - namespace async_grpc { // Wraps a method invocation for all rpc types, unary, client streaming, diff --git a/async_grpc/completion_queue_pool.cc b/async_grpc/completion_queue_pool.cc index a32d0c0..cd5ce04 100644 --- a/async_grpc/completion_queue_pool.cc +++ b/async_grpc/completion_queue_pool.cc @@ -14,10 +14,11 @@ * limitations under the License. */ +#include "async_grpc/completion_queue_pool.h" + #include #include "async_grpc/async_client.h" -#include "async_grpc/completion_queue_pool.h" #include "common/make_unique.h" #include "glog/logging.h" @@ -89,8 +90,7 @@ void CompletionQueuePool::Shutdown() { } CompletionQueuePool::CompletionQueuePool() - : number_completion_queues_(kDefaultNumberCompletionQueues) { -} + : number_completion_queues_(kDefaultNumberCompletionQueues) {} CompletionQueuePool::~CompletionQueuePool() { LOG(INFO) << "~CompletionQueuePool"; diff --git a/async_grpc/completion_queue_thread.h b/async_grpc/completion_queue_thread.h index 9c2c217..b132014 100644 --- a/async_grpc/completion_queue_thread.h +++ b/async_grpc/completion_queue_thread.h @@ -18,6 +18,7 @@ #define CPP_GRPC_COMMON_COMPLETION_QUEUE_THREAD_H_ #include + #include #include diff --git a/async_grpc/opencensus_span.cc b/async_grpc/opencensus_span.cc index 283e062..2cdec0c 100644 --- a/async_grpc/opencensus_span.cc +++ b/async_grpc/opencensus_span.cc @@ -37,7 +37,8 @@ void OpencensusSpan::End() { span_.End(); } OpencensusSpan::OpencensusSpan(const std::string& name, const OpencensusSpan* parent) - : span_(opencensus::trace::Span::StartSpan(name, parent ? &parent->span_: nullptr)) {} + : span_(opencensus::trace::Span::StartSpan( + name, parent ? &parent->span_ : nullptr)) {} } // namespace async_grpc diff --git a/async_grpc/retry.cc b/async_grpc/retry.cc index 6684b65..da0f60c 100644 --- a/async_grpc/retry.cc +++ b/async_grpc/retry.cc @@ -14,11 +14,12 @@ * limitations under the License. */ +#include "async_grpc/retry.h" + #include #include #include -#include "async_grpc/retry.h" #include "glog/logging.h" namespace async_grpc { @@ -26,7 +27,7 @@ namespace async_grpc { RetryStrategy CreateRetryStrategy(RetryIndicator retry_indicator, RetryDelayCalculator retry_delay_calculator) { return [retry_indicator, retry_delay_calculator]( - int failed_attempts, const ::grpc::Status &status) { + int failed_attempts, const ::grpc::Status &status) { if (!retry_indicator(failed_attempts, status)) { return optional(); } diff --git a/async_grpc/rpc.cc b/async_grpc/rpc.cc index eb1ee95..6c26dbf 100644 --- a/async_grpc/rpc.cc +++ b/async_grpc/rpc.cc @@ -15,9 +15,9 @@ */ #include "async_grpc/rpc.h" -#include "async_grpc/service.h" #include "async_grpc/common/make_unique.h" +#include "async_grpc/service.h" #include "glog/logging.h" namespace async_grpc { diff --git a/async_grpc/server.cc b/async_grpc/server.cc index a628116..339cb31 100644 --- a/async_grpc/server.cc +++ b/async_grpc/server.cc @@ -46,12 +46,14 @@ void Server::Builder::SetServerAddress(const std::string& server_address) { } void Server::Builder::SetMaxReceiveMessageSize(int max_receive_message_size) { - CHECK_GT(max_receive_message_size, 0) << "max_receive_message_size must be larger than 0."; + CHECK_GT(max_receive_message_size, 0) + << "max_receive_message_size must be larger than 0."; options_.max_receive_message_size = max_receive_message_size; } void Server::Builder::SetMaxSendMessageSize(int max_send_message_size) { - CHECK_GT(max_send_message_size, 0) << "max_send_message_size must be larger than 0."; + CHECK_GT(max_send_message_size, 0) + << "max_send_message_size must be larger than 0."; options_.max_send_message_size = max_send_message_size; } @@ -63,11 +65,10 @@ void Server::Builder::EnableTracing() { #endif } -void Server::Builder::DisableTracing() { - options_.enable_tracing = false; -} +void Server::Builder::DisableTracing() { options_.enable_tracing = false; } -void Server::Builder::SetTracingSamplerProbability(double tracing_sampler_probability) { +void Server::Builder::SetTracingSamplerProbability( + double tracing_sampler_probability) { options_.tracing_sampler_probability = tracing_sampler_probability; } @@ -75,7 +76,8 @@ void Server::Builder::SetTracingTaskName(const std::string& tracing_task_name) { options_.tracing_task_name = tracing_task_name; } -void Server::Builder::SetTracingGcpProjectId(const std::string& tracing_gcp_project_id) { +void Server::Builder::SetTracingGcpProjectId( + const std::string& tracing_gcp_project_id) { options_.tracing_gcp_project_id = tracing_gcp_project_id; } @@ -176,7 +178,6 @@ void Server::Start() { } #endif - // Start the gRPC server process. server_ = server_builder_.BuildAndStart(); diff --git a/async_grpc/server.h b/async_grpc/server.h index 893b760..e9343a0 100644 --- a/async_grpc/server.h +++ b/async_grpc/server.h @@ -30,13 +30,12 @@ #include "async_grpc/rpc_handler.h" #include "async_grpc/rpc_service_method_traits.h" #include "async_grpc/service.h" - #include "grpc++/grpc++.h" namespace async_grpc { namespace { -constexpr int kDefaultMaxMessageSize = 10 * 1024 * 1024; // 10 MB +constexpr int kDefaultMaxMessageSize = 10 * 1024 * 1024; // 10 MB constexpr double kDefaultTracingSamplerProbability = 0.01; // 1 Percent } // namespace diff --git a/async_grpc/service.cc b/async_grpc/service.cc index 19b96b7..2c0e32a 100644 --- a/async_grpc/service.cc +++ b/async_grpc/service.cc @@ -14,10 +14,9 @@ * limitations under the License. */ -#include "async_grpc/server.h" - #include +#include "async_grpc/server.h" #include "glog/logging.h" #include "grpc++/impl/codegen/proto_utils.h"