@@ -174,6 +174,12 @@ sstring to_string(const event::schema_change::target_type t) {
174
174
SCYLLA_ASSERT (false && " unreachable" );
175
175
}
176
176
177
+ bool is_metadata_id_supported (const service::client_state& client_state) {
178
+ // TODO: metadata_id is mandatory in CQLv5, so extend the check below
179
+ // when CQLv5 support is implemented
180
+ return client_state.is_protocol_extension_set (cql_transport::cql_protocol_extension::USE_METADATA_ID);
181
+ }
182
+
177
183
event::event_type parse_event_type (const sstring& value)
178
184
{
179
185
if (value == " TOPOLOGY_CHANGE" ) {
@@ -982,7 +988,7 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_op
982
988
983
989
std::unique_ptr<cql_server::response>
984
990
make_result (int16_t stream, messages::result_message& msg, const tracing::trace_state_ptr& tr_state,
985
- cql_protocol_version_type version, bool skip_metadata = false );
991
+ cql_protocol_version_type version, cql_metadata_id_wrapper&& metadata_id, bool skip_metadata = false );
986
992
987
993
template <typename Process>
988
994
requires std::is_invocable_r_v<future<cql_server::process_fn_return_type>,
@@ -1078,7 +1084,9 @@ process_query_internal(service::client_state& client_state, distributed<cql3::qu
1078
1084
return cql_server::process_fn_return_type (convert_error_message_to_coordinator_result (msg.get ()));
1079
1085
} else {
1080
1086
tracing::trace (q_state->query_state .get_trace_state (), " Done processing - preparing a result" );
1081
- return cql_server::process_fn_return_type (make_foreign (make_result (stream, *msg, q_state->query_state .get_trace_state (), version, skip_metadata)));
1087
+ cql_metadata_id_wrapper metadata_id{is_metadata_id_supported (q_state->query_state .get_client_state ())};
1088
+
1089
+ return cql_server::process_fn_return_type (make_foreign (make_result (stream, *msg, q_state->query_state .get_trace_state (), version, std::move (metadata_id), skip_metadata)));
1082
1090
}
1083
1091
});
1084
1092
}
@@ -1101,11 +1109,14 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_pr
1101
1109
return qp.prepare (std::move (query), client_state, dialect).discard_result ();
1102
1110
}).then ([this , query, stream, &client_state, trace_state, dialect] () mutable {
1103
1111
tracing::trace (trace_state, " Done preparing on remote shards" );
1104
- return _server._query_processor .local ().prepare (std::move (query), client_state, dialect).then ([this , stream, trace_state] (auto msg) {
1112
+ return _server._query_processor .local ().prepare (std::move (query), client_state, dialect).then ([this , stream, &client_state, trace_state] (auto msg) {
1105
1113
tracing::trace (trace_state, " Done preparing on a local shard - preparing a result. ID is [{}]" , seastar::value_of ([&msg] {
1106
1114
return messages::result_message::prepared::cql::get_id (msg);
1107
1115
}));
1108
- return make_result (stream, *msg, trace_state, _version);
1116
+ cql_metadata_id_wrapper metadata_id = is_metadata_id_supported (client_state)
1117
+ ? cql_metadata_id_wrapper (msg->calculate_or_get_metadata_id ())
1118
+ : cql_metadata_id_wrapper (false );
1119
+ return make_result (stream, *msg, trace_state, _version, std::move (metadata_id));
1109
1120
});
1110
1121
});
1111
1122
}
@@ -1131,6 +1142,10 @@ process_execute_internal(service::client_state& client_state, distributed<cql3::
1131
1142
throw exceptions::prepared_query_not_found_exception (id);
1132
1143
}
1133
1144
1145
+ cql_metadata_id_wrapper metadata_id = is_metadata_id_supported (client_state)
1146
+ ? cql_metadata_id_wrapper (cql3::cql_metadata_id_type (in.read_short_bytes ()), prepared->calculate_or_get_metadata_id ())
1147
+ : cql_metadata_id_wrapper (false );
1148
+
1134
1149
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move (permit));
1135
1150
auto & query_state = q_state->query_state ;
1136
1151
q_state->options = in.read_options (version, qp.local ().get_cql_config ());
@@ -1169,14 +1184,14 @@ process_execute_internal(service::client_state& client_state, distributed<cql3::
1169
1184
1170
1185
tracing::trace (trace_state, " Processing a statement" );
1171
1186
return qp.local ().execute_prepared_without_checking_exception_message (query_state, std::move (stmt), options, std::move (prepared), std::move (cache_key), needs_authorization)
1172
- .then ([trace_state = query_state.get_trace_state (), skip_metadata, q_state = std::move (q_state), stream, version] (auto msg) {
1187
+ .then ([trace_state = query_state.get_trace_state (), skip_metadata, q_state = std::move (q_state), stream, version, metadata_id = std::move (metadata_id) ] (auto msg) mutable {
1173
1188
if (msg->move_to_shard ()) {
1174
1189
return cql_server::process_fn_return_type (make_foreign (dynamic_pointer_cast<messages::result_message::bounce_to_shard>(msg)));
1175
1190
} else if (msg->is_exception ()) {
1176
1191
return cql_server::process_fn_return_type (convert_error_message_to_coordinator_result (msg.get ()));
1177
1192
} else {
1178
1193
tracing::trace (q_state->query_state .get_trace_state (), " Done processing - preparing a result" );
1179
- return cql_server::process_fn_return_type (make_foreign (make_result (stream, *msg, q_state->query_state .get_trace_state (), version, skip_metadata)));
1194
+ return cql_server::process_fn_return_type (make_foreign (make_result (stream, *msg, q_state->query_state .get_trace_state (), version, std::move (metadata_id), skip_metadata)));
1180
1195
}
1181
1196
});
1182
1197
}
@@ -1296,7 +1311,9 @@ process_batch_internal(service::client_state& client_state, distributed<cql3::qu
1296
1311
return cql_server::process_fn_return_type (convert_error_message_to_coordinator_result (msg.get ()));
1297
1312
} else {
1298
1313
tracing::trace (q_state->query_state .get_trace_state (), " Done processing - preparing a result" );
1299
- return cql_server::process_fn_return_type (make_foreign (make_result (stream, *msg, trace_state, version)));
1314
+ cql_metadata_id_wrapper metadata_id{is_metadata_id_supported (q_state->query_state .get_client_state ())};
1315
+
1316
+ return cql_server::process_fn_return_type (make_foreign (make_result (stream, *msg, trace_state, version, std::move (metadata_id))));
1300
1317
}
1301
1318
});
1302
1319
}
@@ -1510,11 +1527,13 @@ class cql_server::fmt_visitor : public messages::result_message::visitor_base {
1510
1527
uint8_t _version;
1511
1528
cql_server::response& _response;
1512
1529
bool _skip_metadata;
1530
+ cql_metadata_id_wrapper _metadata_id;
1513
1531
public:
1514
- fmt_visitor (uint8_t version, cql_server::response& response, bool skip_metadata)
1532
+ fmt_visitor (uint8_t version, cql_server::response& response, bool skip_metadata, cql_metadata_id_wrapper&& metadata_id )
1515
1533
: _version{version}
1516
1534
, _response{response}
1517
1535
, _skip_metadata{skip_metadata}
1536
+ , _metadata_id(std::move(metadata_id))
1518
1537
{ }
1519
1538
1520
1539
virtual void visit (const messages::result_message::void_message&) override {
@@ -1529,8 +1548,11 @@ class cql_server::fmt_visitor : public messages::result_message::visitor_base {
1529
1548
virtual void visit (const messages::result_message::prepared::cql& m) override {
1530
1549
_response.write_int (0x0004 );
1531
1550
_response.write_short_bytes (m.get_id ());
1551
+ if (_metadata_id.is_supported_by_protocol ()) {
1552
+ _response.write_short_bytes (_metadata_id.get_response_metadata_id ()._metadata_id );
1553
+ }
1532
1554
_response.write (m.metadata (), _version);
1533
- _response.write (*m.result_metadata ());
1555
+ _response.write (*m.result_metadata (), _metadata_id );
1534
1556
}
1535
1557
1536
1558
virtual void visit (const messages::result_message::schema_change& m) override {
@@ -1550,7 +1572,7 @@ class cql_server::fmt_visitor : public messages::result_message::visitor_base {
1550
1572
virtual void visit (const messages::result_message::rows& m) override {
1551
1573
_response.write_int (0x0002 );
1552
1574
auto & rs = m.rs ();
1553
- _response.write (rs.get_metadata (), _skip_metadata);
1575
+ _response.write (rs.get_metadata (), _metadata_id, _skip_metadata);
1554
1576
auto row_count_plhldr = _response.write_int_placeholder ();
1555
1577
1556
1578
class visitor {
@@ -1578,7 +1600,7 @@ class cql_server::fmt_visitor : public messages::result_message::visitor_base {
1578
1600
1579
1601
std::unique_ptr<cql_server::response>
1580
1602
make_result (int16_t stream, messages::result_message& msg, const tracing::trace_state_ptr& tr_state,
1581
- cql_protocol_version_type version, bool skip_metadata) {
1603
+ cql_protocol_version_type version, cql_metadata_id_wrapper&& metadata_id, bool skip_metadata) {
1582
1604
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::RESULT, tr_state);
1583
1605
if (__builtin_expect (!msg.warnings ().empty () && version > 3 , false )) {
1584
1606
response->set_frame_flag (cql_frame_flags::warning);
@@ -1588,7 +1610,7 @@ make_result(int16_t stream, messages::result_message& msg, const tracing::trace_
1588
1610
response->set_frame_flag (cql_frame_flags::custom_payload);
1589
1611
response->write_string_bytes_map (msg.custom_payload ().value ());
1590
1612
}
1591
- cql_server::fmt_visitor fmt{version, *response, skip_metadata};
1613
+ cql_server::fmt_visitor fmt{version, *response, skip_metadata, std::move (metadata_id) };
1592
1614
msg.accept (fmt);
1593
1615
return response;
1594
1616
}
@@ -1996,7 +2018,7 @@ thread_local const type_codec::type_id_to_type_type type_codec::type_id_to_type
1996
2018
{ inet_addr_type, type_id::INET },
1997
2019
};
1998
2020
1999
- void cql_server::response::write (const cql3::metadata& m, bool no_metadata) {
2021
+ void cql_server::response::write (const cql3::metadata& m, const cql_metadata_id_wrapper& metadata_id, bool no_metadata) {
2000
2022
auto flags = m.flags ();
2001
2023
bool global_tables_spec = m.flags ().contains <cql3::metadata::flag::GLOBAL_TABLES_SPEC>();
2002
2024
bool has_more_pages = m.flags ().contains <cql3::metadata::flag::HAS_MORE_PAGES>();
@@ -2005,6 +2027,15 @@ void cql_server::response::write(const cql3::metadata& m, bool no_metadata) {
2005
2027
flags.set <cql3::metadata::flag::NO_METADATA>();
2006
2028
}
2007
2029
2030
+ cql3::cql_metadata_id_type calculated_metadata_id{bytes{}};
2031
+ if (metadata_id.is_supported_by_protocol () && metadata_id.has_request_metadata_id () && metadata_id.has_response_metadata_id ()) {
2032
+ if (metadata_id.get_request_metadata_id () != metadata_id.get_response_metadata_id ()) {
2033
+ flags.remove <cql3::metadata::flag::NO_METADATA>();
2034
+ flags.set <cql3::metadata::flag::METADATA_CHANGED>();
2035
+ no_metadata = false ;
2036
+ }
2037
+ }
2038
+
2008
2039
write_int (flags.mask ());
2009
2040
write_int (m.column_count ());
2010
2041
@@ -2016,6 +2047,10 @@ void cql_server::response::write(const cql3::metadata& m, bool no_metadata) {
2016
2047
return ;
2017
2048
}
2018
2049
2050
+ if (flags.contains <cql3::metadata::flag::METADATA_CHANGED>()) {
2051
+ write_short_bytes (metadata_id.get_response_metadata_id ()._metadata_id );
2052
+ }
2053
+
2019
2054
auto names_i = m.get_names ().begin ();
2020
2055
2021
2056
if (global_tables_spec) {
@@ -2068,6 +2103,34 @@ void cql_server::response::write(const cql3::prepared_metadata& m, uint8_t versi
2068
2103
}
2069
2104
}
2070
2105
2106
+ bool cql_metadata_id_wrapper::has_request_metadata_id () const {
2107
+ if (!_supported_by_protocol) {
2108
+ on_internal_error (clogger, " metadata_id is unsupported" );
2109
+ }
2110
+ return _request_metadata_id.has_value ();
2111
+ }
2112
+
2113
+ bool cql_metadata_id_wrapper::has_response_metadata_id () const {
2114
+ if (!_supported_by_protocol) {
2115
+ on_internal_error (clogger, " metadata_id is unsupported" );
2116
+ }
2117
+ return _response_metadata_id.has_value ();
2118
+ }
2119
+
2120
+ const cql3::cql_metadata_id_type& cql_metadata_id_wrapper::get_request_metadata_id () const {
2121
+ if (!has_request_metadata_id ()) {
2122
+ on_internal_error (clogger, " request metadata_id is empty" );
2123
+ }
2124
+ return _request_metadata_id.value ();
2125
+ }
2126
+
2127
+ const cql3::cql_metadata_id_type& cql_metadata_id_wrapper::get_response_metadata_id () const {
2128
+ if (!has_response_metadata_id ()) {
2129
+ on_internal_error (clogger, " response metadata_id is empty" );
2130
+ }
2131
+ return _response_metadata_id.value ();
2132
+ }
2133
+
2071
2134
future<utils::chunked_vector<client_data>> cql_server::get_client_data () {
2072
2135
utils::chunked_vector<client_data> ret;
2073
2136
co_await for_each_gently ([&ret] (const generic_server::connection& c) {
0 commit comments