Skip to content

Commit 5e1d718

Browse files
authored
Rust: fix PipeConsumer get_stats() (#1511)
* Rust: fix PipeConsumer get_stats() Fixes #1510 Consider PipeConsumer which stats contains multiple send streams.
1 parent acdd49b commit 5e1d718

File tree

3 files changed

+33
-9
lines changed

3 files changed

+33
-9
lines changed

rust/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
# NEXT
44

5+
# 0.17.2
6+
7+
- Fix `PipeConsumer::get_stats()` (PR #1511).
8+
59
# 0.17.1
610

711
- Update Rust toolchain channel to version 1.79.0 (PR #1409).

rust/src/router/consumer.rs

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,8 @@ pub enum ConsumerStats {
434434
JustConsumer((ConsumerStat,)),
435435
/// RTC statistics with producer
436436
WithProducer((ConsumerStat, ProducerStat)),
437+
/// RTC statistics with multiple consumers (pipe).
438+
MultipleConsumers(Vec<ConsumerStat>),
437439
}
438440

439441
impl ConsumerStats {
@@ -442,6 +444,7 @@ impl ConsumerStats {
442444
match self {
443445
ConsumerStats::JustConsumer((consumer_stat,)) => consumer_stat,
444446
ConsumerStats::WithProducer((consumer_stat, _)) => consumer_stat,
447+
ConsumerStats::MultipleConsumers(consumer_stats) => &consumer_stats[0],
445448
}
446449
}
447450
}
@@ -1030,18 +1033,31 @@ impl Consumer {
10301033
.await?;
10311034

10321035
if let response::Body::ConsumerGetStatsResponse(data) = response {
1033-
match data.stats.len() {
1034-
0 => panic!("Empty stats response from worker"),
1035-
1 => {
1036-
let consumer_stat = ConsumerStat::from_fbs(&data.stats[0]);
1036+
match self.r#type() {
1037+
ConsumerType::Simple | ConsumerType::Simulcast | ConsumerType::Svc => {
1038+
match data.stats.len() {
1039+
0 => panic!("Empty stats response from worker"),
1040+
1 => {
1041+
let consumer_stat = ConsumerStat::from_fbs(&data.stats[0]);
1042+
1043+
Ok(ConsumerStats::JustConsumer((consumer_stat,)))
1044+
}
1045+
2 => {
1046+
let consumer_stat = ConsumerStat::from_fbs(&data.stats[0]);
1047+
let producer_stat = ProducerStat::from_fbs(&data.stats[1]);
10371048

1038-
Ok(ConsumerStats::JustConsumer((consumer_stat,)))
1049+
Ok(ConsumerStats::WithProducer((consumer_stat, producer_stat)))
1050+
}
1051+
_ => panic!("More than two stats response from worker"),
1052+
}
10391053
}
1040-
_ => {
1041-
let consumer_stat = ConsumerStat::from_fbs(&data.stats[0]);
1042-
let producer_stat = ProducerStat::from_fbs(&data.stats[1]);
1054+
ConsumerType::Pipe => {
1055+
let mut stats = Vec::<ConsumerStat>::with_capacity(data.stats.len());
1056+
for stat in data.stats {
1057+
stats.push(ConsumerStat::from_fbs(&stat));
1058+
}
10431059

1044-
Ok(ConsumerStats::WithProducer((consumer_stat, producer_stat)))
1060+
Ok(ConsumerStats::MultipleConsumers(stats))
10451061
}
10461062
}
10471063
} else {

rust/tests/integration/consumer.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -679,6 +679,10 @@ fn consume_succeeds() {
679679
video_pipe_consumer.app_data().downcast_ref::<()>().unwrap(),
680680
&(),
681681
);
682+
video_pipe_consumer
683+
.get_stats()
684+
.await
685+
.expect("Failed to get consumer stats");
682686

683687
let router_dump = router.dump().await.expect("Failed to get router dump");
684688

0 commit comments

Comments
 (0)