Skip to content

Commit 4178632

Browse files
committed
add a flush method, for case when input terminates without a newline
1 parent 0ca2a9a commit 4178632

File tree

2 files changed

+28
-15
lines changed

2 files changed

+28
-15
lines changed

src/net/linefeeder.spec.ts

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,37 +5,40 @@ import linefeeder = require('./linefeeder');
55
import queue = require('../handler/queue');
66

77
describe('LineFeeder', function() {
8-
var bufferQueue: queue.Queue<ArrayBuffer, void>;
9-
var lines: linefeeder.LineFeeder;
8+
let bufferQueue: queue.Queue<ArrayBuffer, void>;
9+
let lines: linefeeder.LineFeeder;
1010

1111
beforeEach(() => {
1212
bufferQueue = new queue.Queue<ArrayBuffer, void>();
1313
lines = new linefeeder.LineFeeder(bufferQueue);
1414
});
1515

1616
it('one and done', (done) => {
17-
var s = 'hello world';
18-
bufferQueue.handle(arraybuffers.stringToArrayBuffer(s + '\n'));
17+
const s = 'hello world';
18+
bufferQueue.handle(arraybuffers.stringToArrayBuffer(s));
19+
lines.flush();
1920

2021
lines.setSyncNextHandler((result: string) => {
2122
expect(result).toEqual(s);
2223
done();
2324
});
2425
});
2526

26-
it('dangling lines', (done) => {
27-
var s = 'hello world';
27+
it('lines of multiple buffers', (done) => {
28+
const s = 'hello world';
2829
bufferQueue.handle(arraybuffers.stringToArrayBuffer('hello '));
2930
bufferQueue.handle(arraybuffers.stringToArrayBuffer('world\n'));
31+
lines.flush();
3032

3133
lines.setSyncNextHandler((result: string) => {
3234
expect(result).toEqual(s);
3335
done();
3436
});
3537
});
3638

37-
it('multiple lines in one buffer', (done) => {
39+
it('buffers of multiple lines', (done) => {
3840
bufferQueue.handle(arraybuffers.stringToArrayBuffer('a\nb\n'));
41+
lines.flush();
3942

4043
lines.setSyncNextHandler((result: string) => {
4144
expect(result).toEqual('a');

src/net/linefeeder.ts

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,29 @@ export class LineFeeder extends queue.Queue<string, void> {
99
private static DELIMITER = arraybuffers.decodeByte(
1010
arraybuffers.stringToArrayBuffer('\n'));
1111

12-
constructor(source: queue.Queue<ArrayBuffer, void>) {
12+
private leftover_ = new ArrayBuffer(0);
13+
14+
constructor(private source_: queue.Queue<ArrayBuffer, void>) {
1315
super();
14-
let leftover = new ArrayBuffer(0);
15-
source.setSyncHandler((buffer: ArrayBuffer) => {
16-
leftover = arraybuffers.concat([leftover, buffer]);
17-
let i = arraybuffers.indexOf(leftover, LineFeeder.DELIMITER);
16+
source_.setSyncHandler((buffer: ArrayBuffer) => {
17+
this.leftover_ = arraybuffers.concat([this.leftover_, buffer]);
18+
let i = arraybuffers.indexOf(this.leftover_, LineFeeder.DELIMITER);
1819
while (i !== -1) {
19-
let parts = arraybuffers.split(leftover, i);
20+
let parts = arraybuffers.split(this.leftover_, i);
2021
let line = arraybuffers.arrayBufferToString(parts[0]);
21-
leftover = parts[1].slice(1);
22-
i = arraybuffers.indexOf(leftover, LineFeeder.DELIMITER);
22+
this.leftover_ = parts[1].slice(1);
23+
i = arraybuffers.indexOf(this.leftover_, LineFeeder.DELIMITER);
2324
this.handle(line);
2425
}
2526
});
2627
}
28+
29+
// Causes any pending line to be emitted. Intended to be called when the
30+
// underlying stream has terminated, possibly without any terminating
31+
// newline.
32+
public flush = () => {
33+
var s = arraybuffers.arrayBufferToString(this.leftover_);
34+
this.leftover_ = new ArrayBuffer(0);
35+
this.handle(s);
36+
}
2737
}

0 commit comments

Comments
 (0)