Skip to content

Commit f46a58a

Browse files
authored
Merge pull request #69 from vibe-d/lazy_chunked_input_stream
Make ChunkedInputStream lazy.
2 parents 9f1de28 + 0e9aed6 commit f46a58a

File tree

4 files changed

+59
-28
lines changed

4 files changed

+59
-28
lines changed

source/vibe/http/common.d

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -389,19 +389,22 @@ final class ChunkedInputStream : InputStream
389389
private {
390390
InterfaceProxy!InputStream m_in;
391391
ulong m_bytesInCurrentChunk = 0;
392+
bool m_chunkHeaderRead = false;
392393
}
393394

394395
/// private
395396
this(InterfaceProxy!InputStream stream, bool dummy)
396397
{
397398
assert(!!stream);
398399
m_in = stream;
399-
readChunk();
400400
}
401401

402-
@property bool empty() const { return m_bytesInCurrentChunk == 0; }
402+
@property bool empty() { return leastSize == 0; }
403403

404-
@property ulong leastSize() const { return m_bytesInCurrentChunk; }
404+
@property ulong leastSize() {
405+
if (!m_chunkHeaderRead) readChunk();
406+
return m_bytesInCurrentChunk;
407+
}
405408

406409
@property bool dataAvailableForRead() { return m_bytesInCurrentChunk > 0 && m_in.dataAvailableForRead; }
407410

@@ -417,21 +420,20 @@ final class ChunkedInputStream : InputStream
417420
size_t nbytes = 0;
418421

419422
while (dst.length > 0) {
420-
enforceBadRequest(m_bytesInCurrentChunk > 0, "Reading past end of chunked HTTP stream.");
423+
enforceBadRequest(leastSize > 0, "Reading past end of chunked HTTP stream.");
421424

422425
auto sz = cast(size_t)min(m_bytesInCurrentChunk, dst.length);
423426
m_in.read(dst[0 .. sz]);
424427
dst = dst[sz .. $];
425428
m_bytesInCurrentChunk -= sz;
426429
nbytes += sz;
427430

428-
// FIXME: this blocks, but shouldn't for IOMode.once/immediat
429431
if( m_bytesInCurrentChunk == 0 ){
430432
// skip current chunk footer and read next chunk
431433
ubyte[2] crlf;
432434
m_in.read(crlf);
433435
enforceBadRequest(crlf[0] == '\r' && crlf[1] == '\n');
434-
readChunk();
436+
m_chunkHeaderRead = false;
435437
}
436438

437439
if (mode != IOMode.all) break;
@@ -444,12 +446,13 @@ final class ChunkedInputStream : InputStream
444446

445447
private void readChunk()
446448
{
447-
assert(m_bytesInCurrentChunk == 0);
449+
assert(m_bytesInCurrentChunk == 0 && !m_chunkHeaderRead);
448450
// read chunk header
449451
logTrace("read next chunk header");
450452
auto ln = () @trusted { return cast(string)m_in.readLine(); } ();
451453
logTrace("got chunk header: %s", ln);
452454
m_bytesInCurrentChunk = parse!ulong(ln, 16u);
455+
m_chunkHeaderRead = true;
453456

454457
if( m_bytesInCurrentChunk == 0 ){
455458
// empty chunk denotes the end
@@ -633,6 +636,34 @@ FreeListRef!ChunkedOutputStream createChunkedOutputStreamFL(OS, Allocator)(OS de
633636
return FreeListRef!ChunkedOutputStream(interfaceProxy!OutputStream(destination_stream), allocator, true);
634637
}
635638

639+
unittest {
640+
import vibe.stream.memory : createMemoryStream;
641+
642+
ubyte[] bytes = new ubyte[](100*100);
643+
foreach (i, ref bt; bytes) bt = (i + i / 100) % 256;
644+
ubyte[] buf = new ubyte[](bytes.length);
645+
646+
auto dst = createMemoryStream(new ubyte[4*1024*1024], true, 0);
647+
648+
// write data with varying buffer and chunk sizes
649+
auto outp = createChunkedOutputStreamFL(dst);
650+
foreach (i; 0 .. 100) {
651+
outp.write(bytes[0 .. i * 100]);
652+
if (i % 5 == 0)
653+
outp.flush();
654+
}
655+
outp.finalize();
656+
657+
// ensure decoding yields the same byte sequence
658+
dst.seek(0);
659+
auto inp = createChunkedInputStreamFL(dst);
660+
foreach (i; 0 .. 100) {
661+
inp.read(buf[0 .. i * 100]);
662+
assert(buf[0 .. i * 100] == bytes[0 .. i * 100]);
663+
}
664+
}
665+
666+
636667
/// Parses the cookie from a header field, returning the name of the cookie.
637668
/// Implements an algorithm equivalent to https://tools.ietf.org/html/rfc6265#section-5.2
638669
/// Returns: the cookie name as return value, populates the dst argument or allocates on the GC for the tuple overload.

source/vibe/http/websockets.d

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -906,6 +906,7 @@ scope:
906906

907907
try m_conn.close();
908908
catch (Exception e) logException(e, "Failed to close WebSocket connection");
909+
m_conn = ConnectionStream.init;
909910
try m_readCondition.notifyAll();
910911
catch (Exception e) assert(false, e.msg);
911912
}
@@ -924,7 +925,7 @@ scope:
924925
send((scope msg) { msg.write(nativeToLittleEndian(++m_lastPingIndex)); }, FrameOpcode.ping);
925926
logDebugV("Ping sent");
926927
} catch (Exception e) {
927-
logError("Failed to acquire write mutex for sending a WebSocket ping frame: %s", e.msg);
928+
logError("Failed to send WebSocket ping frame: %s", e.msg);
928929
}
929930
}
930931
}
Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,2 @@
11
name "websocket_packetmaxlength"
22
dependency "vibe-http" path="../../"
3-
versions "VibeDefaultMain"

tests/vibe.http.websocket.packetmaxlength/source/app.d

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,39 +3,39 @@ import vibe.core.log;
33
import vibe.inet.url;
44
import vibe.http.server;
55
import vibe.http.websockets;
6+
import core.time : seconds;
67

7-
shared static this()
8+
9+
void main()
810
{
11+
auto tm = setTimer(10.seconds, { assert(false, "Test timed out."); });
12+
913
//test max payload
1014
auto settings = new HTTPServerSettings;
1115
settings.port = 0;
1216
settings.bindAddresses = ["127.0.0.1"];
1317
settings.webSocketPayloadMaxLength = 10;
14-
immutable serverAddr = listenHTTP(settings, handleWebSockets((scope ws) {
18+
auto listener = listenHTTP(settings, handleWebSockets((scope ws) {
1519
assert(ws.connected); // issue #2104
1620
assert(ws.receiveText() == "1234567890");
1721
ws.send("ok");
1822
try{
1923
ws.receiveText(); //at this point the connection should close
2024
assert(false);
21-
}catch(Exception e)
22-
{
23-
}
25+
} catch(Exception e) {} // expected
2426
ws.close();
25-
})).bindAddresses[0];
26-
27-
runTask({
28-
scope(exit) exitEventLoop(true);
27+
}));
2928

30-
try connectWebSocket(URL("http://" ~ serverAddr.toString), (scope ws) {
31-
assert(ws.connected);
32-
ws.send("1234567890");
33-
assert(ws.receiveText() == "ok");
34-
ws.send("1234567890a"); //should cause connection to close
35-
assert(!ws.waitForData);
36-
ws.close();
37-
logInfo("WebSocket max payload test successful");
38-
});
39-
catch (Exception e) assert(false, e.msg);
29+
try connectWebSocket(URL("http://" ~ listener.bindAddresses[0].toString), (scope ws) {
30+
assert(ws.connected);
31+
ws.send("1234567890");
32+
assert(ws.receiveText() == "ok");
33+
ws.send("1234567890a"); //should cause connection to close
34+
assert(!ws.waitForData);
35+
ws.close();
36+
logInfo("WebSocket max payload test successful");
4037
});
38+
catch (Exception e) assert(false, e.msg);
39+
40+
listener.stopListening();
4141
}

0 commit comments

Comments
 (0)