Skip to content

Commit 86e9103

Browse files
Compact fetch schema on read is implemented. [Api 1327] (#1082)
* Schema fetch on read is implemented. * a_type -> sample_compact_type --------- Co-authored-by: ihsan demir <[email protected]>
1 parent 5267a74 commit 86e9103

36 files changed

+613
-83
lines changed

hazelcast/generated-sources/src/hazelcast/client/protocol/codec/codecs.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5821,6 +5821,22 @@ client_sendallschemas_encode(
58215821
return msg;
58225822
}
58235823

5824+
ClientMessage
5825+
client_fetchschema_encode(int64_t schema_id)
5826+
{
5827+
size_t initial_frame_size =
5828+
ClientMessage::REQUEST_HEADER_LEN + ClientMessage::INT64_SIZE;
5829+
ClientMessage msg(initial_frame_size, true);
5830+
msg.set_retryable(true);
5831+
msg.set_operation_name("client.fetchschema");
5832+
5833+
msg.set_message_type(static_cast<int32_t>(5120));
5834+
msg.set_partition_id(-1);
5835+
5836+
msg.set(schema_id);
5837+
return msg;
5838+
}
5839+
58245840
} // namespace codec
58255841
} // namespace protocol
58265842
} // namespace client

hazelcast/generated-sources/src/hazelcast/client/protocol/codec/codecs.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3092,6 +3092,12 @@ ClientMessage HAZELCAST_API
30923092
client_sendallschemas_encode(
30933093
const std::vector<serialization::pimpl::schema>& schemas);
30943094

3095+
/**
3096+
* Fetches a schema from the cluster with the given schemaId
3097+
*/
3098+
ClientMessage HAZELCAST_API
3099+
client_fetchschema_encode(int64_t schema_id);
3100+
30953101
} // namespace codec
30963102
} // namespace protocol
30973103
} // namespace client

hazelcast/include/hazelcast/client/protocol/ClientMessage.h

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -917,6 +917,44 @@ class HAZELCAST_API ClientMessage
917917
return h;
918918
}
919919

920+
template<typename T>
921+
typename std::
922+
enable_if<std::is_same<T, serialization::pimpl::schema>::value, T>::type
923+
get()
924+
{
925+
using namespace serialization;
926+
using namespace serialization::pimpl;
927+
928+
// skip begin frame
929+
skip_frame();
930+
931+
auto type_name = get<std::string>();
932+
933+
std::unordered_map<std::string, field_descriptor> fields;
934+
{
935+
skip_frame();
936+
937+
while (!next_frame_is_data_structure_end_frame()) {
938+
skip_frame();
939+
940+
// skip bytes in initial frame
941+
(void)rd_ptr(SIZE_OF_FRAME_LENGTH_AND_FLAGS);
942+
943+
auto key = get<int>();
944+
auto field_name = get<std::string>();
945+
946+
fast_forward_to_end_frame();
947+
948+
fields.insert(std::make_pair(
949+
field_name, field_descriptor{ field_kind(key) }));
950+
}
951+
}
952+
953+
fast_forward_to_end_frame();
954+
955+
return schema{ type_name, move(fields) };
956+
}
957+
920958
/**
921959
* Reads the header of the current frame.
922960
* The cursor must be at a frame's beginning.

hazelcast/include/hazelcast/client/serialization/field_kind.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,4 @@ operator<<(std::ostream&, field_kind);
7373

7474
} // namespace serialization
7575
} // namespace client
76-
} // namespace hazelcast
76+
} // namespace hazelcast

hazelcast/include/hazelcast/client/serialization/pimpl/compact/compact.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
2+
* Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

hazelcast/include/hazelcast/client/serialization/pimpl/compact/compact_impl.h

Lines changed: 28 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
2+
* Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -557,30 +557,18 @@ default_compact_writer::write(const T& value)
557557
pimpl::serialization_util::write(object_data_output_, value);
558558
}
559559

560-
template<typename T>
561-
struct schema_of
562-
{
563-
static schema build_schema()
564-
{
565-
T t;
566-
schema_writer schema_writer(hz_serializer<T>::type_name());
567-
serialization::compact::compact_writer writer =
568-
create_compact_writer(&schema_writer);
569-
serialization::hz_serializer<T>::write(t, writer);
570-
return std::move(schema_writer).build();
571-
}
572-
573-
const static schema schema_v;
574-
};
575-
576-
template<typename T>
577-
const schema schema_of<T>::schema_v = schema_of<T>::build_schema();
578-
579560
template<typename T>
580561
class class_to_schema
581562
{
582563
public:
583-
static const schema& get() { return value_; }
564+
static const schema* get()
565+
{
566+
if (is_initialized_) {
567+
return &value_;
568+
} else {
569+
return nullptr;
570+
}
571+
}
584572

585573
static void set(const T& object)
586574
{
@@ -613,26 +601,37 @@ template<typename T>
613601
T inline compact_stream_serializer::read(object_data_input& in)
614602
{
615603
int64_t schema_id = in.read<int64_t>();
616-
const auto& local_schema = schema_of<T>::schema_v;
604+
const schema* local_schema = class_to_schema<T>::get();
617605
// optimization to avoid hitting shared map in the schema_service,
618606
// in the case incoming data's schema is same as the local schema
619-
if (schema_id == local_schema.schema_id()) {
607+
if (local_schema && schema_id == local_schema->schema_id()) {
620608
compact::compact_reader reader =
621-
create_compact_reader(*this, in, local_schema);
609+
create_compact_reader(*this, in, *local_schema);
622610
return hz_serializer<T>::read(reader);
623611
}
624-
// This path will run only in schema evolution case
612+
625613
auto schema = schema_service.get(schema_id);
626-
if (schema.type_name() != hz_serializer<T>::type_name()) {
614+
615+
if (!schema) {
616+
throw exception::hazelcast_serialization{
617+
"compact_stream_serializer::read",
618+
boost::str(
619+
boost::format(
620+
"The schema can not be found with id %1% for '%2%' type") %
621+
schema_id % hz_serializer<T>::type_name())
622+
};
623+
}
624+
625+
if (schema->type_name() != hz_serializer<T>::type_name()) {
627626
auto exception = exception::hazelcast_serialization{
628627
"compact_stream_serializer",
629628
(boost::format("Unexpected typename. expected %1%, received %2%") %
630-
hz_serializer<T>::type_name() % schema.type_name())
629+
hz_serializer<T>::type_name() % schema->type_name())
631630
.str()
632631
};
633632
BOOST_THROW_EXCEPTION(exception);
634633
}
635-
compact::compact_reader reader = create_compact_reader(*this, in, schema);
634+
compact::compact_reader reader = create_compact_reader(*this, in, *schema);
636635
return hz_serializer<T>::read(reader);
637636
}
638637

@@ -642,7 +641,7 @@ void inline compact_stream_serializer::write(const T& object,
642641
{
643642
class_to_schema<T>::set(object);
644643

645-
const schema& schema_v = class_to_schema<T>::get();
644+
const schema& schema_v = *class_to_schema<T>::get();
646645

647646
if (!schema_service.is_schema_replicated(schema_v)) {
648647
out.schemas_will_be_replicated_.push_back(schema_v);

hazelcast/include/hazelcast/client/serialization/pimpl/compact/default_schema_service.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
2+
* Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -61,7 +61,7 @@ class HAZELCAST_API default_schema_service
6161
* <li>searching the cluster.</li>
6262
* </ul>
6363
*/
64-
schema get(int64_t schemaId);
64+
std::shared_ptr<schema> get(int64_t schemaId);
6565

6666
/**
6767
* Replicates schema on the cluster

hazelcast/include/hazelcast/client/serialization/pimpl/compact/schema.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,4 @@ operator<<(std::ostream& os, const schema& schema);
6262
} // namespace pimpl
6363
} // namespace serialization
6464
} // namespace client
65-
} // namespace hazelcast
65+
} // namespace hazelcast

hazelcast/src/hazelcast/client/compact.cpp

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
2+
* Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -99,7 +99,7 @@ operator<<(std::ostream& os, field_kind kind)
9999
os << "INT64";
100100
break;
101101
case field_kind::ARRAY_OF_INT64:
102-
os << "ARRAY_OF_INT16";
102+
os << "ARRAY_OF_INT64";
103103
break;
104104
case field_kind::FLOAT32:
105105
os << "FLOAT32";
@@ -1882,17 +1882,43 @@ default_schema_service::default_schema_service(spi::ClientContext& context)
18821882
{
18831883
}
18841884

1885-
schema
1885+
std::shared_ptr<schema>
18861886
default_schema_service::get(int64_t schemaId)
18871887
{
18881888
auto ptr = replicateds_.get(schemaId);
18891889

1890-
if (!ptr) {
1891-
throw exception::illegal_state{ "default_schema_service::get",
1892-
"Schema doesn't exist for this type" };
1890+
if (ptr) {
1891+
return ptr;
1892+
}
1893+
1894+
auto logger = context_.get_logger();
1895+
if (logger.enabled(logger::level::finest)) {
1896+
logger.log(
1897+
logger::level::finest,
1898+
boost::str(boost::format("Could not find schema id %1% locally, will "
1899+
"search on the cluster %1%") %
1900+
schemaId));
1901+
}
1902+
1903+
using namespace protocol::codec;
1904+
1905+
auto request_message = client_fetchschema_encode(schemaId);
1906+
1907+
auto invocation = spi::impl::ClientInvocation::create(
1908+
context_, request_message, SERVICE_NAME);
1909+
auto message = invocation->invoke().get();
1910+
1911+
message.skip_frame();
1912+
auto sch = message.get_nullable<schema>();
1913+
1914+
std::shared_ptr<schema> schema_ptr;
1915+
1916+
if (sch) {
1917+
schema_ptr = std::make_shared<schema>(std::move(*sch));
1918+
replicateds_.put_if_absent(schemaId, schema_ptr);
18931919
}
18941920

1895-
return *ptr;
1921+
return schema_ptr;
18961922
}
18971923

18981924
void

hazelcast/src/hazelcast/client/network.cpp

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -714,13 +714,11 @@ ClientConnectionManagerImpl::initialize_client_on_cluster(
714714
std::lock_guard<std::recursive_mutex> guard(client_state_mutex_);
715715

716716
if (target_cluster_id == cluster_id_) {
717-
if (logger_.enabled(hazelcast::logger::level::fine)) {
718-
logger_.log(
719-
hazelcast::logger::level::fine,
720-
(boost::format("Client state is sent to cluster: %1%") %
721-
target_cluster_id)
722-
.str());
723-
}
717+
HZ_LOG(logger_,
718+
fine,
719+
(boost::format("Client state is sent to cluster: %1%") %
720+
target_cluster_id)
721+
.str());
724722

725723
client_state_ = client_state::INITIALIZED_ON_CLUSTER;
726724
fire_life_cycle_event(lifecycle_event::CLIENT_CONNECTED);

0 commit comments

Comments
 (0)