@@ -16,6 +16,7 @@ limitations under the License.
1616#include < algorithm>
1717#include < chrono>
1818#include < cstdlib>
19+ #include < exception>
1920#include < iomanip>
2021#include < map>
2122#include < memory>
@@ -49,9 +50,10 @@ AIBrixBlobStorage::AIBrixBlobStorage(
4950 global_gc_enabled_(global_gc_enabled),
5051 global_gc_interval_s_(std::chrono::seconds(global_gc_interval_s)),
5152 global_ttl_s_(std::chrono::seconds(global_ttl_s)),
52- ghost_fifo_(capacity_),
53- small_fifo_(capacity_ * kSmallFifoCapacityRatio ),
54- main_fifo_(capacity_ - capacity_ * kSmallFifoCapacityRatio ,
53+ ghost_fifo_(capacity_ / chunk_size),
54+ small_fifo_(capacity_ / chunk_size * kSmallFifoCapacityRatio ),
55+ main_fifo_(capacity_ / chunk_size -
56+ capacity_ / chunk_size * kSmallFifoCapacityRatio ,
5557 kMinEviction ) {
5658 kv_cache_ns_ = std::regex_replace (kv_cache_ns_, std::regex (" /" ), " _" );
5759 kv_cache_ns_ = std::regex_replace (kv_cache_ns_ + " _" , std::regex (" _+" ), " _" );
@@ -221,32 +223,34 @@ Status AIBrixBlobStorage::GetTokenChunkHashes(
221223 return Status::OK ();
222224}
223225
224- #define DEFINE_TASK_FN (FN, OP, CB ) \
225- auto FN = [this , &prefix, &tokens, &kv_tensors, cb = CB]( \
226- size_t i, \
227- std::shared_ptr<KVCacheChunkBuilder> builder) -> Status { \
228- auto chunk_size = this ->chunk_size_ ; \
229- if (builder == nullptr ) { \
230- return Status::OK (); \
231- } \
232- \
233- std::vector<int > my_prefix (prefix.begin (), prefix.end ()); \
234- if (i > 0 ) { \
235- my_prefix.insert (my_prefix.end (), tokens.begin (), \
236- tokens.begin () + i * chunk_size); \
237- } \
238- std::vector<int > my_tokens (tokens.begin () + i * chunk_size, \
239- tokens.begin () + (i + 1 ) * chunk_size); \
240- \
241- std::vector<std::vector<std::pair<LLMKV, LLMKV>>> my_kv_tensors ( \
242- kv_tensors.begin () + i * chunk_size, \
243- kv_tensors.begin () + (i + 1 ) * chunk_size); \
244- \
245- auto status = builder->OP (my_prefix, my_tokens, my_kv_tensors); \
246- if (status.ok ()) { \
247- cb (i, my_kv_tensors); \
248- } \
249- return status; \
226+ #define DEFINE_TASK_FN (FN, OP, CB ) \
227+ auto FN = [this , &prefix, &tokens, &kv_tensors, cb = CB]( \
228+ size_t i, \
229+ std::shared_ptr<KVCacheChunkBuilder> builder) -> Status { \
230+ auto chunk_size = this ->chunk_size_ ; \
231+ if (builder == nullptr ) { \
232+ return Status::OK (); \
233+ } \
234+ \
235+ std::vector<int > my_prefix (prefix.begin (), prefix.end ()); \
236+ if (i > 0 ) { \
237+ my_prefix.insert (my_prefix.end (), tokens.begin (), \
238+ tokens.begin () + i * chunk_size); \
239+ } \
240+ std::vector<int > my_tokens (tokens.begin () + i * chunk_size, \
241+ tokens.begin () + (i + 1 ) * chunk_size); \
242+ \
243+ std::vector<std::vector<std::pair<LLMKV, LLMKV>>> my_kv_tensors ( \
244+ kv_tensors.begin () + i * chunk_size, \
245+ kv_tensors.begin () + (i + 1 ) * chunk_size); \
246+ \
247+ try { \
248+ auto status = builder->OP (my_prefix, my_tokens, my_kv_tensors); \
249+ if (status.ok ()) { \
250+ cb (i, my_kv_tensors); \
251+ } \
252+ return status; \
253+ } catch (const std::exception& e) { return Status::IOError (e.what ()); } \
250254 }
251255
252256#define WAIT_TASK_RESULTS (TIDS, COUNTER, FIRST_ERROR, OBJ_NAMES ) \
@@ -864,12 +868,16 @@ Status AIBrixBlobStorage::GlobalGCFunc() {
864868 return ; \
865869 } \
866870 LOG (INFO) << #NAME " started" ; \
867- Status status = self->NAME ##Func (); \
868- if (!status.ok ()) { \
869- LOG (ERROR) << #NAME " failed: " << status.ToString (); \
870- /* Not a fatal error and wait for next time */ \
871- } else { \
872- LOG (INFO) << #NAME " completed" ; \
871+ try { \
872+ Status status = self->NAME ##Func (); \
873+ if (!status.ok ()) { \
874+ LOG (ERROR) << #NAME " failed: " << status.ToString (); \
875+ /* Not a fatal error and wait for next time */ \
876+ } else { \
877+ LOG (INFO) << #NAME " completed" ; \
878+ } \
879+ } catch (const std::exception& e) { \
880+ LOG (ERROR) << #NAME " failed: " << e.what (); \
873881 } \
874882 last_time = std::chrono::duration_cast<std::chrono::seconds>( \
875883 std::chrono::system_clock::now ().time_since_epoch ()) \
0 commit comments