Skip to content

Commit 5c9be64

Browse files
committed
[fbuffers] Incremental loading of future files
Handles the logic for incrementally loading files and tensors is model shards.
1 parent 0f08432 commit 5c9be64

File tree

4 files changed

+389
-2
lines changed

4 files changed

+389
-2
lines changed

src/llama-model-load-input.cpp

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
#include "llama-model-load-input.h"
2+
#include <sstream>
3+
#include "llama-mmap.h"
24

35
namespace load_input_variant {
46

@@ -12,12 +14,51 @@ const char * identifier(load_input_t & load_input) {
1214
}
1315

1416
fname_load_input split_name_from_variant(load_input_t & load_input) {
17+
if (std::holds_alternative<buffer_future_load_input>(load_input)) {
18+
auto future_input = std::get<buffer_future_load_input>(load_input);
19+
return fname_load_input{ future_input.promise_key, future_input.splits };
20+
}
1521
auto file_input = std::get<fname_load_input>(load_input);
1622
return file_input;
1723
}
1824

1925
bool variant_supports_split_load(load_input_t & load_input) {
20-
return std::holds_alternative<fname_load_input>(load_input);
26+
return std::holds_alternative<fname_load_input>(load_input) ||
27+
std::holds_alternative<buffer_future_load_input>(load_input);
28+
}
29+
30+
bool variant_supports_split_load_from_memory(load_input_t & load_input) {
31+
return std::holds_alternative<buffer_future_load_input>(load_input);
32+
}
33+
34+
std::optional<std::set<std::string>> parse_tensor_list_from_future(load_input_t & load_input) {
35+
std::set<std::string> tensor_names;
36+
37+
if (!std::holds_alternative<buffer_future_load_input>(load_input)) {
38+
return std::nullopt;
39+
}
40+
41+
const auto & future_input = std::get<buffer_future_load_input>(load_input);
42+
43+
// Open and read the tensor list file
44+
llama_future_file_buffer_ro tensor_file(future_input.tensor_list_file, future_input.context);
45+
std::unique_ptr<llama_file_buffer_ro> file_buffer = tensor_file.extract();
46+
47+
// Read the entire buffer as bytes and convert to string
48+
std::vector<uint8_t> buffer;
49+
std::basic_istream<uint8_t> stream(file_buffer->streambuf.get());
50+
std::istreambuf_iterator<uint8_t> begin(stream), end;
51+
buffer.assign(begin, end);
52+
53+
// Convert bytes to string and split by newlines
54+
std::string content(reinterpret_cast<const char *>(buffer.data()), buffer.size());
55+
std::istringstream line_stream(content);
56+
std::string line;
57+
while (std::getline(line_stream, line)) {
58+
tensor_names.insert(line);
59+
}
60+
61+
return tensor_names;
2162
}
2263

2364
} // namespace load_input_variant

src/llama-model-load-input.h

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
#include <stdint.h>
44

55
#include <memory>
6+
#include <optional>
7+
#include <set>
68
#include <string>
79
#include <variant>
810
#include <vector>
@@ -18,14 +20,27 @@ struct buffer_load_input {
1820
std::unique_ptr<std::basic_streambuf<uint8_t>> & streambuf;
1921
};
2022

23+
struct buffer_future_load_input {
24+
const std::string & promise_key;
25+
const std::string & context;
26+
std::vector<std::string> & splits;
27+
const std::string & tensor_list_file;
28+
};
29+
2130
} // namespace load_input_variant
2231

23-
using load_input_t = std::variant<load_input_variant::fname_load_input, load_input_variant::buffer_load_input>;
32+
using load_input_t = std::variant<load_input_variant::fname_load_input, load_input_variant::buffer_load_input,
33+
load_input_variant::buffer_future_load_input>;
2434

2535
namespace load_input_variant {
2636
const char * identifier(load_input_t & load_input);
2737

2838
fname_load_input split_name_from_variant(load_input_t & load_input);
2939

3040
bool variant_supports_split_load(load_input_t & load_input);
41+
42+
bool variant_supports_split_load_from_memory(load_input_t & load_input);
43+
44+
/// @brief Parse tensor list from future file or nullopt if not a future file
45+
std::optional<std::set<std::string>> parse_tensor_list_from_future(load_input_t & load_input);
3146
} // namespace load_input_variant

src/llama-model-load.cpp

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "llama-model-load.h"
22

3+
#include <cstdint>
34
#include <memory>
45
#include <stdexcept>
56
#include <variant>
@@ -19,6 +20,16 @@ gguf_file_load::gguf_file_load(struct ggml_context ** ctx, load_input_t load_inp
1920
throw std::runtime_error(format("%s: failed to load model from %s", __func__, file_input.fname.c_str()));
2021
}
2122
file = std::make_unique<llama_file_disk>(file_input.fname.c_str(), "ro");
23+
} else if (std::holds_alternative<buffer_future_load_input>(load_input)) {
24+
const auto & future_input = std::get<buffer_future_load_input>(load_input);
25+
auto future_file =
26+
std::make_unique<llama_future_file_buffer_ro>(future_input.promise_key, future_input.context);
27+
std::unique_ptr<llama_file_buffer_ro> file_buffer = future_file->extract();
28+
meta.reset(gguf_init_from_buffer(*file_buffer->streambuf, params));
29+
if (!meta) {
30+
throw std::runtime_error(format("%s: failed to load model from buffer", __func__));
31+
}
32+
file = std::move(file_buffer);
2233
} else {
2334
const auto & buffer_input = std::get<buffer_load_input>(load_input);
2435
meta.reset(gguf_init_from_buffer(*buffer_input.streambuf, params));
@@ -28,3 +39,196 @@ gguf_file_load::gguf_file_load(struct ggml_context ** ctx, load_input_t load_inp
2839
file = std::make_unique<llama_file_buffer_ro>(std::move(buffer_input.streambuf));
2940
}
3041
}
42+
43+
gguf_file_load SplitLoad::load_split_gguf(struct ggml_context ** ctx, const char * fname_split,
44+
load_input_t & load_input, std::vector<std::string> & splits) {
45+
using namespace load_input_variant;
46+
if (std::holds_alternative<fname_load_input>(load_input)) {
47+
return gguf_file_load(ctx, fname_load_input{ fname_split, splits });
48+
}
49+
if (std::holds_alternative<buffer_future_load_input>(load_input)) {
50+
auto future_input = std::get<buffer_future_load_input>(load_input);
51+
return gguf_file_load(
52+
ctx, buffer_future_load_input{ fname_split, future_input.context, splits, future_input.tensor_list_file });
53+
}
54+
return gguf_file_load(ctx, load_input);
55+
}
56+
57+
SplitLoad::SplitLoad(load_input_t & load_input, load_input_variant::fname_load_input base_split, uint16_t idx,
58+
std::string kv_split_no) :
59+
load_input(load_input),
60+
base_split(base_split),
61+
idx(idx),
62+
kv_split_no(std::move(kv_split_no)) {}
63+
64+
IncrementalSplitsTensorLoad::IncrementalSplitsTensorLoad(struct ggml_context * ctx, struct llama_model_loader & ml,
65+
gguf_file_load & base_split,
66+
std::set<std::string> tensor_list) :
67+
expected_tensors(std::move(tensor_list)) {
68+
ml.process_loaded_gguf(ctx, base_split, 0);
69+
_process_split(ctx, ml, 0);
70+
}
71+
72+
struct ggml_context * SplitLoad::load(llama_model_loader & ml) {
73+
if (loaded) {
74+
return ml.contexts[idx].get();
75+
}
76+
77+
struct ggml_context * ctx = ml.contexts.back().get();
78+
79+
const char * fname_split = base_split.splits[idx].c_str();
80+
LLAMA_LOG_INFO("loading split-file %s\n", fname_split);
81+
82+
gguf_file_load split_gguf = gguf_file_load(load_split_gguf(&ctx, fname_split, load_input, base_split.splits));
83+
gguf_context_ptr & split_meta = split_gguf.meta;
84+
85+
if (idx > 0) {
86+
const int kid = gguf_find_key(split_meta.get(), kv_split_no.c_str());
87+
if (kid < 0) {
88+
throw std::runtime_error(format("missing key %s in GGUF split %s", kv_split_no.c_str(), fname_split));
89+
}
90+
int idx_gguf = gguf_get_val_u16(split_meta.get(), kid);
91+
if (idx_gguf != idx) {
92+
throw std::runtime_error(
93+
format("invalid split file idx: %d (file: %s), expected %d", idx_gguf, fname_split, idx));
94+
}
95+
}
96+
97+
// Check that this split's idx matches the expected position in ml.files
98+
if (!ml.files.empty() && idx != ml.files.size()) {
99+
throw std::runtime_error(
100+
format("invalid split file loading order: got idx %d but expected %zu based on ml.files size", idx,
101+
ml.files.size()));
102+
}
103+
104+
ml.process_loaded_gguf(ctx, split_gguf, idx);
105+
106+
loaded = true;
107+
return ctx;
108+
}
109+
110+
void IncrementalSplitsTensorLoad::add_split(SplitLoad splitLoad) {
111+
// +1 because first split is expected to have been already loaded (not delayed)
112+
split_info[delayed_files.size() + 1] = SplitInfo();
113+
delayed_files.emplace_back(std::move(splitLoad));
114+
}
115+
116+
void IncrementalSplitsTensorLoad::_load_split(struct llama_model_loader & ml, uint16_t idx) {
117+
// -1 because first split is expected to have been already loaded (not delayed and not present in delayed_files)
118+
const struct ggml_context * ctx = delayed_files[idx - 1].load(ml);
119+
_process_split(ctx, ml, idx);
120+
}
121+
122+
void IncrementalSplitsTensorLoad::_process_split(const struct ggml_context * ctx, struct llama_model_loader & ml,
123+
uint16_t idx) {
124+
SplitInfo & split = split_info[idx];
125+
126+
for (ggml_tensor * cur = ggml_get_first_tensor(ctx); cur; cur = ggml_get_next_tensor(ctx, cur)) {
127+
std::string tensor_name = std::string(cur->name);
128+
split.total_tensor_count++;
129+
130+
// Add tensor info with initial loaded state as false
131+
tensor_info[tensor_name] = TensorInfo{ idx, false };
132+
133+
auto it = ml.weights_map.find(tensor_name);
134+
if (it == ml.weights_map.end()) {
135+
throw std::runtime_error(format("tensor '%s' not found in weights_map", tensor_name.c_str()));
136+
}
137+
split.data_size += ggml_nbytes(it->second.tensor);
138+
}
139+
}
140+
141+
uint16_t IncrementalSplitsTensorLoad::load_tensor_metadata(struct llama_model_loader & ml, const char * tensor_name,
142+
ggml_tensor ** out_tensor_metadata) {
143+
LLAMA_LOG_CMAKE_DEBUG("%s: loading tensor %s (tensor_meta=%p, delayed_loaded=%zu, delayed_files.size=%zu)\n",
144+
__func__, tensor_name, (void *) *out_tensor_metadata, delayed_loaded, delayed_files.size());
145+
if (expected_tensors.find(tensor_name) == expected_tensors.end()) {
146+
throw std::runtime_error(format("unknown tensor not expected in split files: %s", tensor_name));
147+
}
148+
while (!(*out_tensor_metadata) && delayed_loaded < delayed_files.size()) {
149+
// +1 because first split is expected to have been already loaded (not delayed)
150+
_load_split(ml, delayed_loaded + 1);
151+
*out_tensor_metadata = ml.get_tensor_meta(tensor_name);
152+
delayed_loaded++;
153+
if (*out_tensor_metadata) {
154+
LLAMA_LOG_CMAKE_DEBUG("%s: tensor %s found in file %zu\n", __func__, tensor_name, delayed_loaded);
155+
}
156+
if (delayed_loaded == delayed_files.size() && ml.weights_map.size() != expected_n_tensors()) {
157+
throw std::runtime_error(
158+
format("finished incrementally loading all splits but expected %zu tensors, got %zu",
159+
expected_n_tensors(), ml.weights_map.size()));
160+
}
161+
}
162+
uint16_t split_idx = get_split_idx_for_tensor(tensor_name);
163+
164+
// Mark tensor as loaded and increment split's loaded count
165+
auto tensor_it = tensor_info.find(tensor_name);
166+
if (!tensor_it->second.is_loaded) {
167+
tensor_it->second.is_loaded = true;
168+
split_info[split_idx].loaded_tensor_count++;
169+
}
170+
171+
return split_idx;
172+
}
173+
174+
uint16_t IncrementalSplitsTensorLoad::get_split_idx_for_tensor(const char * tensor_name) const {
175+
return _get_tensor_info_iterator(tensor_name)->second.split_idx;
176+
}
177+
178+
std::size_t IncrementalSplitsTensorLoad::get_split_data_size(uint16_t split_idx) const {
179+
return _get_split_info_iterator(split_idx)->second.data_size;
180+
}
181+
182+
void IncrementalSplitsTensorLoad::print_currently_known_tensors() const {
183+
LLAMA_LOG_INFO("Current incremental loaded tensors:\n");
184+
for (const auto & it : tensor_info) {
185+
LLAMA_LOG_INFO("Tensor '%s' in split %d (loaded: %s)\n", it.first.c_str(), it.second.split_idx,
186+
it.second.is_loaded ? "yes" : "no");
187+
}
188+
}
189+
190+
bool IncrementalSplitsTensorLoad::all_tensors_are_loaded(uint16_t split_idx) const {
191+
auto it = _get_split_info_iterator(split_idx);
192+
const SplitInfo & split = it->second;
193+
LLAMA_LOG_CMAKE_DEBUG("Loaded tensor count for split %d: %u/%u\n", split_idx, split.loaded_tensor_count,
194+
split.total_tensor_count);
195+
return split.all_tensors_loaded();
196+
}
197+
198+
std::size_t IncrementalSplitsTensorLoad::expected_n_tensors() {
199+
return expected_tensors.size();
200+
}
201+
202+
void IncrementalSplitsTensorLoad::release_split(struct llama_model_loader & ml, uint16_t split_idx) {
203+
// Let destructor of the smart pointer do the release of memory
204+
ml.files[split_idx] = nullptr;
205+
}
206+
207+
std::map<std::string, IncrementalSplitsTensorLoad::TensorInfo>::const_iterator
208+
IncrementalSplitsTensorLoad::_get_tensor_info_iterator(const char * tensor_name) const {
209+
auto it = tensor_info.find(tensor_name);
210+
if (it == tensor_info.end()) {
211+
throw std::runtime_error(format("tensor '%s' not found in tensor_info map", tensor_name));
212+
}
213+
return it;
214+
}
215+
216+
std::map<uint16_t, IncrementalSplitsTensorLoad::SplitInfo>::const_iterator
217+
IncrementalSplitsTensorLoad::_get_split_info_iterator(uint16_t split_idx) const {
218+
auto it = split_info.find(split_idx);
219+
if (it == split_info.end()) {
220+
throw std::runtime_error(format("split index %d not found in split_info map", split_idx));
221+
}
222+
return it;
223+
}
224+
225+
bool IncrementalSplitsTensorLoad::SplitInfo::all_tensors_loaded() const {
226+
return loaded_tensor_count >= total_tensor_count;
227+
}
228+
229+
bool IncrementalSplitsTensorLoad::tensor_ignored(const std::optional<IncrementalSplitsTensorLoad> & splits_tensor_load,
230+
const char * tensor_name) {
231+
return !splits_tensor_load.has_value() ||
232+
(splits_tensor_load.has_value() &&
233+
splits_tensor_load->expected_tensors.find(tensor_name) == splits_tensor_load->expected_tensors.end());
234+
}

0 commit comments

Comments
 (0)