-
Notifications
You must be signed in to change notification settings - Fork 13.7k
rpc : reuse compute graphs #15405
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
rpc : reuse compute graphs #15405
Conversation
|
The time spent on serializing/deserializing the graph is quite small compared to the graph compute time. I have measured ~1% tg improvement on 1Gpbs connection: master
PR
Not sure if its worth the complexity we are adding here ... |
|
When I'm running over RPC, I'm noticing the amount of data sent from the main host to the RPC servers is very asymmetric during token generation, eg:
Is this the compute graph getting sent (which I can only think must have all the sin/cos values for the positional encodings or something to have this large disparity?) and the hidden state(s) getting sent back? |
|
You can set For |
Thanks! I'll give this a try and see if I can see what is getting sent. |
|
Running the client with |
|
I think I've deciphered this now (it was confusing because I was also running speculative decoding!).
So this is this is the hidden state stored as This appears to be 2x the context length, so I assume it is something holding position encodings. So overall we are sending which is a |
|
The size of the compute graph is substantial in your case (423040 bytes), so reusing the graph (what this PR is about) could bring noticeable improvement for you. I will rebase this PR, so you can test in your environment. |
Thanks! |
Store compute graphs on the server side and reuse them when possible. Compute graphs are kept in a ring buffer with fixed size, so we can avoid serializing and deserializing the same graph every time. Add two new commands: * RPC_CMD_GRAPH_COMPUTE_AND_STORE -- store the graph, compute it and return its ID * RPC_CMD_GRAPH_RECOMPUTE -- recompute the graph with the given ID Currently there is no good way to associate an ID with `ggml_cgraph`, so we abuse `tensor->extra` of the first node for this purpose.
537b237 to
2272e04
Compare
|
@jukofyork I have rebased the PR and I will appreciate if you test it in your environment. With this patch the compute graph is stored on the server side and there is no need to send it for every token. Note that you need to rebuild both client and server parts. |
Thanks! I'm away for a couple of days so will be Thursday/Friday before I can boot up the machines to test it. |
|
This makes quite a big difference for me: master
PR
|
|
@jukofyork this is great news, I didn't have a chance to test this patch with larger graphs, thanks for confirming it makes a significant improvement @slaren Is there a better way to associate an ID with |
|
I've been looking at the RPC code and wonder if there might be a way to simplify/unify all this and remove the It's pretty cheap to hash data using #16753 (and with a few hours work I can probably decipher the 128-bit version and get an extra 1.5x performance for SSE2...), so what about if we removed the existing tensor-specific hashing and instead did it in the // RPC request : | rpc_cmd (1 byte) | request_size (8 bytes) | request_data (request_size bytes) |
// No response
static bool send_rpc_cmd(const std::shared_ptr<socket_t> & sock, enum rpc_cmd cmd, const void * input, size_t input_size) {
uint8_t cmd_byte = cmd;
if (!send_data(sock->fd, &cmd_byte, sizeof(cmd_byte))) {
return false;
}
if (!send_data(sock->fd, &input_size, sizeof(input_size))) {
return false;
}
if (!send_data(sock->fd, input, input_size)) {
return false;
}
return true;
}
// RPC request : | rpc_cmd (1 byte) | request_size (8 bytes) | request_data (request_size bytes) |
// RPC response: | response_size (8 bytes) | response_data (response_size bytes) |
static bool send_rpc_cmd(const std::shared_ptr<socket_t> & sock, enum rpc_cmd cmd, const void * input, size_t input_size, void * output, size_t output_size) {
if (!send_rpc_cmd(sock, cmd, input, input_size)) {
return false;
}
// TODO: currently the output_size is always known, do we need support for commands with variable output size?
// even if we do, we can skip sending output_size from the server for commands with known output size
uint64_t out_size;
if (!recv_data(sock->fd, &out_size, sizeof(out_size))) {
return false;
}
if (out_size != output_size) {
return false;
}
if (!recv_data(sock->fd, output, output_size)) {
return false;
}
return true;
}So: The RPC servers would still have the (optional) non-volatile cache for large transfers and the code above would need then to negotiate these transfers, but since these are the large tensors the added latency would be insignificant and this is essentially what the But we would also introduce a volatile LRU cache:
This seems to have lots of benefits:
The only thing I'm not so sure about is if we are ever sending any pointers (or non-static node indices, etc) over the RPC channel? If so then that will make things much harder and probably a lot more work (but not impossible, eg: using the ideas from https://theboostcpplibraries.com/boost.serialization-pointers-and-references). |
|
I made a lot of ninja edits just now so if reading from the email notification, please see the comment instead! |
|
I am not sure I understand your proposal. As of now, there are only two commands that benefit from caching -- Also note that my implementation is reusing compute graphs without serializing them first, so we don't waste CPU time for serialization and hashing. |
I don't think there is a good way to do this at the moment. This implementation is not reliable either. To do this in a reliable way, you would need to check every node of the graph to determine if it is changed, similar to what the CUDA backend does to implement CUDA graphs. In the future with the graph plan API this may be unnecessary.
With |
I tried this a few days ago but it didn't make any difference for me. Not sure if it's the extra |
I've got to go out, but will try and give a more detailed example of what I mean when I get back in. The basic idea is just to extract all the "communication channel" stuff (requiring tunable heuristic, etc) away from the clean RPC interface. The eg: The simple |
Here's what I tried for both static bool send_msg(sockfd_t sockfd, const void * msg, size_t msg_size) {
const size_t header_size = sizeof(msg_size);
std::vector<uint8_t> buf;
buf.resize(header_size + msg_size);
// header
memcpy(buf.data(), &msg_size, sizeof(msg_size));
// payload
if (msg_size > 0) {
memcpy(buf.data() + header_size, msg, msg_size);
}
// single send
return send_data(sockfd, buf.data(), buf.size());
}static bool send_rpc_cmd(const std::shared_ptr<socket_t> & sock, enum rpc_cmd cmd, const void * input, size_t input_size) {
const size_t header_size = 1 + sizeof(input_size);
std::vector<uint8_t> buf;
buf.resize(header_size + input_size);
// header
buf[0] = static_cast<uint8_t>(cmd);
memcpy(buf.data() + 1, &input_size, sizeof(input_size));
// payload
if (input_size > 0) {
memcpy(buf.data() + header_size, input, input_size);
}
// single send (send_data may still chunk very large buffers, which is fine)
return send_data(sock->fd, buf.data(), buf.size());
} |
Actually this now seems to be giving me quite a boost! With this PR and that branch merged, I'm now getting this:
compared to this the other day:
It may be something else that has changed, so will check next week to be sure, but it seems odd I got around 15 tokens/s when I tested this branch a couple of days ago. |
|
I made a draft PR #16892 to see if this makes any difference to others, or if it only helps with this specific PR, etc. |
|
I also had a go at writing a (very!) hacky proof of concept for the volatile hash last night: llama.cpp/ggml/src/ggml-rpc/ggml-rpc.cpp Line 431 in 3561f8c
// Try use the volatile cache when data size is larger than this threshold
const size_t MIN_CACHE_THRESHOLD = 20 * 1024;
const size_t MAX_CACHE_THRESHOLD = 1024 * 1024;
static bool send_data(sockfd_t sockfd, const void * data, size_t size) {
static std::unordered_set<uint64_t> sent_hashes;
if (size > MIN_CACHE_THRESHOLD && size < MAX_CACHE_THRESHOLD) {
uint64_t hash = generate_hash((const uint8_t*)data, size);
bool is_new = sent_hashes.find(hash) == sent_hashes.end();
uint8_t flag = is_new ? 1 : 0;
if (send(sockfd, (const char*)&flag, sizeof(flag), 0) != sizeof(flag)) {
return false;
}
if (send(sockfd, (const char*)&hash, sizeof(hash), 0) != sizeof(hash)) {
return false;
}
if (!is_new) {
return true;
}
sent_hashes.insert(hash);
}
size_t bytes_sent = 0;
while (bytes_sent < size) {
size_t size_to_send = std::min(size - bytes_sent, MAX_CHUNK_SIZE);
ssize_t n = send(sockfd, (const char *)data + bytes_sent, size_to_send, 0);
if (n < 0) {
GGML_LOG_ERROR("send failed (bytes_sent=%zu, size_to_send=%zu)\n",
bytes_sent, size_to_send);
return false;
}
bytes_sent += (size_t)n;
}
return true;
}
static bool recv_data(sockfd_t sockfd, void * data, size_t size) {
static std::unordered_map<uint64_t, std::vector<uint8_t>> recv_cache;
uint64_t hash = 0;
if (size > MIN_CACHE_THRESHOLD && size < MAX_CACHE_THRESHOLD) {
uint8_t flag;
if (recv(sockfd, (char*)&flag, sizeof(flag), 0) != sizeof(flag)) {
return false;
}
if (recv(sockfd, (char*)&hash, sizeof(hash), 0) != sizeof(hash)) {
return false;
}
if (flag == 0) {
auto it = recv_cache.find(hash);
if (it != recv_cache.end()) {
memcpy(data, it->second.data(), size);
return true;
}
return false;
}
}
size_t bytes_recv = 0;
while (bytes_recv < size) {
size_t size_to_recv = std::min(size - bytes_recv, MAX_CHUNK_SIZE);
ssize_t n = recv(sockfd, (char *)data + bytes_recv, size_to_recv, 0);
if (n < 0) {
GGML_LOG_ERROR("recv failed (bytes_recv=%zu, size_to_recv=%zu)\n",
bytes_recv, size_to_recv);
return false;
}
if (n == 0) {
LOG_DBG("recv returned 0 (peer closed?)\n");
return false;
}
bytes_recv += (size_t)n;
}
if (size > MIN_CACHE_THRESHOLD && size < MAX_CACHE_THRESHOLD) {
recv_cache[hash] = std::vector<uint8_t>((uint8_t*)data, (uint8_t*)data + size);
}
return true;
}It did give a small improvement (from 15.5 tokens/s to 17.5 tokens/s), but either the hashing still has a lot of overhead or the repeated serialisation of the compute graph as mentioned by @rgerganov is very costly, as with this PR I get 19.5 tokens/s in comparison! I then destroyed the branch trying to hack in the code to join the packets after the push linked above, so will have to take another look at this next week and likely start fresh. I did find that the other tensors that were exactly 2x the context length (and suspected was something to do with the position embeddedings) is the same each time and could also be cached (this finally produced a balanced flow over the network of about 4-5MB/s which is purely the hidden state data). |
It's a pity, as this PR does seem to help a lot (maybe mainly large MOE models only though?). From my hacky experiments last night, it definitely looks like @rgerganov is correct that the serialisation+hashing (even using By "visiting every node" does that mean the data of every node needs comparing (as in the full ~450kb of data for my example), or is it just a set of "node IDs" or similar that need comparing? |
|
I wonder if you can simply reuse this approach that I prototyped some time ago for the Metal backend to determine when a graph is the same as the previous one: At the time, I thought it was hacky, but I don't actually see a failure case - it simply does a It didn't end up being used because it didn't lead to any improvements for Metal (#14570). But here it could be a reasonable stopgap until the graph plan API lands. |




Store compute graphs on the server side and reuse them when possible. Compute graphs are kept in a ring buffer with fixed size, so we can avoid serializing and deserializing the same graph every time. Add two new commands:
Currently there is no good way to associate an ID with
ggml_cgraph, so we abusetensor->extraof the first node for this purpose.