Skip to content

Commit c921fcd

Browse files
committed
Merge branch 'master' into breaking-changes
2 parents 81c49fc + 111c4da commit c921fcd

File tree

6 files changed

+121
-25
lines changed

6 files changed

+121
-25
lines changed

README.md

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ await cli.connect();
195195
* [Event: 'error'](#event-error)
196196
* [Event: 'close'](#event-close)
197197
* [Event: 'closing'](#event-closing)
198+
* [Event: 'upgradeAborted'](#event-upgradeaborted)
198199
* [server.auth(callback)](#serverauthcallback)
199200
* [server.handleUpgrade(request)](#serverhandleupgraderequest-socket-head)
200201
* [server.reconfigure(options)](#serverreconfigureoptions)
@@ -276,6 +277,16 @@ Emitted when the server has fully closed and all clients have been disconnected.
276277

277278
Emitted when the server has begun closing. Beyond this point, no more clients will be accepted and the `'client'` event will no longer fire.
278279

280+
#### Event: 'upgradeAborted'
281+
282+
* `event` {Object}
283+
* `error` {Error} - The cause of the abort.
284+
* `socket` {net.Socket} - Network socket between the server and client
285+
* `request` {http.IncomingMessage} - The full HTTP request received by the underlying webserver.
286+
* `identity` {String} - The identity portion of the connection URL, decoded.
287+
288+
Emitted when a websocket upgrade has been aborted. This could be caused by an authentication rejection, socket error or websocket handshake error.
289+
279290
#### server.auth(callback)
280291

281292
* `callback` {Function}
@@ -319,7 +330,7 @@ rpcServer.auth((accept, reject, handshake) => {
319330
#### server.handleUpgrade(request, socket, head)
320331

321332
* `request` {http.IncomingMessage}
322-
* `socket` {stream.Duplex} - Network socket between the server and client
333+
* `socket` {net.Socket} - Network socket between the server and client
323334
* `head` {Buffer} - The first packet of the upgraded stream (may be empty)
324335

325336
Converts an HTTP upgrade request into a WebSocket client to be handled by this RPCServer. This method is bound to the server instance, so it is suitable to pass directly as an `http.Server`'s `'upgrade'` event handler.

lib/errors.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,15 @@ class RPCFrameworkError extends RPCError {
6363
rpcErrorCode = 'RpcFrameworkError';
6464
};
6565

66+
class WebsocketUpgradeError extends Error {
67+
constructor(code, message) {
68+
super(message);
69+
this.code = code;
70+
}
71+
}
72+
6673
module.exports = {
74+
WebsocketUpgradeError,
6775
TimeoutError,
6876
UnexpectedHttpResponse,
6977
RPCError,

lib/server.js

Lines changed: 48 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ const RPCServerClient = require('./server-client');
55
const { abortHandshake, parseSubprotocols } = require('./ws-util');
66
const standardValidators = require('./standard-validators');
77
const { getPackageIdent } = require('./util');
8+
const { WebsocketUpgradeError } = require('./errors');
89

910
class RPCServer extends EventEmitter {
1011
constructor(options) {
@@ -78,24 +79,50 @@ class RPCServer extends EventEmitter {
7879

7980
get handleUpgrade() {
8081
return async (request, socket, head) => {
81-
try {
82-
if (this._state !== OPEN) {
83-
throw Error("Server not open");
82+
83+
let resolved = false;
84+
let emittedAbort = false;
85+
86+
const url = new URL(request.url, 'http://localhost');
87+
const pathParts = url.pathname.split('/');
88+
const identity = decodeURIComponent(pathParts.pop());
89+
90+
const abortUpgrade = (error) => {
91+
resolved = true;
92+
93+
if (error && error instanceof WebsocketUpgradeError) {
94+
abortHandshake(socket, error.code, error.message);
95+
} else {
96+
abortHandshake(socket, 500);
8497
}
8598

86-
if (request.headers.upgrade.toLowerCase() !== 'websocket') {
87-
throw Error("Can only upgrade websocket upgrade requests");
99+
if (!emittedAbort) {
100+
emittedAbort = true;
101+
this.emit('upgradeAborted', {
102+
error,
103+
socket,
104+
request,
105+
identity,
106+
});
88107
}
108+
};
89109

90-
const remoteAddress = request.socket.remoteAddress;
110+
try {
111+
if (this._state !== OPEN) {
112+
throw new WebsocketUpgradeError(500, "Server not open");
113+
}
114+
91115
const headers = request.headers;
116+
117+
if (headers.upgrade.toLowerCase() !== 'websocket') {
118+
throw new WebsocketUpgradeError(400, "Can only upgrade websocket upgrade requests");
119+
}
120+
121+
const endpoint = pathParts.join('/');
122+
const remoteAddress = request.socket.remoteAddress;
92123
const protocols = ('sec-websocket-protocol' in request.headers)
93124
? parseSubprotocols(request.headers['sec-websocket-protocol'])
94125
: new Set();
95-
const url = new URL(request.url, 'http://localhost');
96-
const pathParts = url.pathname.split('/');
97-
const identity = decodeURIComponent(pathParts.pop());
98-
const endpoint = pathParts.join('/');
99126

100127
let password;
101128
if (headers.authorization) {
@@ -130,8 +157,6 @@ class RPCServer extends EventEmitter {
130157
password,
131158
};
132159

133-
let resolved = false;
134-
135160
const accept = (session, protocol) => {
136161
if (resolved) return;
137162
resolved = true;
@@ -140,7 +165,8 @@ class RPCServer extends EventEmitter {
140165
// pick first subprotocol (preferred by server) that is also supported by the client
141166
protocol = (this._options.protocols ?? []).find(p => protocols.has(p));
142167
} else if (protocol !== false && !protocols.has(protocol)) {
143-
return abortHandshake(socket, 400, `Client doesn't support expected subprotocol`);
168+
abortUpgrade(new WebsocketUpgradeError(400, `Client doesn't support expected subprotocol`));
169+
return;
144170
}
145171

146172
// cache auth results for connection creation
@@ -155,14 +181,18 @@ class RPCServer extends EventEmitter {
155181
});
156182
};
157183

158-
const reject = (code = 404, message) => {
184+
const reject = (code = 404, message = 'Not found') => {
159185
if (resolved) return;
160186
resolved = true;
161-
abortHandshake(socket, code, message);
187+
abortUpgrade(new WebsocketUpgradeError(code, message));
162188
};
163189

164-
socket.on('error', () => {
165-
reject();
190+
socket.once('close', () => {
191+
reject(400, `Client connection closed before upgrade complete`);
192+
});
193+
194+
socket.on('error', (err) => {
195+
abortUpgrade(err);
166196
});
167197

168198
if (this.authCallback) {
@@ -176,7 +206,7 @@ class RPCServer extends EventEmitter {
176206
}
177207

178208
} catch (err) {
179-
abortHandshake(socket, 500);
209+
abortUpgrade(err);
180210
}
181211
};
182212
}

package-lock.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "ocpp-rpc",
3-
"version": "1.7.4",
3+
"version": "1.8.0",
44
"description": "A client & server implementation of the WAMP-like RPC-over-websocket system defined in the OCPP protcols (e.g. OCPP1.6-J and OCPP2.0.1).",
55
"main": "index.js",
66
"scripts": {

test/server.js

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ const assert = require('assert/strict');
22
const http = require('http');
33
const { once } = require('events');
44
const RPCClient = require("../lib/client");
5-
const { TimeoutError, UnexpectedHttpResponse } = require('../lib/errors');
5+
const { TimeoutError, UnexpectedHttpResponse, WebsocketUpgradeError } = require('../lib/errors');
66
const RPCServer = require("../lib/server");
77
const { setTimeout } = require('timers/promises');
88
const { createValidator } = require('../lib/validator');
@@ -823,8 +823,14 @@ describe('RPCServer', function(){
823823

824824
it("should abort handshake if server not open", async () => {
825825

826-
let authed = false;
827826
const server = new RPCServer();
827+
828+
let abortEvent;
829+
server.on('upgradeAborted', event => {
830+
abortEvent = event;
831+
});
832+
833+
let authed = false;
828834
server.auth((accept) => {
829835
// shouldn't get this far
830836
authed = true;
@@ -853,6 +859,7 @@ describe('RPCServer', function(){
853859
await server.close();
854860
assert.doesNotReject(server.handleUpgrade(...upgrade));
855861
assert.equal(authed, false);
862+
assert.equal(abortEvent.error.code, 500);
856863
httpServer.close();
857864

858865
});
@@ -862,6 +869,11 @@ describe('RPCServer', function(){
862869

863870
const {endpoint, close, server} = await createServer();
864871

872+
let abortEvent;
873+
server.on('upgradeAborted', event => {
874+
abortEvent = event;
875+
});
876+
865877
let authed = false;
866878
server.auth((accept) => {
867879
// shouldn't get this far
@@ -875,14 +887,49 @@ describe('RPCServer', function(){
875887
headers: {
876888
connection: 'Upgrade',
877889
upgrade: '_UNKNOWN_',
890+
'user-agent': 'test/0',
878891
}
879892
});
880893
req.end();
881894

882895
const [res] = await once(req, 'response');
883896

884-
assert.equal(res.statusCode, 500);
897+
assert.equal(res.statusCode, 400);
885898
assert.equal(authed, false);
899+
assert.ok(abortEvent.error instanceof WebsocketUpgradeError);
900+
assert.equal(abortEvent.request.headers['user-agent'], 'test/0');
901+
902+
} finally {
903+
close();
904+
}
905+
906+
});
907+
908+
909+
it("should emit upgradeAborted event on auth reject", async () => {
910+
911+
const {endpoint, close, server} = await createServer();
912+
913+
let abortEvent;
914+
server.on('upgradeAborted', event => {
915+
abortEvent = event;
916+
});
917+
918+
server.auth((accept, reject) => {
919+
reject(499);
920+
});
921+
922+
try {
923+
924+
const cli = new RPCClient({
925+
endpoint,
926+
identity: 'X'
927+
});
928+
929+
await assert.rejects(cli.connect());
930+
931+
assert.ok(abortEvent.error instanceof WebsocketUpgradeError);
932+
assert.equal(abortEvent.error.code, 499);
886933

887934
} finally {
888935
close();

0 commit comments

Comments
 (0)