Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions kt-kernel/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,13 @@ python -m sglang.launch_server \

See [KT-Kernel Parameters](#kt-kernel-parameters) section below for detailed parameter tuning guidelines.

**Load from Specified CPU Cores:**
- Uses the `KT_NUMA_CPU_OFFSET` environment variable to load models from specified CPU cores.
- Example: Load models from cores with cpuid 8 (NUMA 0) and 48 (NUMA 1), while cores with cpuid 0-7 and 40-47 remain idle:
```bash
export KT_NUMA_CPU_OFFSET=8
```
Comment on lines +195 to +200
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The explanation for KT_NUMA_CPU_OFFSET could be more explicit about how the offset is applied across multiple NUMA nodes. The current example is a bit brief. To avoid ambiguity for users with different hardware configurations, consider clarifying that the offset is applied to the logical core index within each NUMA node. A more detailed example, similar to the one in the pull request description, would be very helpful.


### Complete Example: Qwen3-30B-A3B

This example demonstrates the full workflow from downloading weights to launching the server, showing both **AMX backend** and **LLAMAFILE backend** options.
Expand Down
48 changes: 39 additions & 9 deletions kt-kernel/cpu_backend/worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <chrono>
#include <cstdio>
#include <stdexcept>
#include <cstdlib> // Added for getenv

#include "hwloc.h"

Expand All @@ -45,7 +46,20 @@ InNumaPool::InNumaPool(int max_thread_num, int numa_id, int threads_id_start) {
hwloc_bitmap_t cpuset;
hwloc_topology_init(&topology);
hwloc_topology_load(topology);
printf("In Numa Worker Pool at NUMA %d, %d threads\n", numa_node_of_cpu(sched_getcpu()), max_thread_num);

// Calculate the physical CPU ID for log display
int real_cpu_id = -1;
hwloc_obj_t display_numa_obj = hwloc_get_obj_by_type(topology, HWLOC_OBJ_NUMANODE, numa_id);
if (display_numa_obj) {
hwloc_obj_t start_core_obj = hwloc_get_obj_inside_cpuset_by_type(topology, display_numa_obj->cpuset, HWLOC_OBJ_CORE, threads_id_start);
if (start_core_obj) {
// Get the first physical CPU index from the core's cpuset
real_cpu_id = hwloc_bitmap_first(start_core_obj->cpuset);
}
}

printf("In Numa Worker Pool at NUMA %d, %d threads, loading model from CPU %d\n", numa_id, max_thread_num, real_cpu_id != -1 ? real_cpu_id : threads_id_start);

total_worker_count = max_thread_num;
set_restricted_worker_count(total_worker_count);
thread_state_ = std::unique_ptr<ThreadState[]>(new ThreadState[max_thread_num]);
Expand All @@ -62,7 +76,7 @@ InNumaPool::InNumaPool(int max_thread_num, int numa_id, int threads_id_start) {
if (res_set_name != 0) {
fprintf(stderr, "Failed to set thread name: %s\n", strerror(res_set_name));
}
// 检查线程是否成功命名
// ����߳��Ƿ�ɹ�����
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This comment is garbled, likely due to a character encoding issue. For better code maintainability and consistency with other comments in this file, please translate it to English.

Suggested change
// ����߳��Ƿ�ɹ�����
// Check if the thread name was set successfully

char name[16];
pthread_getname_np(native_handle, name, sizeof(name));
if (strcmp(name, thread_name.c_str()) == 0) {
Expand Down Expand Up @@ -205,7 +219,7 @@ void InNumaPool::worker_thread(int thread_id, int numa_id) {
set_memory_to_numa(numa_id);
}
auto start = std::chrono::high_resolution_clock::now();
WorkerPool::thread_local_id = thread_id; // 设置线程本地变量
WorkerPool::thread_local_id = thread_id; // �����̱߳��ر���
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This comment is garbled, likely due to a character encoding issue. To improve readability and maintain consistency, it should be translated to English.

Suggested change
WorkerPool::thread_local_id = thread_id; // �����̱߳��ر���
WorkerPool::thread_local_id = thread_id; // Set thread-local variable

while (true) {
ThreadStatus status = thread_state_[thread_id].status.load(std::memory_order_acquire);
if (status == ThreadStatus::WORKING) {
Expand Down Expand Up @@ -266,7 +280,14 @@ void NumaJobDistributor::init(std::vector<int> numa_ids, std::vector<int> thread
}

workers.resize(numa_count);
std::vector<int> numa_threads_count(numa_count, 0);

// Set offset from environment variable
int offset = 0;
if (const char* env_p = std::getenv("KT_NUMA_CPU_OFFSET")) {
offset = std::atoi(env_p);
}
Comment on lines +285 to +288
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

std::atoi is not safe for converting strings to integers because it doesn't handle parsing errors. For example, if KT_NUMA_CPU_OFFSET is set to a non-numeric value, atoi will silently return 0. It's better to use std::strtol which allows for error checking.

This logic is also duplicated in WorkerPool::init at lines 416-420. It would be best to extract this into a helper function to avoid code duplication and ensure consistent behavior (e.g., one location prints a message, the other doesn't).

  int offset = 0;
  if (const char* env_p = std::getenv("KT_NUMA_CPU_OFFSET")) {
    char* end;
    long val = std::strtol(env_p, &end, 10);
    if (end == env_p || *end != '\0') {
      fprintf(stderr, "Warning: Invalid value for KT_NUMA_CPU_OFFSET: '%s'. Using default 0.\n", env_p);
    } else {
      offset = static_cast<int>(val);
    }
  }


std::vector<int> numa_threads_count(numa_count, offset);
for (int i = 0; i < numa_count; i++) {
workers[i] = std::thread(&NumaJobDistributor::worker_thread, this, i);
auto this_numa = numa_ids[i];
Expand All @@ -289,19 +310,19 @@ void NumaJobDistributor::init(std::vector<int> numa_ids, std::vector<int> thread
// throw std::runtime_error("Core not found inside NUMA node");
continue;
}
// 精简 cpuset
// ���� cpuset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This comment is garbled due to a character encoding issue. Please translate it to English for consistency and readability.

    // Simplify cpuset

auto cpuset_simple = hwloc_bitmap_alloc();
hwloc_bitmap_copy(cpuset_simple, core_obj->cpuset);
hwloc_bitmap_singlify(cpuset_simple);
// 打印绑定的具体的 CPU 物理索引
// ��ӡ�󶨵ľ���� CPU ��������
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This comment is garbled, likely due to a character encoding issue. It should be translated to English to improve code clarity.

    // Print the specific physical CPU index to which it is bound

unsigned long i_in;
// hwloc_bitmap_foreach_begin(i_in, cpuset_simple) { printf("Thread %d bound to CPU %ld\n", start_id, i_in); }
// hwloc_bitmap_foreach_end();
auto res = hwloc_set_thread_cpubind(topology, native_handle, cpuset_simple, HWLOC_CPUBIND_STRICT);
if (res != 0) {
fprintf(stderr, "Failed to set thread CPU binding: %s\n", strerror(errno));
}
// 检查线程是否绑定到指定的 核上了
// ����߳��Ƿ�󶨵�ָ���� ������
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This comment is garbled due to a character encoding issue. For better readability and maintainability, please translate it to English.

    // Check if the thread is bound to the specified core

hwloc_cpuset_t cpuset = hwloc_bitmap_alloc();
hwloc_get_thread_cpubind(topology, native_handle, cpuset, HWLOC_CPUBIND_THREAD);
// hwloc_bitmap_foreach_begin(i_in, cpuset) { printf("Thread %d is bound to CPU %ld\n", start_id, i_in); }
Expand Down Expand Up @@ -390,11 +411,20 @@ void WorkerPool::init(WorkerPoolConfig config) {
for (int i = 0; i < config.subpool_count; i++) {
numa_worker_pools.push_back(nullptr);
}
std::vector<int> numa_threads_count(config.subpool_count, 0);

// Set offset from environment variable
int offset = 0;
if (const char* env_p = std::getenv("KT_NUMA_CPU_OFFSET")) {
offset = std::atoi(env_p);
printf("KT_NUMA_CPU_OFFSET successfully set to %d\n", offset);
}
Comment on lines +416 to +420
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This logic for reading the KT_NUMA_CPU_OFFSET environment variable is a duplicate of the code in NumaJobDistributor::init (lines 285-288) and uses the unsafe std::atoi function.

As mentioned in the other comment, this should be refactored into a single helper function that uses std::strtol for safe parsing. This will improve robustness and maintainability by centralizing the logic.

  int offset = 0;
  if (const char* env_p = std::getenv("KT_NUMA_CPU_OFFSET")) {
    char* end;
    long val = std::strtol(env_p, &end, 10);
    if (end == env_p || *end != '\0') {
      fprintf(stderr, "Warning: Invalid value for KT_NUMA_CPU_OFFSET: '%s'. Using default 0.\n", env_p);
    } else {
      offset = static_cast<int>(val);
      printf("KT_NUMA_CPU_OFFSET successfully set to %d\n", offset);
    }
  }


std::vector<int> numa_threads_count(config.subpool_count, offset);
for (int i = 0; i < config.subpool_count; i++) {
auto this_numa = config.subpool_numa_map[i];
auto this_thread_count = config.subpool_thread_count[i];
auto this_thread_id_start = numa_threads_count[this_numa];

std::thread([this, i, this_numa, this_thread_count, this_thread_id_start]() {
set_to_numa(this_numa);
numa_worker_pools[i] =
Expand Down Expand Up @@ -455,4 +485,4 @@ void WorkerPool::do_work_stealing_job(int task_num, std::function<void(int)> ini

void WorkerPool::do_work_stealing_job(int task_num, std::function<void(int)> compute_func) {
do_work_stealing_job(task_num, nullptr, compute_func, nullptr);
}
}