1+ /* *
2+ * Licensed to the Apache Software Foundation (ASF) under one
3+ * or more contributor license agreements. See the NOTICE file
4+ * distributed with this work for additional information
5+ * regarding copyright ownership. The ASF licenses this file
6+ * to you under the Apache License, Version 2.0 (the
7+ * "License"); you may not use this file except in compliance
8+ * with the License. You may obtain a copy of the License at
9+ *
10+ * http://www.apache.org/licenses/LICENSE-2.0
11+ *
12+ * Unless required by applicable law or agreed to in writing,
13+ * software distributed under the License is distributed on an
14+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+ * KIND, either express or implied. See the License for the
16+ * specific language governing permissions and limitations
17+ * under the License.
18+ */
19+ #include " NodesSupplier.h"
20+ #include " Session.h"
21+ #include < algorithm>
22+ #include < iostream>
23+ #include < utility>
24+
25+ const std::string NodesSupplier::SHOW_DATA_NODES_COMMAND = " SHOW DATANODES" ;
26+ const std::string NodesSupplier::STATUS_COLUMN_NAME = " Status" ;
27+ const std::string NodesSupplier::IP_COLUMN_NAME = " RpcAddress" ;
28+ const std::string NodesSupplier::PORT_COLUMN_NAME = " RpcPort" ;
29+ const std::string NodesSupplier::REMOVING_STATUS = " Removing" ;
30+
31+ const int64_t NodesSupplier::TIMEOUT_IN_MS = 60000 ;
32+ const int NodesSupplier::FETCH_SIZE = 10000 ;
33+ const int NodesSupplier::THRIFT_DEFAULT_BUFFER_SIZE = 4096 ;
34+ const int NodesSupplier::THRIFT_MAX_FRAME_SIZE = 1048576 ;
35+ const int NodesSupplier::CONNECTION_TIMEOUT_IN_MS = 1000 ;
36+
37+ TEndPoint RoundRobinPolicy::select (const std::vector<TEndPoint>& nodes) {
38+ static std::atomic_uint index{0 };
39+
40+ if (nodes.empty ()) {
41+ throw IoTDBException (" No available nodes" );
42+ }
43+
44+ return nodes[index++ % nodes.size ()];
45+ }
46+
47+ StaticNodesSupplier::StaticNodesSupplier (const std::vector<TEndPoint>& nodes,
48+ NodeSelectionPolicy policy)
49+ : availableNodes_(nodes), policy_(std::move(policy)) {}
50+
51+ boost::optional<TEndPoint> StaticNodesSupplier::getQueryEndPoint () {
52+ try {
53+ if (availableNodes_.empty ()) {
54+ return boost::none;
55+ }
56+ return policy_ (availableNodes_);
57+ } catch (const IoTDBException& e) {
58+ return boost::none;
59+ }
60+ }
61+
62+ std::vector<TEndPoint> StaticNodesSupplier::getEndPointList () {
63+ return availableNodes_;
64+ }
65+
66+ StaticNodesSupplier::~StaticNodesSupplier () = default ;
67+
68+ std::shared_ptr<NodesSupplier> NodesSupplier::create (
69+ std::vector<TEndPoint> endpoints,
70+ std::string userName, std::string password, std::string zoneId,
71+ int32_t thriftDefaultBufferSize, int32_t thriftMaxFrameSize,
72+ int32_t connectionTimeoutInMs, bool useSSL, bool enableRPCCompression,
73+ std::string version, std::chrono::milliseconds refreshInterval,
74+ NodeSelectionPolicy policy) {
75+ if (endpoints.empty ()) {
76+ return nullptr ;
77+ }
78+ auto supplier = std::make_shared<NodesSupplier>(
79+ userName, password, zoneId, thriftDefaultBufferSize,
80+ thriftMaxFrameSize, connectionTimeoutInMs, useSSL,
81+ enableRPCCompression, version, std::move (endpoints), std::move (policy)
82+ );
83+ supplier->startBackgroundRefresh (refreshInterval);
84+ return supplier;
85+ }
86+
87+ NodesSupplier::NodesSupplier (
88+ std::string userName, std::string password, const std::string& zoneId,
89+ int32_t thriftDefaultBufferSize, int32_t thriftMaxFrameSize,
90+ int32_t connectionTimeoutInMs, bool useSSL, bool enableRPCCompression,
91+ std::string version, std::vector<TEndPoint> endpoints, NodeSelectionPolicy policy) : userName(std::move(userName)), password(std::move(password)), zoneId(zoneId),
92+ thriftDefaultBufferSize(thriftDefaultBufferSize), thriftMaxFrameSize(thriftMaxFrameSize),
93+ connectionTimeoutInMs(connectionTimeoutInMs), useSSL(useSSL), enableRPCCompression(enableRPCCompression), version(version), endpoints(std::move(endpoints)),
94+ selectionPolicy(std::move(policy)) {
95+ deduplicateEndpoints ();
96+ }
97+
98+ std::vector<TEndPoint> NodesSupplier::getEndPointList () {
99+ std::lock_guard<std::mutex> lock (mutex);
100+ return endpoints;
101+ }
102+
103+ TEndPoint NodesSupplier::selectQueryEndpoint () {
104+ std::lock_guard<std::mutex> lock (mutex);
105+ try {
106+ return selectionPolicy (endpoints);
107+ } catch (const std::exception& e) {
108+ log_error (" NodesSupplier::selectQueryEndpoint exception: %s" , e.what ());
109+ throw IoTDBException (" NodesSupplier::selectQueryEndpoint exception, " + std::string (e.what ()));
110+ }
111+ }
112+
113+ boost::optional<TEndPoint> NodesSupplier::getQueryEndPoint () {
114+ try {
115+ return selectQueryEndpoint ();
116+ } catch (const IoTDBException& e) {
117+ return boost::none;
118+ }
119+ }
120+
121+ NodesSupplier::~NodesSupplier () {
122+ stopBackgroundRefresh ();
123+ client->close ();
124+ }
125+
126+ void NodesSupplier::deduplicateEndpoints () {
127+ std::vector<TEndPoint> uniqueEndpoints;
128+ uniqueEndpoints.reserve (endpoints.size ());
129+ for (const auto & endpoint : endpoints) {
130+ if (std::find (uniqueEndpoints.begin (), uniqueEndpoints.end (), endpoint) == uniqueEndpoints.end ()) {
131+ uniqueEndpoints.push_back (endpoint);
132+ }
133+ }
134+ endpoints = std::move (uniqueEndpoints);
135+ }
136+
137+ void NodesSupplier::startBackgroundRefresh (std::chrono::milliseconds interval) {
138+ isRunning = true ;
139+ refreshThread = std::thread ([this , interval] {
140+ while (isRunning) {
141+ refreshEndpointList ();
142+ std::unique_lock<std::mutex> cvLock (this ->mutex );
143+ refreshCondition.wait_for (cvLock, interval, [this ]() {
144+ return !isRunning.load ();
145+ });
146+ }
147+ });
148+ }
149+
150+ std::vector<TEndPoint> NodesSupplier::fetchLatestEndpoints () {
151+ try {
152+ if (client == nullptr ) {
153+ client = std::make_shared<ThriftConnection>(selectionPolicy (endpoints));
154+ client->init (userName, password, enableRPCCompression, zoneId, version);
155+ }
156+
157+ auto sessionDataSet = client->executeQueryStatement (SHOW_DATA_NODES_COMMAND);
158+
159+ uint32_t columnAddrIdx = -1 , columnPortIdx = -1 , columnStatusIdx = -1 ;
160+ auto columnNames = sessionDataSet->getColumnNames ();
161+ for (uint32_t i = 0 ; i < columnNames.size (); i++) {
162+ if (columnNames[i] == IP_COLUMN_NAME) {
163+ columnAddrIdx = i;
164+ } else if (columnNames[i] == PORT_COLUMN_NAME) {
165+ columnPortIdx = i;
166+ } else if (columnNames[i] == STATUS_COLUMN_NAME) {
167+ columnStatusIdx = i;
168+ }
169+ }
170+
171+ if (columnAddrIdx == -1 || columnPortIdx == -1 || columnStatusIdx == -1 ) {
172+ throw IoTDBException (" Required columns not found in query result." );
173+ }
174+
175+ std::vector<TEndPoint> ret;
176+ while (sessionDataSet->hasNext ()) {
177+ RowRecord* record = sessionDataSet->next ();
178+ std::string ip = record->fields .at (columnAddrIdx).stringV ;
179+ int32_t port = record->fields .at (columnPortIdx).intV ;
180+ std::string status = record->fields .at (columnStatusIdx).stringV ;
181+
182+ if (ip == " 0.0.0.0" || status == REMOVING_STATUS) {
183+ log_warn (" Skipping invalid node: " + ip + " :" + to_string (port));
184+ continue ;
185+ }
186+ TEndPoint endpoint;
187+ endpoint.ip = ip;
188+ endpoint.port = port;
189+ ret.emplace_back (endpoint);
190+ }
191+
192+ return ret;
193+ } catch (const IoTDBException& e) {
194+ client.reset ();
195+ throw IoTDBException (std::string (" NodesSupplier::fetchLatestEndpoints failed: " ) + e.what ());
196+ }
197+ }
198+
199+ void NodesSupplier::refreshEndpointList () {
200+ try {
201+ auto newEndpoints = fetchLatestEndpoints ();
202+ if (newEndpoints.empty ()) {
203+ return ;
204+ }
205+
206+ std::lock_guard<std::mutex> lock (mutex);
207+ endpoints.swap (newEndpoints);
208+ deduplicateEndpoints ();
209+ } catch (const IoTDBException& e) {
210+ log_error (std::string (" NodesSupplier::refreshEndpointList failed: " ) + e.what ());
211+ }
212+ }
213+
214+ void NodesSupplier::stopBackgroundRefresh () noexcept {
215+ if (isRunning.exchange (false )) {
216+ refreshCondition.notify_all ();
217+ if (refreshThread.joinable ()) {
218+ refreshThread.join ();
219+ }
220+ }
221+ }
0 commit comments