diff --git a/.eslintignore b/.eslintignore new file mode 100644 index 0000000..45bbc55 --- /dev/null +++ b/.eslintignore @@ -0,0 +1,5 @@ +/dist/ +/build/ + +*.js +*.mjs diff --git a/.eslintrc.js b/.eslintrc.js new file mode 100644 index 0000000..9ef2045 --- /dev/null +++ b/.eslintrc.js @@ -0,0 +1,51 @@ +module.exports = { + parser: '@typescript-eslint/parser', + plugins: ['@typescript-eslint/eslint-plugin'], + env: { + browser: true, + commonjs: true, + es2021: true, + node: true, + worker: true, + }, + extends: [ + 'airbnb-base', + 'airbnb-typescript/base', + 'plugin:prettier/recommended', + 'plugin:@typescript-eslint/recommended' + ], + parserOptions: { + project: 'tsconfig.json', + sourceType: 'module', + tsconfigRootDir: __dirname + }, + rules: { + 'global-require': 'off', + 'no-console': process.env.NODE_ENV === 'production' ? 'warn' : 'off', + 'no-unused-vars': 'off', + 'no-underscore-dangle': 'off', + 'no-param-reassign': 'off', + 'no-restricted-syntax': 'off', + camelcase: 'off', + 'default-case': 'off', + 'consistent-return': 'off', + 'import/order': 'off', + 'max-classes-per-file': 'off', + 'no-plusplus': 'off', + 'guard-for-in': 'off', + 'no-bitwise': 'off', + 'class-methods-use-this': 'off', + 'no-continue': 'off', + 'prefer-destructuring': 'off', + 'no-use-before-define': 'off', + // Typescript rules + '@typescript-eslint/interface-name-prefix': 'off', + '@typescript-eslint/explicit-function-return-type': 'off', + '@typescript-eslint/explicit-module-boundary-types': 'off', + '@typescript-eslint/no-explicit-any': 'off', + '@typescript-eslint/no-unused-vars': 'off', + '@typescript-eslint/naming-convention': 'off', + '@typescript-eslint/dot-notation': 'off', + '@typescript-eslint/no-use-before-define': 'off', + } +} diff --git a/.gitignore b/.gitignore index da23d0d..48f8a07 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,6 @@ build/Release # Deployed apps should consider commenting this line out: # see https://npmjs.org/doc/faq.html#Should-I-check-my-node_modules-folder-into-git node_modules + +/build/ +/dist/ diff --git a/.prettierignore b/.prettierignore new file mode 100644 index 0000000..08ab71b --- /dev/null +++ b/.prettierignore @@ -0,0 +1,4 @@ +*.md +README.md +/dist/ +/build/ diff --git a/.prettierrc.js b/.prettierrc.js new file mode 100644 index 0000000..3d8d73d --- /dev/null +++ b/.prettierrc.js @@ -0,0 +1,7 @@ +module.exports = { + semi: false, + singleQuote: true, + useTabs: true, + tabWidth: 4, + endOfLine: "lf", +}; diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..a50deb7 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,9 @@ +{ + "eslint.format.enable": true, + "eslint.lintTask.enable": true, + "editor.defaultFormatter": "esbenp.prettier-vscode", + "editor.codeActionsOnSave": { + "source.fixAll.eslint": "explicit", + "source.fixAll.markdownlint": "explicit" + }, +} \ No newline at end of file diff --git a/constants.js b/constants.js deleted file mode 100644 index 6ade1a3..0000000 --- a/constants.js +++ /dev/null @@ -1,291 +0,0 @@ -/* Protocol - protocol constants */ -const protocol = module.exports -const { Buffer } = require('buffer') - -/* Command code => mnemonic */ -protocol.types = { - 0: 'reserved', - 1: 'connect', - 2: 'connack', - 3: 'publish', - 4: 'puback', - 5: 'pubrec', - 6: 'pubrel', - 7: 'pubcomp', - 8: 'subscribe', - 9: 'suback', - 10: 'unsubscribe', - 11: 'unsuback', - 12: 'pingreq', - 13: 'pingresp', - 14: 'disconnect', - 15: 'auth' -} - -protocol.requiredHeaderFlags = { - 1: 0, // 'connect' - 2: 0, // 'connack' - 4: 0, // 'puback' - 5: 0, // 'pubrec' - 6: 2, // 'pubrel' - 7: 0, // 'pubcomp' - 8: 2, // 'subscribe' - 9: 0, // 'suback' - 10: 2, // 'unsubscribe' - 11: 0, // 'unsuback' - 12: 0, // 'pingreq' - 13: 0, // 'pingresp' - 14: 0, // 'disconnect' - 15: 0 // 'auth' -} - -protocol.requiredHeaderFlagsErrors = {} -for (const k in protocol.requiredHeaderFlags) { - const v = protocol.requiredHeaderFlags[k] - protocol.requiredHeaderFlagsErrors[k] = 'Invalid header flag bits, must be 0x' + v.toString(16) + ' for ' + protocol.types[k] + ' packet' -} - -/* Mnemonic => Command code */ -protocol.codes = {} -for (const k in protocol.types) { - const v = protocol.types[k] - protocol.codes[v] = k -} - -/* Header */ -protocol.CMD_SHIFT = 4 -protocol.CMD_MASK = 0xF0 -protocol.DUP_MASK = 0x08 -protocol.QOS_MASK = 0x03 -protocol.QOS_SHIFT = 1 -protocol.RETAIN_MASK = 0x01 - -/* Length */ -protocol.VARBYTEINT_MASK = 0x7F -protocol.VARBYTEINT_FIN_MASK = 0x80 -protocol.VARBYTEINT_MAX = 268435455 - -/* Connack */ -protocol.SESSIONPRESENT_MASK = 0x01 -protocol.SESSIONPRESENT_HEADER = Buffer.from([protocol.SESSIONPRESENT_MASK]) -protocol.CONNACK_HEADER = Buffer.from([protocol.codes.connack << protocol.CMD_SHIFT]) - -/* Connect */ -protocol.USERNAME_MASK = 0x80 -protocol.PASSWORD_MASK = 0x40 -protocol.WILL_RETAIN_MASK = 0x20 -protocol.WILL_QOS_MASK = 0x18 -protocol.WILL_QOS_SHIFT = 3 -protocol.WILL_FLAG_MASK = 0x04 -protocol.CLEAN_SESSION_MASK = 0x02 -protocol.CONNECT_HEADER = Buffer.from([protocol.codes.connect << protocol.CMD_SHIFT]) - -/* Properties */ -protocol.properties = { - sessionExpiryInterval: 17, - willDelayInterval: 24, - receiveMaximum: 33, - maximumPacketSize: 39, - topicAliasMaximum: 34, - requestResponseInformation: 25, - requestProblemInformation: 23, - userProperties: 38, - authenticationMethod: 21, - authenticationData: 22, - payloadFormatIndicator: 1, - messageExpiryInterval: 2, - contentType: 3, - responseTopic: 8, - correlationData: 9, - maximumQoS: 36, - retainAvailable: 37, - assignedClientIdentifier: 18, - reasonString: 31, - wildcardSubscriptionAvailable: 40, - subscriptionIdentifiersAvailable: 41, - sharedSubscriptionAvailable: 42, - serverKeepAlive: 19, - responseInformation: 26, - serverReference: 28, - topicAlias: 35, - subscriptionIdentifier: 11 -} -protocol.propertiesCodes = {} -for (const prop in protocol.properties) { - const id = protocol.properties[prop] - protocol.propertiesCodes[id] = prop -} -protocol.propertiesTypes = { - sessionExpiryInterval: 'int32', - willDelayInterval: 'int32', - receiveMaximum: 'int16', - maximumPacketSize: 'int32', - topicAliasMaximum: 'int16', - requestResponseInformation: 'byte', - requestProblemInformation: 'byte', - userProperties: 'pair', - authenticationMethod: 'string', - authenticationData: 'binary', - payloadFormatIndicator: 'byte', - messageExpiryInterval: 'int32', - contentType: 'string', - responseTopic: 'string', - correlationData: 'binary', - maximumQoS: 'int8', - retainAvailable: 'byte', - assignedClientIdentifier: 'string', - reasonString: 'string', - wildcardSubscriptionAvailable: 'byte', - subscriptionIdentifiersAvailable: 'byte', - sharedSubscriptionAvailable: 'byte', - serverKeepAlive: 'int16', - responseInformation: 'string', - serverReference: 'string', - topicAlias: 'int16', - subscriptionIdentifier: 'var' -} - -function genHeader (type) { - return [0, 1, 2].map(qos => { - return [0, 1].map(dup => { - return [0, 1].map(retain => { - const buf = Buffer.alloc(1) - buf.writeUInt8( - protocol.codes[type] << protocol.CMD_SHIFT | - (dup ? protocol.DUP_MASK : 0) | - qos << protocol.QOS_SHIFT | retain, 0, true) - return buf - }) - }) - }) -} - -/* Publish */ -protocol.PUBLISH_HEADER = genHeader('publish') - -/* Subscribe */ -protocol.SUBSCRIBE_HEADER = genHeader('subscribe') -protocol.SUBSCRIBE_OPTIONS_QOS_MASK = 0x03 -protocol.SUBSCRIBE_OPTIONS_NL_MASK = 0x01 -protocol.SUBSCRIBE_OPTIONS_NL_SHIFT = 2 -protocol.SUBSCRIBE_OPTIONS_RAP_MASK = 0x01 -protocol.SUBSCRIBE_OPTIONS_RAP_SHIFT = 3 -protocol.SUBSCRIBE_OPTIONS_RH_MASK = 0x03 -protocol.SUBSCRIBE_OPTIONS_RH_SHIFT = 4 -protocol.SUBSCRIBE_OPTIONS_RH = [0x00, 0x10, 0x20] -protocol.SUBSCRIBE_OPTIONS_NL = 0x04 -protocol.SUBSCRIBE_OPTIONS_RAP = 0x08 -protocol.SUBSCRIBE_OPTIONS_QOS = [0x00, 0x01, 0x02] - -/* Unsubscribe */ -protocol.UNSUBSCRIBE_HEADER = genHeader('unsubscribe') - -/* Confirmations */ -protocol.ACKS = { - unsuback: genHeader('unsuback'), - puback: genHeader('puback'), - pubcomp: genHeader('pubcomp'), - pubrel: genHeader('pubrel'), - pubrec: genHeader('pubrec') -} - -protocol.SUBACK_HEADER = Buffer.from([protocol.codes.suback << protocol.CMD_SHIFT]) - -/* Protocol versions */ -protocol.VERSION3 = Buffer.from([3]) -protocol.VERSION4 = Buffer.from([4]) -protocol.VERSION5 = Buffer.from([5]) -protocol.VERSION131 = Buffer.from([131]) -protocol.VERSION132 = Buffer.from([132]) - -/* QoS */ -protocol.QOS = [0, 1, 2].map(qos => { - return Buffer.from([qos]) -}) - -/* Empty packets */ -protocol.EMPTY = { - pingreq: Buffer.from([protocol.codes.pingreq << 4, 0]), - pingresp: Buffer.from([protocol.codes.pingresp << 4, 0]), - disconnect: Buffer.from([protocol.codes.disconnect << 4, 0]) -} - -protocol.MQTT5_PUBACK_PUBREC_CODES = { - 0x00: 'Success', - 0x10: 'No matching subscribers', - 0x80: 'Unspecified error', - 0x83: 'Implementation specific error', - 0x87: 'Not authorized', - 0x90: 'Topic Name invalid', - 0x91: 'Packet identifier in use', - 0x97: 'Quota exceeded', - 0x99: 'Payload format invalid' -} - -protocol.MQTT5_PUBREL_PUBCOMP_CODES = { - 0x00: 'Success', - 0x92: 'Packet Identifier not found' -} - -protocol.MQTT5_SUBACK_CODES = { - 0x00: 'Granted QoS 0', - 0x01: 'Granted QoS 1', - 0x02: 'Granted QoS 2', - 0x80: 'Unspecified error', - 0x83: 'Implementation specific error', - 0x87: 'Not authorized', - 0x8F: 'Topic Filter invalid', - 0x91: 'Packet Identifier in use', - 0x97: 'Quota exceeded', - 0x9E: 'Shared Subscriptions not supported', - 0xA1: 'Subscription Identifiers not supported', - 0xA2: 'Wildcard Subscriptions not supported' -} - -protocol.MQTT5_UNSUBACK_CODES = { - 0x00: 'Success', - 0x11: 'No subscription existed', - 0x80: 'Unspecified error', - 0x83: 'Implementation specific error', - 0x87: 'Not authorized', - 0x8F: 'Topic Filter invalid', - 0x91: 'Packet Identifier in use' -} - -protocol.MQTT5_DISCONNECT_CODES = { - 0x00: 'Normal disconnection', - 0x04: 'Disconnect with Will Message', - 0x80: 'Unspecified error', - 0x81: 'Malformed Packet', - 0x82: 'Protocol Error', - 0x83: 'Implementation specific error', - 0x87: 'Not authorized', - 0x89: 'Server busy', - 0x8B: 'Server shutting down', - 0x8D: 'Keep Alive timeout', - 0x8E: 'Session taken over', - 0x8F: 'Topic Filter invalid', - 0x90: 'Topic Name invalid', - 0x93: 'Receive Maximum exceeded', - 0x94: 'Topic Alias invalid', - 0x95: 'Packet too large', - 0x96: 'Message rate too high', - 0x97: 'Quota exceeded', - 0x98: 'Administrative action', - 0x99: 'Payload format invalid', - 0x9A: 'Retain not supported', - 0x9B: 'QoS not supported', - 0x9C: 'Use another server', - 0x9D: 'Server moved', - 0x9E: 'Shared Subscriptions not supported', - 0x9F: 'Connection rate exceeded', - 0xA0: 'Maximum connect time', - 0xA1: 'Subscription Identifiers not supported', - 0xA2: 'Wildcard Subscriptions not supported' -} - -protocol.MQTT5_AUTH_CODES = { - 0x00: 'Success', - 0x18: 'Continue authentication', - 0x19: 'Re-authenticate' -} diff --git a/generate.js b/generate.js deleted file mode 100644 index 2abf91d..0000000 --- a/generate.js +++ /dev/null @@ -1,57 +0,0 @@ -const writeToStream = require('./writeToStream') -const { EventEmitter } = require('events') -const { Buffer } = require('buffer') - -function generate (packet, opts) { - const stream = new Accumulator() - writeToStream(packet, stream, opts) - return stream.concat() -} - -class Accumulator extends EventEmitter { - constructor () { - super() - this._array = new Array(20) - this._i = 0 - } - - write (chunk) { - this._array[this._i++] = chunk - return true - } - - concat () { - let length = 0 - const lengths = new Array(this._array.length) - const list = this._array - let pos = 0 - let i - - for (i = 0; i < list.length && list[i] !== undefined; i++) { - if (typeof list[i] !== 'string') lengths[i] = list[i].length - else lengths[i] = Buffer.byteLength(list[i]) - - length += lengths[i] - } - - const result = Buffer.allocUnsafe(length) - - for (i = 0; i < list.length && list[i] !== undefined; i++) { - if (typeof list[i] !== 'string') { - list[i].copy(result, pos) - pos += lengths[i] - } else { - result.write(list[i], pos) - pos += lengths[i] - } - } - - return result - } - - destroy (err) { - if (err) this.emit('error', err) - } -} - -module.exports = generate diff --git a/mqtt.js b/mqtt.js deleted file mode 100644 index f23c8d7..0000000 --- a/mqtt.js +++ /dev/null @@ -1,3 +0,0 @@ -exports.parser = require('./parser').parser -exports.generate = require('./generate') -exports.writeToStream = require('./writeToStream') diff --git a/numbers.js b/numbers.js deleted file mode 100644 index 443321b..0000000 --- a/numbers.js +++ /dev/null @@ -1,59 +0,0 @@ -const { Buffer } = require('buffer') -const max = 65536 -const cache = {} - -// in node 6 Buffer.subarray returns a Uint8Array instead of a Buffer -// later versions return a Buffer -// alternative is Buffer.slice but that creates a new buffer -// creating new buffers takes time -// SubOk is only false on node < 8 -const SubOk = Buffer.isBuffer(Buffer.from([1, 2]).subarray(0, 1)) - -function generateBuffer (i) { - const buffer = Buffer.allocUnsafe(2) - buffer.writeUInt8(i >> 8, 0) - buffer.writeUInt8(i & 0x00FF, 0 + 1) - - return buffer -} - -function generateCache () { - for (let i = 0; i < max; i++) { - cache[i] = generateBuffer(i) - } -} - -function genBufVariableByteInt (num) { - const maxLength = 4 // max 4 bytes - let digit = 0 - let pos = 0 - const buffer = Buffer.allocUnsafe(maxLength) - - do { - digit = num % 128 | 0 - num = num / 128 | 0 - if (num > 0) digit = digit | 0x80 - - buffer.writeUInt8(digit, pos++) - } while (num > 0 && pos < maxLength) - - if (num > 0) { - pos = 0 - } - - return SubOk ? buffer.subarray(0, pos) : buffer.slice(0, pos) -} - -function generate4ByteBuffer (num) { - const buffer = Buffer.allocUnsafe(4) - buffer.writeUInt32BE(num, 0) - return buffer -} - -module.exports = { - cache, - generateCache, - generateNumber: generateBuffer, - genBufVariableByteInt, - generate4ByteBuffer -} diff --git a/package.json b/package.json index 8be80a0..510a139 100644 --- a/package.json +++ b/package.json @@ -2,8 +2,28 @@ "name": "mqtt-packet", "version": "9.0.1", "description": "Parse and generate MQTT packets like a breeze", - "main": "mqtt.js", - "types": "types/index.d.ts", + "main": "./build/index.js", + "exports": { + ".": { + "browser": { + "import": "./dist/mqtt-packet.esm.js", + "default": "./dist/mqtt-packet.min.js" + }, + "default": "./build/index.js" + }, + "./package.json": "./package.json", + "./*.map": "./build/*.js.map", + "./dist/*": "./dist/*.js", + "./*": "./build/*.js" + }, + "types": "build/index.d.ts", + "typesVersions": { + "*": { + "*": [ + "./build/index.d.ts" + ] + } + }, "contributors": [ "Matteo Collina (https://github.com/mcollina)", "Adam Rudd ", @@ -13,7 +33,13 @@ ], "scripts": { "test": "tape test.js | tap-spec && standard", - "ci": "tape test.js && node testRandom && standard" + "ci": "tape test.js && node testRandom && standard", + "lint": "eslint --ext .ts .", + "lint-fix": "eslint --fix --ext .ts .", + "build:ts": "rimraf build/ && tsc -p tsconfig.build.json", + "build:browser": "node esbuild.js", + "build": "npm run build:ts && npm run build:browser", + "prepare": "npm run build" }, "pre-commit": "test", "repository": { @@ -34,11 +60,22 @@ }, "homepage": "https://github.com/mqttjs/mqtt-packet", "devDependencies": { + "@types/node": "^22.13.5", + "esbuild": "^0.25.0", + "esbuild-register": "^3.6.0", + "eslint": "^8.57.1", + "eslint-config-airbnb-base": "^15.0.0", + "eslint-config-airbnb-typescript": "^18.0.0", + "eslint-config-prettier": "^10.0.1", + "eslint-plugin-import": "^2.31.0", + "eslint-plugin-prettier": "^5.2.3", "pre-commit": "^1.2.2", + "prettier": "^3.5.2", "readable-stream": "^4.4.2", "standard": "^17.1.0", "tap-spec": "^5.0.0", - "tape": "^5.7.2" + "tape": "^5.7.2", + "typescript": "^5.7.3" }, "dependencies": { "bl": "^6.0.8", diff --git a/packet.js b/packet.js deleted file mode 100644 index 4ef1e78..0000000 --- a/packet.js +++ /dev/null @@ -1,13 +0,0 @@ -class Packet { - constructor () { - this.cmd = null - this.retain = false - this.qos = 0 - this.dup = false - this.length = -1 - this.topic = null - this.payload = null - } -} - -module.exports = Packet diff --git a/parser.js b/parser.js deleted file mode 100644 index db78183..0000000 --- a/parser.js +++ /dev/null @@ -1,809 +0,0 @@ -const bl = require('bl') -const { EventEmitter } = require('events') -const Packet = require('./packet') -const constants = require('./constants') -const debug = require('debug')('mqtt-packet:parser') - -class Parser extends EventEmitter { - constructor () { - super() - this.parser = this.constructor.parser - } - - static parser (opt) { - if (!(this instanceof Parser)) return (new Parser()).parser(opt) - - this.settings = opt || {} - - this._states = [ - '_parseHeader', - '_parseLength', - '_parsePayload', - '_newPacket' - ] - - this._resetState() - return this - } - - _resetState () { - debug('_resetState: resetting packet, error, _list, and _stateCounter') - this.packet = new Packet() - this.error = null - this._list = bl() - this._stateCounter = 0 - } - - parse (buf) { - if (this.error) this._resetState() - - this._list.append(buf) - debug('parse: current state: %s', this._states[this._stateCounter]) - while ((this.packet.length !== -1 || this._list.length > 0) && - this[this._states[this._stateCounter]]() && - !this.error) { - this._stateCounter++ - debug('parse: state complete. _stateCounter is now: %d', this._stateCounter) - debug('parse: packet.length: %d, buffer list length: %d', this.packet.length, this._list.length) - if (this._stateCounter >= this._states.length) this._stateCounter = 0 - } - debug('parse: exited while loop. packet: %d, buffer list length: %d', this.packet.length, this._list.length) - return this._list.length - } - - _parseHeader () { - // There is at least one byte in the buffer - const zero = this._list.readUInt8(0) - const cmdIndex = zero >> constants.CMD_SHIFT - this.packet.cmd = constants.types[cmdIndex] - const headerFlags = zero & 0xf - const requiredHeaderFlags = constants.requiredHeaderFlags[cmdIndex] - if (requiredHeaderFlags != null && headerFlags !== requiredHeaderFlags) { - // Where a flag bit is marked as “Reserved” in Table 2.2 - Flag Bits, it is reserved for future use and MUST be set to the value listed in that table [MQTT-2.2.2-1]. If invalid flags are received, the receiver MUST close the Network Connection [MQTT-2.2.2-2] - return this._emitError(new Error(constants.requiredHeaderFlagsErrors[cmdIndex])) - } - this.packet.retain = (zero & constants.RETAIN_MASK) !== 0 - this.packet.qos = (zero >> constants.QOS_SHIFT) & constants.QOS_MASK - if (this.packet.qos > 2) { - return this._emitError(new Error('Packet must not have both QoS bits set to 1')) - } - this.packet.dup = (zero & constants.DUP_MASK) !== 0 - debug('_parseHeader: packet: %o', this.packet) - - this._list.consume(1) - - return true - } - - _parseLength () { - // There is at least one byte in the list - const result = this._parseVarByteNum(true) - - if (result) { - this.packet.length = result.value - this._list.consume(result.bytes) - } - debug('_parseLength %d', result.value) - return !!result - } - - _parsePayload () { - debug('_parsePayload: payload %O', this._list) - let result = false - - // Do we have a payload? Do we have enough data to complete the payload? - // PINGs have no payload - if (this.packet.length === 0 || this._list.length >= this.packet.length) { - this._pos = 0 - - switch (this.packet.cmd) { - case 'connect': - this._parseConnect() - break - case 'connack': - this._parseConnack() - break - case 'publish': - this._parsePublish() - break - case 'puback': - case 'pubrec': - case 'pubrel': - case 'pubcomp': - this._parseConfirmation() - break - case 'subscribe': - this._parseSubscribe() - break - case 'suback': - this._parseSuback() - break - case 'unsubscribe': - this._parseUnsubscribe() - break - case 'unsuback': - this._parseUnsuback() - break - case 'pingreq': - case 'pingresp': - // These are empty, nothing to do - break - case 'disconnect': - this._parseDisconnect() - break - case 'auth': - this._parseAuth() - break - default: - this._emitError(new Error('Not supported')) - } - - result = true - } - debug('_parsePayload complete result: %s', result) - return result - } - - _parseConnect () { - debug('_parseConnect') - let topic // Will topic - let payload // Will payload - let password // Password - let username // Username - const flags = {} - const packet = this.packet - - // Parse protocolId - const protocolId = this._parseString() - - if (protocolId === null) return this._emitError(new Error('Cannot parse protocolId')) - if (protocolId !== 'MQTT' && protocolId !== 'MQIsdp') { - return this._emitError(new Error('Invalid protocolId')) - } - - packet.protocolId = protocolId - - // Parse constants version number - if (this._pos >= this._list.length) return this._emitError(new Error('Packet too short')) - - packet.protocolVersion = this._list.readUInt8(this._pos) - - if (packet.protocolVersion >= 128) { - packet.bridgeMode = true - packet.protocolVersion = packet.protocolVersion - 128 - } - - if (packet.protocolVersion !== 3 && packet.protocolVersion !== 4 && packet.protocolVersion !== 5) { - return this._emitError(new Error('Invalid protocol version')) - } - - this._pos++ - - if (this._pos >= this._list.length) { - return this._emitError(new Error('Packet too short')) - } - - if (this._list.readUInt8(this._pos) & 0x1) { - // The Server MUST validate that the reserved flag in the CONNECT Control Packet is set to zero and disconnect the Client if it is not zero [MQTT-3.1.2-3] - return this._emitError(new Error('Connect flag bit 0 must be 0, but got 1')) - } - // Parse connect flags - flags.username = (this._list.readUInt8(this._pos) & constants.USERNAME_MASK) - flags.password = (this._list.readUInt8(this._pos) & constants.PASSWORD_MASK) - flags.will = (this._list.readUInt8(this._pos) & constants.WILL_FLAG_MASK) - - const willRetain = !!(this._list.readUInt8(this._pos) & constants.WILL_RETAIN_MASK) - const willQos = (this._list.readUInt8(this._pos) & - constants.WILL_QOS_MASK) >> constants.WILL_QOS_SHIFT - - if (flags.will) { - packet.will = {} - packet.will.retain = willRetain - packet.will.qos = willQos - } else { - if (willRetain) { - return this._emitError(new Error('Will Retain Flag must be set to zero when Will Flag is set to 0')) - } - if (willQos) { - return this._emitError(new Error('Will QoS must be set to zero when Will Flag is set to 0')) - } - } - - packet.clean = (this._list.readUInt8(this._pos) & constants.CLEAN_SESSION_MASK) !== 0 - this._pos++ - - // Parse keepalive - packet.keepalive = this._parseNum() - if (packet.keepalive === -1) return this._emitError(new Error('Packet too short')) - - // parse properties - if (packet.protocolVersion === 5) { - const properties = this._parseProperties() - if (Object.getOwnPropertyNames(properties).length) { - packet.properties = properties - } - } - // Parse clientId - const clientId = this._parseString() - if (clientId === null) return this._emitError(new Error('Packet too short')) - packet.clientId = clientId - debug('_parseConnect: packet.clientId: %s', packet.clientId) - - if (flags.will) { - if (packet.protocolVersion === 5) { - const willProperties = this._parseProperties() - if (Object.getOwnPropertyNames(willProperties).length) { - packet.will.properties = willProperties - } - } - // Parse will topic - topic = this._parseString() - if (topic === null) return this._emitError(new Error('Cannot parse will topic')) - packet.will.topic = topic - debug('_parseConnect: packet.will.topic: %s', packet.will.topic) - - // Parse will payload - payload = this._parseBuffer() - if (payload === null) return this._emitError(new Error('Cannot parse will payload')) - packet.will.payload = payload - debug('_parseConnect: packet.will.paylaod: %s', packet.will.payload) - } - - // Parse username - if (flags.username) { - username = this._parseString() - if (username === null) return this._emitError(new Error('Cannot parse username')) - packet.username = username - debug('_parseConnect: packet.username: %s', packet.username) - } - - // Parse password - if (flags.password) { - password = this._parseBuffer() - if (password === null) return this._emitError(new Error('Cannot parse password')) - packet.password = password - } - // need for right parse auth packet and self set up - this.settings = packet - debug('_parseConnect: complete') - return packet - } - - _parseConnack () { - debug('_parseConnack') - const packet = this.packet - - if (this._list.length < 1) return null - const flags = this._list.readUInt8(this._pos++) - if (flags > 1) { - return this._emitError(new Error('Invalid connack flags, bits 7-1 must be set to 0')) - } - packet.sessionPresent = !!(flags & constants.SESSIONPRESENT_MASK) - - if (this.settings.protocolVersion === 5) { - if (this._list.length >= 2) { - packet.reasonCode = this._list.readUInt8(this._pos++) - } else { - packet.reasonCode = 0 - } - } else { - if (this._list.length < 2) return null - packet.returnCode = this._list.readUInt8(this._pos++) - } - - if (packet.returnCode === -1 || packet.reasonCode === -1) return this._emitError(new Error('Cannot parse return code')) - // mqtt 5 properties - if (this.settings.protocolVersion === 5) { - const properties = this._parseProperties() - if (Object.getOwnPropertyNames(properties).length) { - packet.properties = properties - } - } - debug('_parseConnack: complete') - } - - _parsePublish () { - debug('_parsePublish') - const packet = this.packet - packet.topic = this._parseString() - - if (packet.topic === null) return this._emitError(new Error('Cannot parse topic')) - - // Parse messageId - if (packet.qos > 0) if (!this._parseMessageId()) { return } - - // Properties mqtt 5 - if (this.settings.protocolVersion === 5) { - const properties = this._parseProperties() - if (Object.getOwnPropertyNames(properties).length) { - packet.properties = properties - } - } - - packet.payload = this._list.slice(this._pos, packet.length) - debug('_parsePublish: payload from buffer list: %o', packet.payload) - } - - _parseSubscribe () { - debug('_parseSubscribe') - const packet = this.packet - let topic - let options - let qos - let rh - let rap - let nl - let subscription - - packet.subscriptions = [] - - if (!this._parseMessageId()) { return } - - // Properties mqtt 5 - if (this.settings.protocolVersion === 5) { - const properties = this._parseProperties() - if (Object.getOwnPropertyNames(properties).length) { - packet.properties = properties - } - } - - if (packet.length <= 0) { return this._emitError(new Error('Malformed subscribe, no payload specified')) } - - while (this._pos < packet.length) { - // Parse topic - topic = this._parseString() - if (topic === null) return this._emitError(new Error('Cannot parse topic')) - if (this._pos >= packet.length) return this._emitError(new Error('Malformed Subscribe Payload')) - - options = this._parseByte() - - if (this.settings.protocolVersion === 5) { - if (options & 0xc0) { - return this._emitError(new Error('Invalid subscribe topic flag bits, bits 7-6 must be 0')) - } - } else { - if (options & 0xfc) { - return this._emitError(new Error('Invalid subscribe topic flag bits, bits 7-2 must be 0')) - } - } - - qos = options & constants.SUBSCRIBE_OPTIONS_QOS_MASK - if (qos > 2) { - return this._emitError(new Error('Invalid subscribe QoS, must be <= 2')) - } - nl = ((options >> constants.SUBSCRIBE_OPTIONS_NL_SHIFT) & constants.SUBSCRIBE_OPTIONS_NL_MASK) !== 0 - rap = ((options >> constants.SUBSCRIBE_OPTIONS_RAP_SHIFT) & constants.SUBSCRIBE_OPTIONS_RAP_MASK) !== 0 - rh = (options >> constants.SUBSCRIBE_OPTIONS_RH_SHIFT) & constants.SUBSCRIBE_OPTIONS_RH_MASK - - if (rh > 2) { - return this._emitError(new Error('Invalid retain handling, must be <= 2')) - } - - subscription = { topic, qos } - - // mqtt 5 options - if (this.settings.protocolVersion === 5) { - subscription.nl = nl - subscription.rap = rap - subscription.rh = rh - } else if (this.settings.bridgeMode) { - subscription.rh = 0 - subscription.rap = true - subscription.nl = true - } - - // Push pair to subscriptions - debug('_parseSubscribe: push subscription `%s` to subscription', subscription) - packet.subscriptions.push(subscription) - } - } - - _parseSuback () { - debug('_parseSuback') - const packet = this.packet - this.packet.granted = [] - - if (!this._parseMessageId()) { return } - - // Properties mqtt 5 - if (this.settings.protocolVersion === 5) { - const properties = this._parseProperties() - if (Object.getOwnPropertyNames(properties).length) { - packet.properties = properties - } - } - - if (packet.length <= 0) { return this._emitError(new Error('Malformed suback, no payload specified')) } - - // Parse granted QoSes - while (this._pos < this.packet.length) { - const code = this._list.readUInt8(this._pos++) - if (this.settings.protocolVersion === 5) { - if (!constants.MQTT5_SUBACK_CODES[code]) { - return this._emitError(new Error('Invalid suback code')) - } - } else { - if (code > 2 && code !== 0x80) { - return this._emitError(new Error('Invalid suback QoS, must be 0, 1, 2 or 128')) - } - } - this.packet.granted.push(code) - } - } - - _parseUnsubscribe () { - debug('_parseUnsubscribe') - const packet = this.packet - - packet.unsubscriptions = [] - - // Parse messageId - if (!this._parseMessageId()) { return } - - // Properties mqtt 5 - if (this.settings.protocolVersion === 5) { - const properties = this._parseProperties() - if (Object.getOwnPropertyNames(properties).length) { - packet.properties = properties - } - } - - if (packet.length <= 0) { return this._emitError(new Error('Malformed unsubscribe, no payload specified')) } - - while (this._pos < packet.length) { - // Parse topic - const topic = this._parseString() - if (topic === null) return this._emitError(new Error('Cannot parse topic')) - - // Push topic to unsubscriptions - debug('_parseUnsubscribe: push topic `%s` to unsubscriptions', topic) - packet.unsubscriptions.push(topic) - } - } - - _parseUnsuback () { - debug('_parseUnsuback') - const packet = this.packet - if (!this._parseMessageId()) return this._emitError(new Error('Cannot parse messageId')) - - if ((this.settings.protocolVersion === 3 || - this.settings.protocolVersion === 4) && packet.length !== 2) { - return this._emitError(new Error('Malformed unsuback, payload length must be 2')) - } - if (packet.length <= 0) { return this._emitError(new Error('Malformed unsuback, no payload specified')) } - - // Properties mqtt 5 - if (this.settings.protocolVersion === 5) { - const properties = this._parseProperties() - if (Object.getOwnPropertyNames(properties).length) { - packet.properties = properties - } - // Parse granted QoSes - packet.granted = [] - - while (this._pos < this.packet.length) { - const code = this._list.readUInt8(this._pos++) - if (!constants.MQTT5_UNSUBACK_CODES[code]) { - return this._emitError(new Error('Invalid unsuback code')) - } - this.packet.granted.push(code) - } - } - } - - // parse packets like puback, pubrec, pubrel, pubcomp - _parseConfirmation () { - debug('_parseConfirmation: packet.cmd: `%s`', this.packet.cmd) - const packet = this.packet - - this._parseMessageId() - - if (this.settings.protocolVersion === 5) { - if (packet.length > 2) { - // response code - packet.reasonCode = this._parseByte() - switch (this.packet.cmd) { - case 'puback': - case 'pubrec': - if (!constants.MQTT5_PUBACK_PUBREC_CODES[packet.reasonCode]) { - return this._emitError(new Error('Invalid ' + this.packet.cmd + ' reason code')) - } - break - case 'pubrel': - case 'pubcomp': - if (!constants.MQTT5_PUBREL_PUBCOMP_CODES[packet.reasonCode]) { - return this._emitError(new Error('Invalid ' + this.packet.cmd + ' reason code')) - } - break - } - debug('_parseConfirmation: packet.reasonCode `%d`', packet.reasonCode) - } else { - packet.reasonCode = 0 - } - - if (packet.length > 3) { - // properies mqtt 5 - const properties = this._parseProperties() - if (Object.getOwnPropertyNames(properties).length) { - packet.properties = properties - } - } - } - - return true - } - - // parse disconnect packet - _parseDisconnect () { - const packet = this.packet - debug('_parseDisconnect') - - if (this.settings.protocolVersion === 5) { - // response code - if (this._list.length > 0) { - packet.reasonCode = this._parseByte() - if (!constants.MQTT5_DISCONNECT_CODES[packet.reasonCode]) { - this._emitError(new Error('Invalid disconnect reason code')) - } - } else { - packet.reasonCode = 0 - } - // properies mqtt 5 - const properties = this._parseProperties() - if (Object.getOwnPropertyNames(properties).length) { - packet.properties = properties - } - } - - debug('_parseDisconnect result: true') - return true - } - - // parse auth packet - _parseAuth () { - debug('_parseAuth') - const packet = this.packet - - if (this.settings.protocolVersion !== 5) { - return this._emitError(new Error('Not supported auth packet for this version MQTT')) - } - - // response code - packet.reasonCode = this._parseByte() - if (!constants.MQTT5_AUTH_CODES[packet.reasonCode]) { - return this._emitError(new Error('Invalid auth reason code')) - } - // properies mqtt 5 - const properties = this._parseProperties() - if (Object.getOwnPropertyNames(properties).length) { - packet.properties = properties - } - - debug('_parseAuth: result: true') - return true - } - - _parseMessageId () { - const packet = this.packet - - packet.messageId = this._parseNum() - - if (packet.messageId === null) { - this._emitError(new Error('Cannot parse messageId')) - return false - } - - debug('_parseMessageId: packet.messageId %d', packet.messageId) - return true - } - - _parseString (maybeBuffer) { - const length = this._parseNum() - const end = length + this._pos - - if (length === -1 || end > this._list.length || end > this.packet.length) return null - - const result = this._list.toString('utf8', this._pos, end) - this._pos += length - debug('_parseString: result: %s', result) - return result - } - - _parseStringPair () { - debug('_parseStringPair') - return { - name: this._parseString(), - value: this._parseString() - } - } - - _parseBuffer () { - const length = this._parseNum() - const end = length + this._pos - - if (length === -1 || end > this._list.length || end > this.packet.length) return null - - const result = this._list.slice(this._pos, end) - - this._pos += length - debug('_parseBuffer: result: %o', result) - return result - } - - _parseNum () { - if (this._list.length - this._pos < 2) return -1 - - const result = this._list.readUInt16BE(this._pos) - this._pos += 2 - debug('_parseNum: result: %s', result) - return result - } - - _parse4ByteNum () { - if (this._list.length - this._pos < 4) return -1 - - const result = this._list.readUInt32BE(this._pos) - this._pos += 4 - debug('_parse4ByteNum: result: %s', result) - return result - } - - _parseVarByteNum (fullInfoFlag) { - debug('_parseVarByteNum') - const maxBytes = 4 - let bytes = 0 - let mul = 1 - let value = 0 - let result = false - let current - const padding = this._pos ? this._pos : 0 - - while (bytes < maxBytes && (padding + bytes) < this._list.length) { - current = this._list.readUInt8(padding + bytes++) - value += mul * (current & constants.VARBYTEINT_MASK) - mul *= 0x80 - - if ((current & constants.VARBYTEINT_FIN_MASK) === 0) { - result = true - break - } - if (this._list.length <= bytes) { - break - } - } - - if (!result && bytes === maxBytes && this._list.length >= bytes) { - this._emitError(new Error('Invalid variable byte integer')) - } - - if (padding) { - this._pos += bytes - } - - if (result) { - if (fullInfoFlag) { - result = { bytes, value } - } else { - result = value - } - } else { - result = false - } - - debug('_parseVarByteNum: result: %o', result) - return result - } - - _parseByte () { - let result - if (this._pos < this._list.length) { - result = this._list.readUInt8(this._pos) - this._pos++ - } - debug('_parseByte: result: %o', result) - return result - } - - _parseByType (type) { - debug('_parseByType: type: %s', type) - switch (type) { - case 'byte': { - return this._parseByte() !== 0 - } - case 'int8': { - return this._parseByte() - } - case 'int16': { - return this._parseNum() - } - case 'int32': { - return this._parse4ByteNum() - } - case 'var': { - return this._parseVarByteNum() - } - case 'string': { - return this._parseString() - } - case 'pair': { - return this._parseStringPair() - } - case 'binary': { - return this._parseBuffer() - } - } - } - - _parseProperties () { - debug('_parseProperties') - const length = this._parseVarByteNum() - const start = this._pos - const end = start + length - const result = {} - while (this._pos < end) { - const type = this._parseByte() - if (!type) { - this._emitError(new Error('Cannot parse property code type')) - return false - } - const name = constants.propertiesCodes[type] - if (!name) { - this._emitError(new Error('Unknown property')) - return false - } - // user properties process - if (name === 'userProperties') { - if (!result[name]) { - result[name] = Object.create(null) - } - const currentUserProperty = this._parseByType(constants.propertiesTypes[name]) - if (result[name][currentUserProperty.name]) { - if (Array.isArray(result[name][currentUserProperty.name])) { - result[name][currentUserProperty.name].push(currentUserProperty.value) - } else { - const currentValue = result[name][currentUserProperty.name] - result[name][currentUserProperty.name] = [currentValue] - result[name][currentUserProperty.name].push(currentUserProperty.value) - } - } else { - result[name][currentUserProperty.name] = currentUserProperty.value - } - continue - } - if (result[name]) { - if (Array.isArray(result[name])) { - result[name].push(this._parseByType(constants.propertiesTypes[name])) - } else { - result[name] = [result[name]] - result[name].push(this._parseByType(constants.propertiesTypes[name])) - } - } else { - result[name] = this._parseByType(constants.propertiesTypes[name]) - } - } - return result - } - - _newPacket () { - debug('_newPacket') - if (this.packet) { - this._list.consume(this.packet.length) - debug('_newPacket: parser emit packet: packet.cmd: %s, packet.payload: %s, packet.length: %d', this.packet.cmd, this.packet.payload, this.packet.length) - this.emit('packet', this.packet) - } - debug('_newPacket: new packet') - this.packet = new Packet() - - this._pos = 0 - - return true - } - - _emitError (err) { - debug('_emitError', err) - this.error = err - this.emit('error', err) - } -} - -module.exports = Parser diff --git a/src/constants.ts b/src/constants.ts new file mode 100644 index 0000000..c53dc1a --- /dev/null +++ b/src/constants.ts @@ -0,0 +1,302 @@ +import { Buffer } from 'buffer' + +/* Command code => mnemonic */ +export const types: Record = { + 0: 'reserved', + 1: 'connect', + 2: 'connack', + 3: 'publish', + 4: 'puback', + 5: 'pubrec', + 6: 'pubrel', + 7: 'pubcomp', + 8: 'subscribe', + 9: 'suback', + 10: 'unsubscribe', + 11: 'unsuback', + 12: 'pingreq', + 13: 'pingresp', + 14: 'disconnect', + 15: 'auth', +} + +export const requiredHeaderFlags: Record = { + 1: 0, // 'connect' + 2: 0, // 'connack' + 4: 0, // 'puback' + 5: 0, // 'pubrec' + 6: 2, // 'pubrel' + 7: 0, // 'pubcomp' + 8: 2, // 'subscribe' + 9: 0, // 'suback' + 10: 2, // 'unsubscribe' + 11: 0, // 'unsuback' + 12: 0, // 'pingreq' + 13: 0, // 'pingresp' + 14: 0, // 'disconnect' + 15: 0, // 'auth' +} + +export const requiredHeaderFlagsErrors: Record = + Object.fromEntries( + Object.entries(requiredHeaderFlags).map(([k, v]) => [ + Number(k), + `Invalid header flag bits, must be 0x${v.toString(16)} for ${types[Number(k)]} packet`, + ]), + ) + +/* Mnemonic => Command code */ +export const codes: Record = Object.fromEntries( + Object.entries(types).map(([k, v]) => [v, Number(k)]), +) + +/* Header */ +export const CMD_SHIFT = 4 + +export const CMD_MASK = 0xf0 + +export const DUP_MASK = 0x08 + +export const QOS_MASK = 0x03 + +export const QOS_SHIFT = 1 + +export const RETAIN_MASK = 0x01 + +/* Length */ +export const VARBYTEINT_MASK = 0x7f + +export const VARBYTEINT_FIN_MASK = 0x80 + +export const VARBYTEINT_MAX = 268435455 + +/* Connack */ +export const SESSIONPRESENT_MASK = 0x01 + +export const SESSIONPRESENT_HEADER = Buffer.from([SESSIONPRESENT_MASK]) + +export const CONNACK_HEADER = Buffer.from([codes.connack << CMD_SHIFT]) + +/* Connect */ +export const USERNAME_MASK = 0x80 + +export const PASSWORD_MASK = 0x40 + +export const WILL_RETAIN_MASK = 0x20 + +export const WILL_QOS_MASK = 0x18 + +export const WILL_QOS_SHIFT = 3 + +export const WILL_FLAG_MASK = 0x04 + +export const CLEAN_SESSION_MASK = 0x02 + +export const CONNECT_HEADER = Buffer.from([codes.connect << CMD_SHIFT]) + +/* Properties */ +export const properties: Record = { + sessionExpiryInterval: 17, + willDelayInterval: 24, + receiveMaximum: 33, + maximumPacketSize: 39, + topicAliasMaximum: 34, + requestResponseInformation: 25, + requestProblemInformation: 23, + userProperties: 38, + authenticationMethod: 21, + authenticationData: 22, + payloadFormatIndicator: 1, + messageExpiryInterval: 2, + contentType: 3, + responseTopic: 8, + correlationData: 9, + maximumQoS: 36, + retainAvailable: 37, + assignedClientIdentifier: 18, + reasonString: 31, + wildcardSubscriptionAvailable: 40, + subscriptionIdentifiersAvailable: 41, + sharedSubscriptionAvailable: 42, + serverKeepAlive: 19, + responseInformation: 26, + serverReference: 28, + topicAlias: 35, + subscriptionIdentifier: 11, +} + +export const propertiesCodes: Record = Object.fromEntries( + Object.entries(properties).map(([prop, id]) => [id, prop]), +) + +export const propertiesTypes: Record = { + sessionExpiryInterval: 'int32', + willDelayInterval: 'int32', + receiveMaximum: 'int16', + maximumPacketSize: 'int32', + topicAliasMaximum: 'int16', + requestResponseInformation: 'byte', + requestProblemInformation: 'byte', + userProperties: 'pair', + authenticationMethod: 'string', + authenticationData: 'binary', + payloadFormatIndicator: 'byte', + messageExpiryInterval: 'int32', + contentType: 'string', + responseTopic: 'string', + correlationData: 'binary', + maximumQoS: 'int8', + retainAvailable: 'byte', + assignedClientIdentifier: 'string', + reasonString: 'string', + wildcardSubscriptionAvailable: 'byte', + subscriptionIdentifiersAvailable: 'byte', + sharedSubscriptionAvailable: 'byte', + serverKeepAlive: 'int16', + responseInformation: 'string', + serverReference: 'string', + topicAlias: 'int16', + subscriptionIdentifier: 'var', +} + +export const genHeader = (type: string): Buffer[][][] => { + return [0, 1, 2].map((qos) => + [0, 1].map((dup) => + [0, 1].map((retain) => { + const buf = Buffer.alloc(1) + buf.writeUInt8( + (codes[type] << CMD_SHIFT) | + (dup ? DUP_MASK : 0) | + (qos << QOS_SHIFT) | + retain, + 0, + // true, + ) + return buf + }), + ), + ) +} + +/* Publish */ +export const PUBLISH_HEADER = genHeader('publish') + +/* Subscribe */ +export const SUBSCRIBE_HEADER = genHeader('subscribe') + +/* Unsubscribe */ +export const UNSUBSCRIBE_HEADER = genHeader('unsubscribe') + +/* Confirmations */ +export const ACKS = { + unsuback: genHeader('unsuback'), + puback: genHeader('puback'), + pubcomp: genHeader('pubcomp'), + pubrel: genHeader('pubrel'), + pubrec: genHeader('pubrec'), +} + +export const SUBACK_HEADER = Buffer.from([codes.suback << CMD_SHIFT]) + +/* Protocol versions */ +export const VERSION3 = Buffer.from([3]) + +export const VERSION4 = Buffer.from([4]) + +export const VERSION5 = Buffer.from([5]) + +export const VERSION131 = Buffer.from([131]) + +export const VERSION132 = Buffer.from([132]) + +/* QoS */ +export const QOS = [0, 1, 2].map((qos) => { + return Buffer.from([qos]) +}) + +/* Empty packets */ +export const EMPTY = { + pingreq: Buffer.from([codes.pingreq << 4, 0]), + pingresp: Buffer.from([codes.pingresp << 4, 0]), + disconnect: Buffer.from([codes.disconnect << 4, 0]), +} + +export const MQTT5_CONNACK_CODES = { + 0x00: 'Success', + 0x10: 'No matching subscribers', + 0x80: 'Unspecified error', + 0x83: 'Implementation specific error', + 0x87: 'Not authorized', + 0x90: 'Topic Name invalid', + 0x91: 'Packet identifier in use', + 0x97: 'Quota exceeded', + 0x99: 'Payload format invalid', +} + +export const MQTT5_PUBREL_PUBCOMP_CODES = { + 0x00: 'Success', + 0x92: 'Packet Identifier not found', +} + +export const MQTT5_SUBACK_CODES = { + 0x00: 'Granted QoS 0', + 0x01: 'Granted QoS 1', + 0x02: 'Granted QoS 2', + 0x80: 'Unspecified error', + 0x83: 'Implementation specific error', + 0x87: 'Not authorized', + 0x8f: 'Topic Filter invalid', + 0x91: 'Packet Identifier in use', + 0x97: 'Quota exceeded', + 0x9e: 'Shared Subscriptions not supported', + 0xa1: 'Subscription Identifiers not supported', + 0xa2: 'Wildcard Subscriptions not supported', +} + +export const MQTT5_UNSUBACK_CODES = { + 0x00: 'Success', + 0x11: 'No subscription existed', + 0x80: 'Unspecified error', + 0x83: 'Implementation specific error', + 0x87: 'Not authorized', + 0x8f: 'Topic Filter invalid', + 0x91: 'Packet Identifier in use', +} + +export const MQTT5_DISCONNECT_CODES = { + 0x00: 'Normal disconnection', + 0x04: 'Disconnect with Will Message', + 0x80: 'Unspecified error', + 0x81: 'Malformed Packet', + 0x82: 'Protocol Error', + 0x83: 'Implementation specific error', + 0x87: 'Not authorized', + 0x89: 'Server busy', + 0x8b: 'Server shutting down', + 0x8d: 'Keep Alive timeout', + 0x8e: 'Session taken over', + 0x8f: 'Topic Filter invalid', + 0x90: 'Topic Name invalid', + 0x93: 'Receive Maximum exceeded', + 0x94: 'Topic Alias invalid', + 0x95: 'Packet too large', + 0x96: 'Message rate too high', + 0x97: 'Quota exceeded', + 0x98: 'Administrative action', + 0x99: 'Payload format invalid', + 0x9a: 'Retain not supported', + 0x9b: 'QoS not supported', + 0x9c: 'Use another server', + 0x9d: 'Server moved', + 0x9e: 'Shared Subscriptions not supported', + 0x9f: 'Connection rate exceeded', + 0xa0: 'Maximum connect time', + 0xa1: 'Subscription Identifiers not supported', + 0xa2: 'Wildcard Subscriptions not supported', +} + +export const MQTT5_AUTH_CODES = { + 0x00: 'Success', + 0x18: 'Continue authentication', + 0x19: 'Re-authenticate', +} diff --git a/src/generate.ts b/src/generate.ts new file mode 100644 index 0000000..ec61356 --- /dev/null +++ b/src/generate.ts @@ -0,0 +1,65 @@ +import writeToStream from './writeToStream' +import { EventEmitter } from 'events' +import { Buffer } from 'buffer' + +interface GenerateOptions { + // Define the properties of opts if known +} + +function generate(packet: any, opts?: GenerateOptions): Buffer { + const stream = new Accumulator() + writeToStream(packet, stream, opts) + return stream.concat() +} + +class Accumulator extends EventEmitter { + private _array: (Buffer | string)[] + + private _i: number + + constructor() { + super() + this._array = new Array(20) + this._i = 0 + } + + write(chunk: Buffer | string): boolean { + this._array[this._i++] = chunk + return true + } + + concat(): Buffer { + let length = 0 + const lengths: number[] = new Array(this._array.length) + const list: (Buffer | string)[] = this._array + let pos = 0 + + for (let i = 0; i < list.length && list[i] !== undefined; i++) { + lengths[i] = + typeof list[i] !== 'string' + ? list[i].length + : Buffer.byteLength(list[i]) + length += lengths[i] + } + + const result: Buffer = Buffer.allocUnsafe(length) + + for (let i = 0; i < list.length && list[i] !== undefined; i++) { + if (typeof list[i] !== 'string') { + ;(list[i] as Buffer).copy(result, pos) + pos += lengths[i] + } else { + result.write(list[i] as string, pos) + pos += lengths[i] + } + } + + return result + } + + destroy(err?: Error): void { + if (err) this.emit('error', err) + } +} + +export default generate diff --git a/src/index.ts b/src/index.ts new file mode 100644 index 0000000..d35edad --- /dev/null +++ b/src/index.ts @@ -0,0 +1,5 @@ +import { parser } from './parser' +import generate from './generate' +import writeToStream from './writeToStream' + +export { parser, generate, writeToStream } diff --git a/src/numbers.ts b/src/numbers.ts new file mode 100644 index 0000000..d2799ef --- /dev/null +++ b/src/numbers.ts @@ -0,0 +1,60 @@ +import { Buffer } from 'buffer' + +const max: number = 65536 +const cache: { [key: number]: Buffer } = {} + +// in node 6 Buffer.subarray returns a Uint8Array instead of a Buffer +// later versions return a Buffer +// alternative is Buffer.slice but that creates a new buffer +// creating new buffers takes time +// SubOk is only false on node < 8 +const SubOk: boolean = Buffer.isBuffer(Buffer.from([1, 2]).subarray(0, 1)) + +function generateBuffer(i: number): Buffer { + const buffer: Buffer = Buffer.allocUnsafe(2) + buffer.writeUInt8(i >> 8, 0) + buffer.writeUInt8(i & 0x00ff, 1) + + return buffer +} + +function generateCache(): void { + for (let i: number = 0; i < max; i++) { + cache[i] = generateBuffer(i) + } +} + +function genBufVariableByteInt(num: number): Buffer { + const maxLength: number = 4 // max 4 bytes + let digit: number = 0 + let pos: number = 0 + const buffer: Buffer = Buffer.allocUnsafe(maxLength) + + do { + digit = num % 128 | 0 + num = (num / 128) | 0 + if (num > 0) digit |= 0x80 + + buffer.writeUInt8(digit, pos++) + } while (num > 0 && pos < maxLength) + + if (num > 0) { + pos = 0 + } + + return SubOk ? buffer.subarray(0, pos) : buffer.slice(0, pos) +} + +function generate4ByteBuffer(num: number): Buffer { + const buffer: Buffer = Buffer.allocUnsafe(4) + buffer.writeUInt32BE(num, 0) + return buffer +} + +export default { + cache, + generateCache, + generateNumber: generateBuffer, + genBufVariableByteInt, + generate4ByteBuffer, +} diff --git a/src/packet.ts b/src/packet.ts new file mode 100644 index 0000000..8a6c75c --- /dev/null +++ b/src/packet.ts @@ -0,0 +1,25 @@ +export default class Packet { + cmd: string + + retain: boolean + + qos: number + + dup: boolean + + length: number + + topic: string + + payload: Buffer + + constructor() { + this.cmd = null + this.retain = false + this.qos = 0 + this.dup = false + this.length = -1 + this.topic = null + this.payload = null + } +} diff --git a/src/parser.ts b/src/parser.ts new file mode 100644 index 0000000..698d3ec --- /dev/null +++ b/src/parser.ts @@ -0,0 +1,1056 @@ +import bl from 'bl' +import { EventEmitter } from 'events' +import Packet from './packet' +import * as constants from './constants' +import * as debug from 'debug' +import { BufferList } from 'bl/BufferList' + +const debugLog = debug('mqtt-packet:parser') + +// Type definitions for reused structures +interface ParseVarByteNumResult { + bytes: number + value: number +} + +interface StringPair { + name: string + value: string +} + +// Interface for parser settings +interface ParserSettings { + protocolVersion?: number + bridgeMode?: boolean + [key: string]: any +} + +// Interface for property types +type PropertyTypes = + | 'byte' + | 'int8' + | 'int16' + | 'int32' + | 'var' + | 'string' + | 'pair' + | 'binary' + +class Parser extends EventEmitter { + public packet: Packet + + public error: Error | null + + public settings: ParserSettings + + private _list: BufferList + + private _states: string[] + + private _stateCounter: number + + private _pos: number + + constructor() { + super() + this.packet = new Packet() + this.error = null + this._list = bl() + this._stateCounter = 0 + this._pos = 0 + this.settings = {} + this._states = [ + '_parseHeader', + '_parseLength', + '_parsePayload', + '_newPacket', + ] + } + + // Static factory method + static parser(opt?: ParserSettings): Parser { + if (!(this instanceof Parser)) return new Parser().parser(opt) + + const instance = this as unknown as Parser + instance.settings = opt || {} + instance._resetState() + return instance + } + + // Instance method version of parser + parser(opt?: ParserSettings): Parser { + this.settings = opt || {} + this._resetState() + return this + } + + private _resetState(): void { + debugLog( + '_resetState: resetting packet, error, _list, and _stateCounter', + ) + this.packet = new Packet() + this.error = null + this._list = bl() + this._stateCounter = 0 + } + + parse(buf: Buffer): number { + if (this.error) this._resetState() + + this._list.append(buf) + debugLog('parse: current state: %s', this._states[this._stateCounter]) + + while ( + (this.packet.length !== -1 || this._list.length > 0) && + // Using type assertion because TypeScript doesn't know that this[state] is callable + (this as any)[this._states[this._stateCounter]]() && + !this.error + ) { + this._stateCounter++ + debugLog( + 'parse: state complete. _stateCounter is now: %d', + this._stateCounter, + ) + debugLog( + 'parse: packet.length: %d, buffer list length: %d', + this.packet.length, + this._list.length, + ) + if (this._stateCounter >= this._states.length) + this._stateCounter = 0 + } + + debugLog( + 'parse: exited while loop. packet: %d, buffer list length: %d', + this.packet.length, + this._list.length, + ) + return this._list.length + } + + private _parseHeader(): boolean { + // There is at least one byte in the buffer + const zero = this._list.readUInt8(0) + const cmdIndex = zero >> constants.CMD_SHIFT + this.packet.cmd = constants.types[cmdIndex] + const headerFlags = zero & 0xf + const requiredHeaderFlags = constants.requiredHeaderFlags[cmdIndex] + + if ( + requiredHeaderFlags != null && + headerFlags !== requiredHeaderFlags + ) { + // Where a flag bit is marked as "Reserved" in Table 2.2 - Flag Bits, it is reserved for future use and MUST be set to the value listed in that table [MQTT-2.2.2-1]. If invalid flags are received, the receiver MUST close the Network Connection [MQTT-2.2.2-2] + return this._emitError( + new Error(constants.requiredHeaderFlagsErrors[cmdIndex]), + ) + } + + this.packet.retain = (zero & constants.RETAIN_MASK) !== 0 + this.packet.qos = (zero >> constants.QOS_SHIFT) & constants.QOS_MASK + + if (this.packet.qos > 2) { + return this._emitError( + new Error('Packet must not have both QoS bits set to 1'), + ) + } + + this.packet.dup = (zero & constants.DUP_MASK) !== 0 + debugLog('_parseHeader: packet: %o', this.packet) + + this._list.consume(1) + + return true + } + + private _parseLength(): boolean { + // There is at least one byte in the list + const result = this._parseVarByteNum(true) + + if (result) { + this.packet.length = result.value + this._list.consume(result.bytes) + } + + debugLog('_parseLength %d', result.value) + return !!result + } + + private _parsePayload(): boolean { + debugLog('_parsePayload: payload %O', this._list) + let result = false + + // Do we have a payload? Do we have enough data to complete the payload? + // PINGs have no payload + if ( + this.packet.length === 0 || + this._list.length >= this.packet.length + ) { + this._pos = 0 + + switch (this.packet.cmd) { + case 'connect': + this._parseConnect() + break + case 'connack': + this._parseConnack() + break + case 'publish': + this._parsePublish() + break + case 'puback': + case 'pubrec': + case 'pubrel': + case 'pubcomp': + this._parseConfirmation() + break + case 'subscribe': + this._parseSubscribe() + break + case 'suback': + this._parseSuback() + break + case 'unsubscribe': + this._parseUnsubscribe() + break + case 'unsuback': + this._parseUnsuback() + break + case 'pingreq': + case 'pingresp': + // These are empty, nothing to do + break + case 'disconnect': + this._parseDisconnect() + break + case 'auth': + this._parseAuth() + break + default: + this._emitError(new Error('Not supported')) + } + + result = true + } + + debugLog('_parsePayload complete result: %s', result) + return result + } + + private _parseConnect(): Packet | void { + debugLog('_parseConnect') + let topic: string // Will topic + let payload: Buffer // Will payload + let password: Buffer // Password + let username: string // Username + const flags: { [key: string]: boolean } = {} + const packet = this.packet + + // Parse protocolId + const protocolId = this._parseString() + + if (protocolId === null) + return this._emitError(new Error('Cannot parse protocolId')) + if (protocolId !== 'MQTT' && protocolId !== 'MQIsdp') { + return this._emitError(new Error('Invalid protocolId')) + } + + packet.protocolId = protocolId + + // Parse constants version number + if (this._pos >= this._list.length) + return this._emitError(new Error('Packet too short')) + + packet.protocolVersion = this._list.readUInt8(this._pos) + + if (packet.protocolVersion >= 128) { + packet.bridgeMode = true + packet.protocolVersion -= 128 + } + + if ( + packet.protocolVersion !== 3 && + packet.protocolVersion !== 4 && + packet.protocolVersion !== 5 + ) { + return this._emitError(new Error('Invalid protocol version')) + } + + this._pos++ + + if (this._pos >= this._list.length) { + return this._emitError(new Error('Packet too short')) + } + + if (this._list.readUInt8(this._pos) & 0x1) { + // The Server MUST validate that the reserved flag in the CONNECT Control Packet is set to zero and disconnect the Client if it is not zero [MQTT-3.1.2-3] + return this._emitError( + new Error('Connect flag bit 0 must be 0, but got 1'), + ) + } + + // Parse connect flags + flags.username = + (this._list.readUInt8(this._pos) & constants.USERNAME_MASK) !== 0 + flags.password = + (this._list.readUInt8(this._pos) & constants.PASSWORD_MASK) !== 0 + flags.will = + (this._list.readUInt8(this._pos) & constants.WILL_FLAG_MASK) !== 0 + + const willRetain = !!( + this._list.readUInt8(this._pos) & constants.WILL_RETAIN_MASK + ) + const willQos = + (this._list.readUInt8(this._pos) & constants.WILL_QOS_MASK) >> + constants.WILL_QOS_SHIFT + + if (flags.will) { + packet.will = {} + packet.will.retain = willRetain + packet.will.qos = willQos + } else { + if (willRetain) { + return this._emitError( + new Error( + 'Will Retain Flag must be set to zero when Will Flag is set to 0', + ), + ) + } + if (willQos) { + return this._emitError( + new Error( + 'Will QoS must be set to zero when Will Flag is set to 0', + ), + ) + } + } + + packet.clean = + (this._list.readUInt8(this._pos) & constants.CLEAN_SESSION_MASK) !== + 0 + this._pos++ + + // Parse keepalive + packet.keepalive = this._parseNum() + if (packet.keepalive === -1) + return this._emitError(new Error('Packet too short')) + + // parse properties + if (packet.protocolVersion === 5) { + const properties = this._parseProperties() + if (Object.getOwnPropertyNames(properties).length) { + packet.properties = properties + } + } + + // Parse clientId + const clientId = this._parseString() + if (clientId === null) + return this._emitError(new Error('Packet too short')) + packet.clientId = clientId + debugLog('_parseConnect: packet.clientId: %s', packet.clientId) + + if (flags.will) { + if (packet.protocolVersion === 5) { + const willProperties = this._parseProperties() + if (Object.getOwnPropertyNames(willProperties).length) { + packet.will.properties = willProperties + } + } + // Parse will topic + topic = this._parseString() + if (topic === null) + return this._emitError(new Error('Cannot parse will topic')) + packet.will.topic = topic + debugLog('_parseConnect: packet.will.topic: %s', packet.will.topic) + + // Parse will payload + payload = this._parseBuffer() + if (payload === null) + return this._emitError(new Error('Cannot parse will payload')) + packet.will.payload = payload + debugLog( + '_parseConnect: packet.will.paylaod: %s', + packet.will.payload, + ) + } + + // Parse username + if (flags.username) { + username = this._parseString() + if (username === null) + return this._emitError(new Error('Cannot parse username')) + packet.username = username + debugLog('_parseConnect: packet.username: %s', packet.username) + } + + // Parse password + if (flags.password) { + password = this._parseBuffer() + if (password === null) + return this._emitError(new Error('Cannot parse password')) + packet.password = password + } + + // need for right parse auth packet and self set up + this.settings = packet + debugLog('_parseConnect: complete') + return packet + } + + private _parseConnack(): void { + debugLog('_parseConnack') + const packet = this.packet + + if (this._list.length < 1) return null + const flags = this._list.readUInt8(this._pos++) + if (flags > 1) { + return this._emitError( + new Error('Invalid connack flags, bits 7-1 must be set to 0'), + ) + } + packet.sessionPresent = !!(flags & constants.SESSIONPRESENT_MASK) + + if (this.settings.protocolVersion === 5) { + if (this._list.length >= 2) { + packet.reasonCode = this._list.readUInt8(this._pos++) + } else { + packet.reasonCode = 0 + } + } else { + if (this._list.length < 2) return null + packet.returnCode = this._list.readUInt8(this._pos++) + } + + if (packet.returnCode === -1 || packet.reasonCode === -1) + return this._emitError(new Error('Cannot parse return code')) + // mqtt 5 properties + if (this.settings.protocolVersion === 5) { + const properties = this._parseProperties() + if (Object.getOwnPropertyNames(properties).length) { + packet.properties = properties + } + } + debugLog('_parseConnack: complete') + } + + private _parsePublish(): void { + debugLog('_parsePublish') + const packet = this.packet + packet.topic = this._parseString() + + if (packet.topic === null) + return this._emitError(new Error('Cannot parse topic')) + + // Parse messageId + if (packet.qos > 0) + if (!this._parseMessageId()) { + return + } + + // Properties mqtt 5 + if (this.settings.protocolVersion === 5) { + const properties = this._parseProperties() + if (Object.getOwnPropertyNames(properties).length) { + packet.properties = properties + } + } + + packet.payload = this._list.slice(this._pos, packet.length) + debugLog('_parsePublish: payload from buffer list: %o', packet.payload) + } + + private _parseSubscribe(): void { + debugLog('_parseSubscribe') + const packet = this.packet + let topic: string + let options: number + let qos: number + let rh: number + let rap: boolean + let nl: boolean + let subscription: any + + packet.subscriptions = [] + + if (!this._parseMessageId()) { + return + } + + // Properties mqtt 5 + if (this.settings.protocolVersion === 5) { + const properties = this._parseProperties() + if (Object.getOwnPropertyNames(properties).length) { + packet.properties = properties + } + } + + if (packet.length <= 0) { + return this._emitError( + new Error('Malformed subscribe, no payload specified'), + ) + } + + while (this._pos < packet.length) { + // Parse topic + topic = this._parseString() + if (topic === null) + return this._emitError(new Error('Cannot parse topic')) + if (this._pos >= packet.length) + return this._emitError(new Error('Malformed Subscribe Payload')) + + options = this._parseByte() + + if (this.settings.protocolVersion === 5) { + if (options & 0xc0) { + return this._emitError( + new Error( + 'Invalid subscribe topic flag bits, bits 7-6 must be 0', + ), + ) + } + } else if (options & 0xfc) { + return this._emitError( + new Error( + 'Invalid subscribe topic flag bits, bits 7-2 must be 0', + ), + ) + } + + qos = options & constants.SUBSCRIBE_OPTIONS_QOS_MASK + if (qos > 2) { + return this._emitError( + new Error('Invalid subscribe QoS, must be <= 2'), + ) + } + nl = + ((options >> constants.SUBSCRIBE_OPTIONS_NL_SHIFT) & + constants.SUBSCRIBE_OPTIONS_NL_MASK) !== + 0 + rap = + ((options >> constants.SUBSCRIBE_OPTIONS_RAP_SHIFT) & + constants.SUBSCRIBE_OPTIONS_RAP_MASK) !== + 0 + rh = + (options >> constants.SUBSCRIBE_OPTIONS_RH_SHIFT) & + constants.SUBSCRIBE_OPTIONS_RH_MASK + + if (rh > 2) { + return this._emitError( + new Error('Invalid retain handling, must be <= 2'), + ) + } + + subscription = { topic, qos } + + // mqtt 5 options + if (this.settings.protocolVersion === 5) { + subscription.nl = nl + subscription.rap = rap + subscription.rh = rh + } else if (this.settings.bridgeMode) { + subscription.rh = 0 + subscription.rap = true + subscription.nl = true + } + + // Push pair to subscriptions + debugLog( + '_parseSubscribe: push subscription `%s` to subscription', + subscription, + ) + packet.subscriptions.push(subscription) + } + } + + private _parseSuback(): void { + debugLog('_parseSuback') + const packet = this.packet + this.packet.granted = [] + + if (!this._parseMessageId()) { + return + } + + // Properties mqtt 5 + if (this.settings.protocolVersion === 5) { + const properties = this._parseProperties() + if (Object.getOwnPropertyNames(properties).length) { + packet.properties = properties + } + } + + if (packet.length <= 0) { + return this._emitError( + new Error('Malformed suback, no payload specified'), + ) + } + + // Parse granted QoSes + while (this._pos < this.packet.length) { + const code = this._list.readUInt8(this._pos++) + if (this.settings.protocolVersion === 5) { + if (!constants.MQTT5_SUBACK_CODES[code]) { + return this._emitError(new Error('Invalid suback code')) + } + } else if (code > 2 && code !== 0x80) { + return this._emitError( + new Error('Invalid suback QoS, must be 0, 1, 2 or 128'), + ) + } + this.packet.granted.push(code) + } + } + + private _parseUnsubscribe(): void { + debugLog('_parseUnsubscribe') + const packet = this.packet + + packet.unsubscriptions = [] + + // Parse messageId + if (!this._parseMessageId()) { + return + } + + // Properties mqtt 5 + if (this.settings.protocolVersion === 5) { + const properties = this._parseProperties() + if (Object.getOwnPropertyNames(properties).length) { + packet.properties = properties + } + } + + if (packet.length <= 0) { + return this._emitError( + new Error('Malformed unsubscribe, no payload specified'), + ) + } + + while (this._pos < packet.length) { + // Parse topic + const topic = this._parseString() + if (topic === null) + return this._emitError(new Error('Cannot parse topic')) + + // Push topic to unsubscriptions + debugLog( + '_parseUnsubscribe: push topic `%s` to unsubscriptions', + topic, + ) + packet.unsubscriptions.push(topic) + } + } + + private _parseUnsuback(): void { + debugLog('_parseUnsuback') + const packet = this.packet + if (!this._parseMessageId()) + return this._emitError(new Error('Cannot parse messageId')) + + if ( + (this.settings.protocolVersion === 3 || + this.settings.protocolVersion === 4) && + packet.length !== 2 + ) { + return this._emitError( + new Error('Malformed unsuback, payload length must be 2'), + ) + } + if (packet.length <= 0) { + return this._emitError( + new Error('Malformed unsuback, no payload specified'), + ) + } + + // Properties mqtt 5 + if (this.settings.protocolVersion === 5) { + const properties = this._parseProperties() + if (Object.getOwnPropertyNames(properties).length) { + packet.properties = properties + } + // Parse granted QoSes + packet.granted = [] + + while (this._pos < this.packet.length) { + const code = this._list.readUInt8(this._pos++) + if (!constants.MQTT5_UNSUBACK_CODES[code]) { + return this._emitError(new Error('Invalid unsuback code')) + } + this.packet.granted.push(code) + } + } + } + + // parse packets like puback, pubrec, pubrel, pubcomp + private _parseConfirmation(): boolean { + debugLog('_parseConfirmation: packet.cmd: `%s`', this.packet.cmd) + const packet = this.packet + + this._parseMessageId() + + if (this.settings.protocolVersion === 5) { + if (packet.length > 2) { + // response code + packet.reasonCode = this._parseByte() + switch (this.packet.cmd) { + case 'puback': + case 'pubrec': + if ( + !constants.MQTT5_PUBACK_PUBREC_CODES[ + packet.reasonCode + ] + ) { + return this._emitError( + new Error( + `Invalid ${this.packet.cmd} reason code`, + ), + ) + } + break + case 'pubrel': + case 'pubcomp': + if ( + !constants.MQTT5_PUBREL_PUBCOMP_CODES[ + packet.reasonCode + ] + ) { + return this._emitError( + new Error( + `Invalid ${this.packet.cmd} reason code`, + ), + ) + } + break + } + debugLog( + '_parseConfirmation: packet.reasonCode `%d`', + packet.reasonCode, + ) + } else { + packet.reasonCode = 0 + } + + if (packet.length > 3) { + // properies mqtt 5 + const properties = this._parseProperties() + if (Object.getOwnPropertyNames(properties).length) { + packet.properties = properties + } + } + } + + return true + } + + // parse disconnect packet + private _parseDisconnect(): boolean { + const packet = this.packet + debugLog('_parseDisconnect') + + if (this.settings.protocolVersion === 5) { + // response code + if (this._list.length > 0) { + packet.reasonCode = this._parseByte() + if (!constants.MQTT5_DISCONNECT_CODES[packet.reasonCode]) { + this._emitError(new Error('Invalid disconnect reason code')) + } + } else { + packet.reasonCode = 0 + } + // properies mqtt 5 + const properties = this._parseProperties() + if (Object.getOwnPropertyNames(properties).length) { + packet.properties = properties + } + } + + debugLog('_parseDisconnect result: true') + return true + } + + // parse auth packet + private _parseAuth(): boolean { + debugLog('_parseAuth') + const packet = this.packet + + if (this.settings.protocolVersion !== 5) { + return this._emitError( + new Error('Not supported auth packet for this version MQTT'), + ) + } + + // response code + packet.reasonCode = this._parseByte() + if (!constants.MQTT5_AUTH_CODES[packet.reasonCode]) { + return this._emitError(new Error('Invalid auth reason code')) + } + // properies mqtt 5 + const properties = this._parseProperties() + if (Object.getOwnPropertyNames(properties).length) { + packet.properties = properties + } + + debugLog('_parseAuth: result: true') + return true + } + + private _parseMessageId(): boolean { + const packet = this.packet + + packet.messageId = this._parseNum() + + if (packet.messageId === null) { + this._emitError(new Error('Cannot parse messageId')) + return false + } + + debugLog('_parseMessageId: packet.messageId %d', packet.messageId) + return true + } + + private _parseString(maybeBuffer?: boolean): string | null { + const length = this._parseNum() + const end = length + this._pos + + if ( + length === -1 || + end > this._list.length || + end > this.packet.length + ) + return null + + const result = this._list.toString('utf8', this._pos, end) + this._pos += length + debugLog('_parseString: result: %s', result) + return result + } + + private _parseStringPair(): StringPair | null { + debugLog('_parseStringPair') + const name = this._parseString() + const value = this._parseString() + + if (name === null || value === null) return null + + return { name, value } + } + + private _parseBuffer(): Buffer | null { + const length = this._parseNum() + const end = length + this._pos + + if ( + length === -1 || + end > this._list.length || + end > this.packet.length + ) + return null + + const result = this._list.slice(this._pos, end) + + this._pos += length + debugLog('_parseBuffer: result: %o', result) + return result + } + + private _parseNum(): number { + if (this._list.length - this._pos < 2) return -1 + + const result = this._list.readUInt16BE(this._pos) + this._pos += 2 + debugLog('_parseNum: result: %s', result) + return result + } + + private _parse4ByteNum(): number { + if (this._list.length - this._pos < 4) return -1 + + const result = this._list.readUInt32BE(this._pos) + this._pos += 4 + debugLog('_parse4ByteNum: result: %s', result) + return result + } + + private _parseVarByteNum( + fullInfoFlag?: boolean, + ): ParseVarByteNumResult | number | false { + debugLog('_parseVarByteNum') + const maxBytes = 4 + let bytes = 0 + let mul = 1 + let value = 0 + let result: ParseVarByteNumResult | number | false = false + let current: number + const padding = this._pos ? this._pos : 0 + + while (bytes < maxBytes && padding + bytes < this._list.length) { + current = this._list.readUInt8(padding + bytes++) + value += mul * (current & constants.VARBYTEINT_MASK) + mul *= 0x80 + + if ((current & constants.VARBYTEINT_FIN_MASK) === 0) { + result = true + break + } + if (this._list.length <= bytes) { + break + } + } + + if (!result && bytes === maxBytes && this._list.length >= bytes) { + this._emitError(new Error('Invalid variable byte integer')) + } + + if (padding) { + this._pos += bytes + } + + if (result) { + if (fullInfoFlag) { + result = { bytes, value } + } else { + result = value + } + } else { + result = false + } + + debugLog('_parseVarByteNum: result: %o', result) + return result + } + + private _parseByte(): number { + let result: number + if (this._pos < this._list.length) { + result = this._list.readUInt8(this._pos) + this._pos++ + } + debugLog('_parseByte: result: %o', result) + return result + } + + private _parseByType(type: PropertyTypes): any { + debugLog('_parseByType: type: %s', type) + switch (type) { + case 'byte': { + return this._parseByte() !== 0 + } + case 'int8': { + return this._parseByte() + } + case 'int16': { + return this._parseNum() + } + case 'int32': { + return this._parse4ByteNum() + } + case 'var': { + return this._parseVarByteNum() + } + case 'string': { + return this._parseString() + } + case 'pair': { + return this._parseStringPair() + } + case 'binary': { + return this._parseBuffer() + } + } + } + + private _parseProperties(): Record { + debugLog('_parseProperties') + const length = this._parseVarByteNum() as number + const start = this._pos + const end = start + length + const result: Record = {} + + while (this._pos < end) { + const type = this._parseByte() + if (!type) { + this._emitError(new Error('Cannot parse property code type')) + return {} + } + const name = constants.propertiesCodes[type] + if (!name) { + this._emitError(new Error('Unknown property')) + return {} + } + // user properties process + if (name === 'userProperties') { + if (!result[name]) { + result[name] = Object.create(null) + } + const currentUserProperty = this._parseByType( + constants.propertiesTypes[name], + ) + if (result[name][currentUserProperty.name]) { + if (Array.isArray(result[name][currentUserProperty.name])) { + result[name][currentUserProperty.name].push( + currentUserProperty.value, + ) + } else { + const currentValue = + result[name][currentUserProperty.name] + result[name][currentUserProperty.name] = [currentValue] + result[name][currentUserProperty.name].push( + currentUserProperty.value, + ) + } + } else { + result[name][currentUserProperty.name] = + currentUserProperty.value + } + continue + } + if (result[name]) { + if (Array.isArray(result[name])) { + result[name].push( + this._parseByType(constants.propertiesTypes[name]), + ) + } else { + result[name] = [result[name]] + result[name].push( + this._parseByType(constants.propertiesTypes[name]), + ) + } + } else { + result[name] = this._parseByType( + constants.propertiesTypes[name], + ) + } + } + return result + } + + private _newPacket(): boolean { + debugLog('_newPacket') + if (this.packet) { + this._list.consume(this.packet.length) + debugLog( + '_newPacket: parser emit packet: packet.cmd: %s, packet.payload: %s, packet.length: %d', + this.packet.cmd, + this.packet.payload, + this.packet.length, + ) + this.emit('packet', this.packet) + } + debugLog('_newPacket: new packet') + this.packet = new Packet() + + this._pos = 0 + + return true + } + + private _emitError(err: Error): boolean { + debugLog('_emitError', err) + this.error = err + this.emit('error', err) + return false + } +} + +export default Parser diff --git a/src/types.ts b/src/types.ts new file mode 100644 index 0000000..3417128 --- /dev/null +++ b/src/types.ts @@ -0,0 +1,261 @@ +import EventEmitter from 'events' + +export type QoS = 0 | 1 | 2 + +export type PacketCmd = + | 'auth' + | 'connack' + | 'connect' + | 'disconnect' + | 'pingreq' + | 'pingresp' + | 'puback' + | 'pubcomp' + | 'publish' + | 'pubrel' + | 'pubrec' + | 'suback' + | 'subscribe' + | 'unsuback' + | 'unsubscribe' + +export type UserProperties = { [index: string]: string | string[] } + +export interface IPacket { + cmd: PacketCmd + messageId?: number + length?: number +} + +export interface IAuthPacket extends IPacket { + cmd: 'auth' + reasonCode: number + properties?: { + authenticationMethod?: string + authenticationData?: Buffer + reasonString?: string + userProperties?: UserProperties + } +} + +export interface IConnectPacket extends IPacket { + cmd: 'connect' + clientId: string + protocolVersion?: 131 | 132 | 4 | 5 | 3 + protocolId?: 'MQTT' | 'MQIsdp' + clean?: boolean + keepalive?: number + username?: string + password?: Buffer + will?: { + topic: string + payload: Buffer | string + qos?: QoS + retain?: boolean + properties?: { + willDelayInterval?: number + payloadFormatIndicator?: boolean + messageExpiryInterval?: number + contentType?: string + responseTopic?: string + correlationData?: Buffer + userProperties?: UserProperties + } + } + properties?: { + sessionExpiryInterval?: number + receiveMaximum?: number + maximumPacketSize?: number + topicAliasMaximum?: number + requestResponseInformation?: boolean + requestProblemInformation?: boolean + userProperties?: UserProperties + authenticationMethod?: string + authenticationData?: Buffer + } +} + +export interface IPublishPacket extends IPacket { + cmd: 'publish' + qos: QoS + dup: boolean + retain: boolean + topic: string + payload: string | Buffer + properties?: { + payloadFormatIndicator?: boolean + messageExpiryInterval?: number + topicAlias?: number + responseTopic?: string + correlationData?: Buffer + userProperties?: UserProperties + subscriptionIdentifier?: number | number[] + contentType?: string + } +} + +export interface IConnackPacket extends IPacket { + cmd: 'connack' + returnCode?: number + reasonCode?: number + sessionPresent: boolean + properties?: { + sessionExpiryInterval?: number + receiveMaximum?: number + maximumQoS?: number + retainAvailable?: boolean + maximumPacketSize?: number + assignedClientIdentifier?: string + topicAliasMaximum?: number + reasonString?: string + userProperties?: UserProperties + wildcardSubscriptionAvailable?: boolean + subscriptionIdentifiersAvailable?: boolean + sharedSubscriptionAvailable?: boolean + serverKeepAlive?: number + responseInformation?: string + serverReference?: string + authenticationMethod?: string + authenticationData?: Buffer + } +} + +export interface ISubscription { + topic: string + qos: QoS + nl?: boolean + rap?: boolean + rh?: number +} + +export interface ISubscribePacket extends IPacket { + cmd: 'subscribe' + subscriptions: ISubscription[] + properties?: { + reasonString?: string + subscriptionIdentifier?: number + userProperties?: UserProperties + } +} + +export interface ISubackPacket extends IPacket { + cmd: 'suback' + reasonCode?: number + properties?: { + reasonString?: string + userProperties?: UserProperties + } + granted: number[] | Record +} + +export interface IUnsubscribePacket extends IPacket { + cmd: 'unsubscribe' + properties?: { + reasonString?: string + userProperties?: UserProperties + } + unsubscriptions: string[] +} + +export interface IUnsubackPacket extends IPacket { + cmd: 'unsuback' + reasonCode?: number + properties?: { + reasonString?: string + userProperties?: UserProperties + } + granted: number[] +} + +export interface IPubackPacket extends IPacket { + cmd: 'puback' + reasonCode?: number + properties?: { + reasonString?: string + userProperties?: UserProperties + } +} + +export interface IPubcompPacket extends IPacket { + cmd: 'pubcomp' + reasonCode?: number + properties?: { + reasonString?: string + userProperties?: UserProperties + } +} + +export interface IPubrelPacket extends IPacket { + cmd: 'pubrel' + reasonCode?: number + properties?: { + reasonString?: string + userProperties?: UserProperties + } +} + +export interface IPubrecPacket extends IPacket { + cmd: 'pubrec' + reasonCode?: number + properties?: { + reasonString?: string + userProperties?: UserProperties + } +} + +export interface IPingreqPacket extends IPacket { + cmd: 'pingreq' +} + +export interface IPingrespPacket extends IPacket { + cmd: 'pingresp' +} + +export interface IDisconnectPacket extends IPacket { + cmd: 'disconnect' + reasonCode?: number + properties?: { + sessionExpiryInterval?: number + reasonString?: string + userProperties?: UserProperties + serverReference?: string + } +} + +export type AckPacket = + | IPubackPacket + | IPubcompPacket + | IPubrelPacket + | IPubrecPacket + +export type PacketWithProperties = + | IConnectPacket + | IPublishPacket + | IConnackPacket + | ISubscribePacket + | ISubackPacket + | IUnsubscribePacket + | IUnsubackPacket + | IPubackPacket + | IPubcompPacket + | IPubrelPacket + | IDisconnectPacket + | IPubrecPacket + | IAuthPacket + +export type Packet = + | IConnectPacket + | IPublishPacket + | IConnackPacket + | ISubscribePacket + | ISubackPacket + | IUnsubscribePacket + | IUnsubackPacket + | IPubackPacket + | IPubcompPacket + | IPubrelPacket + | IPingreqPacket + | IPingrespPacket + | IDisconnectPacket + | IPubrecPacket + | IAuthPacket diff --git a/src/writeToStream.ts b/src/writeToStream.ts new file mode 100644 index 0000000..ecd2187 --- /dev/null +++ b/src/writeToStream.ts @@ -0,0 +1,1443 @@ +import * as protocol from './constants' +import { Buffer } from 'buffer' +import numbers from './numbers' +import { nextTick } from 'process-nextick-args' +import * as debugModule from 'debug' +import { + AckPacket, + IAuthPacket, + IConnackPacket, + IConnectPacket, + IDisconnectPacket, + IPublishPacket, + ISubackPacket, + ISubscribePacket, + IUnsubackPacket, + IUnsubscribePacket, + Packet, + PacketWithProperties, +} from './types' +import { Writable } from 'stream' + +const debug = debugModule('mqtt-packet:writeToStream') +const empty = Buffer.allocUnsafe(0) +const zeroBuf = Buffer.from([0]) + +const numCache = numbers.cache +const generateNumber = numbers.generateNumber +const generateCache = numbers.generateCache +const genBufVariableByteInt = numbers.genBufVariableByteInt +const generate4ByteBuffer = numbers.generate4ByteBuffer + +let writeNumber: (stream: Writable, number: number) => boolean = + writeNumberCached +let toGenerate: boolean = true + +// Options interface +interface GenerateOptions { + protocolVersion?: number + properties?: Properties +} + +// Properties interface +type Properties = PacketWithProperties['properties'] + +export interface PropertiesData { + length: number + write: () => void +} + +/** + * Generate an MQTT packet + */ +function generate( + packet: Packet, + stream: Writable, + opts?: GenerateOptions, +): boolean { + debug('generate called') + if (stream.cork) { + stream.cork() + nextTick(uncork, stream) + } + + if (toGenerate) { + toGenerate = false + generateCache() + } + debug('generate: packet.cmd: %s', packet.cmd) + switch (packet.cmd) { + case 'connect': + return connect(packet as IConnectPacket, stream, opts) + case 'connack': + return connack(packet as IConnackPacket, stream, opts) + case 'publish': + return publish(packet as IPublishPacket, stream, opts) + case 'puback': + case 'pubrec': + case 'pubrel': + case 'pubcomp': + return confirmation(packet as AckPacket, stream, opts) + case 'subscribe': + return subscribe(packet as ISubscribePacket, stream, opts) + case 'suback': + return suback(packet as ISubackPacket, stream, opts) + case 'unsubscribe': + return unsubscribe(packet as IUnsubscribePacket, stream, opts) + case 'unsuback': + return unsuback(packet as IUnsubackPacket, stream, opts) + case 'pingreq': + case 'pingresp': + return emptyPacket(packet, stream, opts) + case 'disconnect': + return disconnect(packet as IDisconnectPacket, stream, opts) + case 'auth': + return auth(packet as IAuthPacket, stream, opts) + default: + stream.destroy(new Error('Unknown command')) + return false + } +} + +/** + * Controls numbers cache. + * Set to "false" to allocate buffers on-the-flight instead of pre-generated cache + */ +Object.defineProperty(generate, 'cacheNumbers', { + get(): boolean { + return writeNumber === writeNumberCached + }, + set(value: boolean) { + if (value) { + if (!numCache || Object.keys(numCache).length === 0) + toGenerate = true + writeNumber = writeNumberCached + } else { + toGenerate = false + writeNumber = writeNumberGenerated + } + }, +}) + +function uncork(stream: Writable): void { + stream.uncork?.() +} + +function connect( + packet: IConnectPacket, + stream: Writable, + opts?: GenerateOptions, +): boolean { + const settings: Partial = packet || {} + const protocolId = settings.protocolId || 'MQTT' + let protocolVersion = settings.protocolVersion || 4 + const will = settings.will + let clean = settings.clean + const keepalive = settings.keepalive || 0 + const clientId = settings.clientId || '' + const username = settings.username + const password = settings.password + /* mqtt5 new options */ + const properties = settings.properties + + if (clean === undefined) clean = true + + let length = 0 + + // Must be a string and non-falsy + if ( + !protocolId || + (typeof protocolId !== 'string' && !Buffer.isBuffer(protocolId)) + ) { + stream.destroy(new Error('Invalid protocolId')) + return false + } + length += protocolId.length + 2 + + // Must be 3 or 4 or 5 + if ( + protocolVersion !== 3 && + protocolVersion !== 4 && + protocolVersion !== 5 + ) { + stream.destroy(new Error('Invalid protocol version')) + return false + } + length += 1 + + // ClientId might be omitted in 3.1.1 and 5, but only if cleanSession is set to 1 + if ( + (typeof clientId === 'string' || Buffer.isBuffer(clientId)) && + (clientId || protocolVersion >= 4) && + (clientId || clean) + ) { + length += Buffer.byteLength(clientId) + 2 + } else { + if (protocolVersion < 4) { + stream.destroy(new Error('clientId must be supplied before 3.1.1')) + return false + } + if ((clean ? 1 : 0) === 0) { + stream.destroy( + new Error('clientId must be given if cleanSession set to 0'), + ) + return false + } + } + + // Must be a two byte number + if ( + typeof keepalive !== 'number' || + keepalive < 0 || + keepalive > 65535 || + keepalive % 1 !== 0 + ) { + stream.destroy(new Error('Invalid keepalive')) + return false + } + length += 2 + + // Connect flags + length += 1 + + let propertiesData: PropertiesData | null = null + let willProperties: PropertiesData | null = null + + // Properties + if (protocolVersion === 5) { + propertiesData = getProperties(stream, properties) + if (!propertiesData) { + return false + } + length += propertiesData.length + } + + // If will exists... + if (will) { + // It must be an object + if (typeof will !== 'object') { + stream.destroy(new Error('Invalid will')) + return false + } + // It must have topic typeof string + if (!will.topic || typeof will.topic !== 'string') { + stream.destroy(new Error('Invalid will topic')) + return false + } + length += Buffer.byteLength(will.topic) + 2 + + // Payload + length += 2 // payload length + if (will.payload) { + if (will.payload.length >= 0) { + if (typeof will.payload === 'string') { + length += Buffer.byteLength(will.payload) + } else { + length += will.payload.length + } + } else { + stream.destroy(new Error('Invalid will payload')) + return false + } + } + // will properties + if (protocolVersion === 5) { + willProperties = getProperties(stream, will.properties) + if (!willProperties) { + return false + } + length += willProperties.length + } + } + + // Username + let providedUsername = false + if (username != null) { + if (isStringOrBuffer(username)) { + providedUsername = true + length += Buffer.byteLength(username) + 2 + } else { + stream.destroy(new Error('Invalid username')) + return false + } + } + + // Password + if (password != null) { + if (!providedUsername) { + stream.destroy(new Error('Username is required to use password')) + return false + } + + if (isStringOrBuffer(password)) { + length += byteLength(password) + 2 + } else { + stream.destroy(new Error('Invalid password')) + return false + } + } + + // Generate header + stream.write(protocol.CONNECT_HEADER) + + // Generate length + writeVarByteInt(stream, length) + + // Generate protocol ID + writeStringOrBuffer(stream, protocolId) + + if (settings.bridgeMode) { + protocolVersion += 128 + } + + let versionBuffer: Buffer + switch (protocolVersion) { + case 131: + versionBuffer = protocol.VERSION131 + break + case 132: + versionBuffer = protocol.VERSION132 + break + case 4: + versionBuffer = protocol.VERSION4 + break + case 5: + versionBuffer = protocol.VERSION5 + break + default: + versionBuffer = protocol.VERSION3 + } + stream.write(versionBuffer) + + // Connect flags + let flags = 0 + flags |= username != null ? protocol.USERNAME_MASK : 0 + flags |= password != null ? protocol.PASSWORD_MASK : 0 + flags |= will && will.retain ? protocol.WILL_RETAIN_MASK : 0 + flags |= will && will.qos ? will.qos << protocol.WILL_QOS_SHIFT : 0 + flags |= will ? protocol.WILL_FLAG_MASK : 0 + flags |= clean ? protocol.CLEAN_SESSION_MASK : 0 + + stream.write(Buffer.from([flags])) + + // Keepalive + writeNumber(stream, keepalive) + + // Properties + if (protocolVersion === 5 && propertiesData) { + propertiesData.write() + } + + // Client ID + writeStringOrBuffer(stream, clientId) + + // Will + if (will) { + if (protocolVersion === 5 && willProperties) { + willProperties.write() + } + writeString(stream, will.topic) + writeStringOrBuffer(stream, will.payload) + } + + // Username and password + if (username != null) { + writeStringOrBuffer(stream, username) + } + if (password != null) { + writeStringOrBuffer(stream, password) + } + // This is a small packet that happens only once on a stream + // We assume the stream is always free to receive more data after this + return true +} + +function connack( + packet: IConnackPacket, + stream: Writable, + opts?: GenerateOptions, +): boolean { + const version = opts ? opts.protocolVersion : 4 + const settings: Partial = packet || {} + const rc = version === 5 ? settings.reasonCode : settings.returnCode + const properties = settings.properties + let length = 2 // length of rc and sessionHeader + + // Check return code + if (typeof rc !== 'number') { + stream.destroy(new Error('Invalid return code')) + return false + } + // mqtt5 properties + let propertiesData: PropertiesData | null = null + if (version === 5) { + propertiesData = getProperties(stream, properties) + if (!propertiesData) { + return false + } + length += propertiesData.length + } + + stream.write(protocol.CONNACK_HEADER) + // length + writeVarByteInt(stream, length) + stream.write( + settings.sessionPresent ? protocol.SESSIONPRESENT_HEADER : zeroBuf, + ) + + stream.write(Buffer.from([rc])) + if (propertiesData != null) { + propertiesData.write() + } + return true +} + +function publish( + packet: IPublishPacket, + stream: Writable, + opts?: GenerateOptions, +): boolean { + debug('publish: packet: %o', packet) + const version = opts ? opts.protocolVersion : 4 + const settings: Partial = packet || {} + const qos = settings.qos || 0 + const retain = settings.retain ? protocol.RETAIN_MASK : 0 + const topic = settings.topic + const payload = settings.payload || empty + const id = settings.messageId + const properties = settings.properties + + let length = 0 + + // Topic must be a non-empty string or Buffer + if (typeof topic === 'string') length += Buffer.byteLength(topic) + 2 + else if (Buffer.isBuffer(topic)) length += topic.length + 2 + else { + stream.destroy(new Error('Invalid topic')) + return false + } + + // Get the payload length + if (!Buffer.isBuffer(payload)) length += Buffer.byteLength(payload) + else length += payload.length + + // Message ID must a number if qos > 0 + if (qos && typeof id !== 'number') { + stream.destroy(new Error('Invalid messageId')) + return false + } + if (qos) length += 2 + + // mqtt5 properties + let propertiesData: PropertiesData | null = null + if (version === 5) { + propertiesData = getProperties(stream, properties) + if (!propertiesData) { + return false + } + length += propertiesData.length + } + + // Header + stream.write( + protocol.PUBLISH_HEADER[qos][settings.dup ? 1 : 0][retain ? 1 : 0], + ) + + // Remaining length + writeVarByteInt(stream, length) + + // Topic + writeNumber(stream, byteLength(topic)) + stream.write(topic) + + // Message ID + if (qos > 0) writeNumber(stream, id as number) + + // Properties + if (propertiesData != null) { + propertiesData.write() + } + + // Payload + debug('publish: payload: %o', payload) + return stream.write(payload) +} + +/* Puback, pubrec, pubrel and pubcomp */ +function confirmation( + packet: AckPacket, + stream: Writable, + opts?: GenerateOptions, +): boolean { + const version = opts ? opts.protocolVersion : 4 + const settings: Partial = packet || {} + const type = settings.cmd || 'puback' + const id = settings.messageId + const dup = settings.dup && type === 'pubrel' ? protocol.DUP_MASK : 0 + let qos = 0 + const reasonCode = settings.reasonCode || 0 + const properties = settings.properties + let length = version === 5 ? 3 : 2 + + if (type === 'pubrel') qos = 1 + + // Check message ID + if (typeof id !== 'number') { + stream.destroy(new Error('Invalid messageId')) + return false + } + + // Properties mqtt 5 + let propertiesData: PropertiesData | null = null + if (version === 5) { + // Confirm should not add empty property length with no properties (rfc 3.4.2.2.1) + if (typeof properties === 'object') { + propertiesData = getPropertiesByMaximumPacketSize( + stream, + properties, + opts, + length, + ) + if (!propertiesData) { + return false + } + length += propertiesData.length + } + } + + // Header + stream.write(protocol.ACKS[type][qos][dup][0]) + + // Length === 3 is only true of version === 5 and no properties; therefore if reasonCode === 0 we are allowed to skip both bytes - but if we write the reason code we also have to write property length [MQTT-3.4.2-1]. + if (length === 3) length += reasonCode !== 0 ? 1 : -1 + writeVarByteInt(stream, length) + + // Message ID + writeNumber(stream, id) + + // reason code in header - but only if it couldn't be omitted - indicated by length !== 2. + if (version === 5 && length !== 2) { + stream.write(Buffer.from([reasonCode])) + } + + // Properties mqtt 5 + if (propertiesData !== null) { + propertiesData.write() + } else if (length === 4) { + // we have no properties but have written a reason code - so we need to indicate empty properties by filling in a zero. + stream.write(Buffer.from([0])) + } + return true +} + +function subscribe( + packet: ISubscribePacket, + stream: Writable, + opts?: GenerateOptions, +): boolean { + debug('subscribe: packet: ') + const version = opts ? opts.protocolVersion : 4 + const settings: Partial = packet || {} + const dup = settings.dup ? protocol.DUP_MASK : 0 + const id = settings.messageId + const subs = settings.subscriptions + const properties = settings.properties + + let length = 0 + + // Check message ID + if (typeof id !== 'number') { + stream.destroy(new Error('Invalid messageId')) + return false + } + length += 2 + + // Properties mqtt 5 + let propertiesData: PropertiesData | null = null + if (version === 5) { + propertiesData = getProperties(stream, properties) + if (!propertiesData) { + return false + } + length += propertiesData.length + } + + // Check subscriptions + if (typeof subs === 'object' && subs.length) { + for (let i = 0; i < subs.length; i += 1) { + const itopic = subs[i].topic + const iqos = subs[i].qos + + if (typeof itopic !== 'string') { + stream.destroy( + new Error('Invalid subscriptions - invalid topic'), + ) + return false + } + if (typeof iqos !== 'number') { + stream.destroy(new Error('Invalid subscriptions - invalid qos')) + return false + } + + if (version === 5) { + const nl = subs[i].nl || false + if (typeof nl !== 'boolean') { + stream.destroy( + new Error('Invalid subscriptions - invalid No Local'), + ) + return false + } + const rap = subs[i].rap || false + if (typeof rap !== 'boolean') { + stream.destroy( + new Error( + 'Invalid subscriptions - invalid Retain as Published', + ), + ) + return false + } + const rh = subs[i].rh || 0 + if (typeof rh !== 'number' || rh > 2) { + stream.destroy( + new Error( + 'Invalid subscriptions - invalid Retain Handling', + ), + ) + return false + } + } + + length += Buffer.byteLength(itopic) + 2 + 1 + } + } else { + stream.destroy(new Error('Invalid subscriptions')) + return false + } + + // Generate header + debug('subscribe: writing to stream: %o', protocol.SUBSCRIBE_HEADER) + stream.write(protocol.SUBSCRIBE_HEADER[1][dup ? 1 : 0][0]) + + // Generate length + writeVarByteInt(stream, length) + + // Generate message ID + writeNumber(stream, id) + + // Properties mqtt 5 + if (propertiesData !== null) { + propertiesData.write() + } + + let result = true + + // Generate subs + for (const sub of subs) { + const jtopic = sub.topic + const jqos = sub.qos + const jnl = +(sub.nl || false) + const jrap = +(sub.rap || false) + const jrh = sub.rh || 0 + let joptions + + // Write topic string + writeString(stream, jtopic) + + // Options process + joptions = protocol.SUBSCRIBE_OPTIONS_QOS[jqos] + if (version === 5) { + joptions |= jnl ? protocol.SUBSCRIBE_OPTIONS_NL : 0 + joptions |= jrap ? protocol.SUBSCRIBE_OPTIONS_RAP : 0 + joptions |= jrh ? protocol.SUBSCRIBE_OPTIONS_RH[jrh] : 0 + } + // Write options + result = stream.write(Buffer.from([joptions])) + } + + return result +} + +function suback( + packet: ISubackPacket, + stream: Writable, + opts?: GenerateOptions, +): boolean { + const version = opts ? opts.protocolVersion : 4 + const settings = packet || {} + const id = settings.messageId + const granted = settings.granted + const properties = settings.properties + let length = 0 + + // Check message ID + if (typeof id !== 'number') { + stream.destroy(new Error('Invalid messageId')) + return false + } + length += 2 + + // Check granted qos vector + if (typeof granted === 'object' && granted.length) { + for (let i = 0; i < granted.length; i += 1) { + if (typeof granted[i] !== 'number') { + stream.destroy(new Error('Invalid qos vector')) + return false + } + length += 1 + } + } else { + stream.destroy(new Error('Invalid qos vector')) + return false + } + + // Properties mqtt 5 + let propertiesData: PropertiesData | null = null + if (version === 5) { + propertiesData = getPropertiesByMaximumPacketSize( + stream, + properties, + opts, + length, + ) + if (!propertiesData) { + return false + } + length += propertiesData.length + } + + // Header + stream.write(protocol.SUBACK_HEADER) + + // Length + writeVarByteInt(stream, length) + + // Message ID + writeNumber(stream, id) + + // Properties mqtt 5 + if (propertiesData !== null) { + propertiesData.write() + } + + return stream.write(Buffer.from(granted)) +} + +function unsubscribe( + packet: IUnsubscribePacket, + stream: Writable, + opts?: GenerateOptions, +): boolean { + const version = opts ? opts.protocolVersion : 4 + const settings = packet || {} + const id = settings.messageId + const dup = settings.dup ? protocol.DUP_MASK : 0 + const unsubs = settings.unsubscriptions + const properties = settings.properties + + let length = 0 + + // Check message ID + if (typeof id !== 'number') { + stream.destroy(new Error('Invalid messageId')) + return false + } + length += 2 + + // Check unsubs + if (typeof unsubs === 'object' && unsubs.length) { + for (let i = 0; i < unsubs.length; i += 1) { + if (typeof unsubs[i] !== 'string') { + stream.destroy(new Error('Invalid unsubscriptions')) + return false + } + length += Buffer.byteLength(unsubs[i]) + 2 + } + } else { + stream.destroy(new Error('Invalid unsubscriptions')) + return false + } + // Properties mqtt 5 + let propertiesData: PropertiesData | null = null + if (version === 5) { + propertiesData = getProperties(stream, properties) + if (!propertiesData) { + return false + } + length += propertiesData.length + } + + // Header + stream.write(protocol.UNSUBSCRIBE_HEADER[1][dup ? 1 : 0][0]) + + // Length + writeVarByteInt(stream, length) + + // Message ID + writeNumber(stream, id) + + // Properties mqtt 5 + if (propertiesData !== null) { + propertiesData.write() + } + + // Unsubs + let result = true + for (let j = 0; j < unsubs.length; j++) { + result = writeString(stream, unsubs[j]) + } + + return result +} + +function unsuback( + packet: IUnsubackPacket, + stream: Writable, + opts?: GenerateOptions, +): boolean { + const version = opts ? opts.protocolVersion : 4 + const settings = packet || {} + const id = settings.messageId + const dup = settings.dup ? protocol.DUP_MASK : 0 + const granted = settings.granted + const properties = settings.properties + const type = settings.cmd + const qos = 0 + + let length = 2 + + // Check message ID + if (typeof id !== 'number') { + stream.destroy(new Error('Invalid messageId')) + return false + } + + // Check granted + if (version === 5) { + if (typeof granted === 'object' && granted.length) { + for (let i = 0; i < granted.length; i += 1) { + if (typeof granted[i] !== 'number') { + stream.destroy(new Error('Invalid qos vector')) + return false + } + length += 1 + } + } else { + stream.destroy(new Error('Invalid qos vector')) + return false + } + } + + // Properties mqtt 5 + let propertiesData: PropertiesData | null = null + if (version === 5) { + propertiesData = getPropertiesByMaximumPacketSize( + stream, + properties, + opts, + length, + ) + if (!propertiesData) { + return false + } + length += propertiesData.length + } + + // Header + stream.write(protocol.ACKS[type][qos][dup][0]) + + // Length + writeVarByteInt(stream, length) + + // Message ID + writeNumber(stream, id) + + // Properties mqtt 5 + if (propertiesData !== null) { + propertiesData.write() + } + + // Payload + if (version === 5 && granted) { + stream.write(Buffer.from(granted)) + } + return true +} + +function emptyPacket( + packet: Packet, + stream: Writable, + opts?: GenerateOptions, +): boolean { + return stream.write(protocol.EMPTY[packet.cmd]) +} + +function disconnect( + packet: IDisconnectPacket, + stream: Writable, + opts?: GenerateOptions, +): boolean { + const version = opts ? opts.protocolVersion : 4 + const settings = packet || {} + const reasonCode = settings.reasonCode || 0 + const properties = settings.properties + let length = version === 5 ? 1 : 0 + + // Properties mqtt 5 + let propertiesData: PropertiesData | null = null + if (version === 5) { + propertiesData = getPropertiesByMaximumPacketSize( + stream, + properties, + opts, + length, + ) + if (!propertiesData) { + return false + } + length += propertiesData.length + } + + // Header + stream.write(Buffer.from([protocol.codes.disconnect << 4])) + + // Length + writeVarByteInt(stream, length) + + // Reason code in header + if (version === 5) { + stream.write(Buffer.from([reasonCode])) + } + + // Properties mqtt 5 + if (propertiesData !== null) { + propertiesData.write() + } + + return true +} + +function auth( + packet: IAuthPacket, + stream: Writable, + opts?: GenerateOptions, +): boolean { + const version = opts ? opts.protocolVersion : 4 + const settings = packet || {} + const reasonCode = settings.reasonCode || 0 + const properties = settings.properties + let length = version === 5 ? 1 : 0 + + if (version !== 5) + stream.destroy(new Error('Invalid mqtt version for auth packet')) + + // Properties mqtt 5 + const propertiesData = getPropertiesByMaximumPacketSize( + stream, + properties, + opts, + length, + ) + if (!propertiesData) { + return false + } + length += propertiesData.length + + // Header + stream.write(Buffer.from([protocol.codes.auth << 4])) + + // Length + writeVarByteInt(stream, length) + + // Reason code in header + stream.write(Buffer.from([reasonCode])) + + // Properties mqtt 5 + if (propertiesData !== null) { + propertiesData.write() + } + return true +} + +/** + * writeVarByteInt - write an MQTT style variable byte integer to the buffer + * + * @param stream - destination stream + * @param num - number to write + * @returns boolean indicating success + * + * @private + */ +const varByteIntCache: Record = {} +/** + * Write a variable byte integer to the stream + * @param stream - destination stream + * @param num - number to write + * @returns boolean indicating success + * @private + */ +function writeVarByteInt(stream: Writable, num: number): boolean { + if (num > protocol.VARBYTEINT_MAX) { + stream.destroy(new Error(`Invalid variable byte integer: ${num}`)) + return false + } + + let buffer = varByteIntCache[num] + + if (!buffer) { + buffer = genBufVariableByteInt(num) + if (num < 16384) varByteIntCache[num] = buffer + } + debug('writeVarByteInt: writing to stream: %o', buffer) + return stream.write(buffer) +} + +/** + * Write a string to the stream + * @param stream - destination stream + * @param string - string to write + * @returns boolean indicating success + * @private + */ +function writeString(stream: Writable, string: string): boolean { + const strlen = Buffer.byteLength(string) + writeNumber(stream, strlen) + + debug('writeString: %s', string) + return stream.write(string, 'utf8') +} + +/** + * Write a string pair to the stream + * @param stream - destination stream + * @param name - string name to write + * @param value - string value to write + * @returns boolean indicating success + * @private + */ +function writeStringPair(stream: Writable, name: string, value: string): void { + writeString(stream, name) + writeString(stream, value) +} + +/** + * Write a two byte number to the stream using cached number buffers + * @param stream - destination stream + * @param number - number to write + * @returns boolean indicating success + * @private + */ +function writeNumberCached(stream: Writable, number: number): boolean { + debug('writeNumberCached: number: %d', number) + debug('writeNumberCached: %o', numCache[number]) + return stream.write(numCache[number]) +} + +/** + * Write a two byte number to the stream by generating the buffer + * @param stream - destination stream + * @param number - number to write + * @returns boolean indicating success + * @private + */ +function writeNumberGenerated(stream: Writable, number: number): boolean { + const generatedNumber = generateNumber(number) + debug('writeNumberGenerated: %o', generatedNumber) + return stream.write(generatedNumber) +} + +/** + * Write a four byte number to the stream + * @param stream - destination stream + * @param number - number to write + * @returns boolean indicating success + * @private + */ +function write4ByteNumber(stream: Writable, number: number): boolean { + const generated4ByteBuffer = generate4ByteBuffer(number) + debug('write4ByteNumber: %o', generated4ByteBuffer) + return stream.write(generated4ByteBuffer) +} + +/** + * Write a String or Buffer with its length prefix + * @param stream - destination stream + * @param toWrite - String or Buffer + * @returns boolean indicating success + * @private + */ +function writeStringOrBuffer( + stream: Writable, + toWrite: string | Buffer | undefined, +): boolean { + if (typeof toWrite === 'string') { + return writeString(stream, toWrite) + } + if (toWrite) { + writeNumber(stream, toWrite.length) + return stream.write(toWrite) + } + return writeNumber(stream, 0) +} + +/** + * Get the properties object for MQTT 5 + * @param stream - destination stream + * @param properties - properties object + * @returns PropertiesData object or null if invalid + * @private + */ +function getProperties( + stream: Writable, + properties?: Properties, +): PropertiesData | null { + /* connect properties */ + if (typeof properties !== 'object' || properties?.length != null) { + return { + length: 1, + write(): void { + writeProperties(stream, {}, 0) + }, + } + } + + let propertiesLength = 0 + + function getLengthProperty(name: string, value: any): number | false { + const type = protocol.propertiesTypes[name] + let length = 0 + switch (type) { + case 'byte': { + if (typeof value !== 'boolean') { + stream.destroy(new Error(`Invalid ${name}: ${value}`)) + return false + } + length += 1 + 1 + break + } + case 'int8': { + if (typeof value !== 'number' || value < 0 || value > 0xff) { + stream.destroy(new Error(`Invalid ${name}: ${value}`)) + return false + } + length += 1 + 1 + break + } + case 'binary': { + if (value && value === null) { + stream.destroy(new Error(`Invalid ${name}: ${value}`)) + return false + } + length += 1 + Buffer.byteLength(value) + 2 + break + } + case 'int16': { + if (typeof value !== 'number' || value < 0 || value > 0xffff) { + stream.destroy(new Error(`Invalid ${name}: ${value}`)) + return false + } + length += 1 + 2 + break + } + case 'int32': { + if ( + typeof value !== 'number' || + value < 0 || + value > 0xffffffff + ) { + stream.destroy(new Error(`Invalid ${name}: ${value}`)) + return false + } + length += 1 + 4 + break + } + case 'var': { + // var byte integer is max 24 bits packed in 32 bits + if ( + typeof value !== 'number' || + value < 0 || + value > 0x0fffffff + ) { + stream.destroy(new Error(`Invalid ${name}: ${value}`)) + return false + } + length += 1 + Buffer.byteLength(genBufVariableByteInt(value)) + break + } + case 'string': { + if (typeof value !== 'string') { + stream.destroy(new Error(`Invalid ${name}: ${value}`)) + return false + } + length += 1 + 2 + Buffer.byteLength(value.toString()) + break + } + case 'pair': { + if (typeof value !== 'object') { + stream.destroy(new Error(`Invalid ${name}: ${value}`)) + return false + } + length += Object.getOwnPropertyNames(value).reduce( + (result, name) => { + const currentValue = value[name] + if (Array.isArray(currentValue)) { + result += currentValue.reduce( + (currentLength, value) => { + currentLength += + 1 + + 2 + + Buffer.byteLength(name.toString()) + + 2 + + Buffer.byteLength(value.toString()) + return currentLength + }, + 0, + ) + } else { + result += + 1 + + 2 + + Buffer.byteLength(name.toString()) + + 2 + + Buffer.byteLength(value[name].toString()) + } + return result + }, + 0, + ) + break + } + default: { + stream.destroy(new Error(`Invalid property ${name}: ${value}`)) + return false + } + } + return length + } + + if (properties) { + for (const propName in properties) { + let propLength = 0 + let propValueLength: number | false = 0 + const propValue = properties[propName] + + if (Array.isArray(propValue)) { + for ( + let valueIndex = 0; + valueIndex < propValue.length; + valueIndex++ + ) { + propValueLength = getLengthProperty( + propName, + propValue[valueIndex], + ) + if (propValueLength === false) { + return null + } + propLength += propValueLength + } + } else { + propValueLength = getLengthProperty(propName, propValue) + if (propValueLength === false) { + return null + } + propLength = propValueLength + } + + if (!propLength) return null + propertiesLength += propLength + } + } + + const propertiesLengthLength = Buffer.byteLength( + genBufVariableByteInt(propertiesLength), + ) + + return { + length: propertiesLengthLength + propertiesLength, + write(): void { + writeProperties(stream, properties, propertiesLength) + }, + } +} + +/** + * Get properties by maximum packet size + * @param stream - destination stream + * @param properties - properties object + * @param opts - options + * @param length - current length + * @returns PropertiesData object or null if invalid + * @private + */ +function getPropertiesByMaximumPacketSize( + stream: Writable, + properties?: Properties, + opts?: GenerateOptions, + length: number = 0, +): PropertiesData | null { + const mayEmptyProps = ['reasonString', 'userProperties'] + const maximumPacketSize = opts?.properties?.maximumPacketSize || 0 + + let propertiesData = getProperties(stream, properties) + if (maximumPacketSize) { + while ( + propertiesData && + length + propertiesData.length > maximumPacketSize + ) { + const currentMayEmptyProp = mayEmptyProps.shift() + if ( + currentMayEmptyProp && + properties && + properties[currentMayEmptyProp] + ) { + delete properties[currentMayEmptyProp] + propertiesData = getProperties(stream, properties) + } else { + return null + } + } + } + return propertiesData +} + +/** + * Write a property to the stream + * @param stream - destination stream + * @param propName - property name + * @param value - property value + * @returns boolean indicating success + * @private + */ +function writeProperty( + stream: Writable, + propName: string, + value: any, +): boolean { + const type = protocol.propertiesTypes[propName] + switch (type) { + case 'byte': { + stream.write(Buffer.from([protocol.properties[propName]])) + stream.write(Buffer.from([+value])) + break + } + case 'int8': { + stream.write(Buffer.from([protocol.properties[propName]])) + stream.write(Buffer.from([value])) + break + } + case 'binary': { + stream.write(Buffer.from([protocol.properties[propName]])) + writeStringOrBuffer(stream, value) + break + } + case 'int16': { + stream.write(Buffer.from([protocol.properties[propName]])) + writeNumber(stream, value) + break + } + case 'int32': { + stream.write(Buffer.from([protocol.properties[propName]])) + write4ByteNumber(stream, value) + break + } + case 'var': { + stream.write(Buffer.from([protocol.properties[propName]])) + writeVarByteInt(stream, value) + break + } + case 'string': { + stream.write(Buffer.from([protocol.properties[propName]])) + writeString(stream, value) + break + } + case 'pair': { + Object.getOwnPropertyNames(value).forEach((name) => { + const currentValue = value[name] + if (Array.isArray(currentValue)) { + currentValue.forEach((value) => { + stream.write( + Buffer.from([protocol.properties[propName]]), + ) + writeStringPair( + stream, + name.toString(), + value.toString(), + ) + }) + } else { + stream.write(Buffer.from([protocol.properties[propName]])) + writeStringPair( + stream, + name.toString(), + currentValue.toString(), + ) + } + }) + break + } + default: { + stream.destroy( + new Error(`Invalid property ${propName} value: ${value}`), + ) + return false + } + } + return true +} + +/** + * Write properties to stream + * @param stream - destination stream + * @param properties - properties object + * @param propertiesLength - properties length + * @private + */ +function writeProperties( + stream: Writable, + properties: Properties, + propertiesLength: number, +): void { + /* write properties to stream */ + writeVarByteInt(stream, propertiesLength) + for (const propName in properties) { + if ( + Object.prototype.hasOwnProperty.call(properties, propName) && + properties[propName] !== null + ) { + const value = properties[propName] + if (Array.isArray(value)) { + for ( + let valueIndex = 0; + valueIndex < value.length; + valueIndex++ + ) { + writeProperty(stream, propName, value[valueIndex]) + } + } else { + writeProperty(stream, propName, value) + } + } + } +} + +/** + * Get byte length of buffer or string + * @param bufOrString - buffer or string + * @returns byte length + * @private + */ +function byteLength(bufOrString: string | Buffer | undefined): number { + if (!bufOrString) return 0 + if (bufOrString instanceof Buffer) return bufOrString.length + return Buffer.byteLength(bufOrString) +} + +/** + * Check if a field is a string or buffer + * @param field - field to check + * @returns boolean indicating if field is a string or buffer + * @private + */ +function isStringOrBuffer(field: any): boolean { + return typeof field === 'string' || field instanceof Buffer +} diff --git a/test.js b/test/test.js similarity index 100% rename from test.js rename to test/test.js diff --git a/testRandom.js b/test/testRandom.js similarity index 100% rename from testRandom.js rename to test/testRandom.js diff --git a/tsconfig.build.json b/tsconfig.build.json new file mode 100644 index 0000000..a042068 --- /dev/null +++ b/tsconfig.build.json @@ -0,0 +1,5 @@ +{ + "extends": "./tsconfig.json", + "exclude": ["node_modules", "test", "dist", "build"], + } + \ No newline at end of file diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..010f98e --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,32 @@ +{ + "compilerOptions": { + "module": "commonjs", + "declaration": true, + "removeComments": true, + "emitDecoratorMetadata": true, + "experimentalDecorators": true, + "allowSyntheticDefaultImports": true, + "target": "es2017", + "sourceMap": true, + "outDir": "./build", + "baseUrl": "./", + "incremental": true, + "skipLibCheck": true, + "preserveSymlinks": true, + "esModuleInterop": true, + "resolveJsonModule": true, + "typeRoots": [ + "node_modules/@types" + ], + "types": [ + "node", + ], + }, + "include": [ + "src", + "test", + ], + "exclude": [ + "dist", + ] +} \ No newline at end of file diff --git a/types/index.d.ts b/types/index.d.ts deleted file mode 100644 index 96960a8..0000000 --- a/types/index.d.ts +++ /dev/null @@ -1,257 +0,0 @@ -import EventEmitter = NodeJS.EventEmitter -import WritableStream = NodeJS.WritableStream - -export declare type QoS = 0 | 1 | 2 - -export declare type PacketCmd = 'auth' | - 'connack' | - 'connect' | - 'disconnect' | - 'pingreq' | - 'pingresp' | - 'puback' | - 'pubcomp' | - 'publish' | - 'pubrel' | - 'pubrec' | - 'suback' | - 'subscribe' | - 'unsuback' | - 'unsubscribe' - -export declare type UserProperties = {[index: string]: string | string[]} - -export interface IPacket { - cmd: PacketCmd - messageId?: number - length?: number -} - -export interface IAuthPacket extends IPacket { - cmd: 'auth' - reasonCode: number, - properties?: { - authenticationMethod?: string, - authenticationData?: Buffer, - reasonString?: string, - userProperties?: UserProperties, - } -} - -export interface IConnectPacket extends IPacket { - cmd: 'connect' - clientId: string - protocolVersion?: 4 | 5 | 3 - protocolId?: 'MQTT' | 'MQIsdp' - clean?: boolean - keepalive?: number - username?: string - password?: Buffer - will?: { - topic: string - payload: Buffer | string - qos?: QoS - retain?: boolean - properties?: { - willDelayInterval?: number, - payloadFormatIndicator?: boolean, - messageExpiryInterval?: number, - contentType?: string, - responseTopic?: string, - correlationData?: Buffer, - userProperties?: UserProperties - } - } - properties?: { - sessionExpiryInterval?: number, - receiveMaximum?: number, - maximumPacketSize?: number, - topicAliasMaximum?: number, - requestResponseInformation?: boolean, - requestProblemInformation?: boolean, - userProperties?: UserProperties, - authenticationMethod?: string, - authenticationData?: Buffer - } -} - -export interface IPublishPacket extends IPacket { - cmd: 'publish' - qos: QoS - dup: boolean - retain: boolean - topic: string - payload: string | Buffer - properties?: { - payloadFormatIndicator?: boolean, - messageExpiryInterval?: number, - topicAlias?: number, - responseTopic?: string, - correlationData?: Buffer, - userProperties?: UserProperties, - subscriptionIdentifier?: number | number[], - contentType?: string - } -} - -export interface IConnackPacket extends IPacket { - cmd: 'connack' - returnCode?: number, - reasonCode?: number, - sessionPresent: boolean - properties?: { - sessionExpiryInterval?: number, - receiveMaximum?: number, - maximumQoS?: number, - retainAvailable?: boolean, - maximumPacketSize?: number, - assignedClientIdentifier?: string, - topicAliasMaximum?: number, - reasonString?: string, - userProperties?: UserProperties, - wildcardSubscriptionAvailable?: boolean, - subscriptionIdentifiersAvailable?: boolean, - sharedSubscriptionAvailable?: boolean, - serverKeepAlive?: number, - responseInformation?: string, - serverReference?: string, - authenticationMethod?: string, - authenticationData?: Buffer - } -} - -export interface ISubscription { - topic: string - qos: QoS, - nl?: boolean, - rap?: boolean, - rh?: number -} - -export interface ISubscribePacket extends IPacket { - cmd: 'subscribe' - subscriptions: ISubscription[], - properties?: { - reasonString?: string, - subscriptionIdentifier?: number, - userProperties?: UserProperties - } -} - -export interface ISubackPacket extends IPacket { - cmd: 'suback', - reasonCode?: number, - properties?: { - reasonString?: string, - userProperties?: UserProperties - }, - granted: number[] | Object[] -} - -export interface IUnsubscribePacket extends IPacket { - cmd: 'unsubscribe', - properties?: { - reasonString?: string, - userProperties?: UserProperties - }, - unsubscriptions: string[] -} - -export interface IUnsubackPacket extends IPacket { - cmd: 'unsuback', - reasonCode?: number, - properties?: { - reasonString?: string, - userProperties?: UserProperties - }, - granted: number[] -} - -export interface IPubackPacket extends IPacket { - cmd: 'puback', - reasonCode?: number, - properties?: { - reasonString?: string, - userProperties?: UserProperties - } -} - -export interface IPubcompPacket extends IPacket { - cmd: 'pubcomp', - reasonCode?: number, - properties?: { - reasonString?: string, - userProperties?: UserProperties - } -} - -export interface IPubrelPacket extends IPacket { - cmd: 'pubrel', - reasonCode?: number, - properties?: { - reasonString?: string, - userProperties?: UserProperties - } -} - -export interface IPubrecPacket extends IPacket { - cmd: 'pubrec', - reasonCode?: number, - properties?: { - reasonString?: string, - userProperties?: UserProperties - } -} - -export interface IPingreqPacket extends IPacket { - cmd: 'pingreq' -} - -export interface IPingrespPacket extends IPacket { - cmd: 'pingresp' -} - -export interface IDisconnectPacket extends IPacket { - cmd: 'disconnect', - reasonCode?: number, - properties?: { - sessionExpiryInterval?: number, - reasonString?: string, - userProperties?: UserProperties, - serverReference?: string - } -} - -export declare type Packet = IConnectPacket | - IPublishPacket | - IConnackPacket | - ISubscribePacket | - ISubackPacket | - IUnsubscribePacket | - IUnsubackPacket | - IPubackPacket | - IPubcompPacket | - IPubrelPacket | - IPingreqPacket | - IPingrespPacket | - IDisconnectPacket | - IPubrecPacket | - IAuthPacket - -export interface Parser extends EventEmitter { - on(event: 'packet', callback: (packet: Packet) => void): this - - on(event: 'error', callback: (error: any) => void): this - - parse(buffer: Buffer, opts?: Object): number -} - -export declare function parser(opts?: Object): Parser - -export declare function generate(packet: Packet, opts?: Object): Buffer - -export declare function writeToStream(object: Packet, stream: WritableStream, opts?: Object): boolean - -export declare namespace writeToStream { - let cacheNumbers: boolean -} diff --git a/writeToStream.js b/writeToStream.js deleted file mode 100644 index 74ca766..0000000 --- a/writeToStream.js +++ /dev/null @@ -1,1127 +0,0 @@ -const protocol = require('./constants') -const { Buffer } = require('buffer') -const empty = Buffer.allocUnsafe(0) -const zeroBuf = Buffer.from([0]) -const numbers = require('./numbers') -const nextTick = require('process-nextick-args').nextTick -const debug = require('debug')('mqtt-packet:writeToStream') - -const numCache = numbers.cache -const generateNumber = numbers.generateNumber -const generateCache = numbers.generateCache -const genBufVariableByteInt = numbers.genBufVariableByteInt -const generate4ByteBuffer = numbers.generate4ByteBuffer -let writeNumber = writeNumberCached -let toGenerate = true - -function generate (packet, stream, opts) { - debug('generate called') - if (stream.cork) { - stream.cork() - nextTick(uncork, stream) - } - - if (toGenerate) { - toGenerate = false - generateCache() - } - debug('generate: packet.cmd: %s', packet.cmd) - switch (packet.cmd) { - case 'connect': - return connect(packet, stream, opts) - case 'connack': - return connack(packet, stream, opts) - case 'publish': - return publish(packet, stream, opts) - case 'puback': - case 'pubrec': - case 'pubrel': - case 'pubcomp': - return confirmation(packet, stream, opts) - case 'subscribe': - return subscribe(packet, stream, opts) - case 'suback': - return suback(packet, stream, opts) - case 'unsubscribe': - return unsubscribe(packet, stream, opts) - case 'unsuback': - return unsuback(packet, stream, opts) - case 'pingreq': - case 'pingresp': - return emptyPacket(packet, stream, opts) - case 'disconnect': - return disconnect(packet, stream, opts) - case 'auth': - return auth(packet, stream, opts) - default: - stream.destroy(new Error('Unknown command')) - return false - } -} -/** - * Controls numbers cache. - * Set to "false" to allocate buffers on-the-flight instead of pre-generated cache - */ -Object.defineProperty(generate, 'cacheNumbers', { - get () { - return writeNumber === writeNumberCached - }, - set (value) { - if (value) { - if (!numCache || Object.keys(numCache).length === 0) toGenerate = true - writeNumber = writeNumberCached - } else { - toGenerate = false - writeNumber = writeNumberGenerated - } - } -}) - -function uncork (stream) { - stream.uncork() -} - -function connect (packet, stream, opts) { - const settings = packet || {} - const protocolId = settings.protocolId || 'MQTT' - let protocolVersion = settings.protocolVersion || 4 - const will = settings.will - let clean = settings.clean - const keepalive = settings.keepalive || 0 - const clientId = settings.clientId || '' - const username = settings.username - const password = settings.password - /* mqtt5 new oprions */ - const properties = settings.properties - - if (clean === undefined) clean = true - - let length = 0 - - // Must be a string and non-falsy - if (!protocolId || - (typeof protocolId !== 'string' && !Buffer.isBuffer(protocolId))) { - stream.destroy(new Error('Invalid protocolId')) - return false - } else length += protocolId.length + 2 - - // Must be 3 or 4 or 5 - if (protocolVersion !== 3 && protocolVersion !== 4 && protocolVersion !== 5) { - stream.destroy(new Error('Invalid protocol version')) - return false - } else length += 1 - - // ClientId might be omitted in 3.1.1 and 5, but only if cleanSession is set to 1 - if ((typeof clientId === 'string' || Buffer.isBuffer(clientId)) && - (clientId || protocolVersion >= 4) && (clientId || clean)) { - length += Buffer.byteLength(clientId) + 2 - } else { - if (protocolVersion < 4) { - stream.destroy(new Error('clientId must be supplied before 3.1.1')) - return false - } - if ((clean * 1) === 0) { - stream.destroy(new Error('clientId must be given if cleanSession set to 0')) - return false - } - } - - // Must be a two byte number - if (typeof keepalive !== 'number' || - keepalive < 0 || - keepalive > 65535 || - keepalive % 1 !== 0) { - stream.destroy(new Error('Invalid keepalive')) - return false - } else length += 2 - - // Connect flags - length += 1 - - let propertiesData - let willProperties - - // Properties - if (protocolVersion === 5) { - propertiesData = getProperties(stream, properties) - if (!propertiesData) { return false } - length += propertiesData.length - } - - // If will exists... - if (will) { - // It must be an object - if (typeof will !== 'object') { - stream.destroy(new Error('Invalid will')) - return false - } - // It must have topic typeof string - if (!will.topic || typeof will.topic !== 'string') { - stream.destroy(new Error('Invalid will topic')) - return false - } else { - length += Buffer.byteLength(will.topic) + 2 - } - - // Payload - length += 2 // payload length - if (will.payload) { - if (will.payload.length >= 0) { - if (typeof will.payload === 'string') { - length += Buffer.byteLength(will.payload) - } else { - length += will.payload.length - } - } else { - stream.destroy(new Error('Invalid will payload')) - return false - } - } - // will properties - willProperties = {} - if (protocolVersion === 5) { - willProperties = getProperties(stream, will.properties) - if (!willProperties) { return false } - length += willProperties.length - } - } - - // Username - let providedUsername = false - if (username != null) { - if (isStringOrBuffer(username)) { - providedUsername = true - length += Buffer.byteLength(username) + 2 - } else { - stream.destroy(new Error('Invalid username')) - return false - } - } - - // Password - if (password != null) { - if (!providedUsername) { - stream.destroy(new Error('Username is required to use password')) - return false - } - - if (isStringOrBuffer(password)) { - length += byteLength(password) + 2 - } else { - stream.destroy(new Error('Invalid password')) - return false - } - } - - // Generate header - stream.write(protocol.CONNECT_HEADER) - - // Generate length - writeVarByteInt(stream, length) - - // Generate protocol ID - writeStringOrBuffer(stream, protocolId) - - if (settings.bridgeMode) { - protocolVersion += 128 - } - - stream.write( - protocolVersion === 131 - ? protocol.VERSION131 - : protocolVersion === 132 - ? protocol.VERSION132 - : protocolVersion === 4 - ? protocol.VERSION4 - : protocolVersion === 5 - ? protocol.VERSION5 - : protocol.VERSION3 - ) - - // Connect flags - let flags = 0 - flags |= (username != null) ? protocol.USERNAME_MASK : 0 - flags |= (password != null) ? protocol.PASSWORD_MASK : 0 - flags |= (will && will.retain) ? protocol.WILL_RETAIN_MASK : 0 - flags |= (will && will.qos) ? will.qos << protocol.WILL_QOS_SHIFT : 0 - flags |= will ? protocol.WILL_FLAG_MASK : 0 - flags |= clean ? protocol.CLEAN_SESSION_MASK : 0 - - stream.write(Buffer.from([flags])) - - // Keepalive - writeNumber(stream, keepalive) - - // Properties - if (protocolVersion === 5) { - propertiesData.write() - } - - // Client ID - writeStringOrBuffer(stream, clientId) - - // Will - if (will) { - if (protocolVersion === 5) { - willProperties.write() - } - writeString(stream, will.topic) - writeStringOrBuffer(stream, will.payload) - } - - // Username and password - if (username != null) { - writeStringOrBuffer(stream, username) - } - if (password != null) { - writeStringOrBuffer(stream, password) - } - // This is a small packet that happens only once on a stream - // We assume the stream is always free to receive more data after this - return true -} - -function connack (packet, stream, opts) { - const version = opts ? opts.protocolVersion : 4 - const settings = packet || {} - const rc = version === 5 ? settings.reasonCode : settings.returnCode - const properties = settings.properties - let length = 2 // length of rc and sessionHeader - - // Check return code - if (typeof rc !== 'number') { - stream.destroy(new Error('Invalid return code')) - return false - } - // mqtt5 properties - let propertiesData = null - if (version === 5) { - propertiesData = getProperties(stream, properties) - if (!propertiesData) { return false } - length += propertiesData.length - } - - stream.write(protocol.CONNACK_HEADER) - // length - writeVarByteInt(stream, length) - stream.write(settings.sessionPresent ? protocol.SESSIONPRESENT_HEADER : zeroBuf) - - stream.write(Buffer.from([rc])) - if (propertiesData != null) { - propertiesData.write() - } - return true -} - -function publish (packet, stream, opts) { - debug('publish: packet: %o', packet) - const version = opts ? opts.protocolVersion : 4 - const settings = packet || {} - const qos = settings.qos || 0 - const retain = settings.retain ? protocol.RETAIN_MASK : 0 - const topic = settings.topic - const payload = settings.payload || empty - const id = settings.messageId - const properties = settings.properties - - let length = 0 - - // Topic must be a non-empty string or Buffer - if (typeof topic === 'string') length += Buffer.byteLength(topic) + 2 - else if (Buffer.isBuffer(topic)) length += topic.length + 2 - else { - stream.destroy(new Error('Invalid topic')) - return false - } - - // Get the payload length - if (!Buffer.isBuffer(payload)) length += Buffer.byteLength(payload) - else length += payload.length - - // Message ID must a number if qos > 0 - if (qos && typeof id !== 'number') { - stream.destroy(new Error('Invalid messageId')) - return false - } else if (qos) length += 2 - - // mqtt5 properties - let propertiesData = null - if (version === 5) { - propertiesData = getProperties(stream, properties) - if (!propertiesData) { return false } - length += propertiesData.length - } - - // Header - stream.write(protocol.PUBLISH_HEADER[qos][settings.dup ? 1 : 0][retain ? 1 : 0]) - - // Remaining length - writeVarByteInt(stream, length) - - // Topic - writeNumber(stream, byteLength(topic)) - stream.write(topic) - - // Message ID - if (qos > 0) writeNumber(stream, id) - - // Properties - if (propertiesData != null) { - propertiesData.write() - } - - // Payload - debug('publish: payload: %o', payload) - return stream.write(payload) -} - -/* Puback, pubrec, pubrel and pubcomp */ -function confirmation (packet, stream, opts) { - const version = opts ? opts.protocolVersion : 4 - const settings = packet || {} - const type = settings.cmd || 'puback' - const id = settings.messageId - const dup = (settings.dup && type === 'pubrel') ? protocol.DUP_MASK : 0 - let qos = 0 - const reasonCode = settings.reasonCode - const properties = settings.properties - let length = version === 5 ? 3 : 2 - - if (type === 'pubrel') qos = 1 - - // Check message ID - if (typeof id !== 'number') { - stream.destroy(new Error('Invalid messageId')) - return false - } - - // properies mqtt 5 - let propertiesData = null - if (version === 5) { - // Confirm should not add empty property length with no properties (rfc 3.4.2.2.1) - if (typeof properties === 'object') { - propertiesData = getPropertiesByMaximumPacketSize(stream, properties, opts, length) - if (!propertiesData) { return false } - length += propertiesData.length - } - } - - // Header - stream.write(protocol.ACKS[type][qos][dup][0]) - - // Length === 3 is only true of version === 5 and no properties; therefore if reasonCode === 0 we are allowed to skip both bytes - but if we write the reason code we also have to write property length [MQTT-3.4.2-1]. - if (length === 3) length += reasonCode !== 0 ? 1 : -1 - writeVarByteInt(stream, length) - - // Message ID - writeNumber(stream, id) - - // reason code in header - but only if it couldn't be omitted - indicated by length !== 2. - if (version === 5 && length !== 2) { - stream.write(Buffer.from([reasonCode])) - } - - // properties mqtt 5 - if (propertiesData !== null) { - propertiesData.write() - } else { - if (length === 4) { - // we have no properties but have written a reason code - so we need to indicate empty properties by filling in a zero. - stream.write(Buffer.from([0])) - } - } - return true -} - -function subscribe (packet, stream, opts) { - debug('subscribe: packet: ') - const version = opts ? opts.protocolVersion : 4 - const settings = packet || {} - const dup = settings.dup ? protocol.DUP_MASK : 0 - const id = settings.messageId - const subs = settings.subscriptions - const properties = settings.properties - - let length = 0 - - // Check message ID - if (typeof id !== 'number') { - stream.destroy(new Error('Invalid messageId')) - return false - } else length += 2 - - // properies mqtt 5 - let propertiesData = null - if (version === 5) { - propertiesData = getProperties(stream, properties) - if (!propertiesData) { return false } - length += propertiesData.length - } - - // Check subscriptions - if (typeof subs === 'object' && subs.length) { - for (let i = 0; i < subs.length; i += 1) { - const itopic = subs[i].topic - const iqos = subs[i].qos - - if (typeof itopic !== 'string') { - stream.destroy(new Error('Invalid subscriptions - invalid topic')) - return false - } - if (typeof iqos !== 'number') { - stream.destroy(new Error('Invalid subscriptions - invalid qos')) - return false - } - - if (version === 5) { - const nl = subs[i].nl || false - if (typeof nl !== 'boolean') { - stream.destroy(new Error('Invalid subscriptions - invalid No Local')) - return false - } - const rap = subs[i].rap || false - if (typeof rap !== 'boolean') { - stream.destroy(new Error('Invalid subscriptions - invalid Retain as Published')) - return false - } - const rh = subs[i].rh || 0 - if (typeof rh !== 'number' || rh > 2) { - stream.destroy(new Error('Invalid subscriptions - invalid Retain Handling')) - return false - } - } - - length += Buffer.byteLength(itopic) + 2 + 1 - } - } else { - stream.destroy(new Error('Invalid subscriptions')) - return false - } - - // Generate header - debug('subscribe: writing to stream: %o', protocol.SUBSCRIBE_HEADER) - stream.write(protocol.SUBSCRIBE_HEADER[1][dup ? 1 : 0][0]) - - // Generate length - writeVarByteInt(stream, length) - - // Generate message ID - writeNumber(stream, id) - - // properies mqtt 5 - if (propertiesData !== null) { - propertiesData.write() - } - - let result = true - - // Generate subs - for (const sub of subs) { - const jtopic = sub.topic - const jqos = sub.qos - const jnl = +sub.nl - const jrap = +sub.rap - const jrh = sub.rh - let joptions - - // Write topic string - writeString(stream, jtopic) - - // options process - joptions = protocol.SUBSCRIBE_OPTIONS_QOS[jqos] - if (version === 5) { - joptions |= jnl ? protocol.SUBSCRIBE_OPTIONS_NL : 0 - joptions |= jrap ? protocol.SUBSCRIBE_OPTIONS_RAP : 0 - joptions |= jrh ? protocol.SUBSCRIBE_OPTIONS_RH[jrh] : 0 - } - // Write options - result = stream.write(Buffer.from([joptions])) - } - - return result -} - -function suback (packet, stream, opts) { - const version = opts ? opts.protocolVersion : 4 - const settings = packet || {} - const id = settings.messageId - const granted = settings.granted - const properties = settings.properties - let length = 0 - - // Check message ID - if (typeof id !== 'number') { - stream.destroy(new Error('Invalid messageId')) - return false - } else length += 2 - - // Check granted qos vector - if (typeof granted === 'object' && granted.length) { - for (let i = 0; i < granted.length; i += 1) { - if (typeof granted[i] !== 'number') { - stream.destroy(new Error('Invalid qos vector')) - return false - } - length += 1 - } - } else { - stream.destroy(new Error('Invalid qos vector')) - return false - } - - // properies mqtt 5 - let propertiesData = null - if (version === 5) { - propertiesData = getPropertiesByMaximumPacketSize(stream, properties, opts, length) - if (!propertiesData) { return false } - length += propertiesData.length - } - - // header - stream.write(protocol.SUBACK_HEADER) - - // Length - writeVarByteInt(stream, length) - - // Message ID - writeNumber(stream, id) - - // properies mqtt 5 - if (propertiesData !== null) { - propertiesData.write() - } - - return stream.write(Buffer.from(granted)) -} - -function unsubscribe (packet, stream, opts) { - const version = opts ? opts.protocolVersion : 4 - const settings = packet || {} - const id = settings.messageId - const dup = settings.dup ? protocol.DUP_MASK : 0 - const unsubs = settings.unsubscriptions - const properties = settings.properties - - let length = 0 - - // Check message ID - if (typeof id !== 'number') { - stream.destroy(new Error('Invalid messageId')) - return false - } else { - length += 2 - } - // Check unsubs - if (typeof unsubs === 'object' && unsubs.length) { - for (let i = 0; i < unsubs.length; i += 1) { - if (typeof unsubs[i] !== 'string') { - stream.destroy(new Error('Invalid unsubscriptions')) - return false - } - length += Buffer.byteLength(unsubs[i]) + 2 - } - } else { - stream.destroy(new Error('Invalid unsubscriptions')) - return false - } - // properies mqtt 5 - let propertiesData = null - if (version === 5) { - propertiesData = getProperties(stream, properties) - if (!propertiesData) { return false } - length += propertiesData.length - } - - // Header - stream.write(protocol.UNSUBSCRIBE_HEADER[1][dup ? 1 : 0][0]) - - // Length - writeVarByteInt(stream, length) - - // Message ID - writeNumber(stream, id) - - // properies mqtt 5 - if (propertiesData !== null) { - propertiesData.write() - } - - // Unsubs - let result = true - for (let j = 0; j < unsubs.length; j++) { - result = writeString(stream, unsubs[j]) - } - - return result -} - -function unsuback (packet, stream, opts) { - const version = opts ? opts.protocolVersion : 4 - const settings = packet || {} - const id = settings.messageId - const dup = settings.dup ? protocol.DUP_MASK : 0 - const granted = settings.granted - const properties = settings.properties - const type = settings.cmd - const qos = 0 - - let length = 2 - - // Check message ID - if (typeof id !== 'number') { - stream.destroy(new Error('Invalid messageId')) - return false - } - - // Check granted - if (version === 5) { - if (typeof granted === 'object' && granted.length) { - for (let i = 0; i < granted.length; i += 1) { - if (typeof granted[i] !== 'number') { - stream.destroy(new Error('Invalid qos vector')) - return false - } - length += 1 - } - } else { - stream.destroy(new Error('Invalid qos vector')) - return false - } - } - - // properies mqtt 5 - let propertiesData = null - if (version === 5) { - propertiesData = getPropertiesByMaximumPacketSize(stream, properties, opts, length) - if (!propertiesData) { return false } - length += propertiesData.length - } - - // Header - stream.write(protocol.ACKS[type][qos][dup][0]) - - // Length - writeVarByteInt(stream, length) - - // Message ID - writeNumber(stream, id) - - // properies mqtt 5 - if (propertiesData !== null) { - propertiesData.write() - } - - // payload - if (version === 5) { - stream.write(Buffer.from(granted)) - } - return true -} - -function emptyPacket (packet, stream, opts) { - return stream.write(protocol.EMPTY[packet.cmd]) -} - -function disconnect (packet, stream, opts) { - const version = opts ? opts.protocolVersion : 4 - const settings = packet || {} - const reasonCode = settings.reasonCode - const properties = settings.properties - let length = version === 5 ? 1 : 0 - - // properies mqtt 5 - let propertiesData = null - if (version === 5) { - propertiesData = getPropertiesByMaximumPacketSize(stream, properties, opts, length) - if (!propertiesData) { return false } - length += propertiesData.length - } - - // Header - stream.write(Buffer.from([protocol.codes.disconnect << 4])) - - // Length - writeVarByteInt(stream, length) - - // reason code in header - if (version === 5) { - stream.write(Buffer.from([reasonCode])) - } - - // properies mqtt 5 - if (propertiesData !== null) { - propertiesData.write() - } - - return true -} - -function auth (packet, stream, opts) { - const version = opts ? opts.protocolVersion : 4 - const settings = packet || {} - const reasonCode = settings.reasonCode - const properties = settings.properties - let length = version === 5 ? 1 : 0 - - if (version !== 5) stream.destroy(new Error('Invalid mqtt version for auth packet')) - - // properies mqtt 5 - const propertiesData = getPropertiesByMaximumPacketSize(stream, properties, opts, length) - if (!propertiesData) { return false } - length += propertiesData.length - - // Header - stream.write(Buffer.from([protocol.codes.auth << 4])) - - // Length - writeVarByteInt(stream, length) - - // reason code in header - stream.write(Buffer.from([reasonCode])) - - // properies mqtt 5 - if (propertiesData !== null) { - propertiesData.write() - } - return true -} - -/** - * writeVarByteInt - write an MQTT style variable byte integer to the buffer - * - * @param buffer - destination - * @param pos - offset - * @param length - length (>0) - * @returns number of bytes written - * - * @api private - */ - -const varByteIntCache = {} -function writeVarByteInt (stream, num) { - if (num > protocol.VARBYTEINT_MAX) { - stream.destroy(new Error(`Invalid variable byte integer: ${num}`)) - return false - } - - let buffer = varByteIntCache[num] - - if (!buffer) { - buffer = genBufVariableByteInt(num) - if (num < 16384) varByteIntCache[num] = buffer - } - debug('writeVarByteInt: writing to stream: %o', buffer) - return stream.write(buffer) -} - -/** - * writeString - write a utf8 string to the buffer - * - * @param buffer - destination - * @param pos - offset - * @param string - string to write - * @return number of bytes written - * - * @api private - */ - -function writeString (stream, string) { - const strlen = Buffer.byteLength(string) - writeNumber(stream, strlen) - - debug('writeString: %s', string) - return stream.write(string, 'utf8') -} - -/** - * writeStringPair - write a utf8 string pairs to the buffer - * - * @param buffer - destination - * @param name - string name to write - * @param value - string value to write - * @return number of bytes written - * - * @api private - */ -function writeStringPair (stream, name, value) { - writeString(stream, name) - writeString(stream, value) -} - -/** - * writeNumber - write a two byte number to the buffer - * - * @param buffer - destination - * @param pos - offset - * @param number - number to write - * @return number of bytes written - * - * @api private - */ -function writeNumberCached (stream, number) { - debug('writeNumberCached: number: %d', number) - debug('writeNumberCached: %o', numCache[number]) - return stream.write(numCache[number]) -} -function writeNumberGenerated (stream, number) { - const generatedNumber = generateNumber(number) - debug('writeNumberGenerated: %o', generatedNumber) - return stream.write(generatedNumber) -} -function write4ByteNumber (stream, number) { - const generated4ByteBuffer = generate4ByteBuffer(number) - debug('write4ByteNumber: %o', generated4ByteBuffer) - return stream.write(generated4ByteBuffer) -} -/** - * writeStringOrBuffer - write a String or Buffer with the its length prefix - * - * @param buffer - destination - * @param pos - offset - * @param toWrite - String or Buffer - * @return number of bytes written - */ -function writeStringOrBuffer (stream, toWrite) { - if (typeof toWrite === 'string') { - writeString(stream, toWrite) - } else if (toWrite) { - writeNumber(stream, toWrite.length) - stream.write(toWrite) - } else writeNumber(stream, 0) -} - -function getProperties (stream, properties) { - /* connect properties */ - if (typeof properties !== 'object' || properties.length != null) { - return { - length: 1, - write () { - writeProperties(stream, {}, 0) - } - } - } - let propertiesLength = 0 - function getLengthProperty (name, value) { - const type = protocol.propertiesTypes[name] - let length = 0 - switch (type) { - case 'byte': { - if (typeof value !== 'boolean') { - stream.destroy(new Error(`Invalid ${name}: ${value}`)) - return false - } - length += 1 + 1 - break - } - case 'int8': { - if (typeof value !== 'number' || value < 0 || value > 0xff) { - stream.destroy(new Error(`Invalid ${name}: ${value}`)) - return false - } - length += 1 + 1 - break - } - case 'binary': { - if (value && value === null) { - stream.destroy(new Error(`Invalid ${name}: ${value}`)) - return false - } - length += 1 + Buffer.byteLength(value) + 2 - break - } - case 'int16': { - if (typeof value !== 'number' || value < 0 || value > 0xffff) { - stream.destroy(new Error(`Invalid ${name}: ${value}`)) - return false - } - length += 1 + 2 - break - } - case 'int32': { - if (typeof value !== 'number' || value < 0 || value > 0xffffffff) { - stream.destroy(new Error(`Invalid ${name}: ${value}`)) - return false - } - length += 1 + 4 - break - } - case 'var': { - // var byte integer is max 24 bits packed in 32 bits - if (typeof value !== 'number' || value < 0 || value > 0x0fffffff) { - stream.destroy(new Error(`Invalid ${name}: ${value}`)) - return false - } - length += 1 + Buffer.byteLength(genBufVariableByteInt(value)) - break - } - case 'string': { - if (typeof value !== 'string') { - stream.destroy(new Error(`Invalid ${name}: ${value}`)) - return false - } - length += 1 + 2 + Buffer.byteLength(value.toString()) - break - } - case 'pair': { - if (typeof value !== 'object') { - stream.destroy(new Error(`Invalid ${name}: ${value}`)) - return false - } - length += Object.getOwnPropertyNames(value).reduce((result, name) => { - const currentValue = value[name] - if (Array.isArray(currentValue)) { - result += currentValue.reduce((currentLength, value) => { - currentLength += 1 + 2 + Buffer.byteLength(name.toString()) + 2 + Buffer.byteLength(value.toString()) - return currentLength - }, 0) - } else { - result += 1 + 2 + Buffer.byteLength(name.toString()) + 2 + Buffer.byteLength(value[name].toString()) - } - return result - }, 0) - break - } - default: { - stream.destroy(new Error(`Invalid property ${name}: ${value}`)) - return false - } - } - return length - } - if (properties) { - for (const propName in properties) { - let propLength = 0 - let propValueLength = 0 - const propValue = properties[propName] - if (Array.isArray(propValue)) { - for (let valueIndex = 0; valueIndex < propValue.length; valueIndex++) { - propValueLength = getLengthProperty(propName, propValue[valueIndex]) - if (!propValueLength) { return false } - propLength += propValueLength - } - } else { - propValueLength = getLengthProperty(propName, propValue) - if (!propValueLength) { return false } - propLength = propValueLength - } - if (!propLength) return false - propertiesLength += propLength - } - } - const propertiesLengthLength = Buffer.byteLength(genBufVariableByteInt(propertiesLength)) - - return { - length: propertiesLengthLength + propertiesLength, - write () { - writeProperties(stream, properties, propertiesLength) - } - } -} - -function getPropertiesByMaximumPacketSize (stream, properties, opts, length) { - const mayEmptyProps = ['reasonString', 'userProperties'] - const maximumPacketSize = opts && opts.properties && opts.properties.maximumPacketSize ? opts.properties.maximumPacketSize : 0 - - let propertiesData = getProperties(stream, properties) - if (maximumPacketSize) { - while (length + propertiesData.length > maximumPacketSize) { - const currentMayEmptyProp = mayEmptyProps.shift() - if (currentMayEmptyProp && properties[currentMayEmptyProp]) { - delete properties[currentMayEmptyProp] - propertiesData = getProperties(stream, properties) - } else { - return false - } - } - } - return propertiesData -} - -function writeProperty (stream, propName, value) { - const type = protocol.propertiesTypes[propName] - switch (type) { - case 'byte': { - stream.write(Buffer.from([protocol.properties[propName]])) - stream.write(Buffer.from([+value])) - break - } - case 'int8': { - stream.write(Buffer.from([protocol.properties[propName]])) - stream.write(Buffer.from([value])) - break - } - case 'binary': { - stream.write(Buffer.from([protocol.properties[propName]])) - writeStringOrBuffer(stream, value) - break - } - case 'int16': { - stream.write(Buffer.from([protocol.properties[propName]])) - writeNumber(stream, value) - break - } - case 'int32': { - stream.write(Buffer.from([protocol.properties[propName]])) - write4ByteNumber(stream, value) - break - } - case 'var': { - stream.write(Buffer.from([protocol.properties[propName]])) - writeVarByteInt(stream, value) - break - } - case 'string': { - stream.write(Buffer.from([protocol.properties[propName]])) - writeString(stream, value) - break - } - case 'pair': { - Object.getOwnPropertyNames(value).forEach(name => { - const currentValue = value[name] - if (Array.isArray(currentValue)) { - currentValue.forEach(value => { - stream.write(Buffer.from([protocol.properties[propName]])) - writeStringPair(stream, name.toString(), value.toString()) - }) - } else { - stream.write(Buffer.from([protocol.properties[propName]])) - writeStringPair(stream, name.toString(), currentValue.toString()) - } - }) - break - } - default: { - stream.destroy(new Error(`Invalid property ${propName} value: ${value}`)) - return false - } - } -} - -function writeProperties (stream, properties, propertiesLength) { - /* write properties to stream */ - writeVarByteInt(stream, propertiesLength) - for (const propName in properties) { - if (Object.prototype.hasOwnProperty.call(properties, propName) && properties[propName] !== null) { - const value = properties[propName] - if (Array.isArray(value)) { - for (let valueIndex = 0; valueIndex < value.length; valueIndex++) { - writeProperty(stream, propName, value[valueIndex]) - } - } else { - writeProperty(stream, propName, value) - } - } - } -} - -function byteLength (bufOrString) { - if (!bufOrString) return 0 - else if (bufOrString instanceof Buffer) return bufOrString.length - else return Buffer.byteLength(bufOrString) -} - -function isStringOrBuffer (field) { - return typeof field === 'string' || field instanceof Buffer -} - -module.exports = generate