Skip to content

Commit 7de6a86

Browse files
authored
Sql partition aware routing[API-1862] (#1167)
* sql aware draft * sql_result update * LRU Cache Interface * std::priority_queue used * sql service implementation * revert unused file * clang format * LRU Test added * LRU Test added * basic type tests * unit tests for complex types * test case update * test added for routing * clang format * 3 members for sql test * cluster version check added * test case failure fix * member3_ reset * additional test cases * statement partition arg index set * tdd review fix * clang format * comments added * clang format * jenkins fix * compiler error fix for jenkins * jenkins linux compiler error * read_optimized_lru_cache removed from public API * code review fixes * initial value of lock * rename variable * clang format * compiler error fixed * windows bat file is updated
1 parent 12092b2 commit 7de6a86

File tree

10 files changed

+1076
-35
lines changed

10 files changed

+1076
-35
lines changed

hazelcast/include/hazelcast/client/client_properties.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@ class HAZELCAST_API client_properties
9999

100100
const client_property& cloud_base_url() const;
101101

102+
const client_property& partition_arg_cache_size() const;
103+
102104
/**
103105
* Client will be sending heartbeat messages to members and this is the
104106
* timeout. If there is no any message passing between client and member
@@ -264,6 +266,18 @@ class HAZELCAST_API client_properties
264266
static constexpr const char* CLOUD_URL_BASE_DEFAULT =
265267
"api.viridian.hazelcast.com";
266268

269+
/**
270+
* Parametrized SQL queries touching only a single partition benefit from
271+
* using the partition owner as the query coordinator, if the partition
272+
* owner can be determined from one of the query parameters. When such a
273+
* query is executed, the cluster sends the index of such argument to the
274+
* client. This parameter configures the size of the cache the client uses
275+
* for storing this information.
276+
*/
277+
static constexpr const char* PARTITION_ARGUMENT_CACHE_SIZE =
278+
"hazelcast.client.sql.partition.argument.cache.size";
279+
static constexpr const char* PARTITION_ARGUMENT_CACHE_SIZE_DEFAULT = "1024";
280+
267281
/**
268282
* Returns the configured boolean value of a {@link ClientProperty}.
269283
*
@@ -314,6 +328,7 @@ class HAZELCAST_API client_properties
314328
client_property backup_timeout_millis_;
315329
client_property fail_on_indeterminate_state_;
316330
client_property cloud_base_url_;
331+
client_property partition_arg_cache_size_;
317332

318333
std::unordered_map<std::string, std::string> properties_map_;
319334
};
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
/*
2+
* Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#pragma once
17+
18+
#include <queue>
19+
#include "hazelcast/util/SynchronizedMap.h"
20+
#include "hazelcast/util/export.h"
21+
22+
namespace hazelcast {
23+
namespace client {
24+
namespace sql {
25+
namespace impl {
26+
27+
/**
28+
* Implementation of an LRU cache optimized for read-heavy use cases.
29+
* <p>
30+
* It stores the entries in a {SynchronizedMap}, along with the last
31+
* access time. It allows the size to grow beyond the capacity, up to
32+
* `cleanup_threshold_`, at which point the inserting thread will remove a batch
33+
* of the eldest items in two passes.
34+
* <p>
35+
* The cleanup process isn't synchronized to guarantee that the capacity is not
36+
* exceeded. The cache is available during the cleanup for reads and writes. If
37+
* there's a large number of writes by many threads, the one thread doing the
38+
* cleanup might not be quick enough and there's no upper bound on the actual
39+
* size of the cache. This is done to optimize the happy path when the keys fit
40+
* into the cache.
41+
*/
42+
template<typename K, typename V>
43+
class read_optimized_lru_cache
44+
{
45+
public:
46+
/**
47+
* @param capacity Capacity of the cache
48+
* @param cleanup_threshold The size at which the cache will clean up oldest
49+
* entries in batch. `cleanup_threshold - capacity` entries will be
50+
* removed
51+
* @throws exception::illegal_argument if capacity is smaller or equal to 0,
52+
* or if the cleanup_threshold is smaller than capacity
53+
*/
54+
explicit read_optimized_lru_cache(const uint32_t capacity,
55+
const uint32_t cleanup_threshold)
56+
{
57+
if (capacity == 0) {
58+
BOOST_THROW_EXCEPTION(
59+
client::exception::illegal_argument("capacity == 0"));
60+
}
61+
if (cleanup_threshold <= capacity) {
62+
BOOST_THROW_EXCEPTION(client::exception::illegal_argument(
63+
"cleanupThreshold <= capacity"));
64+
}
65+
66+
capacity_ = capacity;
67+
cleanup_threshold_ = cleanup_threshold;
68+
}
69+
70+
/**
71+
* @param key the key of the cache entry
72+
* @param default_value the default value if the key is not cached.
73+
* @returns Returns the value to which the specified key is cached,
74+
* or default value if this cache contains no mapping for the key.
75+
*/
76+
std::shared_ptr<V> get_or_default(const K& key,
77+
const std::shared_ptr<V>& default_value)
78+
{
79+
const auto existing_value = get(key);
80+
return (existing_value != nullptr) ? existing_value : default_value;
81+
}
82+
83+
/**
84+
* @param key the key of the cache entry
85+
* Returns the value to which the specified key is cached,
86+
* or {@code null} if this cache contains no mapping for the key.
87+
* @returns Returns the value to which the specified key is cached
88+
*/
89+
std::shared_ptr<V> get(const K& key)
90+
{
91+
auto value_from_cache = cache_.get(key);
92+
if (value_from_cache == nullptr) {
93+
return nullptr;
94+
}
95+
value_from_cache->touch();
96+
return std::make_shared<int32_t>(value_from_cache->value_);
97+
}
98+
99+
/**
100+
* @param key the key of the cache entry
101+
* @param value the value of the cache entry
102+
* @throws exception::illegal_argument if the value equals to nullptr
103+
*/
104+
void put(const K& key, const std::shared_ptr<V>& value)
105+
{
106+
if (value == nullptr) {
107+
BOOST_THROW_EXCEPTION(client::exception::illegal_argument(
108+
"Null values are disallowed"));
109+
}
110+
111+
auto old_value =
112+
cache_.put(key, std::make_shared<value_and_timestamp<V>>(*value));
113+
if (old_value == nullptr && cache_.size() > cleanup_threshold_) {
114+
do_cleanup();
115+
}
116+
}
117+
118+
/**
119+
* @param key the key of the cache entry
120+
* Removes the cached value for the given key
121+
*/
122+
void remove(const K& key) { cache_.remove(key); }
123+
124+
protected:
125+
/**
126+
* Helper class to hold the value with timestamp.
127+
*/
128+
template<typename T>
129+
class value_and_timestamp
130+
{
131+
public:
132+
const T value_;
133+
int64_t timestamp_;
134+
135+
value_and_timestamp(T value)
136+
: value_(value)
137+
{
138+
touch();
139+
}
140+
141+
void touch() { timestamp_ = util::current_time_nanos(); }
142+
};
143+
144+
util::SynchronizedMap<K, value_and_timestamp<V>> cache_;
145+
146+
private:
147+
/**
148+
* Cleans the cache
149+
*/
150+
void do_cleanup()
151+
{
152+
bool expected = false;
153+
// if no thread is cleaning up, we'll do it
154+
if (!cleanup_lock_.compare_exchange_strong(expected, true)) {
155+
return;
156+
}
157+
158+
util::finally release_lock(
159+
[this]() { this->cleanup_lock_.store(false); });
160+
161+
if (capacity_ >= cache_.size()) {
162+
// this can happen if the cache is concurrently modified
163+
return;
164+
}
165+
auto entries_to_remove = cache_.size() - capacity_;
166+
167+
/*max heap*/
168+
std::priority_queue<int64_t> oldest_timestamps;
169+
170+
// 1st pass
171+
const auto values = cache_.values();
172+
for (const auto& value_and_timestamp : values) {
173+
oldest_timestamps.push(value_and_timestamp->timestamp_);
174+
if (oldest_timestamps.size() > entries_to_remove) {
175+
oldest_timestamps.pop();
176+
}
177+
}
178+
179+
// find out the highest value in the queue - the value, below which
180+
// entries will be removed
181+
if (oldest_timestamps.empty()) {
182+
// this can happen if the cache is concurrently modified
183+
return;
184+
}
185+
int64_t remove_threshold = oldest_timestamps.top();
186+
oldest_timestamps.pop();
187+
188+
// 2nd pass
189+
cache_.remove_values_if(
190+
[remove_threshold](const value_and_timestamp<V>& v) -> bool {
191+
return (v.timestamp_ <= remove_threshold);
192+
});
193+
}
194+
195+
std::atomic<bool> cleanup_lock_{ false };
196+
uint32_t capacity_;
197+
uint32_t cleanup_threshold_;
198+
};
199+
200+
} // namespace impl
201+
} // namespace sql
202+
} // namespace client
203+
} // namespace hazelcast

hazelcast/include/hazelcast/client/sql/sql_service.h

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "hazelcast/client/sql/sql_result.h"
2020
#include "hazelcast/client/sql/sql_statement.h"
2121
#include "hazelcast/client/sql/hazelcast_sql_exception.h"
22+
#include "hazelcast/client/sql/impl/read_optimized_lru_cache.h"
2223

2324
#if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
2425
#pragma warning(push)
@@ -127,12 +128,17 @@ class HAZELCAST_API sql_service
127128
boost::future<std::shared_ptr<sql_result>> execute(
128129
const sql_statement& statement);
129130

131+
std::shared_ptr<impl::read_optimized_lru_cache<std::string, int32_t>>
132+
partition_argument_index_cache_;
133+
130134
private:
131135
friend client::impl::hazelcast_client_instance_impl;
132136
friend sql_result;
133137

134138
client::spi::ClientContext& client_context_;
135139

140+
bool is_smart_routing_;
141+
136142
struct sql_execute_response_parameters
137143
{
138144
int64_t update_count;
@@ -141,6 +147,8 @@ class HAZELCAST_API sql_service
141147
boost::optional<impl::sql_error> error;
142148
bool is_infinite_rows = false;
143149
bool is_infinite_rows_exist = false;
150+
int32_t partition_argument_index = -1;
151+
bool is_partition_argument_index_exists = false;
144152
};
145153

146154
struct sql_fetch_response_parameters
@@ -152,6 +160,12 @@ class HAZELCAST_API sql_service
152160
explicit sql_service(client::spi::ClientContext& context);
153161

154162
std::shared_ptr<connection::Connection> query_connection();
163+
std::shared_ptr<connection::Connection> query_connection(
164+
int32_t partition_id);
165+
166+
boost::optional<int32_t> extract_partition_id(
167+
const sql_statement& statement,
168+
const int32_t arg_index) const;
155169

156170
void rethrow(const std::exception& exc_ptr);
157171
void rethrow(const std::exception& cause_ptr,
@@ -160,10 +174,13 @@ class HAZELCAST_API sql_service
160174
boost::uuids::uuid client_id();
161175

162176
std::shared_ptr<sql_result> handle_execute_response(
177+
const std::string& sql_query,
178+
const int32_t original_partition_argument_index,
163179
protocol::ClientMessage& msg,
164180
std::shared_ptr<connection::Connection> connection,
165181
impl::query_id id,
166-
int32_t cursor_buffer_size);
182+
int32_t cursor_buffer_size,
183+
std::weak_ptr<std::atomic<int32_t>> statement_par_arg_index_ptr);
167184

168185
static sql_execute_response_parameters decode_execute_response(
169186
protocol::ClientMessage& msg);

hazelcast/include/hazelcast/client/sql/sql_statement.h

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,13 @@ class HAZELCAST_API sql_statement
194194
*/
195195
sql_statement& expected_result_type(sql_expected_result_type type);
196196

197+
/**
198+
* Get the partition argument index value
199+
*
200+
* @return partition argument index, -1 if not set.
201+
*/
202+
std::shared_ptr<std::atomic<int32_t>> partition_argument_index() const;
203+
197204
/**
198205
* Gets the schema name.
199206
*
@@ -224,12 +231,26 @@ class HAZELCAST_API sql_statement
224231

225232
sql_statement(spi::ClientContext& client_context, std::string query);
226233

234+
/**
235+
* Set the partition argument index. If there's no such argument, use -1.
236+
* <p>
237+
* Setting a wrong argument index will not cause incorrect query results,
238+
* but might cause performance degradation due to more network
239+
* communication. Setting a value higher than the actual number of arguments
240+
* will have no effect.
241+
*
242+
* @param partition_argument_index index of the partition-determining
243+
* argument of the statement
244+
*/
245+
sql_statement& partition_argument_index(int32_t partition_argument_index);
246+
227247
std::string sql_;
228248
std::vector<data> serialized_parameters_;
229249
int32_t cursor_buffer_size_;
230250
std::chrono::milliseconds timeout_;
231251
sql::sql_expected_result_type expected_result_type_;
232252
boost::optional<std::string> schema_;
253+
std::shared_ptr<std::atomic<int32_t>> partition_argument_index_;
233254

234255
serialization_service& serialization_service_;
235256

hazelcast/include/hazelcast/util/SynchronizedMap.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,24 @@ class SynchronizedMap
242242
return internal_map_.empty();
243243
}
244244

245+
/**
246+
* @param comp Map is iterated from beginning to the end and removed if the
247+
* lambda comp return true.
248+
*/
249+
template<typename Comparator>
250+
void remove_values_if(Comparator comp)
251+
{
252+
std::lock_guard<std::mutex> lg(map_lock_);
253+
254+
for (auto iter = internal_map_.begin(); iter != internal_map_.end();) {
255+
if (iter->second != nullptr && comp(*(iter->second))) {
256+
iter = internal_map_.erase(iter);
257+
} else {
258+
++iter;
259+
}
260+
}
261+
}
262+
245263
private:
246264
std::unordered_map<K, std::shared_ptr<V>, Hash> internal_map_;
247265
mutable std::mutex map_lock_;

hazelcast/src/hazelcast/client/client_impl.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1074,6 +1074,8 @@ client_properties::client_properties(
10741074
, fail_on_indeterminate_state_(FAIL_ON_INDETERMINATE_OPERATION_STATE,
10751075
FAIL_ON_INDETERMINATE_OPERATION_STATE_DEFAULT)
10761076
, cloud_base_url_(CLOUD_URL_BASE, CLOUD_URL_BASE_DEFAULT)
1077+
, partition_arg_cache_size_(PARTITION_ARGUMENT_CACHE_SIZE,
1078+
PARTITION_ARGUMENT_CACHE_SIZE_DEFAULT)
10771079
, properties_map_(properties)
10781080
{}
10791081

@@ -1202,6 +1204,12 @@ client_properties::cloud_base_url() const
12021204
return cloud_base_url_;
12031205
}
12041206

1207+
const client_property&
1208+
client_properties::partition_arg_cache_size() const
1209+
{
1210+
return partition_arg_cache_size_;
1211+
}
1212+
12051213
namespace exception {
12061214
iexception::iexception(std::string exception_name,
12071215
std::string source,

0 commit comments

Comments
 (0)