Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ node_modules
# Optional REPL history
.node_repl_history
lib
.idea
7 changes: 5 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
"description": "Hudson-Taylor TCP transport",
"main": "lib/index.js",
"scripts": {
"test": "semistandard && mocha -r babel-register -R spec --bail",
"lint": "semistandard | snazzy",
"check": "mocha -r babel-register -R spec --bail",
"test": "npm run lint && npm run check",
"build": "babel src -d lib",
"prepublish": "npm run build"
},
Expand All @@ -29,7 +31,8 @@
"babel-register": "^6.5.2",
"mocha": "^2.4.5",
"openport": "0.0.4",
"semistandard": "^7.0.5"
"semistandard": "^7.0.5",
"snazzy": "^6.0.0"
},
"dependencies": {}
}
29 changes: 29 additions & 0 deletions src/chunkDecoder.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
'use strict';

import {decode, CONSTANTS} from './protocol';

const chunkDecoder = (onDecoded) => {
let left = Buffer.alloc(0);

return function onDataChunk (chunk) {
let start = 0;
while (chunk.indexOf(CONSTANTS.PACKET_END, start) !== -1 && start < chunk.length) {
const offset = chunk.indexOf(CONSTANTS.PACKET_END, start);
let packet;
if (start === 0 && left.length) {
packet = Buffer.concat([left, chunk.slice(start, offset)]);
left = Buffer.alloc(0);
} else {
packet = chunk.slice(start, offset);
}
const decoded = decode(packet);
onDecoded(decoded);
start = offset + 1;
}
if (start < chunk.length) {
left = Buffer.concat([left, chunk.slice(start)]);
}
};
};

export default chunkDecoder;
51 changes: 23 additions & 28 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
import net from 'net';
import crypto from 'crypto';
import {encode} from './protocol';
import chunkDecoder from './chunkDecoder';

function TCPTransportServer (config) {
let _TCPTransportServer = function (fn) {
this.server = net.createServer(function (c) {
c.on('data', function (_response) {
let response = JSON.parse(_response);
let { id, name, data } = response;

fn(name, data, function (error, d) {
let response = JSON.stringify({
data: d,
id,
error
});
c.on('data', chunkDecoder((request) => {
fn(request.header.method, request.data, function (error, d) {
const response = encode({
method: request.header.method,
id: request.header.id,
data: d
}, error);
c.write(response);
});
});
}));
});
};

Expand Down Expand Up @@ -59,27 +58,24 @@ function TCPTransportClient (config) {
_TCPTransportClient.prototype.connect = function (done) {
// open a persistent connection to the server
this.conn = net.createConnection(config.port, config.host);
this.conn.setEncoding('utf8');
// this.conn.setEncoding(null);
this.conn.on('connect', () => {
this.connected = true;
done();
});
this.conn.on('data', (d) => {
let response = JSON.parse(d);
let { id, error, data } = response;

// find callback we stashed
let fn = this.fns[id];
this.conn.on('data', chunkDecoder((d) => {
let fn = this.fns[d.header.id];
if (!fn) {
// unknown, drop
return;
}
if (error) {
return fn(error);
if (d.header.error) {
return fn(d.header.error);
} else {
return fn(null, data);
return fn(null, d.data);
}
});
}));
};

_TCPTransportClient.prototype.disconnect = function (done) {
Expand All @@ -97,14 +93,13 @@ function TCPTransportClient (config) {
error: 'disconnected'
});
}
// Chances of this returning the same id
// as one in use is extremely negligible.
let id = crypto.randomBytes(10).toString('hex');
let request = JSON.stringify({
name: method,
data,
id
const id = crypto.randomBytes(10).toString('hex');
const request = encode({
method,
id,
data
});

// stash callback for later
this.fns[id] = callback;
this.conn.write(request);
Expand Down
81 changes: 81 additions & 0 deletions src/protocol.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
'use strict';

import crypto from 'crypto';

const HEADER_START = '\u0001';
const DATA_START = '\u0002';
const DATA_END = '\u0003';
const PACKET_END = '\u0004';
const HEADER_DELIMITER = '/';
const FORMAT = {
'JSON': 1,
'TEXT': 2
};

export const CONSTANTS = {
HEADER_START,
DATA_START,
DATA_END,
PACKET_END,
FORMAT
};

export const encode = ({method, data, id}, error) => {
id = id || crypto.randomBytes(10).toString('hex');
const format = typeof data === 'string'
? CONSTANTS.FORMAT.TEXT
: CONSTANTS.FORMAT.JSON;

let dataBuf;
const errString = error ? JSON.stringify(error) : '';
switch (format) {
case FORMAT.JSON:
dataBuf = Buffer.from(JSON.stringify(data));
break;
case FORMAT.TEXT:
dataBuf = Buffer.from(data);
break;
}

return Buffer.concat([
Buffer.from(HEADER_START),
Buffer.from([id, method, dataBuf.length, format, errString].join(HEADER_DELIMITER)),
Buffer.from(DATA_START),
dataBuf,
Buffer.from(DATA_END),
Buffer.from(PACKET_END)
]);
};

export const decode = (packet) => {
const headerIndex = {
start: packet.indexOf(HEADER_START) + HEADER_START.length,
end: packet.indexOf(DATA_START)
};
const dataIndex = {
start: packet.indexOf(DATA_START) + DATA_START.length,
end: packet.indexOf(Buffer.from(DATA_END + PACKET_END))
};
let [id, method, dataLength, format, errString] = packet.slice(headerIndex.start, headerIndex.end).toString().split(HEADER_DELIMITER);
format = parseInt(format, 10);
dataLength = parseInt(dataLength, 10);
let data;
switch (format) {
case FORMAT.JSON:
data = JSON.parse(packet.slice(dataIndex.start, dataIndex.end).toString());
break;
case FORMAT.TEXT:
data = packet.slice(dataIndex.start, dataIndex.end).toString();
break;
}
const header = {
id, method, dataLength, format
};
if (errString) {
header.error = JSON.parse(errString);
}
return {
header,
data
};
};
92 changes: 63 additions & 29 deletions test/index.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
/*global describe, it, before */
import assert from 'assert';
import net from 'net';

import openport from 'openport';

import TCP from '../src';
import * as protocol from '../src/protocol';
import chunkDecoder from '../src/chunkDecoder';

describe('TCP Transport', function () {
let transport;
Expand Down Expand Up @@ -40,7 +41,8 @@ describe('TCP Transport', function () {
});

it('should return error if server cannot bind to port', function (done) {
let netServer = net.createServer(() => {});
let netServer = net.createServer(() => {
});

netServer.listen(port, host, function (err) {
assert.ifError(err);
Expand Down Expand Up @@ -111,8 +113,8 @@ describe('TCP Transport', function () {

it('should call fn when request is received', function (done) {
let _method = 'echo';
let _data = { hello: 'world' };
let _data2 = { something: 'else' };
let _data = {hello: 'world'};
let _data2 = {something: 'else'};

server = new transport.Server(function (method, data, callback) {
assert.equal(method, _method);
Expand All @@ -123,23 +125,59 @@ describe('TCP Transport', function () {
server.listen(function (err) {
assert.ifError(err);

let request = JSON.stringify({
id: 1,
name: _method,
data: _data
let request = protocol.encode({
method: _method,
data: _data,
id: 1
});

let clientSocket = net.createConnection(port, host);
clientSocket.setEncoding('utf8');
clientSocket.write(request);

clientSocket.on('data', function (_response) {
clientSocket.on('data', chunkDecoder((response) => {
clientSocket.end();
let response = JSON.parse(_response);
assert.ifError(response.error);
assert.deepEqual(response.data, _data2);
server.stop(done);
}));
});
});

it('should handle TCP fragmentation', function (done) {
let _method = 'echo';
const MAX_MTU = 65535; // loopback interface MTU
let _data = '';
let _data2 = '';
for (let i = 0; i < MAX_MTU / 2; i++) {
_data += `A${i}.`;
_data2 += `B${i}.`;
}

server = new transport.Server(function (method, data, callback) {
assert.equal(method, _method);
assert.equal(data, _data);
callback(null, _data2);
});

server.listen(function (err) {
assert.ifError(err);

let request = protocol.encode({
method: _method,
data: _data,
id: 1
});

let clientSocket = net.createConnection(port, host);
clientSocket.write(request);

clientSocket.on('data', chunkDecoder((response) => {
clientSocket.end();
assert.ifError(response.error);
assert.equal(response.data, _data2, 0);
server.stop(done);
}));
});
});
});
Expand Down Expand Up @@ -177,18 +215,16 @@ describe('TCP Transport', function () {

it('should be able to call method', function (done) {
let _method = 'hello';
let _data = { something: 'world' };
let _data = 'world';

let netServer = net.createServer(function (socket) {
socket.setEncoding('utf8');
socket.on('data', function (_data) {
var data = JSON.parse(_data);
socket.end(JSON.stringify({
id: data.id,
data: data.data,
error: null
}));
});
socket.on('data', chunkDecoder((response) => {
socket.end(protocol.encode({
method: response.header.method,
id: response.header.id,
data: response.data
}, null));
}));
});

let client = new transport.Client();
Expand Down Expand Up @@ -238,15 +274,13 @@ describe('TCP Transport', function () {
let _err = 'err!';

let netServer = net.createServer(function (socket) {
socket.setEncoding('utf8');
socket.on('data', function (_data) {
var data = JSON.parse(_data);
socket.end(JSON.stringify({
id: data.id,
data: null,
error: _err
}));
});
socket.on('data', chunkDecoder((response) => {
socket.end(protocol.encode({
method: response.header.method,
id: response.header.id,
data: response.data
}, _err));
}));
});

let client = new transport.Client();
Expand Down