1+ import { once } from "events" ;
12import { viewToBuffer } from "@miniflare/shared" ;
23import StandardWebSocket from "ws" ;
34import {
78 kClose ,
89 kClosed ,
910 kCoupled ,
11+ kSend ,
1012} from "./websocket" ;
1113
1214export async function coupleWebSocket (
@@ -24,20 +26,16 @@ export async function coupleWebSocket(
2426 ) ;
2527 }
2628
27- // Forward events from client to worker
29+ // Forward messages from client to worker (register this before `open` to
30+ // ensure messages queued before other pair `accept`s to release)
2831 ws . on ( "message" , ( message : Buffer , isBinary : boolean ) => {
29- if ( isBinary ) {
30- pair . send ( viewToBuffer ( message ) ) ;
31- } else {
32- pair . send ( message . toString ( ) ) ;
32+ // Silently discard messages sent after close
33+ if ( ! pair [ kClosed ] ) {
34+ // Convert binary messages to `ArrayBuffer`s (note `[kSend]` will queue
35+ // messages if other pair hasn't `accept`ed yet)
36+ pair [ kSend ] ( isBinary ? viewToBuffer ( message ) : message . toString ( ) ) ;
3337 }
3438 } ) ;
35- ws . on ( "close" , ( code : number , reason : Buffer ) => {
36- if ( ! pair [ kClosed ] ) pair [ kClose ] ( code , reason . toString ( ) ) ;
37- } ) ;
38- ws . on ( "error" , ( error ) => {
39- pair . dispatchEvent ( new ErrorEvent ( "error" , { error } ) ) ;
40- } ) ;
4139
4240 // Forward events from worker to client
4341 pair . addEventListener ( "message" , ( e ) => {
@@ -55,26 +53,24 @@ export async function coupleWebSocket(
5553
5654 if ( ws . readyState === StandardWebSocket . CONNECTING ) {
5755 // Wait for client to be open before accepting worker pair (which would
58- // release buffered messages)
59- await new Promise < void > ( ( resolve , reject ) => {
60- ws . once ( "open" , ( ) => {
61- pair . accept ( ) ;
62- pair [ kCoupled ] = true ;
63-
64- ws . off ( "close" , reject ) ;
65- ws . off ( "error" , reject ) ;
66- resolve ( ) ;
67- } ) ;
68- ws . once ( "close" , reject ) ;
69- ws . once ( "error" , reject ) ;
70- } ) ;
71- } else {
72- // Accept worker pair immediately
73- pair . accept ( ) ;
74- pair [ kCoupled ] = true ;
75- // Throw error if socket is already closing/closed
76- if ( ws . readyState >= StandardWebSocket . CLOSING ) {
77- throw new TypeError ( "Incoming WebSocket connection already closed." ) ;
78- }
56+ // release buffered messages). Note this will throw if an "error" event is
57+ // dispatched.
58+ await once ( ws , "open" ) ;
59+ } else if ( ws . readyState >= StandardWebSocket . CLOSING ) {
60+ throw new TypeError ( "Incoming WebSocket connection already closed." ) ;
7961 }
62+ pair . accept ( ) ;
63+ pair [ kCoupled ] = true ;
64+
65+ // Forward close/error events from client to worker (register this after
66+ // `once(ws, "open")` to ensure close/error due to connection failure throws
67+ // and can be caught from this function: https://github.com/cloudflare/miniflare/issues/229)
68+ ws . on ( "close" , ( code : number , reason : Buffer ) => {
69+ // `[kClose]` skips code/reason validation, allowing reserved codes
70+ // (e.g. 1005 for "No Status Received")
71+ if ( ! pair [ kClosed ] ) pair [ kClose ] ( code , reason . toString ( ) ) ;
72+ } ) ;
73+ ws . on ( "error" , ( error ) => {
74+ pair . dispatchEvent ( new ErrorEvent ( "error" , { error } ) ) ;
75+ } ) ;
8076}
0 commit comments