Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions benchmark/profile_pipeline_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ def parse_args():
ArgumentHelper.num_tokens_per_iter(tb_group)
ArgumentHelper.max_prefill_iters(tb_group)
ArgumentHelper.communicator(tb_group)
ArgumentHelper.async_(tb_group)

args = parser.parse_args()
return args
Expand All @@ -285,19 +286,19 @@ def main():
random.seed(args.seed)
os.environ['TM_LOG_LEVEL'] = args.log_level
if args.backend == 'turbomind':
engine_config = TurbomindEngineConfig(
max_batch_size=args.concurrency,
tp=args.tp,
cache_max_entry_count=args.cache_max_entry_count,
session_len=args.session_len,
cache_block_seq_len=args.cache_block_seq_len,
model_format=args.model_format,
quant_policy=args.quant_policy,
num_tokens_per_iter=args.num_tokens_per_iter,
max_prefill_iters=args.max_prefill_iters,
enable_prefix_caching=args.enable_prefix_caching,
communicator=args.communicator,
)
engine_config = TurbomindEngineConfig(max_batch_size=args.concurrency,
tp=args.tp,
cache_max_entry_count=args.cache_max_entry_count,
session_len=args.session_len,
cache_block_seq_len=args.cache_block_seq_len,
model_format=args.model_format,
quant_policy=args.quant_policy,
num_tokens_per_iter=args.num_tokens_per_iter,
max_prefill_iters=args.max_prefill_iters,
enable_prefix_caching=args.enable_prefix_caching,
communicator=args.communicator,
enable_metrics=False,
async_=args.async_)
elif args.backend == 'pytorch':
engine_config = PytorchEngineConfig(
cache_max_entry_count=args.cache_max_entry_count,
Expand Down
9 changes: 3 additions & 6 deletions src/turbomind/engine/engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,6 @@ void Engine::Impl::Accept(const Requests& rs, vector<Signal>& signals)
{
auto& s = states_.at(0);

const int offset = s.rc.size();
int index = offset;

vector<unique_ptr<RequestCache>> incoming;
incoming.reserve(rs.size());

Expand Down Expand Up @@ -522,7 +519,7 @@ void Engine::Impl::Schedule()

// dbg("Schedule");

auto outcome = seq_mgr_->Materialize(
seq_mgr_->Materialize(
sequences, context_length, alpha, priorities, param_.max_forward_token_num, param_.max_context_token_num);

vector<int> idxs(sequences.size());
Expand Down Expand Up @@ -703,12 +700,12 @@ void Engine::Impl::Update(BatchData& b, std::vector<Signal>& signals)
s.tokens.insert(s.tokens.end(), c.token_ids + c.seq_len - new_tokens, c.token_ids + c.seq_len);
}
if (TM_UNLIKELY(finished[i])) {
signals.push_back([this, r = c.req, l = c.seq_len] { //
signals.push_back([r = c.req, l = c.seq_len] { //
UpdateState(*r, Request::kFinish, l);
});
}
else if (c.req->stream_output) {
signals.push_back([this, r = c.req, l = c.seq_len] { //
signals.push_back([r = c.req, l = c.seq_len] { //
UpdateState(*r, Request::kOk, l);
});
}
Expand Down
56 changes: 20 additions & 36 deletions src/turbomind/models/llama/BlockManager.cc
Original file line number Diff line number Diff line change
@@ -1,32 +1,15 @@
// Copyright (c) OpenMMLab. All rights reserved.

#include <algorithm>

#include "src/turbomind/models/llama/BlockManager.h"
#include "src/turbomind/utils/cuda_utils.h"
#include "src/turbomind/utils/debug_utils.h"
#include "src/turbomind/utils/logger.h"
#include "src/turbomind/utils/string_utils.h"
#include <algorithm>
#include <iterator>
#include <stdexcept>

namespace turbomind {

size_t GetSyncFreeMemSize(Barrier& barrier, std::atomic<size_t>& value)
{
size_t free{};
size_t total{};
check_cuda_error(cudaMemGetInfo(&free, &total));

// atomicMin
auto old = value.load();
while (old > free && !value.compare_exchange_weak(old, free)) {}

// wait for all ranks
barrier.wait();

return value.load();
}

BlockManager::BlockManager(
size_t block_size, double block_count, int chunk_size, core::Allocator allocator, GetFreeMemSize get_free_size):
block_size_(block_size), allocator_(allocator)
Expand Down Expand Up @@ -106,18 +89,18 @@ size_t BlockManager::GetBlockCount(size_t block_size, double ratio, GetFreeMemSi

void BlockManager::Move(std::vector<int>& src, const std::vector<int>& delta, std::vector<int>& dst)
{
FT_CHECK(src.size() >= delta.size());
TM_CHECK_GE(src.size(), delta.size());
std::vector<int> src1(src.size() - delta.size());
{
auto end = std::set_difference(src.begin(), src.end(), delta.begin(), delta.end(), src1.begin());
FT_CHECK(end == src1.end());
TM_CHECK(end == src1.end());
}
src.swap(src1);

std::vector<int> dst1(dst.size() + delta.size());
{
auto end = std::set_union(dst.begin(), dst.end(), delta.begin(), delta.end(), dst1.begin());
FT_CHECK(end == dst1.end());
TM_CHECK(end == dst1.end());
}
dst.swap(dst1);
}
Expand All @@ -136,10 +119,11 @@ auto BlockManager::Allocate(int count) -> std::pair<BlockIds, UniqueIds>
for (int i = 0; i < count; ++i) {
int idx = free_ids_[i];
auto& b = blocks_[idx];
FT_CHECK(is_free(b)); // pre-condition: uc == 0 && ts == 0
TM_CHECK(is_free(b)); // pre-condition: uc == 0 && ts == 0
b.use_count = 1;
b.unique_id = unique_id_++;
FT_CHECK(is_active(b)); // post-condition
b.timestamp = timestamp_++;
Copy link

Copilot AI Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timestamp is now being set during Allocate, but this appears to be a new addition. The timestamp should logically be set when blocks are allocated, but verify this doesn't cause issues with LRU eviction logic, especially if blocks are allocated but not immediately used.

Copilot uses AI. Check for mistakes.
TM_CHECK(is_active(b)); // post-condition
block_ids[i] = idx;
unique_ids[i] = b.unique_id;
}
Expand All @@ -153,7 +137,7 @@ auto BlockManager::Allocate(int count) -> std::pair<BlockIds, UniqueIds>

void BlockManager::Evict(int count)
{
FT_CHECK(count <= cached_ids_.size());
TM_CHECK_LE(count, cached_ids_.size());
std::vector<int> idxs(cached_ids_);
// get first `count` cached ids according to timestamp
std::nth_element(idxs.begin(), idxs.begin() + count, idxs.end(), [&](int i, int j) {
Expand All @@ -167,10 +151,10 @@ void BlockManager::Evict(int count)
// set as free
for (const auto& idx : idxs) {
auto& b = blocks_[idx];
FT_CHECK(is_cached(b));
TM_CHECK(is_cached(b)); // pre-condition
b.unique_id = 0;
b.timestamp = 0;
FT_CHECK(is_free(b));
TM_CHECK(is_free(b)); // post-condition
}

Move(cached_ids_, idxs, free_ids_);
Expand All @@ -184,10 +168,10 @@ void BlockManager::Free(BlockIds ids)

for (const auto& i : ids) {
auto& b = blocks_[i];
FT_CHECK(is_cached(b)); // uc == 0 && ts != 0
TM_CHECK(is_cached(b)); // pre-condition
b.unique_id = 0;
b.timestamp = 0;
FT_CHECK(is_free(b));
TM_CHECK(is_free(b)); // post-condition
}

Move(cached_ids_, ids, free_ids_);
Expand All @@ -200,10 +184,10 @@ int BlockManager::Unlock(const BlockIds& ids)

for (const auto& i : ids) {
auto& b = blocks_[i];
FT_CHECK(is_active(b)); // pre-condition: uc > 0
TM_CHECK(is_active(b)); // pre-condition
if (--b.use_count == 0) {
unlock.push_back(b.id);
FT_CHECK(is_cached(b)); // post-condition
TM_CHECK(is_cached(b)); // post-condition
}
}

Expand All @@ -224,7 +208,7 @@ int BlockManager::Lock(const BlockIds& ids)
auto& b = blocks_[i];
if (++b.use_count == 1) {
lock.push_back(i);
FT_CHECK(is_active(b));
TM_CHECK(is_active(b)); // post-condition
}
}

Expand All @@ -240,14 +224,14 @@ int BlockManager::Lock(const BlockIds& ids)
void BlockManager::Touch(const BlockIds& ids)
{
std::for_each(ids.crbegin(), ids.crend(), [this](int i) {
FT_CHECK(is_active(blocks_[i]));
TM_CHECK(is_active(blocks_[i]));
blocks_[i].timestamp = timestamp_++;
});
}

int BlockManager::Verify(const std::vector<int>& block_ids, const std::vector<uint64_t>& unique_ids)
{
FT_CHECK(block_ids.size() == unique_ids.size());
TM_CHECK_EQ(block_ids.size(), unique_ids.size());
int valid = block_ids.size();
for (int i = 0; i < block_ids.size(); ++i) {
if (unique_id(block_ids[i]) != unique_ids[i]) {
Expand All @@ -260,8 +244,8 @@ int BlockManager::Verify(const std::vector<int>& block_ids, const std::vector<ui
miss += (unique_id(block_ids[i]) != unique_ids[i]);
}
// All later blocks should have been invalidated
FT_CHECK_WITH_INFO(miss == (int)block_ids.size() - valid,
fmtstr("count = %d, valid = %d, miss = %d", (int)block_ids.size(), valid, miss));
TM_CHECK_EQ(miss, (int)block_ids.size() - valid)
<< fmtstr("count = %d, valid = %d, miss = %d", (int)block_ids.size(), valid, miss);
return valid;
}

Expand Down
2 changes: 0 additions & 2 deletions src/turbomind/models/llama/BlockManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ struct Snapshot {

using GetFreeMemSize = std::function<size_t()>;

size_t GetSyncFreeMemSize(Barrier& barrier, std::atomic<size_t>& value);

class BlockManager {
public:
explicit BlockManager(
Expand Down
56 changes: 26 additions & 30 deletions src/turbomind/models/llama/BlockTrie.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,37 +22,35 @@ BlockTrie::BlockTrie(size_t block_len, std::shared_ptr<BlockManager> block_manag

std::tuple<BlockIds, UniqueIds> BlockTrie::Match(const Sequence& seq)
{
BlockIds matched_blocks;
UniqueIds matched_unique_ids;
BlockIds block_ids;
UniqueIds unique_ids;

std::shared_ptr<TrieNode> curr_node = root_;
int num_matched = 0;
auto node = root_;
auto first = seq.prompt.begin();

// Warning: Do not use "<=" operator even when seq.prompt length is evenly
// divisible by block_seq_len_. This may produce an input_length of zero for
// the sequence, violating the precondition checked in LlamaBatch::Forward.
while (num_matched + block_seq_len_ < seq.prompt.size()) {
std::vector<int> curr_tokens(seq.prompt.begin() + num_matched,
seq.prompt.begin() + num_matched + block_seq_len_);
size_t hash_key = hash(curr_tokens);

auto it = curr_node->children.find(hash_key);

if (it == curr_node->children.end()) {
break;
// divisible by block_seq_len_. The model needs at least one input token to generate output.
while (first + block_seq_len_ < seq.prompt.end()) {
const std::vector<int> segment{first, first + block_seq_len_};
const size_t hash_key = hash(segment);
if (const auto it = node->children.find(hash_key); it != node->children.end()) {
if (segment == it->second->tokens) {
block_ids.push_back(it->second->block_id);
unique_ids.push_back(it->second->block_unique_id);
node = it->second;
first += block_seq_len_;
}
else {
TM_LOG_WARNING("hash collision detected");
break;
}
}

if (curr_tokens != it->second->tokens) {
TM_LOG_WARNING("hash key cache hit, but tokens are not the same");
else {
break;
}

matched_blocks.emplace_back(it->second->block_id);
matched_unique_ids.emplace_back(it->second->block_unique_id);
curr_node = it->second;
num_matched += block_seq_len_;
}
return std::make_tuple(matched_blocks, matched_unique_ids);

return std::make_tuple(block_ids, unique_ids);
}

std::tuple<BlockIds, UniqueIds> BlockTrie::Cache(const Sequence& seq, const std::vector<int>& tokens)
Expand All @@ -62,7 +60,6 @@ std::tuple<BlockIds, UniqueIds> BlockTrie::Cache(const Sequence& seq, const std:
TM_CHECK_LE(seq.cache_len, seq.blocks.size() * block_seq_len_);

auto node = root_;
int idx = 0;

BlockIds cache_block_ids;
UniqueIds cache_block_unique_ids;
Expand All @@ -75,15 +72,14 @@ std::tuple<BlockIds, UniqueIds> BlockTrie::Cache(const Sequence& seq, const std:
auto start = tokens.begin() + idx * block_seq_len_;
auto end = start + block_seq_len_;

std::vector<int> curr_tokens(start, end);
// TODO(lvhan): add salt to ensure the hash security
size_t hash_key = hash(curr_tokens);
const std::vector<int> segment(start, end);
const size_t hash_key = hash(segment); // TODO(lvhan): add salt to ensure the hash security

int block_id = seq.blocks[idx];
uint64_t block_unique_id = seq.block_unique_ids[idx];

if (auto it = node->children.find(hash_key); it != node->children.end()) {
if (curr_tokens == it->second->tokens) { // fast-forward
if (segment == it->second->tokens) { // fast-forward
node = it->second;
node->block_id = block_id;
node->block_unique_id = block_unique_id;
Expand All @@ -97,7 +93,7 @@ std::tuple<BlockIds, UniqueIds> BlockTrie::Cache(const Sequence& seq, const std:
// insert new node
node = node->children.emplace_hint(it, hash_key, std::make_shared<TrieNode>())->second;
node->hash_key = hash_key;
node->tokens = curr_tokens;
node->tokens = segment;
node->block_id = block_id;
node->block_unique_id = block_unique_id;
new_cached += block_seq_len_;
Expand Down
Loading
Loading