Skip to content

Commit 25e6493

Browse files
committed
feat(eventsub): Implement WebSocket reconnection logic
1 parent db48eef commit 25e6493

File tree

1 file changed

+62
-28
lines changed

1 file changed

+62
-28
lines changed

src/eventsub.rs

Lines changed: 62 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -43,32 +43,48 @@ pub async fn listen_for_offline(
4343
let session_id = Arc::new(tokio::sync::OnceCell::new());
4444
let notify = Arc::new(tokio::sync::Notify::new());
4545

46-
info!("Connecting to Twitch EventSub via WebSocket");
47-
let upgrade = client
48-
.clone()
49-
.get(TWITCH_EVENTSUB_WS_URL)
50-
.upgrade()
51-
.send()
52-
.await
53-
.context("Connecting to Twitch's EventSub Endpoint")?;
54-
55-
let ws = upgrade
56-
.into_websocket()
57-
.await
58-
.context("Upgrading HTTP request into a WebSocket")?;
59-
let (_sink, stream) = ws.split();
60-
6146
// Handle websocket connection
6247
{
6348
let session_id = session_id.clone();
6449
let notify = notify.clone();
50+
let ws_client = client.clone();
51+
6552
tokio::spawn(async move {
66-
tokio::pin!(stream);
67-
loop {
68-
tokio::select! {
69-
() = ct.cancelled() => break,
70-
() = tokio::time::sleep(Duration::from_secs(40)) => panic!("Didn't get any message for 40s. Connection is effectively poisoned!"),
71-
Some(Ok(message)) = stream.next() => handle_ws_message(message, &session_id, &notify, &tx).await
53+
let mut connection_url: Option<String> = None;
54+
55+
'all: loop {
56+
info!("(re-)Connecting to Twitch EventSub via WebSocket");
57+
let upgrade = ws_client
58+
.clone()
59+
.get(connection_url.unwrap_or(TWITCH_EVENTSUB_WS_URL.to_string()))
60+
.upgrade()
61+
.send()
62+
.await
63+
.context("Connecting to Twitch's EventSub Endpoint")
64+
.unwrap();
65+
66+
let ws = upgrade
67+
.into_websocket()
68+
.await
69+
.context("Upgrading HTTP request into a WebSocket")
70+
.unwrap();
71+
let (_sink, stream) = ws.split();
72+
73+
tokio::pin!(stream);
74+
'session: loop {
75+
let action = tokio::select! {
76+
() = ct.cancelled() => break 'all,
77+
() = tokio::time::sleep(Duration::from_secs(40)) => {
78+
panic!("Didn't get any message for 40s. Connection is effectively poisoned!")
79+
// TODO: Handle re-registration (resend EventSub subscriptions)
80+
},
81+
Some(Ok(message)) = stream.next() => handle_ws_message(message, &session_id, &notify, &tx).await
82+
};
83+
84+
if let SocketAction::Reconnect(url) = action {
85+
connection_url = Some(url);
86+
break 'session;
87+
}
7288
}
7389
}
7490
});
@@ -114,12 +130,16 @@ pub async fn listen_for_offline(
114130
Ok(rx)
115131
}
116132

133+
enum SocketAction {
134+
None,
135+
Reconnect(String),
136+
}
117137
async fn handle_ws_message(
118138
message: reqwest_websocket::Message,
119139
session_id: &OnceCell<Value>,
120140
notify: &Arc<Notify>,
121141
tx: &mpsc::Sender<u64>,
122-
) {
142+
) -> SocketAction {
123143
match message {
124144
Message::Text(m) => {
125145
let m: Value = serde_json::from_str(&m).unwrap();
@@ -129,31 +149,43 @@ async fn handle_ws_message(
129149
.set(m["payload"]["session"]["id"].clone())
130150
.unwrap();
131151
notify.notify_one();
132-
return;
152+
return SocketAction::None;
133153
}
134154

135155
let JsonString(message_type) = &m["metadata"]["message_type"] else {
136156
error!("Twitch message is missing message_type\n{m}\nSkipping...");
137-
return;
157+
return SocketAction::None;
138158
};
139159

140160
match message_type.as_str() {
141-
"session_keepalive" => {} // noop
161+
"session_keepalive" => {
162+
// Noop
163+
return SocketAction::None;
164+
}
142165
"notification" => {
143166
info!("Received notification message!");
144167
let channel_id = &m["payload"]["event"]["broadcaster_user_id"]
145168
.as_str()
146-
.as_u64()
147169
.context("Stream offline notification message does not contain broadcaster_user_id!")
148170
.unwrap()
149171
.parse::<u64>()
150172
.unwrap();
151173
tx.send(*channel_id)
152174
.await
153175
.expect("Unable to announce offline");
176+
177+
return SocketAction::None;
178+
}
179+
"session_reconnect" => {
180+
info!("Need to reconnect!");
181+
let reconnect_url = m["payload"]["session"]["reconnect_url"].as_str().unwrap();
182+
return SocketAction::Reconnect(reconnect_url.to_string());
154183
}
155184

156-
other => warn!("Unhandled message type: {other}"),
185+
other => {
186+
warn!("Unhandled message type: {other}");
187+
return SocketAction::None;
188+
}
157189
}
158190
}
159191

@@ -163,6 +195,8 @@ async fn handle_ws_message(
163195
panic!("Twitch closed WS connection!");
164196
}
165197

166-
_ => {}
198+
_ => {
199+
return SocketAction::None;
200+
}
167201
}
168202
}

0 commit comments

Comments
 (0)