Skip to content

Commit 51f3c89

Browse files
committed
Wrap phoenix.js transports
Phoenix expected a transport to be API compatible with the JS WebSocket API. This is a little bit unflexible and configuring the authToken that is handled differently depending on the transport was a bit awkward. Thus, we introduce a new Transport class that should be implemented by the given transport, falling back to a a WebSocket compatible wrapper, keeping API compatibility. The websocket specific handling (readyState, onopen, etc.) could be further abstracted in the future, to make integrating new transports like WebTransport easier. Then, it would also make sense to document the Transport API for others to implement their own transports.
1 parent cbb832f commit 51f3c89

File tree

5 files changed

+148
-45
lines changed

5 files changed

+148
-45
lines changed

assets/js/phoenix/longpoll.js

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import {
22
SOCKET_STATES,
3-
TRANSPORTS,
4-
AUTH_TOKEN_PREFIX
3+
TRANSPORTS
54
} from "./constants"
65

6+
import Transport from "./transport"
77
import Ajax from "./ajax"
88

99
let arrayBufferToBase64 = (buffer) => {
@@ -14,14 +14,9 @@ let arrayBufferToBase64 = (buffer) => {
1414
return btoa(binary)
1515
}
1616

17-
export default class LongPoll {
18-
19-
constructor(endPoint, protocols){
20-
// we only support subprotocols for authToken
21-
// ["phoenix", "base64url.bearer.phx.BASE64_ENCODED_TOKEN"]
22-
if (protocols.length === 2 && protocols[1].startsWith(AUTH_TOKEN_PREFIX)) {
23-
this.authToken = atob(protocols[1].slice(AUTH_TOKEN_PREFIX.length))
24-
}
17+
export default class LongPoll extends Transport {
18+
constructor(endPoint, options){
19+
super(endPoint, options)
2520
this.endPoint = null
2621
this.token = null
2722
this.skipHeartbeat = true
@@ -30,12 +25,9 @@ export default class LongPoll {
3025
this.currentBatch = null
3126
this.currentBatchTimer = null
3227
this.batchBuffer = []
33-
this.onopen = function (){ } // noop
34-
this.onerror = function (){ } // noop
35-
this.onmessage = function (){ } // noop
36-
this.onclose = function (){ } // noop
3728
this.pollEndpoint = this.normalizeEndpoint(endPoint)
38-
this.readyState = SOCKET_STATES.connecting
29+
this.authToken = options.authToken
30+
this.timeout = options.timeout
3931
// we must wait for the caller to finish setting up our callbacks and timeout properties
4032
setTimeout(() => this.poll(), 0)
4133
}
@@ -57,7 +49,7 @@ export default class LongPoll {
5749
}
5850

5951
ontimeout(){
60-
this.onerror("timeout")
52+
this.triggerError("timeout")
6153
this.closeAndRetry(1005, "timeout", false)
6254
}
6355

@@ -97,25 +89,24 @@ export default class LongPoll {
9789
//
9890
// In order to emulate this behaviour, we need to make sure each
9991
// onmessage handler is run within its own macrotask.
100-
setTimeout(() => this.onmessage({data: msg}), 0)
92+
setTimeout(() => this.triggerMessage({data: msg}), 0)
10193
})
10294
this.poll()
10395
break
10496
case 204:
10597
this.poll()
10698
break
10799
case 410:
108-
this.readyState = SOCKET_STATES.open
109-
this.onopen({})
100+
this.triggerOpen({})
110101
this.poll()
111102
break
112103
case 403:
113-
this.onerror(403)
104+
this.triggerError(403)
114105
this.close(1008, "forbidden", false)
115106
break
116107
case 0:
117108
case 500:
118-
this.onerror(500)
109+
this.triggerError(500)
119110
this.closeAndRetry(1011, "internal server error", 500)
120111
break
121112
default: throw new Error(`unhandled poll status ${status}`)
@@ -144,10 +135,10 @@ export default class LongPoll {
144135

145136
batchSend(messages){
146137
this.awaitingBatchAck = true
147-
this.ajax("POST", "application/x-ndjson", messages.join("\n"), () => this.onerror("timeout"), resp => {
138+
this.ajax("POST", "application/x-ndjson", messages.join("\n"), () => this.triggerError("timeout"), resp => {
148139
this.awaitingBatchAck = false
149140
if(!resp || resp.status !== 200){
150-
this.onerror(resp && resp.status)
141+
this.triggerError(resp && resp.status)
151142
this.closeAndRetry(1011, "internal server error", false)
152143
} else if(this.batchBuffer.length > 0){
153144
this.batchSend(this.batchBuffer)
@@ -158,15 +149,14 @@ export default class LongPoll {
158149

159150
close(code, reason, wasClean){
160151
for(let req of this.reqs){ req.abort() }
161-
this.readyState = SOCKET_STATES.closed
162152
let opts = Object.assign({code: 1000, reason: undefined, wasClean: true}, {code, reason, wasClean})
163153
this.batchBuffer = []
164154
clearTimeout(this.currentBatchTimer)
165155
this.currentBatchTimer = null
166156
if(typeof(CloseEvent) !== "undefined"){
167-
this.onclose(new CloseEvent("close", opts))
157+
this.triggerClose(new CloseEvent("close", opts))
168158
} else {
169-
this.onclose(opts)
159+
this.triggerClose(opts)
170160
}
171161
}
172162

assets/js/phoenix/socket.js

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,7 @@ import {
66
DEFAULT_VSN,
77
SOCKET_STATES,
88
TRANSPORTS,
9-
WS_CLOSE_NORMAL,
10-
AUTH_TOKEN_PREFIX
9+
WS_CLOSE_NORMAL
1110
} from "./constants"
1211

1312
import {
@@ -17,6 +16,7 @@ import {
1716
import Ajax from "./ajax"
1817
import Channel from "./channel"
1918
import LongPoll from "./longpoll"
19+
import Transport, { WebSocketTransport, WrapperTransport } from "./transport"
2020
import Serializer from "./serializer"
2121
import Timer from "./timer"
2222

@@ -28,11 +28,12 @@ import Timer from "./timer"
2828
* `"wss://example.com"`
2929
* `"/socket"` (inherited host & protocol)
3030
* @param {Object} [opts] - Optional configuration
31-
* @param {Function} [opts.transport] - The Websocket Transport, for example WebSocket or Phoenix.LongPoll.
31+
* @param {Function} [opts.transport] - The Transport, for example WebSocket or Phoenix.LongPoll.
3232
*
3333
* Defaults to WebSocket with automatic LongPoll fallback if WebSocket is not defined.
3434
* To fallback to LongPoll when WebSocket attempts fail, use `longPollFallbackMs: 2500`.
3535
*
36+
* @param {Object} [opts.transportOpts] - Extra options to pass to the transport. Useful for custom transport implementations.
3637
* @param {number} [opts.longPollFallbackMs] - The millisecond time to attempt the primary transport
3738
* before falling back to the LongPoll transport. Disabled by default.
3839
*
@@ -117,7 +118,8 @@ export default class Socket {
117118
this.sendBuffer = []
118119
this.ref = 0
119120
this.timeout = opts.timeout || DEFAULT_TIMEOUT
120-
this.transport = opts.transport || global.WebSocket || LongPoll
121+
this.transport = this.initializeTransport(opts)
122+
this.transportOpts = opts.transportOpts || {}
121123
this.primaryPassedHealthCheck = false
122124
this.longPollFallbackMs = opts.longPollFallbackMs
123125
this.fallbackTimer = null
@@ -346,18 +348,35 @@ export default class Socket {
346348
* @private
347349
*/
348350

351+
initializeTransport(opts) {
352+
if (opts.transport && Transport.isTransport(opts.transport)) {
353+
return opts.transport
354+
} else if (opts.transport) {
355+
// legacy transport (WebSocket or WebSocket compatible class)
356+
return new WrapperTransport(opts.transport)
357+
} else {
358+
// no transport specified, use WebSocket if available, otherwise LongPoll
359+
return global.WebSocket ? WebSocketTransport : LongPoll
360+
}
361+
}
362+
349363
transportConnect(){
350364
this.connectClock++
351365
this.closeWasClean = false
352-
let protocols = ["phoenix"]
353-
// Sec-WebSocket-Protocol based token
354-
// (longpoll uses Authorization header instead)
355-
if (this.authToken) {
356-
protocols.push(`${AUTH_TOKEN_PREFIX}${btoa(this.authToken).replace(/=/g, "")}`)
366+
367+
const options = {
368+
authToken: this.authToken,
369+
...this.transportOpts
370+
}
371+
372+
if (this.transport === LongPoll) {
373+
// special options for longpoll
374+
options.timeout = this.longpollerTimeout
375+
} else {
376+
options.binaryType = this.binaryType
357377
}
358-
this.conn = new this.transport(this.endPointURL(), protocols)
359-
this.conn.binaryType = this.binaryType
360-
this.conn.timeout = this.longpollerTimeout
378+
379+
this.conn = new this.transport(this.endPointURL(), options)
361380
this.conn.onopen = () => this.onConnOpen()
362381
this.conn.onerror = error => this.onConnError(error)
363382
this.conn.onmessage = event => this.onConnMessage(event)

assets/js/phoenix/transport.js

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import { global, SOCKET_STATES, AUTH_TOKEN_PREFIX } from "./constants"
2+
3+
export default class Transport {
4+
constructor(url, options = {}) {
5+
this.url = url
6+
this.options = options
7+
this.readyState = SOCKET_STATES.connecting
8+
this.onopen = null
9+
this.onerror = null
10+
this.onmessage = null
11+
this.onclose = null
12+
this.binaryType = options.binaryType || "arraybuffer"
13+
this.authToken = options.authToken
14+
}
15+
16+
static isTransport(transport) {
17+
return transport.prototype instanceof Transport
18+
}
19+
20+
send(_data) {
21+
throw new Error("send() must be implemented by subclass")
22+
}
23+
24+
close(_code, _reason) {
25+
throw new Error("close() must be implemented by subclass")
26+
}
27+
28+
// Helper methods for subclasses to trigger events
29+
triggerOpen() {
30+
this.readyState = SOCKET_STATES.open
31+
if (this.onopen) this.onopen()
32+
}
33+
34+
triggerError(error) {
35+
if (this.onerror) this.onerror(error)
36+
}
37+
38+
triggerMessage(message) {
39+
if (this.onmessage) this.onmessage(message)
40+
}
41+
42+
triggerClose(event) {
43+
this.readyState = SOCKET_STATES.closed
44+
if (this.onclose) this.onclose(event)
45+
}
46+
}
47+
48+
export class WebSocketTransport extends Transport {
49+
constructor(url, options = {}) {
50+
super(url, options)
51+
52+
// Handle WebSocket-specific protocol setup
53+
const subprotocols = ["phoenix"]
54+
if (this.authToken) {
55+
subprotocols.push(`${AUTH_TOKEN_PREFIX}${btoa(this.authToken).replace(/=/g, "")}`)
56+
}
57+
58+
const WebSocket = options.WebSocket || global.WebSocket
59+
this.ws = new WebSocket(url, subprotocols)
60+
this.ws.binaryType = this.binaryType
61+
62+
this.ws.onopen = () => this.triggerOpen()
63+
this.ws.onerror = (error) => this.triggerError(error)
64+
this.ws.onmessage = (event) => this.triggerMessage(event)
65+
this.ws.onclose = (event) => this.triggerClose(event)
66+
}
67+
68+
send(data) {
69+
this.ws.send(data)
70+
}
71+
72+
close(code, reason) {
73+
this.ws.close(code, reason)
74+
}
75+
}
76+
77+
export class WrapperTransport {
78+
constructor(transport) {
79+
return class extends WebSocketTransport {
80+
constructor(url, options) {
81+
options.WebSocket = transport
82+
super(url, options)
83+
}
84+
}
85+
}
86+
}

assets/test/channel_test.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ describe("with transport", function (){
6767
const socket = new Socket("/socket", {authToken})
6868

6969
socket.connect()
70-
expect(socket.conn.protocols).toEqual(["phoenix", "base64url.bearer.phx.MTIzNA"])
70+
expect(socket.conn.ws.protocols).toEqual(["phoenix", "base64url.bearer.phx.MTIzNA"])
7171
})
7272
})
7373

0 commit comments

Comments
 (0)