Skip to content

Conversation

@karlseguin
Copy link
Collaborator

…utting down

Previously, we could have multiple in-flight messages from the server to a single client. This isn't safe and can lead to message interleaving. While write / send are atomic, they are only atomic for the N bytes which they write, which may not be the entire buffer. Consider this writeAll function:

pub fn writeAll(socket: socket_t, bytes: []const u8) !void {
    var index: usize = 0;
    while (index < bytes.len) {
        index += try posix.write(socket, bytes[index..]);
    }
}

If we're trying to send "abc123", this could take anywhere from 1 to 6 calls to posix.write (it would take 6 calls, for example, if every call to posix.write only wrote a single byte). Now if you're trying to write other data to this same socket at the same time, messages will get interleaved.

In order for this to work, the client now has a send_queue (doubly linked list). When one message is sent, it sends the next.

In addition to the above change, the Client is now self-contained with respect to its lifetime. This is necessary so that completions which come in AFTER our concept of its lifetime ends, can still be processed. I think all types that receive completions need to follow this model. This relies on the fact that kqueue (which I know for a fact) and io_uring (which people seem to imply) handle socket shutdown properly. It's still a bit messy because of timeout and not wanting to wait until timeout to accept new connections, but needing to wait until timeout to cleanup the client.

The self-contained nature of Client makes it difficult to test as a generic. I removed Client(T). Tests now use real sockets. Some tests had to be removed because they're too difficult to test over a real connection :(

…utting down

Previously, we could have multiple in-flight messages from the server to a
single client. This isn't safe and can lead to message interleaving. While
write / send are atomic, they are only atomic for the N bytes which they write,
which may not be the entire buffer. Consider this writeAll function:

```
pub fn writeAll(socket: socket_t, bytes: []const u8) !void {
    var index: usize = 0;
    while (index < bytes.len) {
        index += try posix.write(socket, bytes[index..]);
    }
}
```

If we're trying to send "abc123", this could take anywhere from 1 to 6 calls
to posix.write (it would take 6 calls, for example, if every call to
posix.write only wrote a single byte). Now if you're trying to write other data
to this same socket at the same time, messages _will_ get interleaved.

In order for this to work, the client now has a send_queue (doubly linked list).
When one message is sent, it sends the next.

In addition to the above change, the Client is now self-contained with respect
to its lifetime. This is necessary so that completions which come in AFTER our
concept of its lifetime ends, can still be processed. I think all types that
receive completions need to follow this model. This relies on the fact that
kqueue (which I know for a fact) and io_uring (which people seem to imply) handle
socket shutdown properly. It's still a bit messy because of timeout and not
wanting to wait until timeout to accept new connections, but needing to wait
until timeout to cleanup the client.

The self-contained nature of Client makes it difficult to test as a generic. I
removed Client(T). Tests now use real sockets. Some tests had to be removed
because they're too difficult to test over a real connection :(
@krichprollsch krichprollsch merged commit ccacac0 into lightpanda-io:main Mar 10, 2025
9 checks passed
@github-actions github-actions bot locked and limited conversation to collaborators Mar 10, 2025
@karlseguin karlseguin deleted the serialized_writes branch March 11, 2025 02:19
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants