@@ -175,6 +175,12 @@ sstring to_string(const event::schema_change::target_type t) {
175
175
SCYLLA_ASSERT (false && " unreachable" );
176
176
}
177
177
178
+ bool is_metadata_id_supported (const service::client_state& client_state) {
179
+ // TODO: metadata_id is mandatory in CQLv5, so extend the check below
180
+ // when CQLv5 support is implemented
181
+ return client_state.is_protocol_extension_set (cql_transport::cql_protocol_extension::USE_METADATA_ID);
182
+ }
183
+
178
184
event::event_type parse_event_type (const sstring& value)
179
185
{
180
186
if (value == " TOPOLOGY_CHANGE" ) {
@@ -1009,7 +1015,7 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_op
1009
1015
1010
1016
std::unique_ptr<cql_server::response>
1011
1017
make_result (int16_t stream, messages::result_message& msg, const tracing::trace_state_ptr& tr_state,
1012
- cql_protocol_version_type version, bool skip_metadata = false );
1018
+ cql_protocol_version_type version, cql_metadata_id_wrapper&& metadata_id, bool skip_metadata = false );
1013
1019
1014
1020
template <typename Process>
1015
1021
requires std::is_invocable_r_v<future<cql_server::process_fn_return_type>,
@@ -1104,7 +1110,8 @@ process_query_internal(service::client_state& client_state, distributed<cql3::qu
1104
1110
return cql_server::process_fn_return_type (convert_error_message_to_coordinator_result (msg.get ()));
1105
1111
} else {
1106
1112
tracing::trace (q_state->query_state .get_trace_state (), " Done processing - preparing a result" );
1107
- return cql_server::process_fn_return_type (make_foreign (make_result (stream, *msg, q_state->query_state .get_trace_state (), version, skip_metadata)));
1113
+
1114
+ return cql_server::process_fn_return_type (make_foreign (make_result (stream, *msg, q_state->query_state .get_trace_state (), version, cql_metadata_id_wrapper{}, skip_metadata)));
1108
1115
}
1109
1116
});
1110
1117
}
@@ -1127,11 +1134,14 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_pr
1127
1134
return qp.prepare (std::move (query), client_state, dialect).discard_result ();
1128
1135
}).then ([this , query, stream, &client_state, trace_state, dialect] () mutable {
1129
1136
tracing::trace (trace_state, " Done preparing on remote shards" );
1130
- return _server._query_processor .local ().prepare (std::move (query), client_state, dialect).then ([this , stream, trace_state] (auto msg) {
1137
+ return _server._query_processor .local ().prepare (std::move (query), client_state, dialect).then ([this , stream, &client_state, trace_state] (auto msg) {
1131
1138
tracing::trace (trace_state, " Done preparing on a local shard - preparing a result. ID is [{}]" , seastar::value_of ([&msg] {
1132
1139
return messages::result_message::prepared::cql::get_id (msg);
1133
1140
}));
1134
- return make_result (stream, *msg, trace_state, _version);
1141
+ cql_metadata_id_wrapper metadata_id = is_metadata_id_supported (client_state)
1142
+ ? cql_metadata_id_wrapper (msg->get_metadata_id ())
1143
+ : cql_metadata_id_wrapper ();
1144
+ return make_result (stream, *msg, trace_state, _version, std::move (metadata_id));
1135
1145
});
1136
1146
});
1137
1147
}
@@ -1157,6 +1167,10 @@ process_execute_internal(service::client_state& client_state, distributed<cql3::
1157
1167
throw exceptions::prepared_query_not_found_exception (id);
1158
1168
}
1159
1169
1170
+ cql_metadata_id_wrapper metadata_id = is_metadata_id_supported (client_state)
1171
+ ? cql_metadata_id_wrapper (cql3::cql_metadata_id_type (in.read_short_bytes ()), prepared->get_metadata_id ())
1172
+ : cql_metadata_id_wrapper ();
1173
+
1160
1174
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move (permit));
1161
1175
auto & query_state = q_state->query_state ;
1162
1176
q_state->options = in.read_options (version, qp.local ().get_cql_config ());
@@ -1195,14 +1209,14 @@ process_execute_internal(service::client_state& client_state, distributed<cql3::
1195
1209
1196
1210
tracing::trace (trace_state, " Processing a statement" );
1197
1211
return qp.local ().execute_prepared_without_checking_exception_message (query_state, std::move (stmt), options, std::move (prepared), std::move (cache_key), needs_authorization)
1198
- .then ([trace_state = query_state.get_trace_state (), skip_metadata, q_state = std::move (q_state), stream, version] (auto msg) {
1212
+ .then ([trace_state = query_state.get_trace_state (), skip_metadata, q_state = std::move (q_state), stream, version, metadata_id = std::move (metadata_id) ] (auto msg) mutable {
1199
1213
if (msg->move_to_shard ()) {
1200
1214
return cql_server::process_fn_return_type (make_foreign (dynamic_pointer_cast<messages::result_message::bounce_to_shard>(msg)));
1201
1215
} else if (msg->is_exception ()) {
1202
1216
return cql_server::process_fn_return_type (convert_error_message_to_coordinator_result (msg.get ()));
1203
1217
} else {
1204
1218
tracing::trace (q_state->query_state .get_trace_state (), " Done processing - preparing a result" );
1205
- return cql_server::process_fn_return_type (make_foreign (make_result (stream, *msg, q_state->query_state .get_trace_state (), version, skip_metadata)));
1219
+ return cql_server::process_fn_return_type (make_foreign (make_result (stream, *msg, q_state->query_state .get_trace_state (), version, std::move (metadata_id), skip_metadata)));
1206
1220
}
1207
1221
});
1208
1222
}
@@ -1323,7 +1337,8 @@ process_batch_internal(service::client_state& client_state, distributed<cql3::qu
1323
1337
return cql_server::process_fn_return_type (convert_error_message_to_coordinator_result (msg.get ()));
1324
1338
} else {
1325
1339
tracing::trace (q_state->query_state .get_trace_state (), " Done processing - preparing a result" );
1326
- return cql_server::process_fn_return_type (make_foreign (make_result (stream, *msg, trace_state, version)));
1340
+
1341
+ return cql_server::process_fn_return_type (make_foreign (make_result (stream, *msg, trace_state, version, cql_metadata_id_wrapper{})));
1327
1342
}
1328
1343
});
1329
1344
}
@@ -1538,11 +1553,13 @@ class cql_server::fmt_visitor : public messages::result_message::visitor_base {
1538
1553
uint8_t _version;
1539
1554
cql_server::response& _response;
1540
1555
bool _skip_metadata;
1556
+ cql_metadata_id_wrapper _metadata_id;
1541
1557
public:
1542
- fmt_visitor (uint8_t version, cql_server::response& response, bool skip_metadata)
1558
+ fmt_visitor (uint8_t version, cql_server::response& response, bool skip_metadata, cql_metadata_id_wrapper&& metadata_id )
1543
1559
: _version{version}
1544
1560
, _response{response}
1545
1561
, _skip_metadata{skip_metadata}
1562
+ , _metadata_id(std::move(metadata_id))
1546
1563
{ }
1547
1564
1548
1565
virtual void visit (const messages::result_message::void_message&) override {
@@ -1557,8 +1574,11 @@ class cql_server::fmt_visitor : public messages::result_message::visitor_base {
1557
1574
virtual void visit (const messages::result_message::prepared::cql& m) override {
1558
1575
_response.write_int (0x0004 );
1559
1576
_response.write_short_bytes (m.get_id ());
1577
+ if (_metadata_id.has_response_metadata_id ()) {
1578
+ _response.write_short_bytes (_metadata_id.get_response_metadata_id ()._metadata_id );
1579
+ }
1560
1580
_response.write (m.metadata (), _version);
1561
- _response.write (*m.result_metadata ());
1581
+ _response.write (*m.result_metadata (), _metadata_id );
1562
1582
}
1563
1583
1564
1584
virtual void visit (const messages::result_message::schema_change& m) override {
@@ -1578,7 +1598,7 @@ class cql_server::fmt_visitor : public messages::result_message::visitor_base {
1578
1598
virtual void visit (const messages::result_message::rows& m) override {
1579
1599
_response.write_int (0x0002 );
1580
1600
auto & rs = m.rs ();
1581
- _response.write (rs.get_metadata (), _skip_metadata);
1601
+ _response.write (rs.get_metadata (), _metadata_id, _skip_metadata);
1582
1602
auto row_count_plhldr = _response.write_int_placeholder ();
1583
1603
1584
1604
class visitor {
@@ -1606,7 +1626,7 @@ class cql_server::fmt_visitor : public messages::result_message::visitor_base {
1606
1626
1607
1627
std::unique_ptr<cql_server::response>
1608
1628
make_result (int16_t stream, messages::result_message& msg, const tracing::trace_state_ptr& tr_state,
1609
- cql_protocol_version_type version, bool skip_metadata) {
1629
+ cql_protocol_version_type version, cql_metadata_id_wrapper&& metadata_id, bool skip_metadata) {
1610
1630
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::RESULT, tr_state);
1611
1631
if (__builtin_expect (!msg.warnings ().empty () && version > 3 , false )) {
1612
1632
response->set_frame_flag (cql_frame_flags::warning);
@@ -1616,7 +1636,7 @@ make_result(int16_t stream, messages::result_message& msg, const tracing::trace_
1616
1636
response->set_frame_flag (cql_frame_flags::custom_payload);
1617
1637
response->write_string_bytes_map (msg.custom_payload ().value ());
1618
1638
}
1619
- cql_server::fmt_visitor fmt{version, *response, skip_metadata};
1639
+ cql_server::fmt_visitor fmt{version, *response, skip_metadata, std::move (metadata_id) };
1620
1640
msg.accept (fmt);
1621
1641
return response;
1622
1642
}
@@ -2024,7 +2044,7 @@ thread_local const type_codec::type_id_to_type_type type_codec::type_id_to_type
2024
2044
{ inet_addr_type, type_id::INET },
2025
2045
};
2026
2046
2027
- void cql_server::response::write (const cql3::metadata& m, bool no_metadata) {
2047
+ void cql_server::response::write (const cql3::metadata& m, const cql_metadata_id_wrapper& metadata_id, bool no_metadata) {
2028
2048
auto flags = m.flags ();
2029
2049
bool global_tables_spec = m.flags ().contains <cql3::metadata::flag::GLOBAL_TABLES_SPEC>();
2030
2050
bool has_more_pages = m.flags ().contains <cql3::metadata::flag::HAS_MORE_PAGES>();
@@ -2033,6 +2053,15 @@ void cql_server::response::write(const cql3::metadata& m, bool no_metadata) {
2033
2053
flags.set <cql3::metadata::flag::NO_METADATA>();
2034
2054
}
2035
2055
2056
+ cql3::cql_metadata_id_type calculated_metadata_id{bytes{}};
2057
+ if (metadata_id.has_request_metadata_id () && metadata_id.has_response_metadata_id ()) {
2058
+ if (metadata_id.get_request_metadata_id () != metadata_id.get_response_metadata_id ()) {
2059
+ flags.remove <cql3::metadata::flag::NO_METADATA>();
2060
+ flags.set <cql3::metadata::flag::METADATA_CHANGED>();
2061
+ no_metadata = false ;
2062
+ }
2063
+ }
2064
+
2036
2065
write_int (flags.mask ());
2037
2066
write_int (m.column_count ());
2038
2067
@@ -2044,6 +2073,10 @@ void cql_server::response::write(const cql3::metadata& m, bool no_metadata) {
2044
2073
return ;
2045
2074
}
2046
2075
2076
+ if (flags.contains <cql3::metadata::flag::METADATA_CHANGED>()) {
2077
+ write_short_bytes (metadata_id.get_response_metadata_id ()._metadata_id );
2078
+ }
2079
+
2047
2080
auto names_i = m.get_names ().begin ();
2048
2081
2049
2082
if (global_tables_spec) {
@@ -2096,6 +2129,28 @@ void cql_server::response::write(const cql3::prepared_metadata& m, uint8_t versi
2096
2129
}
2097
2130
}
2098
2131
2132
+ bool cql_metadata_id_wrapper::has_request_metadata_id () const {
2133
+ return _request_metadata_id.has_value ();
2134
+ }
2135
+
2136
+ bool cql_metadata_id_wrapper::has_response_metadata_id () const {
2137
+ return _response_metadata_id.has_value ();
2138
+ }
2139
+
2140
+ const cql3::cql_metadata_id_type& cql_metadata_id_wrapper::get_request_metadata_id () const {
2141
+ if (!has_request_metadata_id ()) {
2142
+ on_internal_error (clogger, " request metadata_id is empty" );
2143
+ }
2144
+ return _request_metadata_id.value ();
2145
+ }
2146
+
2147
+ const cql3::cql_metadata_id_type& cql_metadata_id_wrapper::get_response_metadata_id () const {
2148
+ if (!has_response_metadata_id ()) {
2149
+ on_internal_error (clogger, " response metadata_id is empty" );
2150
+ }
2151
+ return _response_metadata_id.value ();
2152
+ }
2153
+
2099
2154
future<utils::chunked_vector<client_data>> cql_server::get_client_data () {
2100
2155
utils::chunked_vector<client_data> ret;
2101
2156
co_await for_each_gently ([&ret] (const generic_server::connection& c) {
0 commit comments