Skip to content

Commit 9e607c2

Browse files
committed
Simple proof of concept for the volatile caching
1 parent d9f3da6 commit 9e607c2

File tree

1 file changed

+51
-0
lines changed

1 file changed

+51
-0
lines changed

ggml/src/ggml-rpc/ggml-rpc.cpp

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,30 @@ static std::shared_ptr<socket_t> create_server_socket(const char * host, int por
428428
return sock;
429429
}
430430

431+
// Try use the volatile cache when data size is larger than this threshold
432+
const size_t CACHE_THRESHOLD = 1024 * 1024;
433+
431434
static bool send_data(sockfd_t sockfd, const void * data, size_t size) {
435+
static std::unordered_set<uint64_t> sent_hashes;
436+
437+
if (size >= CACHE_THRESHOLD) {
438+
uint64_t hash = generate_hash((const uint8_t*)data, size);
439+
bool is_new = sent_hashes.find(hash) == sent_hashes.end();
440+
441+
uint8_t flag = is_new ? 1 : 0;
442+
if (send(sockfd, (const char*)&flag, sizeof(flag), 0) != sizeof(flag)) {
443+
return false;
444+
}
445+
if (send(sockfd, (const char*)&hash, sizeof(hash), 0) != sizeof(hash)) {
446+
return false;
447+
}
448+
449+
if (!is_new) {
450+
return true;
451+
}
452+
sent_hashes.insert(hash);
453+
}
454+
432455
size_t bytes_sent = 0;
433456
while (bytes_sent < size) {
434457
size_t size_to_send = std::min(size - bytes_sent, MAX_CHUNK_SIZE);
@@ -444,6 +467,29 @@ static bool send_data(sockfd_t sockfd, const void * data, size_t size) {
444467
}
445468

446469
static bool recv_data(sockfd_t sockfd, void * data, size_t size) {
470+
static std::unordered_map<uint64_t, std::vector<uint8_t>> recv_cache;
471+
472+
uint64_t hash = 0;
473+
474+
if (size >= CACHE_THRESHOLD) {
475+
uint8_t flag;
476+
if (recv(sockfd, (char*)&flag, sizeof(flag), 0) != sizeof(flag)) {
477+
return false;
478+
}
479+
if (recv(sockfd, (char*)&hash, sizeof(hash), 0) != sizeof(hash)) {
480+
return false;
481+
}
482+
483+
if (flag == 0) {
484+
auto it = recv_cache.find(hash);
485+
if (it != recv_cache.end()) {
486+
memcpy(data, it->second.data(), size);
487+
return true;
488+
}
489+
return false;
490+
}
491+
}
492+
447493
size_t bytes_recv = 0;
448494
while (bytes_recv < size) {
449495
size_t size_to_recv = std::min(size - bytes_recv, MAX_CHUNK_SIZE);
@@ -459,6 +505,11 @@ static bool recv_data(sockfd_t sockfd, void * data, size_t size) {
459505
}
460506
bytes_recv += (size_t)n;
461507
}
508+
509+
if (size >= CACHE_THRESHOLD) {
510+
recv_cache[hash] = std::vector<uint8_t>((uint8_t*)data, (uint8_t*)data + size);
511+
}
512+
462513
return true;
463514
}
464515

0 commit comments

Comments
 (0)