Skip to content

Commit 431db85

Browse files
authored
Merge pull request #978 from lightpanda-io/dynamic_cdp_read_buffer
Make the CDP read buffer heap allocated & dynamic
2 parents 94960cc + 1ebac06 commit 431db85

File tree

1 file changed

+49
-10
lines changed

1 file changed

+49
-10
lines changed

src/server.zig

Lines changed: 49 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ const MAX_HTTP_REQUEST_SIZE = 4096;
3636
// max message size
3737
// +14 for max websocket payload overhead
3838
// +140 for the max control packet that might be interleaved in a message
39-
const MAX_MESSAGE_SIZE = 512 * 1024 + 14;
39+
const MAX_MESSAGE_SIZE = 512 * 1024 + 14 + 140;
4040

4141
pub const Server = struct {
4242
app: *App,
@@ -188,12 +188,15 @@ pub const Client = struct {
188188
// we expect the socket to come to us as nonblocking
189189
std.debug.assert(socket_flags & nonblocking == nonblocking);
190190

191+
var reader = try Reader(true).init(server.allocator);
192+
errdefer reader.deinit();
193+
191194
return .{
192195
.socket = socket,
193196
.server = server,
197+
.reader = reader,
194198
.mode = .{ .http = {} },
195199
.socket_flags = socket_flags,
196-
.reader = .{ .allocator = server.allocator },
197200
.send_arena = ArenaAllocator.init(server.allocator),
198201
};
199202
}
@@ -537,14 +540,23 @@ fn Reader(comptime EXPECT_MASK: bool) type {
537540

538541
// we add 140 to allow 1 control message (ping/pong/close) to be
539542
// fragmented into a normal message.
540-
buf: [MAX_MESSAGE_SIZE + 140]u8 = undefined,
543+
buf: []u8,
541544

542545
fragments: ?Fragments = null,
543546

544547
const Self = @This();
545548

549+
fn init(allocator: Allocator) !Self {
550+
const buf = try allocator.alloc(u8, 16 * 1024);
551+
return .{
552+
.buf = buf,
553+
.allocator = allocator,
554+
};
555+
}
556+
546557
fn deinit(self: *Self) void {
547558
self.cleanup();
559+
self.allocator.free(self.buf);
548560
}
549561

550562
fn cleanup(self: *Self) void {
@@ -613,9 +625,14 @@ fn Reader(comptime EXPECT_MASK: bool) type {
613625
}
614626
} else if (message_len > MAX_MESSAGE_SIZE) {
615627
return error.TooLarge;
616-
}
617-
618-
if (buf.len < message_len) {
628+
} else if (message_len > self.buf.len) {
629+
const len = self.buf.len;
630+
self.buf = try growBuffer(self.allocator, self.buf, message_len);
631+
buf = self.buf[0..len];
632+
// we need more data
633+
return null;
634+
} else if (buf.len < message_len) {
635+
// we need more data
619636
return null;
620637
}
621638

@@ -753,13 +770,32 @@ fn Reader(comptime EXPECT_MASK: bool) type {
753770

754771
// We're here because we either don't have enough bytes of the next
755772
// message, or we know that it won't fit in our buffer as-is.
756-
std.mem.copyForwards(u8, &self.buf, partial);
773+
std.mem.copyForwards(u8, self.buf, partial);
757774
self.pos = 0;
758775
self.len = partial_bytes;
759776
}
760777
};
761778
}
762779

780+
fn growBuffer(allocator: Allocator, buf: []u8, required_capacity: usize) ![]u8 {
781+
// from std.ArrayList
782+
var new_capacity = buf.len;
783+
while (true) {
784+
new_capacity +|= new_capacity / 2 + 8;
785+
if (new_capacity >= required_capacity) break;
786+
}
787+
788+
log.debug(.app, "CDP buffer growth", .{ .from = buf.len, .to = new_capacity });
789+
790+
if (allocator.resize(buf, new_capacity)) {
791+
return buf.ptr[0..new_capacity];
792+
}
793+
const new_buffer = try allocator.alloc(u8, new_capacity);
794+
@memcpy(new_buffer[0..buf.len], buf);
795+
allocator.free(buf);
796+
return new_buffer;
797+
}
798+
763799
const Fragments = struct {
764800
type: Message.Type,
765801
message: std.ArrayListUnmanaged(u8),
@@ -1037,8 +1073,8 @@ test "Client: read invalid websocket message" {
10371073
);
10381074
}
10391075

1040-
// length of message is 0000 0401, i.e: 1024 * 512 + 1
1041-
try assertWebSocketError(1009, &.{ 129, 255, 0, 0, 0, 0, 0, 8, 0, 1, 'm', 'a', 's', 'k' });
1076+
// length of message is 0000 0810, i.e: 1024 * 512 + 265
1077+
try assertWebSocketError(1009, &.{ 129, 255, 0, 0, 0, 0, 0, 8, 1, 0, 'm', 'a', 's', 'k' });
10421078

10431079
// continuation type message must come after a normal message
10441080
// even when not a fin frame
@@ -1260,7 +1296,10 @@ fn createTestClient() !TestClient {
12601296
try posix.setsockopt(stream.handle, posix.SOL.SOCKET, posix.SO.SNDTIMEO, &timeout);
12611297
return .{
12621298
.stream = stream,
1263-
.reader = .{ .allocator = testing.allocator },
1299+
.reader = .{
1300+
.allocator = testing.allocator,
1301+
.buf = try testing.allocator.alloc(u8, 1024 * 16),
1302+
},
12641303
};
12651304
}
12661305

0 commit comments

Comments
 (0)