Skip to content

Commit 56b78a9

Browse files
committed
simplify timer and fix incorrect flush order
1 parent e3af98d commit 56b78a9

File tree

1 file changed

+7
-20
lines changed

1 file changed

+7
-20
lines changed

pulsebeam/src/participant/actor.rs

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -83,22 +83,9 @@ impl actor::Actor<ParticipantMessageSet> for ParticipantActor {
8383
))
8484
.await;
8585
let mut stats_interval = tokio::time::interval(Duration::from_millis(200));
86-
let mut current_deadline = Instant::now() + Duration::from_secs(1);
87-
let mut new_deadline = Some(current_deadline);
88-
89-
let rtc_timer = tokio::time::sleep_until(current_deadline);
90-
tokio::pin!(rtc_timer);
91-
92-
loop {
93-
match new_deadline {
94-
Some(new_deadline) if new_deadline != current_deadline => {
95-
rtc_timer.as_mut().reset(new_deadline);
96-
current_deadline = new_deadline;
97-
}
98-
None => break,
99-
_ => {}
100-
}
86+
let mut maybe_deadline = self.core.poll_rtc();
10187

88+
while let Some(deadline) = maybe_deadline {
10289
let events: Vec<_> = self.core.drain_events().collect();
10390
for event in events {
10491
self.handle_core_event(event).await;
@@ -127,18 +114,18 @@ impl actor::Actor<ParticipantMessageSet> for ParticipantActor {
127114
}
128115
Some((meta, pkt)) = self.core.downstream.next() => {
129116
self.core.handle_forward_rtp(meta, pkt);
117+
118+
maybe_deadline = self.core.poll_rtc();
130119
// this indicates the first batch is filled.
131120
if self.core.udp_batcher.len() >= 2 {
132121
self.core.udp_batcher.flush(&self.udp_egress);
133122
}
134123
if self.core.tcp_batcher.len() >= 2 {
135124
self.core.tcp_batcher.flush(&self.tcp_egress);
136125
}
137-
138-
new_deadline = self.core.poll_rtc();
139126
},
140127
Some(batch) = gateway_rx.recv() => {
141-
new_deadline = self.core.handle_udp_packet_batch(batch);
128+
maybe_deadline = self.core.handle_udp_packet_batch(batch);
142129
},
143130

144131
// Priority 3: Flush to network
@@ -153,8 +140,8 @@ impl actor::Actor<ParticipantMessageSet> for ParticipantActor {
153140
now = stats_interval.tick() => {
154141
self.core.poll_slow(now);
155142
}
156-
_ = &mut rtc_timer => {
157-
new_deadline = self.core.handle_timeout();
143+
_ = tokio::time::sleep_until(deadline) => {
144+
maybe_deadline = self.core.handle_timeout();
158145
},
159146
}
160147
}

0 commit comments

Comments
 (0)