8
8
9
9
#include " server.hh"
10
10
11
+ #include " cql3/prepared_statements_cache.hh"
11
12
#include " cql3/statements/batch_statement.hh"
12
13
#include " cql3/statements/modification_statement.hh"
13
14
#include < seastar/core/scheduling.hh>
27
28
#include < seastar/core/coroutine.hh>
28
29
#include < seastar/core/future-util.hh>
29
30
#include < seastar/core/seastar.hh>
31
+ #include < seastar/core/shared_ptr.hh>
30
32
#include < seastar/net/byteorder.hh>
31
33
#include < seastar/core/metrics.hh>
32
34
#include < seastar/net/byteorder.hh>
@@ -174,6 +176,12 @@ sstring to_string(const event::schema_change::target_type t) {
174
176
SCYLLA_ASSERT (false && " unreachable" );
175
177
}
176
178
179
+ bool is_metadata_id_supported (const service::client_state& client_state) {
180
+ // TODO: metadata_id is mandatory in CQLv5, so extend the check below
181
+ // when CQLv5 support is implemented
182
+ return client_state.is_protocol_extension_set (cql_transport::cql_protocol_extension::USE_METADATA_ID);
183
+ }
184
+
177
185
event::event_type parse_event_type (const sstring& value)
178
186
{
179
187
if (value == " TOPOLOGY_CHANGE" ) {
@@ -982,7 +990,7 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_op
982
990
983
991
std::unique_ptr<cql_server::response>
984
992
make_result (int16_t stream, messages::result_message& msg, const tracing::trace_state_ptr& tr_state,
985
- cql_protocol_version_type version, bool skip_metadata = false );
993
+ cql_protocol_version_type version, cql_metadata_id_wrapper&& metadata_id, bool skip_metadata = false );
986
994
987
995
template <typename Process>
988
996
requires std::is_invocable_r_v<future<cql_server::process_fn_return_type>,
@@ -1078,7 +1086,9 @@ process_query_internal(service::client_state& client_state, distributed<cql3::qu
1078
1086
return cql_server::process_fn_return_type (convert_error_message_to_coordinator_result (msg.get ()));
1079
1087
} else {
1080
1088
tracing::trace (q_state->query_state .get_trace_state (), " Done processing - preparing a result" );
1081
- return cql_server::process_fn_return_type (make_foreign (make_result (stream, *msg, q_state->query_state .get_trace_state (), version, skip_metadata)));
1089
+ cql_metadata_id_wrapper metadata_id{is_metadata_id_supported (q_state->query_state .get_client_state ())};
1090
+
1091
+ return cql_server::process_fn_return_type (make_foreign (make_result (stream, *msg, q_state->query_state .get_trace_state (), version, std::move (metadata_id), skip_metadata)));
1082
1092
}
1083
1093
});
1084
1094
}
@@ -1101,11 +1111,13 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_pr
1101
1111
return qp.prepare (std::move (query), client_state, dialect).discard_result ();
1102
1112
}).then ([this , query, stream, &client_state, trace_state, dialect] () mutable {
1103
1113
tracing::trace (trace_state, " Done preparing on remote shards" );
1104
- return _server._query_processor .local ().prepare (std::move (query), client_state, dialect).then ([this , stream, trace_state] (auto msg) {
1114
+ cql_metadata_id_wrapper metadata_id{is_metadata_id_supported (client_state)};
1115
+
1116
+ return _server._query_processor .local ().prepare (std::move (query), client_state, dialect).then ([this , stream, trace_state, metadata_id = std::move (metadata_id)] (auto msg) mutable {
1105
1117
tracing::trace (trace_state, " Done preparing on a local shard - preparing a result. ID is [{}]" , seastar::value_of ([&msg] {
1106
1118
return messages::result_message::prepared::cql::get_id (msg);
1107
1119
}));
1108
- return make_result (stream, *msg, trace_state, _version);
1120
+ return make_result (stream, *msg, trace_state, _version, std::move (metadata_id) );
1109
1121
});
1110
1122
});
1111
1123
}
@@ -1116,6 +1128,11 @@ process_execute_internal(service::client_state& client_state, distributed<cql3::
1116
1128
service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace, cql3::computed_function_values cached_pk_fn_calls,
1117
1129
cql3::dialect dialect) {
1118
1130
cql3::prepared_cache_key_type cache_key (in.read_short_bytes (), dialect);
1131
+
1132
+ cql_metadata_id_wrapper metadata_id = is_metadata_id_supported (client_state)
1133
+ ? cql_metadata_id_wrapper (cql3::cql_metadata_id_type (in.read_short_bytes ()))
1134
+ : cql_metadata_id_wrapper (false );
1135
+
1119
1136
auto & id = cql3::prepared_cache_key_type::cql_id (cache_key);
1120
1137
bool needs_authorization = false ;
1121
1138
@@ -1169,14 +1186,14 @@ process_execute_internal(service::client_state& client_state, distributed<cql3::
1169
1186
1170
1187
tracing::trace (trace_state, " Processing a statement" );
1171
1188
return qp.local ().execute_prepared_without_checking_exception_message (query_state, std::move (stmt), options, std::move (prepared), std::move (cache_key), needs_authorization)
1172
- .then ([trace_state = query_state.get_trace_state (), skip_metadata, q_state = std::move (q_state), stream, version] (auto msg) {
1189
+ .then ([trace_state = query_state.get_trace_state (), skip_metadata, q_state = std::move (q_state), stream, version, metadata_id = std::move (metadata_id) ] (auto msg) mutable {
1173
1190
if (msg->move_to_shard ()) {
1174
1191
return cql_server::process_fn_return_type (make_foreign (dynamic_pointer_cast<messages::result_message::bounce_to_shard>(msg)));
1175
1192
} else if (msg->is_exception ()) {
1176
1193
return cql_server::process_fn_return_type (convert_error_message_to_coordinator_result (msg.get ()));
1177
1194
} else {
1178
1195
tracing::trace (q_state->query_state .get_trace_state (), " Done processing - preparing a result" );
1179
- return cql_server::process_fn_return_type (make_foreign (make_result (stream, *msg, q_state->query_state .get_trace_state (), version, skip_metadata)));
1196
+ return cql_server::process_fn_return_type (make_foreign (make_result (stream, *msg, q_state->query_state .get_trace_state (), version, std::move (metadata_id), skip_metadata)));
1180
1197
}
1181
1198
});
1182
1199
}
@@ -1296,7 +1313,9 @@ process_batch_internal(service::client_state& client_state, distributed<cql3::qu
1296
1313
return cql_server::process_fn_return_type (convert_error_message_to_coordinator_result (msg.get ()));
1297
1314
} else {
1298
1315
tracing::trace (q_state->query_state .get_trace_state (), " Done processing - preparing a result" );
1299
- return cql_server::process_fn_return_type (make_foreign (make_result (stream, *msg, trace_state, version)));
1316
+ cql_metadata_id_wrapper metadata_id{is_metadata_id_supported (q_state->query_state .get_client_state ())};
1317
+
1318
+ return cql_server::process_fn_return_type (make_foreign (make_result (stream, *msg, trace_state, version, std::move (metadata_id))));
1300
1319
}
1301
1320
});
1302
1321
}
@@ -1510,11 +1529,13 @@ class cql_server::fmt_visitor : public messages::result_message::visitor_base {
1510
1529
uint8_t _version;
1511
1530
cql_server::response& _response;
1512
1531
bool _skip_metadata;
1532
+ cql_metadata_id_wrapper _metadata_id;
1513
1533
public:
1514
- fmt_visitor (uint8_t version, cql_server::response& response, bool skip_metadata)
1534
+ fmt_visitor (uint8_t version, cql_server::response& response, bool skip_metadata, cql_metadata_id_wrapper&& metadata_id )
1515
1535
: _version{version}
1516
1536
, _response{response}
1517
1537
, _skip_metadata{skip_metadata}
1538
+ , _metadata_id(std::move(metadata_id))
1518
1539
{ }
1519
1540
1520
1541
virtual void visit (const messages::result_message::void_message&) override {
@@ -1529,8 +1550,11 @@ class cql_server::fmt_visitor : public messages::result_message::visitor_base {
1529
1550
virtual void visit (const messages::result_message::prepared::cql& m) override {
1530
1551
_response.write_int (0x0004 );
1531
1552
_response.write_short_bytes (m.get_id ());
1553
+ if (_metadata_id.is_supported_by_protocol ()) {
1554
+ _response.write_short_bytes (m.result_metadata ()->calculate_metadata_id ()._metadata_id );
1555
+ }
1532
1556
_response.write (m.metadata (), _version);
1533
- _response.write (*m.result_metadata ());
1557
+ _response.write (*m.result_metadata (), _metadata_id );
1534
1558
}
1535
1559
1536
1560
virtual void visit (const messages::result_message::schema_change& m) override {
@@ -1550,7 +1574,7 @@ class cql_server::fmt_visitor : public messages::result_message::visitor_base {
1550
1574
virtual void visit (const messages::result_message::rows& m) override {
1551
1575
_response.write_int (0x0002 );
1552
1576
auto & rs = m.rs ();
1553
- _response.write (rs.get_metadata (), _skip_metadata);
1577
+ _response.write (rs.get_metadata (), _metadata_id, _skip_metadata);
1554
1578
auto row_count_plhldr = _response.write_int_placeholder ();
1555
1579
1556
1580
class visitor {
@@ -1578,7 +1602,7 @@ class cql_server::fmt_visitor : public messages::result_message::visitor_base {
1578
1602
1579
1603
std::unique_ptr<cql_server::response>
1580
1604
make_result (int16_t stream, messages::result_message& msg, const tracing::trace_state_ptr& tr_state,
1581
- cql_protocol_version_type version, bool skip_metadata) {
1605
+ cql_protocol_version_type version, cql_metadata_id_wrapper&& metadata_id, bool skip_metadata) {
1582
1606
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::RESULT, tr_state);
1583
1607
if (__builtin_expect (!msg.warnings ().empty () && version > 3 , false )) {
1584
1608
response->set_frame_flag (cql_frame_flags::warning);
@@ -1588,7 +1612,7 @@ make_result(int16_t stream, messages::result_message& msg, const tracing::trace_
1588
1612
response->set_frame_flag (cql_frame_flags::custom_payload);
1589
1613
response->write_string_bytes_map (msg.custom_payload ().value ());
1590
1614
}
1591
- cql_server::fmt_visitor fmt{version, *response, skip_metadata};
1615
+ cql_server::fmt_visitor fmt{version, *response, skip_metadata, std::move (metadata_id) };
1592
1616
msg.accept (fmt);
1593
1617
return response;
1594
1618
}
@@ -1996,7 +2020,7 @@ thread_local const type_codec::type_id_to_type_type type_codec::type_id_to_type
1996
2020
{ inet_addr_type, type_id::INET },
1997
2021
};
1998
2022
1999
- void cql_server::response::write (const cql3::metadata& m, bool no_metadata) {
2023
+ void cql_server::response::write (const cql3::metadata& m, const cql_metadata_id_wrapper& metadata_id, bool no_metadata) {
2000
2024
auto flags = m.flags ();
2001
2025
bool global_tables_spec = m.flags ().contains <cql3::metadata::flag::GLOBAL_TABLES_SPEC>();
2002
2026
bool has_more_pages = m.flags ().contains <cql3::metadata::flag::HAS_MORE_PAGES>();
@@ -2005,6 +2029,18 @@ void cql_server::response::write(const cql3::metadata& m, bool no_metadata) {
2005
2029
flags.set <cql3::metadata::flag::NO_METADATA>();
2006
2030
}
2007
2031
2032
+ cql3::cql_metadata_id_type calculated_metadata_id{bytes{}};
2033
+ if (metadata_id.is_non_empty ())
2034
+ {
2035
+ calculated_metadata_id = m.calculate_metadata_id ();
2036
+ if (calculated_metadata_id != metadata_id.get_metadata_id ())
2037
+ {
2038
+ flags.remove <cql3::metadata::flag::NO_METADATA>();
2039
+ flags.set <cql3::metadata::flag::METADATA_CHANGED>();
2040
+ no_metadata = false ;
2041
+ }
2042
+ }
2043
+
2008
2044
write_int (flags.mask ());
2009
2045
write_int (m.column_count ());
2010
2046
@@ -2016,6 +2052,10 @@ void cql_server::response::write(const cql3::metadata& m, bool no_metadata) {
2016
2052
return ;
2017
2053
}
2018
2054
2055
+ if (flags.contains <cql3::metadata::flag::METADATA_CHANGED>()) {
2056
+ write_bytes_as_string (calculated_metadata_id._metadata_id );
2057
+ }
2058
+
2019
2059
auto names_i = m.get_names ().begin ();
2020
2060
2021
2061
if (global_tables_spec) {
@@ -2068,6 +2108,13 @@ void cql_server::response::write(const cql3::prepared_metadata& m, uint8_t versi
2068
2108
}
2069
2109
}
2070
2110
2111
+ const cql3::cql_metadata_id_type& cql_metadata_id_wrapper::get_metadata_id () const {
2112
+ if (_state != state::SUPPORTED_BY_PROTOCOL_NON_EMPTY) {
2113
+ on_internal_error (clogger, " Metadata id is unsupported or empty" );
2114
+ }
2115
+ return _metadata_id;
2116
+ }
2117
+
2071
2118
future<utils::chunked_vector<client_data>> cql_server::get_client_data () {
2072
2119
utils::chunked_vector<client_data> ret;
2073
2120
co_await for_each_gently ([&ret] (const generic_server::connection& c) {
0 commit comments