Skip to content

Commit 8f0b3b8

Browse files
committed
Refactor streams
1 parent d76d95c commit 8f0b3b8

File tree

1 file changed

+19
-70
lines changed

1 file changed

+19
-70
lines changed

lib/streams.js

Lines changed: 19 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -33,98 +33,51 @@ exports.OutputStream = OutputStream;
3333

3434
/**
3535
* function InputStream([buffer])
36-
* Readable stream interface for buffer arrays
36+
* Readable stream interface for FastCGI input streams
3737
*/
3838

39-
function InputStream(buffer) {
40-
if (!(this instanceof InputStream)) {
41-
return new InputStream(array);
42-
}
43-
39+
function InputStream() {
4440
stream.Readable.call(this);
4541

46-
this.buffer = buffer || [];
47-
this.writable = true;
42+
this._buffer = [];
43+
this._canPush = true;
4844
}
4945
util.inherits(InputStream, stream.Readable);
5046

51-
InputStream.prototype.append = function (chunk) {
52-
if (this.writable) {
53-
this.writable = this.push(chunk);
47+
InputStream.prototype._data = function (chunk) {
48+
if (this._canPush) {
49+
this._canPush = this.push(chunk);
5450
} else {
55-
this.buffer.push(chunk);
51+
this._buffer.push(chunk);
5652
}
5753
};
5854

5955
InputStream.prototype._read = function (size) {
60-
while (this.buffer.length && (this.writable = this.push(this.buffer.shift())));
56+
while (this._buffer.length && (this._canPush = this.push(this._buffer.shift())));
6157
};
6258

6359
/**
64-
* function OutputStream(conn, req, res, type)
60+
* function OutputStream(conn, type)
6561
* Writable stream interface for FastCGI output streams
6662
*/
6763

68-
function OutputStream(conn, req, res, type) {
69-
if (!(this instanceof OutputStream)) {
70-
return new OutputStream(conn, req, res, type);
71-
}
72-
64+
function OutputStream(conn, id, recordType) {
7365
stream.Writable.call(this);
7466

75-
this.conn = conn;
76-
this.req = req;
77-
this.res = this._httpMessage = res; // NOTE: http.OutgoingMessage needs connection._httpMessage = response
78-
this.type = type || fcgi.records.StdOut;
67+
this.recordType = recordType || fcgi.records.StdOut;
7968

80-
this.timeout = 0;
69+
this._conn = conn;
70+
this._id = id;
8171

82-
// NOTE: http.OutgoingMessage needs connection.writable = true
83-
this._open = this.writable = true;
84-
this.on('drain', this.res.emit.bind(res, 'drain'));
72+
this._open = true;
8573
}
8674
util.inherits(OutputStream, stream.Writable);
8775

8876
OutputStream.prototype.close = function () {
89-
this._open = this.writable = false;
77+
this._open = false;
9078
this.emit('close');
9179
};
9280

93-
OutputStream.prototype.setTimeout = function (msecs, callback) {
94-
if (this._timeout_ref) {
95-
clearTimeout(this._timeout_ref);
96-
}
97-
98-
this.timeout = msecs;
99-
if (callback) {
100-
this.once('timeout', callback);
101-
}
102-
103-
if (msecs > 0) {
104-
this._timeout_ref = setTimeout(function (self) {
105-
self.emit('timeout');
106-
}, msecs, this);
107-
} else {
108-
this._timeout_ref = undefined;
109-
}
110-
};
111-
112-
OutputStream.prototype._resetTimeout = function () {
113-
if (this.timeout || this._timeout_ref) {
114-
if (this._timeout_ref) {
115-
clearTimeout(this._timeout_ref);
116-
}
117-
118-
if (this.timeout > 0) {
119-
this._timeout_ref = setTimeout(function (self) {
120-
self.emit('timeout');
121-
}, this.timeout, this);
122-
} else {
123-
this._timeout_ref = undefined;
124-
}
125-
}
126-
};
127-
12881
OutputStream.prototype._write = function (chunk, encoding, callback) {
12982
if (!this._open) {
13083
callback(new Error("Output stream is not open"));
@@ -133,8 +86,6 @@ OutputStream.prototype._write = function (chunk, encoding, callback) {
13386

13487
var chunks = [];
13588

136-
this._resetTimeout();
137-
13889
if (!Buffer.isBuffer(chunk)) {
13990
chunk = new Buffer(chunk, encoding);
14091
}
@@ -151,13 +102,11 @@ OutputStream.prototype._write = function (chunk, encoding, callback) {
151102
}
152103

153104
while (chunks.length > 1) {
154-
this.conn.stream.writeRecord(
155-
this.req.id,
156-
new this.type(chunks.shift()));
105+
this._conn.stream.writeRecord(this._id, new this.type(chunks.shift()));
157106
}
158107

159-
this.conn.stream.writeRecord(
160-
this.req.id,
108+
this._conn.stream.writeRecord(
109+
this._id,
161110
new this.type(chunks.shift()),
162111
callback.bind(this));
163112
};

0 commit comments

Comments
 (0)