Skip to content

Commit c246dff

Browse files
authored
[ISSUE #1001] C++ client should block on build when sync settings failed (#1002)
1 parent 4afe70e commit c246dff

File tree

9 files changed

+69
-39
lines changed

9 files changed

+69
-39
lines changed

.github/workflows/cpp_build.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ jobs:
1111
# Disable VS 2022 before https://github.com/bazelbuild/bazel/issues/18592 issue is solved
1212
# Remove macos-11 since there is no such runner available
1313
# os: [ubuntu-20.04, ubuntu-22.04, macos-11, macos-12, windows-2019, windows-2022]
14-
os: [ubuntu-20.04, ubuntu-22.04, windows-2019]
14+
os: [ubuntu-22.04, windows-2019]
1515
steps:
1616
- uses: actions/checkout@v2
1717
- name: Compile On Linux

cpp/source/client/ClientManagerImpl.cpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,22 +94,21 @@ void ClientManagerImpl::start() {
9494
SPDLOG_WARN("Unexpected client instance state: {}", state_.load(std::memory_order_relaxed));
9595
return;
9696
}
97+
9798
state_.store(State::STARTING, std::memory_order_relaxed);
9899

99100
callback_thread_pool_->start();
100-
101101
scheduler_->start();
102102

103103
std::weak_ptr<ClientManagerImpl> client_instance_weak_ptr = shared_from_this();
104-
105104
auto heartbeat_functor = [client_instance_weak_ptr]() {
106105
auto client_instance = client_instance_weak_ptr.lock();
107106
if (client_instance) {
108107
client_instance->doHeartbeat();
109108
}
110109
};
111-
heartbeat_task_id_ =
112-
scheduler_->schedule(heartbeat_functor, HEARTBEAT_TASK_NAME, std::chrono::seconds(1), std::chrono::seconds(10));
110+
heartbeat_task_id_ = scheduler_->schedule(
111+
heartbeat_functor, HEARTBEAT_TASK_NAME, std::chrono::seconds(1), std::chrono::seconds(10));
113112
SPDLOG_DEBUG("Heartbeat task-id={}", heartbeat_task_id_);
114113

115114
state_.store(State::STARTED, std::memory_order_relaxed);

cpp/source/client/TelemetryBidiReactor.cpp

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ TelemetryBidiReactor::TelemetryBidiReactor(std::weak_ptr<Client> client,
3838
auto ptr = client_.lock();
3939
auto deadline = std::chrono::system_clock::now() + std::chrono::hours(1);
4040
context_.set_deadline(deadline);
41-
sync_settings_future_ = sync_settings_promise_.get_future();
4241
Metadata metadata;
4342
Signature::sign(ptr->config(), metadata);
4443
for (const auto& entry : metadata) {
@@ -56,8 +55,19 @@ TelemetryBidiReactor::~TelemetryBidiReactor() {
5655
}
5756

5857
bool TelemetryBidiReactor::awaitApplyingSettings() {
59-
sync_settings_future_.get();
60-
return true;
58+
auto settings_future = sync_settings_promise_.get_future();
59+
std::future_status status = settings_future.wait_for(std::chrono::seconds(3));
60+
if (status == std::future_status::ready) {
61+
if (settings_future.get()) {
62+
return true;
63+
}
64+
}
65+
{
66+
absl::MutexLock lk(&state_mtx_);
67+
state_ = StreamState::Closed;
68+
state_cv_.SignalAll();
69+
}
70+
return false;
6171
}
6272

6373
void TelemetryBidiReactor::OnWriteDone(bool ok) {
@@ -283,21 +293,29 @@ void TelemetryBidiReactor::tryWriteNext() {
283293
static_cast<std::uint8_t>(state_));
284294
return;
285295
}
296+
286297
if (writes_.empty()) {
287-
SPDLOG_DEBUG("No TelemetryCommand to write. Peer={}", peer_address_);
298+
SPDLOG_DEBUG("No pending TelemetryCommand to write. Peer={}", peer_address_);
288299
return;
289300
}
290301

291302
if (!writes_.empty()) {
292-
SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_, writes_.front().ShortDebugString());
293-
AddHold();
294-
StartWrite(&(writes_.front()));
303+
SPDLOG_DEBUG("Writing TelemetryCommand to {}: {}", peer_address_, writes_.front().ShortDebugString());
304+
if (StreamState::Ready == state_) {
305+
AddHold();
306+
StartWrite(&(writes_.front()));
307+
} else {
308+
SPDLOG_WARN("Writing TelemetryCommand error due to unexpected state. State={}, Peer={}",
309+
static_cast<uint8_t>(state_), peer_address_);
310+
}
295311
}
296312
}
297313

298314
void TelemetryBidiReactor::signalClose() {
299315
absl::MutexLock lk(&state_mtx_);
300-
state_ = StreamState::Closing;
316+
if (state_ == StreamState::Ready) {
317+
state_ = StreamState::Closing;
318+
}
301319
}
302320

303321
void TelemetryBidiReactor::close() {
@@ -361,7 +379,7 @@ void TelemetryBidiReactor::OnReadInitialMetadataDone(bool ok) {
361379
if (!ok) {
362380
// for read stream
363381
// Remove the hold corresponding to AddHold in TelemetryBidiReactor::TelemetryBidiReactor.
364-
RemoveHold();
382+
// RemoveHold();
365383

366384
SPDLOG_DEBUG("Change state {} --> {}", static_cast<std::uint8_t>(state_),
367385
static_cast<std::uint8_t>(StreamState::Closing));

cpp/source/client/include/TelemetryBidiReactor.h

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,19 +40,25 @@ enum class StreamState : std::uint8_t
4040
Closed = 2,
4141
};
4242

43-
/// stream-state: ready --> closing --> closed
43+
/// TelemetryBidiReactor: Manages a bidirectional gRPC stream for telemetry data
4444
///
45-
/// requirement:
46-
/// 1, close --> blocking wait till bidireactor is closed;
47-
/// 2, when session is closed and client is still active, recreate a new session to accept incoming commands from
48-
/// server
45+
/// Stream State Transitions:
46+
/// Ready --> Closing --> Closed
4947
///
48+
/// Key Features:
49+
/// 1. Close Operation: Performs a blocking wait until the bidirectional reactor is fully closed.
50+
/// 2. Session Management: If the session closes while the client is still active,
51+
/// it automatically initiates the creation of a new session to maintain
52+
/// communication with the server.
53+
///
54+
/// The reactor handles reading from and writing to the stream, manages stream state,
55+
/// and applies settings received from the server.
5056
class TelemetryBidiReactor : public grpc::ClientBidiReactor<TelemetryCommand, TelemetryCommand>,
5157
public std::enable_shared_from_this<TelemetryBidiReactor> {
5258
public:
5359
TelemetryBidiReactor(std::weak_ptr<Client> client, rmq::MessagingService::Stub* stub, std::string peer_address);
5460

55-
~TelemetryBidiReactor();
61+
~TelemetryBidiReactor() override;
5662

5763
/// Notifies the application that all operations associated with this RPC
5864
/// have completed and all Holds have been removed. OnDone provides the RPC
@@ -125,7 +131,6 @@ class TelemetryBidiReactor : public grpc::ClientBidiReactor<TelemetryCommand, Te
125131
absl::CondVar state_cv_;
126132

127133
std::promise<bool> sync_settings_promise_;
128-
std::future<bool> sync_settings_future_;
129134

130135
void applySettings(const rmq::Settings& settings);
131136

@@ -137,6 +142,7 @@ class TelemetryBidiReactor : public grpc::ClientBidiReactor<TelemetryCommand, Te
137142

138143
/// Attempt to write pending telemetry command to server.
139144
void tryWriteNext() LOCKS_EXCLUDED(state_mtx_, writes_mtx_);
145+
140146
void signalClose();
141147
};
142148

cpp/source/rocketmq/ClientImpl.cpp

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -106,26 +106,30 @@ void ClientImpl::start() {
106106
name_server_resolver_->start();
107107

108108
client_config_.client_id = clientId();
109-
110109
if (!client_manager_) {
111-
client_manager_ = std::make_shared<ClientManagerImpl>(client_config_.resource_namespace, client_config_.withSsl);
110+
client_manager_ = std::make_shared<ClientManagerImpl>(
111+
client_config_.resource_namespace, client_config_.withSsl);
112+
client_manager_->start();
112113
}
113-
client_manager_->start();
114114

115115
const auto& endpoint = name_server_resolver_->resolve();
116116
if (endpoint.empty()) {
117117
SPDLOG_ERROR("Failed to resolve name server address");
118-
abort();
118+
return;
119119
}
120120

121-
createSession(endpoint, false);
122-
{
123-
absl::MutexLock lk(&session_map_mtx_);
124-
session_map_[endpoint]->await();
121+
while (true) {
122+
createSession(endpoint, false);
123+
{
124+
absl::MutexLock lk(&session_map_mtx_);
125+
if (session_map_.contains(endpoint) && session_map_[endpoint]->await()) {
126+
break;
127+
}
128+
session_map_.erase(endpoint);
129+
}
125130
}
126131

127132
std::weak_ptr<ClientImpl> ptr(self());
128-
129133
{
130134
// Query routes for topics of interest in synchronous
131135
std::vector<std::string> topics;
@@ -164,8 +168,9 @@ void ClientImpl::start() {
164168
}
165169
};
166170

167-
route_update_handle_ = client_manager_->getScheduler()->schedule(route_update_functor, UPDATE_ROUTE_TASK_NAME,
168-
std::chrono::seconds(10), std::chrono::seconds(30));
171+
route_update_handle_ = client_manager_->getScheduler()->schedule(
172+
route_update_functor, UPDATE_ROUTE_TASK_NAME,
173+
std::chrono::seconds(10), std::chrono::seconds(30));
169174

170175
auto telemetry_functor = [ptr]() {
171176
std::shared_ptr<ClientImpl> base = ptr.lock();
@@ -597,8 +602,11 @@ void ClientImpl::notifyClientTermination(const NotifyClientTerminationRequest& r
597602
Signature::sign(client_config_, metadata);
598603

599604
for (const auto& endpoint : endpoints) {
600-
client_manager_->notifyClientTermination(endpoint, metadata, request,
601-
absl::ToChronoMilliseconds(client_config_.request_timeout));
605+
std::error_code ec = client_manager_->notifyClientTermination(
606+
endpoint, metadata, request,absl::ToChronoMilliseconds(client_config_.request_timeout));
607+
if (ec) {
608+
SPDLOG_WARN("Notify client termination error, ErrorCode={}, Endpoint={}", ec.message(), endpoint);
609+
}
602610
}
603611
}
604612

cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ void ConsumeMessageServiceImpl::dispatch(std::shared_ptr<ProcessQueue> process_q
7272
return;
7373
}
7474

75-
for (auto message : messages) {
75+
for (const auto& message : messages) {
7676
auto consume_task = std::make_shared<ConsumeTask>(shared_from_this(), process_queue, message);
7777
pool_->submit([consume_task]() { consume_task->process(); });
7878
}

cpp/source/rocketmq/ProducerImpl.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ void ProducerImpl::start() {
5656

5757
State expecting = State::STARTING;
5858
if (!state_.compare_exchange_strong(expecting, State::STARTED)) {
59-
SPDLOG_ERROR("Start with unexpected state. Expecting: {}, Actual: {}", State::STARTING,
59+
SPDLOG_ERROR("Producer started with an unexpected state. Expecting: {}, Actual: {}", State::STARTING,
6060
state_.load(std::memory_order_relaxed));
6161
return;
6262
}

cpp/source/rocketmq/PushConsumerImpl.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ void PushConsumerImpl::start() {
6565
if (!message_listener_) {
6666
SPDLOG_ERROR("Required message listener is missing");
6767
abort();
68-
return;
6968
}
7069

7170
client_config_.subscriber.group.set_resource_namespace(resourceNamespace());

cpp/source/rocketmq/include/ConsumeMessageServiceImpl.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ class ConsumeMessageServiceImpl
4343
* Make it noncopyable.
4444
*/
4545
ConsumeMessageServiceImpl(const ConsumeMessageServiceImpl &other) = delete;
46-
ConsumeMessageServiceImpl &
47-
operator=(const ConsumeMessageServiceImpl &other) = delete;
46+
47+
ConsumeMessageServiceImpl &operator=(const ConsumeMessageServiceImpl &other) = delete;
4848

4949
void start() override;
5050

0 commit comments

Comments
 (0)