Skip to content

Commit 9f70ab7

Browse files
committed
Fix bugs in stream code
1 parent 666cf3f commit 9f70ab7

File tree

1 file changed

+35
-9
lines changed

1 file changed

+35
-9
lines changed

lib/streams.js

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
/**
22
* Copyright (c) 2016 Fabio Massaioli and other contributors
33
*
4-
* Code from Node http module:
5-
* Copyright Joyent, Inc. and other Node contributors
6-
*
74
* Permission is hereby granted, free of charge, to any person obtaining a copy of
85
* this software and associated documentation files (the "Software"), to deal in
96
* the Software without restriction, including without limitation the rights to
@@ -30,6 +27,7 @@ var stream = require('stream'),
3027

3128
exports.InputStream = InputStream;
3229
exports.OutputStream = OutputStream;
30+
exports.IOStream = IOStream;
3331

3432
/**
3533
* function InputStream([buffer])
@@ -73,9 +71,11 @@ function OutputStream(conn, id, recordType) {
7371
}
7472
util.inherits(OutputStream, stream.Writable);
7573

76-
OutputStream.prototype.close = function () {
77-
this._open = false;
78-
this.emit('close');
74+
OutputStream.prototype._close = function (hadError) {
75+
if (this._open) {
76+
this._open = false;
77+
this.emit('close', hadError ? true : false);
78+
}
7979
};
8080

8181
OutputStream.prototype._write = function (chunk, encoding, callback) {
@@ -90,6 +90,11 @@ OutputStream.prototype._write = function (chunk, encoding, callback) {
9090
chunk = new Buffer(chunk, encoding);
9191
}
9292

93+
if (chunk.length === 0) {
94+
callback.call(this);
95+
return;
96+
}
97+
9398
if (chunk.length <= 65535) {
9499
chunks.push(chunk);
95100
} else {
@@ -102,15 +107,34 @@ OutputStream.prototype._write = function (chunk, encoding, callback) {
102107
}
103108

104109
while (chunks.length > 1) {
105-
this._conn.stream.writeRecord(this._id, new this.type(chunks.shift()));
110+
this._conn.stream.writeRecord(
111+
this._id, new this.recordType(chunks.shift()));
106112
}
107113

108114
this._conn.stream.writeRecord(
109115
this._id,
110-
new this.type(chunks.shift()),
116+
new this.recordType(chunks.shift()),
111117
callback.bind(this));
112118
};
113119

120+
OutputStream.prototype.write = function () {
121+
if (!this._open) {
122+
this.emit('error', new Error("Output stream is not open"));
123+
return;
124+
}
125+
126+
return stream.Writable.prototype.write.apply(this, arguments);
127+
}
128+
129+
OutputStream.prototype.end = function () {
130+
if (!this._open) {
131+
this.emit('error', new Error("Output stream is not open"));
132+
return;
133+
}
134+
135+
return stream.Writable.prototype.end.apply(this, arguments);
136+
}
137+
114138
/**
115139
* function IOStream([buffer])
116140
* Duplex stream interface for FastCGI input streams
@@ -134,5 +158,7 @@ util.inherits(IOStream, stream.Duplex);
134158
IOStream.prototype._data = InputStream.prototype._data;
135159
IOStream.prototype._read = InputStream.prototype._read;
136160

137-
IOStream.prototype.close = OutputStream.prototype.close;
161+
IOStream.prototype._close = OutputStream.prototype._close;
138162
IOStream.prototype._write = OutputStream.prototype._write;
163+
IOStream.prototype.write = OutputStream.prototype.write;
164+
IOStream.prototype.end = OutputStream.prototype.end;

0 commit comments

Comments
 (0)