Skip to content

Commit 6a08517

Browse files
authored
Schema distribution on cluster-restart is implemented [API-1328] (#1067)
* Schema distribution on cluster-restart is implemented.
1 parent 4c56ae3 commit 6a08517

File tree

17 files changed

+531
-36
lines changed

17 files changed

+531
-36
lines changed

hazelcast/generated-sources/src/hazelcast/client/protocol/codec/codecs.cpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5804,6 +5804,23 @@ client_sendschema_encode(const serialization::pimpl::schema& schema)
58045804
return msg;
58055805
}
58065806

5807+
ClientMessage
5808+
client_sendallschemas_encode(
5809+
const std::vector<serialization::pimpl::schema>& schemas)
5810+
{
5811+
size_t initial_frame_size = ClientMessage::REQUEST_HEADER_LEN;
5812+
ClientMessage msg(initial_frame_size);
5813+
msg.set_retryable(true);
5814+
msg.set_operation_name("client.sendallschemas");
5815+
5816+
msg.set_message_type(static_cast<int32_t>(5376));
5817+
msg.set_partition_id(-1);
5818+
5819+
msg.set(schemas, true);
5820+
5821+
return msg;
5822+
}
5823+
58075824
} // namespace codec
58085825
} // namespace protocol
58095826
} // namespace client

hazelcast/generated-sources/src/hazelcast/client/protocol/codec/codecs.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3085,6 +3085,13 @@ sql_fetch_encode(const sql::impl::query_id& query_id,
30853085
ClientMessage HAZELCAST_API
30863086
client_sendschema_encode(const serialization::pimpl::schema& schema);
30873087

3088+
/**
3089+
* Sends all the schemas to the cluster
3090+
*/
3091+
ClientMessage HAZELCAST_API
3092+
client_sendallschemas_encode(
3093+
const std::vector<serialization::pimpl::schema>& schemas);
3094+
30883095
} // namespace codec
30893096
} // namespace protocol
30903097
} // namespace client

hazelcast/include/hazelcast/client/connection/ClientConnectionManagerImpl.h

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <thread>
2222
#include <future>
2323
#include <vector>
24+
#include <mutex>
2425
#include <boost/asio.hpp>
2526
#include <boost/smart_ptr/atomic_shared_ptr.hpp>
2627

@@ -137,8 +138,24 @@ class HAZELCAST_API ClientConnectionManagerImpl
137138

138139
boost::uuids::uuid get_client_uuid() const;
139140

141+
/**
142+
* Check the connected state and user connection strategy configuration to
143+
* see if an invocation is allowed at the moment returns without throwing
144+
* exception only when is the client is Connected to cluster
145+
*
146+
* @throws io_exception if client is disconnected and
147+
* ReconnectMode is ON or if client is starting and async start is false
148+
* @throws hazelcast_client_offline if client is disconnected and
149+
* ReconnectMode is ASYNC or if client is starting and async start is true
150+
*/
140151
void check_invocation_allowed();
141152

153+
/**
154+
* Returns {@code true} if the client is initialized on the cluster, by
155+
* sending its local state, if necessary.
156+
*/
157+
bool client_initialized_on_cluster() const;
158+
142159
void connect_to_all_cluster_members();
143160

144161
void notify_backup(int64_t call_id);
@@ -160,6 +177,42 @@ class HAZELCAST_API ClientConnectionManagerImpl
160177
std::string server_version;
161178
};
162179

180+
enum class client_state
181+
{
182+
/**
183+
* Clients start with this state. Once a client connects to a cluster,
184+
* it directly switches to {@link #INITIALIZED_ON_CLUSTER} instead of
185+
* {@link #CONNECTED_TO_CLUSTER} because on startup a client has no
186+
* local state to send to the cluster.
187+
*/
188+
INITIAL,
189+
190+
/**
191+
* When a client switches to a new cluster, it moves to this state.
192+
* It means that the client has connected to a new cluster but not sent
193+
* its local state to the new cluster yet.
194+
*/
195+
CONNECTED_TO_CLUSTER,
196+
197+
/**
198+
* When a client sends its local state to the cluster it has connected,
199+
* it switches to this state.
200+
* <p>
201+
* Invocations are allowed in this state.
202+
*/
203+
INITIALIZED_ON_CLUSTER,
204+
205+
/**
206+
* When the client closes the last connection to the cluster it
207+
* currently connected to, it switches to this state.
208+
* <p>
209+
*/
210+
DISCONNECTED_FROM_CLUSTER,
211+
};
212+
213+
friend std::ostream HAZELCAST_API& operator<<(std::ostream& os,
214+
client_state);
215+
163216
auth_response authenticate_on_cluster(
164217
std::shared_ptr<Connection>& connection);
165218

@@ -178,7 +231,7 @@ class HAZELCAST_API ClientConnectionManagerImpl
178231
static void shutdown_with_external_thread(
179232
std::weak_ptr<client::impl::hazelcast_client_instance_impl> client_impl);
180233

181-
bool do_connect_to_cluster();
234+
void do_connect_to_cluster();
182235

183236
std::vector<address> get_possible_member_addresses();
184237

@@ -197,6 +250,8 @@ class HAZELCAST_API ClientConnectionManagerImpl
197250

198251
void check_client_active();
199252

253+
void initialize_client_on_cluster(boost::uuids::uuid);
254+
200255
template<typename T>
201256
std::shared_ptr<Connection> try_connect(const T& target)
202257
{
@@ -255,7 +310,7 @@ class HAZELCAST_API ClientConnectionManagerImpl
255310
wait_strategy wait_strategy_;
256311

257312
// following fields are updated inside synchronized(clientStateMutex)
258-
std::recursive_mutex client_state_mutex_;
313+
mutable std::recursive_mutex client_state_mutex_;
259314
util::SynchronizedMap<boost::uuids::uuid,
260315
Connection,
261316
boost::hash<boost::uuids::uuid>>
@@ -267,7 +322,9 @@ class HAZELCAST_API ClientConnectionManagerImpl
267322
#else
268323
std::atomic<boost::uuids::uuid> cluster_id_;
269324
#endif
325+
client_state client_state_;
270326
std::atomic_bool connect_to_cluster_task_submitted_;
327+
bool established_initial_cluster_connection;
271328

272329
bool use_public_address_{ false };
273330

@@ -283,6 +340,7 @@ class HAZELCAST_API ClientConnectionManagerImpl
283340

284341
address translate(const member& m);
285342
};
343+
286344
} // namespace connection
287345
} // namespace client
288346
} // namespace hazelcast

hazelcast/include/hazelcast/client/exception/protocol_exceptions.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@
2828

2929
namespace hazelcast {
3030
namespace client {
31+
namespace spi {
32+
namespace impl {
33+
class ClientInvocation;
34+
}
35+
} // namespace spi
3136
namespace exception {
3237
#define DEFINE_EXCEPTION_CLASS(ClassName, errorNo, isRuntime) \
3338
class HAZELCAST_API ClassName : public iexception \
@@ -351,6 +356,14 @@ class HAZELCAST_API query : public hazelcast_
351356
boost::uuids::uuid originating_member_uuid_;
352357
};
353358

359+
class HAZELCAST_API invocation_might_contain_compact_data : public hazelcast_
360+
{
361+
public:
362+
explicit invocation_might_contain_compact_data(
363+
std::string source,
364+
const spi::impl::ClientInvocation& invocation);
365+
};
366+
354367
class HAZELCAST_API member_left : public execution
355368
{
356369
public:

hazelcast/include/hazelcast/client/impl/hazelcast_client_instance_impl.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,14 @@ class HAZELCAST_API hazelcast_client_instance_impl
222222

223223
sql::sql_service& get_sql();
224224

225+
void send_state_to_cluster();
226+
227+
/**
228+
* Returns {@code true} if we need to check the urgent invocations, by
229+
* examining the local registry of the schema service.
230+
*/
231+
bool should_check_urgent_invocations() const;
232+
225233
private:
226234
client_config client_config_;
227235
client_properties client_properties_;

hazelcast/include/hazelcast/client/protocol/ClientMessage.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1122,6 +1122,10 @@ class HAZELCAST_API ClientMessage
11221122
if (is_final) {
11231123
h->flags |= IS_FINAL_FLAG;
11241124
}
1125+
1126+
if (std::is_same<T, serialization::pimpl::data>::value) {
1127+
contains_serialized_data_in_request_ = true;
1128+
}
11251129
} else {
11261130
set(*value, is_final);
11271131
}
@@ -1241,6 +1245,8 @@ class HAZELCAST_API ClientMessage
12411245
copy(begin(replicated_schemas),
12421246
end(replicated_schemas),
12431247
back_inserter(schemas_will_be_replicated_));
1248+
1249+
contains_serialized_data_in_request_ = true;
12441250
}
12451251

12461252
inline void set(const serialization::pimpl::data* value,
@@ -1423,6 +1429,8 @@ class HAZELCAST_API ClientMessage
14231429

14241430
void drop_fragmentation_frame();
14251431

1432+
bool contains_serialized_data_in_request() const;
1433+
14261434
friend std::ostream HAZELCAST_API& operator<<(std::ostream& os,
14271435
const ClientMessage& message);
14281436

@@ -1520,6 +1528,7 @@ class HAZELCAST_API ClientMessage
15201528
std::vector<std::vector<byte>> data_buffer_;
15211529
size_t buffer_index_{ 0 };
15221530
size_t offset_{ 0 };
1531+
bool contains_serialized_data_in_request_;
15231532
std::vector<serialization::pimpl::schema> schemas_will_be_replicated_;
15241533
};
15251534

hazelcast/include/hazelcast/client/serialization/pimpl/compact/default_schema_service.h

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include "hazelcast/util/export.h"
2222
#include "hazelcast/util/SynchronizedMap.h"
2323
#include "hazelcast/client/serialization/pimpl/compact/schema.h"
24+
#include "hazelcast/logger.h"
2425

2526
namespace hazelcast {
2627
namespace client {
@@ -32,7 +33,6 @@ class ClientMessage;
3233
namespace spi {
3334
class ClientContext;
3435
}
35-
3636
namespace serialization {
3737
namespace pimpl {
3838

@@ -51,6 +51,8 @@ class HAZELCAST_API default_schema_service
5151
static constexpr const char* MAX_PUT_RETRY_COUNT_DEFAULT = "100";
5252

5353
default_schema_service(spi::ClientContext&);
54+
default_schema_service(const default_schema_service&) = delete;
55+
default_schema_service& operator=(const default_schema_service&) = delete;
5456

5557
/**
5658
* Gets the schema with the given id either by
@@ -68,6 +70,16 @@ class HAZELCAST_API default_schema_service
6870

6971
bool is_schema_replicated(const schema&);
7072

73+
/**
74+
* Replicates all schemas on cluster
75+
*/
76+
void replicate_all_schemas();
77+
78+
/**
79+
* Check whether any schemas exist in cache
80+
*/
81+
bool has_any_schemas() const;
82+
7183
private:
7284
void put_if_absent(schema);
7385

hazelcast/include/hazelcast/client/spi/impl/ClientInvocationServiceImpl.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ class HAZELCAST_API ClientInvocationServiceImpl
5959

6060
void check_invocation_allowed();
6161

62+
void check_urgent_invocation_allowed(const ClientInvocation&);
63+
6264
bool is_smart_routing() const;
6365

6466
std::chrono::milliseconds get_invocation_timeout() const;

hazelcast/src/hazelcast/client/client_impl.cpp

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,18 @@ hazelcast_client_instance_impl::get_sql()
477477
return sql_service_;
478478
}
479479

480+
void
481+
hazelcast_client_instance_impl::send_state_to_cluster()
482+
{
483+
schema_service_.replicate_all_schemas();
484+
}
485+
486+
bool
487+
hazelcast_client_instance_impl::should_check_urgent_invocations() const
488+
{
489+
return schema_service_.has_any_schemas();
490+
}
491+
480492
void
481493
hazelcast_client_instance_impl::check_discovery_configuration_consistency(
482494
bool address_list_provided,
@@ -1363,6 +1375,21 @@ query::originating_member_uuid() const
13631375
return originating_member_uuid_;
13641376
}
13651377

1378+
invocation_might_contain_compact_data::invocation_might_contain_compact_data(
1379+
std::string source,
1380+
const spi::impl::ClientInvocation& invocation)
1381+
: hazelcast_{
1382+
move(source),
1383+
boost::str(
1384+
boost::format(
1385+
"The invocation %1% might contain Compact serialized "
1386+
"data and it is not safe to invoke it when the client is not "
1387+
"yet initialized on the cluster") %
1388+
invocation)
1389+
}
1390+
{
1391+
}
1392+
13661393
} // namespace exception
13671394
} // namespace client
13681395

hazelcast/src/hazelcast/client/compact.cpp

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1995,6 +1995,66 @@ default_schema_service::send_schema_response_decode(
19951995
boost::hash<boost::uuids::uuid>>>();
19961996
}
19971997

1998+
bool
1999+
default_schema_service::has_any_schemas() const
2000+
{
2001+
return replicateds_.size();
2002+
}
2003+
2004+
std::ostream&
2005+
operator<<(std::ostream& os, const std::vector<schema>& schemas)
2006+
{
2007+
os << "Schemas {";
2008+
2009+
for (const auto& s : schemas)
2010+
os << s << " , ";
2011+
2012+
os << "}";
2013+
2014+
return os;
2015+
}
2016+
2017+
void
2018+
default_schema_service::replicate_all_schemas()
2019+
{
2020+
using level = hazelcast::logger::level;
2021+
using namespace protocol::codec;
2022+
2023+
auto logger = context_.get_logger();
2024+
if (replicateds_.empty()) {
2025+
if (logger.enabled(level::finest)) {
2026+
logger.log(level::finest,
2027+
"There is no schema to send to the cluster.");
2028+
}
2029+
2030+
return;
2031+
}
2032+
2033+
std::vector<std::shared_ptr<schema>> schemas_sptr = replicateds_.values();
2034+
std::vector<schema> all_schemas;
2035+
2036+
all_schemas.reserve(schemas_sptr.size());
2037+
2038+
transform(begin(schemas_sptr),
2039+
end(schemas_sptr),
2040+
back_inserter(all_schemas),
2041+
[](const std::shared_ptr<schema>& s) { return *s; });
2042+
2043+
if (logger.enabled(level::finest)) {
2044+
logger.log(
2045+
level::finest,
2046+
(boost::format("Sending schemas to the cluster %1%") % all_schemas)
2047+
.str());
2048+
}
2049+
2050+
auto message = client_sendallschemas_encode(all_schemas);
2051+
2052+
auto invocation =
2053+
spi::impl::ClientInvocation::create(context_, message, SERVICE_NAME);
2054+
2055+
invocation->invoke_urgent().get();
2056+
}
2057+
19982058
compact_stream_serializer::compact_stream_serializer(
19992059
default_schema_service& service)
20002060
: schema_service{ service }

0 commit comments

Comments
 (0)