Skip to content

Commit 992955b

Browse files
feat: [DSM-91] Metric for misrouted messages in streams (#8338)
During a subnet split, export a metric to track misrouted (according to the current routing table, presumably correctly routed according to an earlier routing table) messages in streams. We only scan streams to/from subnets involved in canister migrations, which should make this a no-op most of the time. This metric should make it trivial to ascertain when it is safe to remove a given `canister_migrations` entry (when the registry version has moved past the routing table update and there are no more misrouted messages to or from the involved subnets; for a while, to smooth over any race conditions).
1 parent ed1bed8 commit 992955b

File tree

3 files changed

+317
-4
lines changed

3 files changed

+317
-4
lines changed

rs/messaging/src/routing/stream_builder.rs

Lines changed: 99 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use ic_types::{CountBytes, Cycles, SubnetId};
1717
#[cfg(test)]
1818
use mockall::automock;
1919
use prometheus::{Histogram, IntCounter, IntCounterVec, IntGaugeVec};
20-
use std::collections::{BTreeMap, btree_map};
20+
use std::collections::{BTreeMap, BTreeSet, btree_map};
2121
use std::sync::{Arc, Mutex};
2222

2323
#[cfg(test)]
@@ -36,6 +36,8 @@ struct StreamBuilderMetrics {
3636
pub routed_messages: IntCounterVec,
3737
/// Successfully routed XNet messages' total payload size.
3838
pub routed_payload_sizes: Histogram,
39+
/// Misrouted messages currently in streams, by remote subnet.
40+
pub stream_misrouted_messages: IntGaugeVec,
3941
/// Critical error counter for detected infinite loops while routing.
4042
pub critical_error_infinite_loops: IntCounter,
4143
/// Critical error for payloads above the maximum supported size.
@@ -53,6 +55,7 @@ const METRIC_STREAM_BEGIN: &str = "mr_stream_begin";
5355
const METRIC_SIGNALS_END: &str = "mr_signals_end";
5456
const METRIC_ROUTED_MESSAGES: &str = "mr_routed_message_count";
5557
const METRIC_ROUTED_PAYLOAD_SIZES: &str = "mr_routed_payload_size_bytes";
58+
const METRIC_STREAM_MISROUTED_MESSAGES: &str = "mr_stream_misrouted_messages";
5659

5760
const LABEL_TYPE: &str = "type";
5861
const LABEL_STATUS: &str = "status";
@@ -106,6 +109,11 @@ impl StreamBuilderMetrics {
106109
// 10 B - 5 MB
107110
decimal_buckets(1, 6),
108111
);
112+
let stream_misrouted_messages= metrics_registry.int_gauge_vec(
113+
METRIC_STREAM_MISROUTED_MESSAGES,
114+
"Count of misrouted messages in streams, by remote subnet. Only populated for subnets currently involved in a canister migration.",
115+
&[LABEL_REMOTE],
116+
);
109117
let critical_error_infinite_loops =
110118
metrics_registry.error_counter(CRITICAL_ERROR_INFINITE_LOOP);
111119
let critical_error_payload_too_large =
@@ -144,6 +152,7 @@ impl StreamBuilderMetrics {
144152
signals_end,
145153
routed_messages,
146154
routed_payload_sizes,
155+
stream_misrouted_messages,
147156
critical_error_infinite_loops,
148157
critical_error_payload_too_large,
149158
critical_error_response_destination_not_found,
@@ -266,6 +275,94 @@ impl StreamBuilderImpl {
266275
.observe(msg.payload_size_bytes().get() as f64);
267276
}
268277

278+
/// Iterates over all messages in potentially relevant streams and counts how
279+
/// many are misrouted (mismatched source or destination subnet according to the
280+
/// current routing table).
281+
///
282+
/// Only streams to or from subnets involved in migrations may enqueue misrouted
283+
/// messages. If this subnet is involved in a migration, we scan all its
284+
/// streams. Otherwise, we only scan streams to subnets involved in migrations.
285+
fn observe_misrouted_messages(&self, state: &ReplicatedState) {
286+
// Reset all gauges to zero before recounting.
287+
//
288+
// This may lead to a race condition where some keys are temporarily missing,
289+
// but we already have a race condition between this metric and the registry
290+
// version metric. We work around both by (1) aggregating over all replicas on
291+
// the subnet and (2) requiring the condition (no misrouted messages) to hold
292+
// for a while before acting on it.
293+
self.metrics.stream_misrouted_messages.reset();
294+
295+
let canister_migrations = state.metadata.network_topology.canister_migrations.as_ref();
296+
if canister_migrations.is_empty() {
297+
return;
298+
}
299+
300+
// Collect all subnets involved in migrations (source or destination).
301+
//
302+
// It may be sufficient to only look at "source" subnets of migrations because
303+
// we are looking for messages stuck in streams to OLD host subnets. But, just
304+
// to be safe, we collect all subnets appearing in migration traces.
305+
let mut subnets_with_canister_migrations = BTreeSet::new();
306+
for (_, trace) in canister_migrations.iter() {
307+
for subnet in trace {
308+
subnets_with_canister_migrations.insert(*subnet);
309+
}
310+
}
311+
let relevant_subnets = if subnets_with_canister_migrations.contains(&self.subnet_id) {
312+
// This subnet is the source or target of a migration, scan all its streams.
313+
state
314+
.metadata
315+
.streams()
316+
.keys()
317+
.cloned()
318+
.collect::<BTreeSet<_>>()
319+
} else {
320+
// This is a third-party subnet, only scan streams to subnets involved in
321+
// canister migrations.
322+
subnets_with_canister_migrations
323+
};
324+
325+
for remote_subnet in &relevant_subnets {
326+
let Some(stream) = state.metadata.streams().get(remote_subnet) else {
327+
continue;
328+
};
329+
330+
let mut misrouted_messages = 0;
331+
// Iterate over all messages in the stream
332+
for (_, msg) in stream.messages().iter() {
333+
// Check for receiver subnet mismatch.
334+
let receiver_host_subnet =
335+
state.metadata.network_topology.route(msg.receiver().get());
336+
if receiver_host_subnet != Some(*remote_subnet) {
337+
misrouted_messages += 1;
338+
continue;
339+
}
340+
341+
// Check for sender subnet mismatch.
342+
let sender_host_subnet = match msg {
343+
StreamMessage::Request(req) => {
344+
state.metadata.network_topology.route(req.sender.get())
345+
}
346+
StreamMessage::Response(resp) => {
347+
state.metadata.network_topology.route(resp.originator.get())
348+
}
349+
StreamMessage::Refund(_) => {
350+
// Refunds don't have explicit senders. Always assume they are local.
351+
Some(self.subnet_id)
352+
}
353+
};
354+
if sender_host_subnet != Some(self.subnet_id) {
355+
misrouted_messages += 1;
356+
}
357+
}
358+
359+
self.metrics
360+
.stream_misrouted_messages
361+
.with_label_values(&[&remote_subnet.to_string()])
362+
.set(misrouted_messages);
363+
}
364+
}
365+
269366
/// Implementation of `StreamBuilder::build_streams()`.
270367
fn build_streams_impl(&self, mut state: ReplicatedState) -> ReplicatedState {
271368
/// Pops the previously peeked message.
@@ -535,6 +632,7 @@ impl StreamBuilderImpl {
535632
.with_label_values(&[&subnet])
536633
.set(signals_end.get() as i64);
537634
});
635+
self.observe_misrouted_messages(&state);
538636

539637
{
540638
// Record the enqueuing time of any messages newly enqueued into `streams`.

rs/messaging/src/routing/stream_builder/tests.rs

Lines changed: 209 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use ic_config::message_routing::{MAX_STREAM_MESSAGES, TARGET_STREAM_SIZE_BYTES};
66
use ic_error_types::RejectCode;
77
use ic_limits::SYSTEM_SUBNET_STREAM_MSG_LIMIT;
88
use ic_management_canister_types_private::Method;
9-
use ic_registry_routing_table::{CanisterIdRange, RoutingTable};
9+
use ic_registry_routing_table::{CanisterIdRange, CanisterMigrations, RoutingTable};
1010
use ic_registry_subnet_type::SubnetType;
1111
use ic_replicated_state::testing::{
1212
CanisterQueuesTesting, ReplicatedStateTesting, SystemStateTesting,
@@ -18,7 +18,9 @@ use ic_test_utilities_metrics::{
1818
nonzero_values,
1919
};
2020
use ic_test_utilities_state::{new_canister_state, register_callback};
21-
use ic_test_utilities_types::ids::{SUBNET_27, SUBNET_42, canister_test_id, user_test_id};
21+
use ic_test_utilities_types::ids::{
22+
SUBNET_3, SUBNET_4, SUBNET_5, SUBNET_27, SUBNET_42, canister_test_id, user_test_id,
23+
};
2224
use ic_test_utilities_types::messages::RequestBuilder;
2325
use ic_types::messages::{
2426
CallbackId, MAX_INTER_CANISTER_PAYLOAD_IN_BYTES_U64, NO_DEADLINE, Payload, Refund,
@@ -1216,6 +1218,211 @@ fn build_streams_with_oversized_payloads() {
12161218
});
12171219
}
12181220

1221+
/// Local subnet is splitting: a canister is migrating from local subnet to
1222+
/// subnet B.
1223+
#[test]
1224+
fn test_observe_misrouted_messages_on_splitting_subnet() {
1225+
with_test_replica_logger(|log| {
1226+
let (stream_builder, mut state, metrics_registry) = new_fixture(&log);
1227+
1228+
// Subnets and canisters.
1229+
const REMOTE_SUBNET_B: SubnetId = SUBNET_4; // Destination of migration.
1230+
const REMOTE_SUBNET_Z: SubnetId = SUBNET_5; // Other subnet, no migration.
1231+
1232+
let local_canister = canister_test_id(100);
1233+
// Migrating but not yet migrated canister.
1234+
let migrating_canister = canister_test_id(200);
1235+
// Already migrated canister.
1236+
let migrated_canister = canister_test_id(300);
1237+
let canister_on_b = canister_test_id(400);
1238+
let canister_on_z = canister_test_id(500);
1239+
1240+
// Routing table: `migrating_canister` is still hosted by the local subnet;
1241+
// `migrated_canister` has already migrated from the local subnet to B.
1242+
state.metadata.network_topology.routing_table = Arc::new(
1243+
RoutingTable::try_from(btreemap! {
1244+
CanisterIdRange{ start: local_canister, end: local_canister } => LOCAL_SUBNET,
1245+
CanisterIdRange{ start: migrating_canister, end: migrating_canister } => LOCAL_SUBNET,
1246+
CanisterIdRange{ start: migrated_canister, end: migrated_canister } => REMOTE_SUBNET_B,
1247+
CanisterIdRange{ start: canister_on_b, end: canister_on_b } => REMOTE_SUBNET_B,
1248+
CanisterIdRange{ start: canister_on_z, end: canister_on_z } => REMOTE_SUBNET_Z,
1249+
})
1250+
.unwrap(),
1251+
);
1252+
1253+
// Canister migrations: both `migrating_canister` and `migrated_canister` are
1254+
// migrating from the local subnet to B.
1255+
state.metadata.network_topology.canister_migrations = Arc::new(
1256+
CanisterMigrations::try_from(btreemap! {
1257+
CanisterIdRange{ start: migrating_canister, end: migrating_canister } => vec![LOCAL_SUBNET, REMOTE_SUBNET_B],
1258+
CanisterIdRange{ start: migrated_canister, end: migrated_canister } => vec![LOCAL_SUBNET, REMOTE_SUBNET_B],
1259+
})
1260+
.unwrap(),
1261+
);
1262+
1263+
let message_to = |receiver: CanisterId| {
1264+
RequestBuilder::default()
1265+
.sender(local_canister)
1266+
.receiver(receiver)
1267+
.build()
1268+
.into()
1269+
};
1270+
let message = |sender: CanisterId, receiver: CanisterId| {
1271+
RequestBuilder::default()
1272+
.sender(sender)
1273+
.receiver(receiver)
1274+
.build()
1275+
.into()
1276+
};
1277+
1278+
// Loopback stream, with messages to and from all canisters hosted by subnet A
1279+
// at any given time. The 3 messages to/from `migrated_canister` are misrouted.
1280+
let mut loopback_stream = Stream::default();
1281+
loopback_stream.push(message_to(local_canister));
1282+
loopback_stream.push(message_to(migrated_canister));
1283+
loopback_stream.push(message_to(migrating_canister));
1284+
loopback_stream.push(message(migrated_canister, local_canister));
1285+
loopback_stream.push(message(migrated_canister, migrated_canister));
1286+
loopback_stream.push(message(migrating_canister, local_canister));
1287+
loopback_stream.push(message(migrating_canister, migrating_canister));
1288+
1289+
// Stream to subnet B, with messages to all canisters potentially hosted by
1290+
// subnet B at any given time.
1291+
//
1292+
// The 2 messages from the migrated canister and the 2 messages to the migrating
1293+
// canister are misrouted.
1294+
let mut stream_to_subnet_b = Stream::default();
1295+
stream_to_subnet_b.push(message_to(canister_on_b));
1296+
stream_to_subnet_b.push(message_to(migrated_canister));
1297+
stream_to_subnet_b.push(message_to(migrating_canister));
1298+
stream_to_subnet_b.push(message(migrated_canister, canister_on_b));
1299+
stream_to_subnet_b.push(message(migrated_canister, migrated_canister));
1300+
stream_to_subnet_b.push(message(migrating_canister, canister_on_b));
1301+
stream_to_subnet_b.push(message(migrating_canister, migrating_canister));
1302+
1303+
// Stream to subnet Z: one message from each currently or previously hosted
1304+
// canister. The message from `migrated_canister` is misrouted.
1305+
let mut stream_to_subnet_z = Stream::default();
1306+
stream_to_subnet_z.push(message_to(canister_on_z));
1307+
stream_to_subnet_z.push(message(migrated_canister, canister_on_z));
1308+
stream_to_subnet_z.push(message(migrating_canister, canister_on_z));
1309+
1310+
state.modify_streams(|streams| {
1311+
*streams = btreemap! {
1312+
LOCAL_SUBNET => loopback_stream,
1313+
REMOTE_SUBNET_B => stream_to_subnet_b,
1314+
REMOTE_SUBNET_Z => stream_to_subnet_z,
1315+
}
1316+
});
1317+
1318+
// Act.
1319+
stream_builder.observe_misrouted_messages(&state);
1320+
1321+
// Assert.
1322+
assert_eq!(
1323+
metric_vec(&[
1324+
(&[(LABEL_REMOTE, &LOCAL_SUBNET.to_string())], 3),
1325+
(&[(LABEL_REMOTE, &REMOTE_SUBNET_B.to_string())], 4),
1326+
(&[(LABEL_REMOTE, &REMOTE_SUBNET_Z.to_string())], 1)
1327+
]),
1328+
fetch_int_gauge_vec(&metrics_registry, METRIC_STREAM_MISROUTED_MESSAGES)
1329+
);
1330+
});
1331+
}
1332+
1333+
/// A canister is migrating between remote subnets A and B.
1334+
#[test]
1335+
fn test_observe_misrouted_messages_on_third_party_subnet() {
1336+
with_test_replica_logger(|log| {
1337+
let (stream_builder, mut state, metrics_registry) = new_fixture(&log);
1338+
1339+
// Subnets and canisters.
1340+
const REMOTE_SUBNET_A: SubnetId = SUBNET_3; // Source of migration.
1341+
const REMOTE_SUBNET_B: SubnetId = SUBNET_4; // Destination of migration.
1342+
const REMOTE_SUBNET_Z: SubnetId = SUBNET_5; // Other subnet, no migration.
1343+
1344+
let local_canister = canister_test_id(100);
1345+
let canister_on_a = canister_test_id(200);
1346+
let migrated_canister = canister_test_id(300);
1347+
let migrating_canister = canister_test_id(400);
1348+
let canister_on_b = canister_test_id(500);
1349+
let canister_on_z = canister_test_id(600);
1350+
1351+
// Routing table: `migrating_canister` is still hosted by subnet A;
1352+
// `migrated_canister` has already migrated from A to B.
1353+
state.metadata.network_topology.routing_table = Arc::new(
1354+
RoutingTable::try_from(btreemap! {
1355+
CanisterIdRange{ start: local_canister, end: local_canister } => LOCAL_SUBNET,
1356+
CanisterIdRange{ start: canister_on_a, end: canister_on_a } => REMOTE_SUBNET_A,
1357+
CanisterIdRange{ start: migrating_canister, end: migrating_canister } => REMOTE_SUBNET_A,
1358+
CanisterIdRange{ start: migrated_canister, end: migrated_canister } => REMOTE_SUBNET_B,
1359+
CanisterIdRange{ start: canister_on_b, end: canister_on_b } => REMOTE_SUBNET_B,
1360+
CanisterIdRange{ start: canister_on_z, end: canister_on_z } => REMOTE_SUBNET_Z,
1361+
})
1362+
.unwrap(),
1363+
);
1364+
1365+
// Canister migrations: both `migrating_canister` and `migrated_canister` are
1366+
// migrating from A to B.
1367+
state.metadata.network_topology.canister_migrations = Arc::new(
1368+
CanisterMigrations::try_from(btreemap! {
1369+
CanisterIdRange{ start: migrated_canister, end: migrated_canister } => vec![REMOTE_SUBNET_A, REMOTE_SUBNET_B],
1370+
CanisterIdRange{ start: migrating_canister, end: migrating_canister } => vec![REMOTE_SUBNET_A, REMOTE_SUBNET_B],
1371+
})
1372+
.unwrap(),
1373+
);
1374+
1375+
let message_to = |receiver: CanisterId| {
1376+
RequestBuilder::default()
1377+
.sender(local_canister)
1378+
.receiver(receiver)
1379+
.build()
1380+
.into()
1381+
};
1382+
1383+
// Stream to subnet A with messages to all the canisters it hosted at one time
1384+
// or another. Only the message to `migrated_canister` is misrouted.
1385+
let mut stream_to_subnet_a = Stream::default();
1386+
stream_to_subnet_a.push(message_to(canister_on_a));
1387+
stream_to_subnet_a.push(message_to(migrated_canister));
1388+
stream_to_subnet_a.push(message_to(migrating_canister));
1389+
1390+
// Stream to subnet B with messages to all the canisters it could potentially
1391+
// have hosted. Only the message to `migrating_canister` is misrouted.
1392+
let mut stream_to_subnet_b = Stream::default();
1393+
stream_to_subnet_b.push(message_to(canister_on_b));
1394+
stream_to_subnet_b.push(message_to(migrated_canister));
1395+
stream_to_subnet_b.push(message_to(migrating_canister));
1396+
1397+
// Contents of both the loopback stream and the stream to subnet Z. It enqueues
1398+
// one misrouted message (e.g. due to a manual canister migration). Not counted
1399+
// because neither the local subnet nor Z are on any migration trace.
1400+
let mut other_stream = Stream::default();
1401+
other_stream.push(message_to(canister_on_a));
1402+
1403+
state.modify_streams(|streams| {
1404+
*streams = btreemap! {
1405+
REMOTE_SUBNET_A => stream_to_subnet_a,
1406+
REMOTE_SUBNET_B => stream_to_subnet_b,
1407+
REMOTE_SUBNET_Z => other_stream.clone(),
1408+
LOCAL_SUBNET => other_stream,
1409+
}
1410+
});
1411+
1412+
// Act.
1413+
stream_builder.observe_misrouted_messages(&state);
1414+
1415+
// Assert.
1416+
assert_eq!(
1417+
metric_vec(&[
1418+
(&[(LABEL_REMOTE, &REMOTE_SUBNET_A.to_string())], 1),
1419+
(&[(LABEL_REMOTE, &REMOTE_SUBNET_B.to_string())], 1)
1420+
]),
1421+
fetch_int_gauge_vec(&metrics_registry, METRIC_STREAM_MISROUTED_MESSAGES)
1422+
);
1423+
});
1424+
}
1425+
12191426
/// Sets up the `StreamHandlerImpl`, `ReplicatedState` and `MetricsRegistry` to
12201427
/// be used by a test using specific stream limits.
12211428
fn new_fixture_with_limits(

0 commit comments

Comments
 (0)