Skip to content

Commit 8e27cd6

Browse files
authored
chore: ensure that rpc messages get processed sequentially and avoid phantom and repeated key presses (#744)
1 parent bb87fb5 commit 8e27cd6

File tree

1 file changed

+14
-1
lines changed

1 file changed

+14
-1
lines changed

webrtc.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ type Session struct {
2323
HidChannel *webrtc.DataChannel
2424
DiskChannel *webrtc.DataChannel
2525
shouldUmountVirtualMedia bool
26+
rpcQueue chan webrtc.DataChannelMessage
2627
}
2728

2829
type SessionConfig struct {
@@ -105,14 +106,21 @@ func newSession(config SessionConfig) (*Session, error) {
105106
return nil, err
106107
}
107108
session := &Session{peerConnection: peerConnection}
109+
session.rpcQueue = make(chan webrtc.DataChannelMessage, 256)
110+
go func() {
111+
for msg := range session.rpcQueue {
112+
onRPCMessage(msg, session)
113+
}
114+
}()
108115

109116
peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
110117
scopedLogger.Info().Str("label", d.Label()).Uint16("id", *d.ID()).Msg("New DataChannel")
111118
switch d.Label() {
112119
case "rpc":
113120
session.RPCChannel = d
114121
d.OnMessage(func(msg webrtc.DataChannelMessage) {
115-
go onRPCMessage(msg, session)
122+
// Enqueue to ensure ordered processing
123+
session.rpcQueue <- msg
116124
})
117125
triggerOTAStateUpdate()
118126
triggerVideoStateUpdate()
@@ -186,6 +194,11 @@ func newSession(config SessionConfig) (*Session, error) {
186194
if session == currentSession {
187195
currentSession = nil
188196
}
197+
// Stop RPC processor
198+
if session.rpcQueue != nil {
199+
close(session.rpcQueue)
200+
session.rpcQueue = nil
201+
}
189202
if session.shouldUmountVirtualMedia {
190203
err := rpcUnmountImage()
191204
scopedLogger.Warn().Err(err).Msg("unmount image failed on connection close")

0 commit comments

Comments
 (0)