Skip to content

Commit 2d29f71

Browse files
committed
Refactored io, testing readTypeAndValue error
1 parent fee506f commit 2d29f71

File tree

3 files changed

+73
-80
lines changed

3 files changed

+73
-80
lines changed

dist/io.js

Lines changed: 23 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,16 @@ function toArrayBuffer(buffer) {
1717
? arrayBuffer // if Buffer occupies whole ArrayBuffer, no need to slice it
1818
: arrayBuffer.slice(byteOffset, byteOffset + byteLength);
1919
}
20+
function concatStream(stream, callback) {
21+
const segments = [];
22+
stream
23+
.on('data', chunk => segments.push(chunk instanceof Buffer ? chunk : Buffer.from(chunk)))
24+
.on('error', function (err) {
25+
this.destroy();
26+
callback(err, null);
27+
})
28+
.on('end', () => callback(null, toArrayBuffer(Buffer.concat(segments))));
29+
}
2030
/**
2131
* Writes the contents of `type.toBuffer()` ([[Type.toBuffer]])
2232
* to a writable stream and then closes the stream.
@@ -205,18 +215,12 @@ exports.writeTypeAndValue = writeTypeAndValue;
205215
function readType(inStream, callback) {
206216
assert_1.default.instanceOf(inStream, stream_1.Readable);
207217
assert_1.default.instanceOf(callback, Function);
208-
const segments = [];
209-
inStream
210-
.on('data', chunk => segments.push(chunk instanceof Buffer ? chunk : Buffer.from(chunk)))
211-
.on('error', function (err) {
212-
this.destroy();
213-
callback(err, null);
214-
})
215-
.on('end', () => {
216-
const buffer = Buffer.concat(segments);
218+
concatStream(inStream, (err, buffer) => {
219+
if (err)
220+
return callback(err, null);
217221
let type;
218222
try {
219-
type = r.type(toArrayBuffer(buffer), false);
223+
type = r.type(buffer, false);
220224
}
221225
catch (e) {
222226
return callback(e, null);
@@ -260,18 +264,12 @@ exports.readType = readType;
260264
function readValue({ type, inStream }, callback) {
261265
assert_1.default.instanceOf(inStream, stream_1.Readable);
262266
assert_1.default.instanceOf(callback, Function);
263-
const segments = [];
264-
inStream
265-
.on('data', chunk => segments.push(chunk))
266-
.on('error', function (err) {
267-
this.destroy();
268-
callback(err, null);
269-
})
270-
.on('end', () => {
271-
const buffer = Buffer.concat(segments);
267+
concatStream(inStream, (err, buffer) => {
268+
if (err)
269+
return callback(err, null);
272270
let value;
273271
try {
274-
value = type.readValue(toArrayBuffer(buffer));
272+
value = type.readValue(buffer);
275273
}
276274
catch (e) {
277275
return callback(e, null);
@@ -305,26 +303,20 @@ exports.readValue = readValue;
305303
function readTypeAndValue(inStream, callback) {
306304
assert_1.default.instanceOf(inStream, stream_1.Readable);
307305
assert_1.default.instanceOf(callback, Function);
308-
const segments = [];
309-
inStream
310-
.on('data', chunk => segments.push(chunk))
311-
.on('error', function (err) {
312-
this.destroy();
313-
callback(err, null, null);
314-
})
315-
.on('end', () => {
316-
const buffer = Buffer.concat(segments);
306+
concatStream(inStream, (err, buffer) => {
307+
if (err)
308+
return callback(err, null, null);
317309
let type;
318310
//Using consumeType() in order to get the length of the type (the start of the value)
319311
try {
320-
type = r._consumeType(toArrayBuffer(buffer), 0);
312+
type = r._consumeType(buffer, 0);
321313
}
322314
catch (e) {
323315
return callback(e, null, null);
324316
}
325317
let value;
326318
try {
327-
value = type.value.readValue(toArrayBuffer(buffer), type.length);
319+
value = type.value.readValue(buffer, type.length);
328320
}
329321
catch (e) {
330322
return callback(e, null, null);

io.ts

Lines changed: 43 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,21 @@ function toArrayBuffer(buffer: Buffer): ArrayBuffer {
1919
? arrayBuffer // if Buffer occupies whole ArrayBuffer, no need to slice it
2020
: arrayBuffer.slice(byteOffset, byteOffset + byteLength)
2121
}
22+
type ArrayBufferCallback = (err: Error | null, buffer: ArrayBuffer | null) => void
23+
function concatStream(stream: Readable, callback: ArrayBufferCallback) {
24+
const segments: Buffer[] = []
25+
stream
26+
.on('data', chunk =>
27+
segments.push(chunk instanceof Buffer ? chunk : Buffer.from(chunk))
28+
)
29+
.on('error', function(this: typeof stream, err) {
30+
this.destroy()
31+
callback(err, null)
32+
})
33+
.on('end', () =>
34+
callback(null, toArrayBuffer(Buffer.concat(segments)))
35+
)
36+
}
2237

2338
export interface WriteParams<E> {
2439
type: Type<E>
@@ -245,22 +260,14 @@ export declare namespace writeTypeAndValue {
245260
export function readType<E>(inStream: Readable, callback: TypeCallback<E>) {
246261
assert.instanceOf(inStream, Readable)
247262
assert.instanceOf(callback, Function)
248-
const segments: Buffer[] = []
249-
inStream
250-
.on('data', chunk =>
251-
segments.push(chunk instanceof Buffer ? chunk : Buffer.from(chunk))
252-
)
253-
.on('error', function(this: typeof inStream, err: Error) {
254-
this.destroy()
255-
callback(err, null)
256-
})
257-
.on('end', () => {
258-
const buffer = Buffer.concat(segments)
259-
let type: Type<E>
260-
try { type = r.type(toArrayBuffer(buffer), false) }
261-
catch (e) { return callback(e, null) }
262-
callback(null, type)
263-
})
263+
concatStream(inStream, (err, buffer) => {
264+
if (err) return callback(err, null)
265+
266+
let type: Type<E>
267+
try { type = r.type(buffer!, false) }
268+
catch (e) { return callback(e, null) }
269+
callback(null, type)
270+
})
264271
}
265272
export declare namespace readType {
266273
function __promisify__<E>(inStream: Readable): Promise<Type<E>>
@@ -301,20 +308,14 @@ export declare namespace readType {
301308
export function readValue<E>({type, inStream}: ReadValueParams<E>, callback: ValueCallback<E>) {
302309
assert.instanceOf(inStream, Readable)
303310
assert.instanceOf(callback, Function)
304-
const segments: Buffer[] = []
305-
inStream
306-
.on('data', chunk => segments.push(chunk as Buffer))
307-
.on('error', function(this: typeof inStream, err: Error) {
308-
this.destroy()
309-
callback(err, null)
310-
})
311-
.on('end', () => {
312-
const buffer = Buffer.concat(segments)
313-
let value: E
314-
try { value = type.readValue(toArrayBuffer(buffer)) }
315-
catch (e) { return callback(e, null) }
316-
callback(null, value)
317-
})
311+
concatStream(inStream, (err, buffer) => {
312+
if (err) return callback(err, null)
313+
314+
let value: E
315+
try { value = type.readValue(buffer!) }
316+
catch (e) { return callback(e, null) }
317+
callback(null, value)
318+
})
318319
}
319320
export declare namespace readValue {
320321
function __promisify__<E>(params: ReadValueParams<E>): Promise<E>
@@ -345,24 +346,18 @@ export declare namespace readValue {
345346
export function readTypeAndValue<E>(inStream: Readable, callback: TypeAndValueCallback<E>) {
346347
assert.instanceOf(inStream, Readable)
347348
assert.instanceOf(callback, Function)
348-
const segments: Buffer[] = []
349-
inStream
350-
.on('data', chunk => segments.push(chunk as Buffer))
351-
.on('error', function(this: typeof inStream, err: Error) {
352-
this.destroy()
353-
callback(err, null, null)
354-
})
355-
.on('end', () => {
356-
const buffer = Buffer.concat(segments)
357-
let type: ReadResult<Type<E>>
358-
//Using consumeType() in order to get the length of the type (the start of the value)
359-
try { type = r._consumeType(toArrayBuffer(buffer), 0) }
360-
catch (e) { return callback(e, null, null) }
361-
let value: E
362-
try { value = type.value.readValue(toArrayBuffer(buffer), type.length) }
363-
catch (e) { return callback(e, null, null) }
364-
callback(null, type.value, value)
365-
})
349+
concatStream(inStream, (err, buffer) => {
350+
if (err) return callback(err, null, null)
351+
352+
let type: ReadResult<Type<E>>
353+
//Using consumeType() in order to get the length of the type (the start of the value)
354+
try { type = r._consumeType(buffer!, 0) }
355+
catch (e) { return callback(e, null, null) }
356+
let value: E
357+
try { value = type.value.readValue(buffer!, type.length) }
358+
catch (e) { return callback(e, null, null) }
359+
callback(null, type.value, value)
360+
})
366361
}
367362
//Custom promisifiy function because Promise cannot resolve to 2 values
368363
(readTypeAndValue as any)[promisify.custom] = <E>(inStream: Readable) =>

test/io/type-and-value.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,16 @@ const readPromise = promisify(io.readTypeAndValue)<Map<string, number>>(new Buff
7373
assert.equal(readType, type)
7474
assert.equal(readValue, value)
7575
})
76+
const readErrorPromise = promisify(io.readTypeAndValue)<string>(
77+
new BufferStream(bufferFrom([0x41, 0x61, 0x62, 0x63]))
78+
)
79+
.then(_ => { throw new Error('Expected error to be thrown') })
80+
.catch(err => assert.errorMessage(err, 'Buffer is not long enough'))
7681
export = Promise.all([
7782
writePromise,
7883
writeWithoutCallback,
7984
writeTypeErrorPromise,
8085
writeValueErrorPromise,
81-
readPromise
86+
readPromise,
87+
readErrorPromise
8288
])

0 commit comments

Comments
 (0)