diff --git a/examples/server/lock-free.hpp b/examples/server/lock-free.hpp new file mode 100644 index 000000000..8d225ce6c --- /dev/null +++ b/examples/server/lock-free.hpp @@ -0,0 +1,889 @@ +/* + hash_map -- Lock-Free Hash Map port from folly::AtomicUnorderedInsertMap for C++. + + Copyright (c) 2010-2017 + + This library is released under the MIT License. + Please see LICENSE file or visit https://github.com/ez8-co/atomic for details. + */ +#pragma once + +#include +#include +#include +#include +#include + +#ifdef _MSC_VER + #include + #define LIKELY(x) (x) + #define UNLIKELY(x) (x) +#else + #define LIKELY(x) (__builtin_expect((x), 1)) + #define UNLIKELY(x) (__builtin_expect((x), 0)) +#endif + +#if __cplusplus >= 201103L || _MSC_VER >= 1700 + #include +#else +namespace std { + + typedef enum memory_order { + memory_order_relaxed, + memory_order_consume, + memory_order_acquire, + memory_order_release, + memory_order_acq_rel, + memory_order_seq_cst + } memory_order; + +#ifdef _MSC_VER + template + struct interlocked {}; + + template + struct interlocked { + static inline T incre(T volatile* x) { + return static_cast(_InterlockedIncrement(reinterpret_cast(x))); + } + static inline T decre(T volatile* x) { + return static_cast(_InterlockedDecrement(reinterpret_cast(x))); + } + static inline T add(T volatile* x, T delta) { + return static_cast(_InterlockedExchangeAdd(reinterpret_cast(x), delta)); + } + static inline T compare_exchange(T volatile* x, const T new_val, const T expected_val) { + return static_cast( + _InterlockedCompareExchange(reinterpret_cast(x), + static_cast(new_val), static_cast(expected_val))); + } + static inline T exchange(T volatile* x, const T new_val) { + return static_cast( + _InterlockedExchange( + reinterpret_cast(x), static_cast(new_val))); + } + }; + + template + struct interlocked { + static inline T incre(T volatile* x) { +#ifdef WIN64 + return static_cast(_InterlockedIncrement64(reinterpret_cast(x))); +#else + return add(x, 1); +#endif // WIN64 + } + static inline T decre(T volatile* x) { +#ifdef WIN64 + return static_cast(_InterlockedDecrement64(reinterpret_cast(x))); +#else + return add(x, -1); +#endif // WIN64 + } + static inline T add(T volatile* x, T delta) { +#ifdef WIN64 + return static_cast(_InterlockedExchangeAdd64(reinterpret_cast(x), delta)); +#else + __int64 old_val, new_val; + do { + old_val = static_cast<__int64>(*x); + new_val = old_val + static_cast<__int64>(delta); + } while (_InterlockedCompareExchange64( + reinterpret_cast(x), new_val, old_val) != + old_val); + return static_cast(new_val); +#endif // WIN64 + } + static inline T compare_exchange(T volatile* x, const T new_val, const T expected_val) { + return static_cast( + _InterlockedCompareExchange64(reinterpret_cast(x), + static_cast(new_val), static_cast(expected_val))); + } + static inline T exchange(T volatile* x, const T new_val) { +#ifdef WIN64 + return static_cast( + _InterlockedExchange64(reinterpret_cast(x), + static_cast(new_val))); +#else + __int64 old_val; + do { + old_val = static_cast<__int64>(*x); + } while (_InterlockedCompareExchange64( + reinterpret_cast(x), new_val, old_val) != + old_val); + return static_cast(old_val); +#endif // WIN64 + } + }; + +#else + + template + struct hash {}; + + template<> + struct hash { + inline size_t operator()(size_t v) const { return v; } + }; + +#endif + + template + class atomic { + public: + atomic() : value_(static_cast(0)) {} + explicit atomic(const T value) : value_(value) {} + + T operator++() { + #ifdef _MSC_VER + return interlocked::incre(&value_); + #else + return __atomic_add_fetch(&value_, 1, __ATOMIC_SEQ_CST); + #endif + } + + T operator++(int) { + T v = load(); ++(*this); return v; + } + + T operator--() { + #ifdef _MSC_VER + return interlocked::decre(&value_); + #else + return __atomic_sub_fetch(&value_, 1, __ATOMIC_SEQ_CST); + #endif + } + + T operator+=(T v) { + #ifdef _MSC_VER + return interlocked::add(&value_, v); + #else + return __atomic_add_fetch(&value_, v, __ATOMIC_SEQ_CST); + #endif + } + + bool compare_exchange_strong(T& expected_val, T new_val, memory_order order = memory_order_seq_cst) { + #ifdef _MSC_VER + return expected_val == interlocked::compare_exchange(&value_, new_val, expected_val); + #else + return __atomic_compare_exchange_n(&value_, &expected_val, new_val, 0, order, __ATOMIC_SEQ_CST); + #endif + } + + void store(const T new_val, memory_order order = memory_order_seq_cst) { + #ifdef _MSC_VER + interlocked::exchange(&value_, new_val); + #else + __atomic_store_n(&value_, new_val, order); + #endif + } + + T load(memory_order order = memory_order_seq_cst) const { + #ifdef _MSC_VER + return interlocked::add(const_cast(&value_), 0); + #else + return __atomic_load_n(&value_, order); + #endif + } + + T operator=(const T new_value) { + store(new_value); + return new_value; + } + + operator T() const { + return load(); + } + + private: + volatile T value_; + }; +} +#endif + +/* +* Copyright 2013-present Facebook, Inc. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +namespace lock_free { + + size_t nextPowTwo(size_t v) { + #ifdef _MSC_VER + unsigned long x = 0; + _BitScanForward(&x, v - 1); + #else + int x = __builtin_clzll(v - 1); + #endif + return v ? (size_t(1) << (v - 1 ? (((sizeof(unsigned long long) << 3) - 1) ^ x) + 1 : 0)) : 1; + } + + template < + typename Key, + typename Value, + typename Hash = std::hash, + typename KeyEqual = std::equal_to, + template class Atom = std::atomic, + typename IndexType = size_t, + typename Allocator = std::allocator > + + struct hash_map { + + typedef Key key_type; + typedef Value mapped_type; + typedef std::pair value_type; + typedef std::size_t size_type; + typedef std::ptrdiff_t difference_type; + typedef Hash hasher; + typedef KeyEqual key_equal; + typedef const value_type& const_reference; + + typedef struct ConstIterator : public std::iterator { + ConstIterator(const hash_map& owner, IndexType slot) + : owner_(owner) + , slot_(slot) + {} + + const value_type& operator*() const { + return owner_.slots_[slot_].keyValue(); + } + + const value_type* operator->() const { + return &owner_.slots_[slot_].keyValue(); + } + + // pre-increment + const ConstIterator& operator++() { + while (slot_ > 0) { + --slot_; + if (owner_.slots_[slot_].state() == LINKED) { + break; + } + } + return *this; + } + + // post-increment + ConstIterator operator++(int /* dummy */) { + ConstIterator prev = *this; + ++*this; + return prev; + } + + bool operator==(const ConstIterator& rhs) const { + return slot_ == rhs.slot_; + } + bool operator!=(const ConstIterator& rhs) const { + return !(*this == rhs); + } + + private: + const hash_map& owner_; + IndexType slot_; + } const_iterator; + + friend ConstIterator; + + hash_map(size_t maxSize, + float maxLoadFactor = 0.8f, + const Allocator& alloc = Allocator()) + : allocator_(alloc) + { + size_t capacity = size_t(maxSize / (maxLoadFactor > 1.0f ? 1.0f : maxLoadFactor) + 128); + size_t avail = size_t(1) << (8 * sizeof(IndexType) - 2); + if (capacity > avail && maxSize < avail) { + // we'll do our best + capacity = avail; + } + if (capacity < maxSize || capacity > avail) { + throw std::invalid_argument( + "hash_map capacity must fit in IndexType with 2 bits " + "left over"); + } + + numSlots_ = capacity; + slotMask_ = nextPowTwo(capacity * 4) - 1; + mmapRequested_ = sizeof(Slot) * capacity; + slots_ = reinterpret_cast(allocator_.allocate(mmapRequested_)); + memset(slots_, 0, mmapRequested_); + // mark the zero-th slot as in-use but not valid, since that happens + // to be our nil value + slots_[0].stateUpdate(EMPTY, CONSTRUCTING); + } + + ~hash_map() { + for (size_t i = 1; i < numSlots_; ++i) { + slots_[i].~Slot(); + } + allocator_.deallocate(reinterpret_cast(slots_), mmapRequested_); + } + + template + std::pair findOrConstruct(const Key& key, Func func, const V* value) { + IndexType const slot = keyToSlotIdx(key); + IndexType prev = slots_[slot].headAndState_.load(std::memory_order_acquire); + + IndexType existing = find(key, slot); + if (existing) + return std::make_pair(ConstIterator(*this, existing), false); + + IndexType idx = allocateNear(slot); + // allocaion failed, return fake element + if (!idx) + return std::make_pair(ConstIterator(*this, idx), false); + new (&slots_[idx].keyValue().first) Key(key); + func(static_cast(&slots_[idx].keyValue().second), value); + + while (true) { + slots_[idx].next_ = prev >> 2; + + // we can merge the head update and the CONSTRUCTING -> LINKED update + // into a single CAS if slot == idx (which should happen often) + IndexType after = idx << 2; + if (slot == idx) + after += LINKED; + else + after += (prev & 3); + + if (slots_[slot].headAndState_.compare_exchange_strong(prev, after)) { + // success + if (idx != slot) + slots_[idx].stateUpdate(CONSTRUCTING, LINKED); + return std::make_pair(ConstIterator(*this, idx), true); + } + // compare_exchange_strong updates its first arg on failure, so + // there is no need to reread prev + + existing = find(key, slot); + if (existing) { + // our allocated key and value are no longer needed + slots_[idx].keyValue().first.~Key(); + slots_[idx].keyValue().second.~Value(); + slots_[idx].stateUpdate(CONSTRUCTING, EMPTY); + + return std::make_pair(ConstIterator(*this, existing), false); + } + } + } + + template + std::pair insert(const K& key, const V& value) { + return findOrConstruct(key, &hash_map::copyCtor, &value); + } + + const_iterator find(const Key& key) const { + return ConstIterator(*this, find(key, keyToSlotIdx(key))); + } + + const_iterator cbegin() const { + IndexType slot = numSlots_ - 1; + while (slot > 0 && slots_[slot].state() != LINKED) { + --slot; + } + return ConstIterator(*this, slot); + } + + const_iterator cend() const { + return ConstIterator(*this, 0); + } + + const_iterator begin() const { + return this->cbegin(); + } + + const_iterator end() const { + return this->cend(); + } + + // Add by orca.zhang@yahoo.com + void clear() { + for (size_t i = 1; i < numSlots_; ++i) { + slots_[i].~Slot(); + } + memset(slots_, 0, mmapRequested_); + slots_[0].stateUpdate(EMPTY, CONSTRUCTING); + } + + // Add by orca.zhang@yahoo.com + bool erase(const Key& key) const { + KeyEqual ke; + IndexType slot = keyToSlotIdx(key); + IndexType hs = slots_[slot].headAndState_.load(std::memory_order_acquire); + IndexType last_slot = 0; + for (IndexType idx = hs >> 2; idx != 0; idx = slots_[idx].next_) { + if (ke(key, slots_[idx].keyValue().first)) { + if (!last_slot) + slots_[slot].headAndState_ = (slots_[idx].next_ & (unsigned)-4) | (hs & 3); + else + slots_[last_slot].next_ = slots_[idx].next_; + slots_[idx].~Slot(); + slots_[idx].stateUpdate(LINKED, EMPTY); + return true; + } + last_slot = idx; + } + return false; + } + + private: + enum { + kMaxAllocationTries = 1000, // after this we throw + }; + + typedef IndexType BucketState; + + enum { + EMPTY = 0, + CONSTRUCTING = 1, + LINKED = 2, + }; + + /// Lock-free insertion is easiest by prepending to collision chains. + /// A large chaining hash table takes two cache misses instead of + /// one, however. Our solution is to colocate the bucket storage and + /// the head storage, so that even though we are traversing chains we + /// are likely to stay within the same cache line. Just make sure to + /// traverse head before looking at any keys. This strategy gives us + /// 32 bit pointers and fast iteration. + struct Slot { + /// The bottom two bits are the BucketState, the rest is the index + /// of the first bucket for the chain whose keys map to this slot. + /// When things are going well the head usually links to this slot, + /// but that doesn't always have to happen. + Atom headAndState_; + + /// The next bucket in the chain + IndexType next_; + + /// Key and Value + unsigned char raw_[sizeof(value_type)]; + + ~Slot() { + BucketState s = state(); + assert(s == EMPTY || s == LINKED); + if (s == LINKED) { + keyValue().first.~Key(); + keyValue().second.~Value(); + } + } + + BucketState state() const { + return BucketState(headAndState_.load(std::memory_order_acquire) & 3); + } + + void stateUpdate(BucketState before, BucketState after) { + assert(state() == before); + headAndState_ += (after - before); + } + + value_type& keyValue() { + assert(state() != EMPTY); + union { + unsigned char* p; + value_type* v; + } u; + u.p = raw_; + return *u.v; + } + + const value_type& keyValue() const { + assert(state() != EMPTY); + union { + unsigned char* p; + value_type* v; + } u; + u.p = raw_; + return *u.v; + } + + }; + + // We manually manage the slot memory so we can bypass initialization + // (by getting a zero-filled mmap chunk) and optionally destruction of + // the slots + + size_t mmapRequested_; + size_t numSlots_; + + /// tricky, see keyToSlodIdx + size_t slotMask_; + + Allocator allocator_; + Slot* slots_; + + IndexType keyToSlotIdx(const Key& key) const { + size_t h = hasher()(key); + h &= slotMask_; + while (h >= numSlots_) { + h -= numSlots_; + } + return h; + } + + IndexType find(const Key& key, IndexType slot) const { + KeyEqual ke; + IndexType hs = slots_[slot].headAndState_.load(std::memory_order_acquire); + for (slot = hs >> 2; slot != 0; slot = slots_[slot].next_) { + if (ke(key, slots_[slot].keyValue().first)) { + return slot; + } + } + return 0; + } + + /// Allocates a slot and returns its index. Tries to put it near + /// slots_[start]. + IndexType allocateNear(IndexType start) { + for (IndexType tries = 0; tries < kMaxAllocationTries; ++tries) { + IndexType slot = allocationAttempt(start, tries); + IndexType prev = slots_[slot].headAndState_.load(std::memory_order_acquire); + if ((prev & 3) == EMPTY && + slots_[slot].headAndState_.compare_exchange_strong( + prev, prev + CONSTRUCTING - EMPTY)) { + return slot; + } + } + return 0; // return fake element rather than throw exception to ignore overflow + // throw std::bad_alloc(); + } + + /// Returns the slot we should attempt to allocate after tries failed + /// tries, starting from the specified slot. This is pulled out so we + /// can specialize it differently during deterministic testing + IndexType allocationAttempt(IndexType start, IndexType tries) const { + if (LIKELY(tries < 8 && start + tries < numSlots_)) { + return IndexType(start + tries); + } else { + IndexType rv; + if (sizeof(IndexType) <= 4) { + rv = IndexType(rand() % numSlots_); + } else { + rv = IndexType(((int64_t(rand()) << 32) + rand()) % numSlots_); + } + assert(rv < numSlots_); + return rv; + } + } + + template + static void copyCtor(void* raw, const V* v) { + assert(v); + new (raw) Value(*v); + } + }; + + /// MutableAtom is a tiny wrapper than gives you the option of atomically + /// updating values inserted into an hash_map>. This relies on hash_map's guarantee + /// that it doesn't move values. + template class Atom = std::atomic> + struct MutableAtom { + mutable Atom data; + explicit MutableAtom(const T& init) : data(init) {} + }; + + /// MutableData is a tiny wrapper than gives you the option of using an + /// external concurrency control mechanism to updating values inserted + /// into an hash_map. + template + struct MutableData { + mutable T data; + explicit MutableData(const T& init) : data(init) {} + }; + + /** + * A very simple atomic single-linked list primitive. + * + * Usage: + * + * class MyClass { + * _linked_list_hook hook_; + * } + * + * _linked_list list; + * list.insert(&a); + * list.sweep([] (MyClass* c) { doSomething(c); } + */ + template + struct _linked_list_hook { + T* next{nullptr}; + }; + + template T::*HookMember> + class _linked_list { + public: + _linked_list() {} + + _linked_list(const _linked_list&) = delete; + _linked_list& operator=(const _linked_list&) = + delete; + + _linked_list(_linked_list&& other) noexcept + : head_(other.head_.exchange(nullptr, std::memory_order_acq_rel)) {} + + // Absent because would be too error-prone to use correctly because of + // the requirement that lists are empty upon destruction. + _linked_list& operator=( + _linked_list&& other) noexcept = delete; + + /** + * Move the currently held elements to a new list. + * The current list becomes empty, but concurrent threads + * might still add new elements to it. + * + * Equivalent to calling a move constructor, but more linter-friendly + * in case you still need the old list. + */ + _linked_list spliceAll() { return std::move(*this); } + + /** + * Move-assign the current list to `other`, then reverse-sweep + * the old list with the provided callback `func`. + * + * A safe replacement for the move assignment operator, which is absent + * because of the resource leak concerns. + */ + template + void reverseSweepAndAssign(_linked_list&& other, F&& func) { + auto otherHead = other.head_.exchange(nullptr, std::memory_order_acq_rel); + auto head = head_.exchange(otherHead, std::memory_order_acq_rel); + unlinkAll(head, std::forward(func)); + } + + /** + * Note: The list must be empty on destruction. + */ + ~_linked_list() { assert(empty()); } + + /** + * Returns the current head of the list. + * + * WARNING: The returned pointer might not be valid if the list + * is modified concurrently! + */ + T* unsafeHead() const { return head_.load(std::memory_order_acquire); } + + /** + * Returns true if the list is empty. + * + * WARNING: This method's return value is only valid for a snapshot + * of the state, it might become stale as soon as it's returned. + */ + bool empty() const { return unsafeHead() == nullptr; } + + /** + * Atomically insert t at the head of the list. + * @return True if the inserted element is the only one in the list + * after the call. + */ + bool insertHead(T* t) { + assert(next(t) == nullptr); + + auto oldHead = head_.load(std::memory_order_relaxed); + do { + next(t) = oldHead; + /* oldHead is updated by the call below. + + NOTE: we don't use next(t) instead of oldHead directly due to + compiler bugs (GCC prior to 4.8.3 (bug 60272), clang (bug 18899), + MSVC (bug 819819); source: + http://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange */ + } while (!head_.compare_exchange_weak( + oldHead, t, std::memory_order_release, std::memory_order_relaxed)); + + return oldHead == nullptr; + } + + /** + * Replaces the head with nullptr, + * and calls func() on the removed elements in the order from tail to head. + * Returns false if the list was empty. + */ + template + bool sweepOnce(F&& func) { + if (auto head = head_.exchange(nullptr, std::memory_order_acq_rel)) { + auto rhead = reverse(head); + unlinkAll(rhead, std::forward(func)); + return true; + } + return false; + } + + /** + * Repeatedly replaces the head with nullptr, + * and calls func() on the removed elements in the order from tail to head. + * Stops when the list is empty. + */ + template + void sweep(F&& func) { + while (sweepOnce(func)) { + } + } + + /** + * Similar to sweep() but calls func() on elements in LIFO order. + * + * func() is called for all elements in the list at the moment + * reverseSweep() is called. Unlike sweep() it does not loop to ensure the + * list is empty at some point after the last invocation. This way callers + * can reason about the ordering: elements inserted since the last call to + * reverseSweep() will be provided in LIFO order. + * + * Example: if elements are inserted in the order 1-2-3, the callback is + * invoked 3-2-1. If the callback moves elements onto a stack, popping off + * the stack will produce the original insertion order 1-2-3. + */ + template + void reverseSweep(F&& func) { + // We don't loop like sweep() does because the overall order of callbacks + // would be strand-wise LIFO which is meaningless to callers. + auto head = head_.exchange(nullptr, std::memory_order_acq_rel); + unlinkAll(head, std::forward(func)); + } + + private: + std::atomic head_{nullptr}; + + static T*& next(T* t) { return (t->*HookMember).next; } + + /* Reverses a linked list, returning the pointer to the new head + (old tail) */ + static T* reverse(T* head) { + T* rhead = nullptr; + while (head != nullptr) { + auto t = head; + head = next(t); + next(t) = rhead; + rhead = t; + } + return rhead; + } + + /* Unlinks all elements in the linked list fragment pointed to by `head', + * calling func() on every element */ + template + static void unlinkAll(T* head, F&& func) { + while (head != nullptr) { + auto t = head; + head = next(t); + next(t) = nullptr; + func(t); + } + } + }; + + /** + * A very simple atomic single-linked list primitive. + * + * Usage: + * + * linked_list list; + * list.insert(a); + * list.sweep([] (MyClass& c) { doSomething(c); } + */ + + template + class linked_list { + public: + linked_list() {} + linked_list(const linked_list&) = delete; + linked_list& operator=(const linked_list&) = delete; + linked_list(linked_list&& other) noexcept = default; + linked_list& operator=(linked_list&& other) noexcept { + list_.reverseSweepAndAssign(std::move(other.list_), [](Wrapper* node) { + delete node; + }); + return *this; + } + + ~linked_list() { + sweep([](T&&) {}); + } + + bool empty() const { return list_.empty(); } + + /** + * Atomically insert t at the head of the list. + * @return True if the inserted element is the only one in the list + * after the call. + */ + bool insertHead(T t) { + auto wrapper = std::make_unique(std::move(t)); + + return list_.insertHead(wrapper.release()); + } + + /** + * Repeatedly pops element from head, + * and calls func() on the removed elements in the order from tail to head. + * Stops when the list is empty. + */ + template + void sweep(F&& func) { + list_.sweep([&](Wrapper* wrapperPtr) mutable { + std::unique_ptr wrapper(wrapperPtr); + + func(std::move(wrapper->data)); + }); + } + + /** + * Sweeps the list a single time, as a single point in time swap with the + * current contents of the list. + * + * Unlike sweep() it does not loop to ensure the list is empty at some point + * after the last invocation. + * + * Returns false if the list is empty. + */ + template + bool sweepOnce(F&& func) { + return list_.sweepOnce([&](Wrapper* wrappedPtr) { + std::unique_ptr wrapper(wrappedPtr); + func(std::move(wrapper->data)); + }); + } + + /** + * Similar to sweep() but calls func() on elements in LIFO order. + * + * func() is called for all elements in the list at the moment + * reverseSweep() is called. Unlike sweep() it does not loop to ensure the + * list is empty at some point after the last invocation. This way callers + * can reason about the ordering: elements inserted since the last call to + * reverseSweep() will be provided in LIFO order. + * + * Example: if elements are inserted in the order 1-2-3, the callback is + * invoked 3-2-1. If the callback moves elements onto a stack, popping off + * the stack will produce the original insertion order 1-2-3. + */ + template + void reverseSweep(F&& func) { + list_.reverseSweep([&](Wrapper* wrapperPtr) mutable { + std::unique_ptr wrapper(wrapperPtr); + + func(std::move(wrapper->data)); + }); + } + + private: + struct Wrapper { + explicit Wrapper(T&& t) : data(std::move(t)) {} + + _linked_list_hook hook; + T data; + }; + _linked_list list_; + }; + +} // namespace lock_free \ No newline at end of file diff --git a/examples/server/server.cpp b/examples/server/server.cpp index 360f571e4..bef3df5f4 100644 --- a/examples/server/server.cpp +++ b/examples/server/server.cpp @@ -33,6 +33,8 @@ #include "prompt-formats.js.hpp" #include "json-schema-to-grammar.mjs.hpp" +#include "lock-free.hpp" + #include #include #include @@ -103,11 +105,13 @@ struct server_task_result { bool error; }; +using server_task_result_ptr = std::shared_ptr; + struct server_task_multi { int id = -1; - std::set subtasks_remaining; - std::vector results; + lock_free::hash_map subtasks_remaining = {10000}; + lock_free::hash_map results = {10000}; }; struct slot_params { @@ -378,14 +382,15 @@ struct server_metrics { }; struct server_queue { - int id = 0; + std::atomic id = 0; bool running; // queues - std::vector queue_tasks; - std::vector queue_tasks_deferred; + lock_free::linked_list queue_tasks; + lock_free::linked_list queue_tasks_deferred; + std::atomic n_queue_tasks_deferred = 0; - std::vector queue_multitasks; + lock_free::linked_list queue_multitasks; std::mutex mutex_tasks; std::condition_variable condition_tasks; @@ -397,25 +402,23 @@ struct server_queue { // Add a new task to the end of the queue int post(server_task task) { - std::unique_lock lock(mutex_tasks); if (task.id == -1) { task.id = id++; LOG_VERBOSE("new task id", {{"new_id", task.id}}); } - queue_tasks.push_back(std::move(task)); + queue_tasks.insertHead(std::move(task)); condition_tasks.notify_one(); return task.id; } // Add a new task, but defer until one slot is available void defer(server_task task) { - std::unique_lock lock(mutex_tasks); - queue_tasks_deferred.push_back(std::move(task)); + queue_tasks_deferred.insertHead(std::move(task)); + n_queue_tasks_deferred++; } // Get the next id for creating anew task int get_new_id() { - std::unique_lock lock(mutex_tasks); int new_id = id++; LOG_VERBOSE("new task id", {{"new_id", new_id}}); return new_id; @@ -439,16 +442,14 @@ struct server_queue { // Call when the state of one slot is changed void notify_slot_changed() { // move deferred tasks back to main loop - std::unique_lock lock(mutex_tasks); - for (auto & task : queue_tasks_deferred) { - queue_tasks.push_back(std::move(task)); - } - queue_tasks_deferred.clear(); + queue_tasks_deferred.sweep([&](server_task && task) { + queue_tasks.insertHead(std::move(task)); + }); + n_queue_tasks_deferred = 0; } // end the start_loop routine void terminate() { - std::unique_lock lock(mutex_tasks); running = false; condition_tasks.notify_all(); } @@ -467,33 +468,27 @@ struct server_queue { LOG_VERBOSE("new task may arrive", {}); while (true) { - std::unique_lock lock(mutex_tasks); if (queue_tasks.empty()) { - lock.unlock(); break; } - server_task task = queue_tasks.front(); - queue_tasks.erase(queue_tasks.begin()); - lock.unlock(); - LOG_VERBOSE("callback_new_task", {{"id_task", task.id}}); - callback_new_task(task); + + queue_tasks.sweepOnce([&](server_task && task) { + LOG_VERBOSE("callback_new_task", {{"id_task", task.id}}); + callback_new_task(task); + }); } LOG_VERBOSE("update_multitasks", {}); // check if we have any finished multitasks - auto queue_iterator = queue_multitasks.begin(); - while (queue_iterator != queue_multitasks.end()) { - if (queue_iterator->subtasks_remaining.empty()) { + queue_multitasks.sweep([&](server_task_multi && multitask) { + if (multitask.subtasks_remaining.cbegin() == multitask.subtasks_remaining.cend()) { // all subtasks done == multitask is done - server_task_multi current_multitask = *queue_iterator; - callback_finish_multitask(current_multitask); - // remove this multitask - queue_iterator = queue_multitasks.erase(queue_iterator); + callback_finish_multitask(multitask); } else { - ++queue_iterator; + queue_multitasks.insertHead(multitask); } - } + }); // all tasks in the current loop is processed, slots data is now ready LOG_VERBOSE("callback_update_slots", {}); @@ -501,17 +496,15 @@ struct server_queue { callback_update_slots(); LOG_VERBOSE("wait for new task", {}); - { - std::unique_lock lock(mutex_tasks); - if (queue_tasks.empty()) { - if (!running) { - LOG_VERBOSE("ending start_loop", {}); - return; - } - condition_tasks.wait(lock, [&]{ - return (!queue_tasks.empty() || !running); - }); + if (queue_tasks.empty()) { + if (!running) { + LOG_VERBOSE("ending start_loop", {}); + return; } + std::unique_lock lock(mutex_tasks); + condition_tasks.wait(lock, [&]{ + return (!queue_tasks.empty() || !running); + }); } } } @@ -522,34 +515,35 @@ struct server_queue { // add a multitask by specifying the id of all subtask (subtask is a server_task) void add_multitask(int id_multi, std::vector & sub_ids) { - std::lock_guard lock(mutex_tasks); - server_task_multi multi; + server_task_multi multi = {}; multi.id = id_multi; - std::copy(sub_ids.begin(), sub_ids.end(), std::inserter(multi.subtasks_remaining, multi.subtasks_remaining.end())); - queue_multitasks.push_back(multi); + for (auto & id : sub_ids) { + multi.subtasks_remaining.insert(id, 0); + } + queue_multitasks.insertHead(multi); } // updatethe remaining subtasks, while appending results to multitask - void update_multitask(int id_multi, int id_sub, server_task_result & result) { - std::lock_guard lock(mutex_tasks); - for (auto & multitask : queue_multitasks) { + void update_multitask(int id_multi, int id_sub, server_task_result_ptr & result) { + queue_multitasks.sweep([&](server_task_multi && multitask) { if (multitask.id == id_multi) { multitask.subtasks_remaining.erase(id_sub); - multitask.results.push_back(result); + multitask.results.insert(id_sub, std::move(result)); } - } + queue_multitasks.insertHead(multitask); + }); } }; struct server_response { - typedef std::function callback_multitask_t; + typedef std::function callback_multitask_t; callback_multitask_t callback_update_multitask; // for keeping track of all tasks waiting for the result - std::set waiting_task_ids; + lock_free::hash_map waiting_task_ids = {10000}; - // the main result queue - std::vector queue_results; + // the main result queue (using ptr for polymorphism) + lock_free::hash_map queue_results = {10000}; std::mutex mutex_results; std::condition_variable condition_results; @@ -557,35 +551,31 @@ struct server_response { // add the id_task to the list of tasks waiting for response void add_waiting_task_id(int id_task) { LOG_VERBOSE("waiting for task id", {{"id_task", id_task}}); - - std::unique_lock lock(mutex_results); - waiting_task_ids.insert(id_task); + waiting_task_ids.insert(id_task, 0); } // when the request is finished, we can remove task associated with it void remove_waiting_task_id(int id_task) { LOG_VERBOSE("remove waiting for task id", {{"id_task", id_task}}); - - std::unique_lock lock(mutex_results); waiting_task_ids.erase(id_task); + // make sure to clean up all pending results + queue_results.erase(id_task); } - // This function blocks the thread until there is a response for this id_task - server_task_result recv(int id_task) { + // This function blocks the thread until there is a response for one of the id_tasks + server_task_result_ptr recv(int id_task) { while (true) { + auto iter = queue_results.find(id_task); + if (iter != queue_results.cend()) { + server_task_result_ptr res = iter->second; + queue_results.erase(id_task); + return res; + } + std::unique_lock lock(mutex_results); condition_results.wait(lock, [&]{ - return !queue_results.empty(); + return queue_results.cbegin() != queue_results.cend(); }); - - for (int i = 0; i < (int) queue_results.size(); i++) { - if (queue_results[i].id == id_task) { - assert(queue_results[i].id_multi == -1); - server_task_result res = queue_results[i]; - queue_results.erase(queue_results.begin() + i); - return res; - } - } } // should never reach here @@ -597,25 +587,18 @@ struct server_response { } // Send a new result to a waiting id_task - void send(server_task_result result) { - LOG_VERBOSE("send new result", {{"id_task", result.id}}); - - std::unique_lock lock(mutex_results); - for (const auto & id_task : waiting_task_ids) { - // LOG_TEE("waiting task id %i \n", id_task); - // for now, tasks that have associated parent multitasks just get erased once multitask picks up the result - if (result.id_multi == id_task) { - LOG_VERBOSE("callback_update_multitask", {{"id_task", id_task}}); - callback_update_multitask(id_task, result.id, result); - continue; - } + void send(server_task_result_ptr result) { + LOG_VERBOSE("send new result", {{"id_task", result->id}}); - if (result.id == id_task) { - LOG_VERBOSE("queue_results.push_back", {{"id_task", id_task}}); - queue_results.push_back(result); - condition_results.notify_all(); - return; - } + if (waiting_task_ids.find(result->id_multi) != waiting_task_ids.cend()) { + LOG_VERBOSE("callback_update_multitask", {{"id_task", result->id_multi}}); + callback_update_multitask(result->id_multi, result->id, result); + } + + if (waiting_task_ids.find(result->id) != waiting_task_ids.cend()) { + LOG_VERBOSE("queue_results.push_back", {{"id_task", result->id}}); + queue_results.insert(result->id, std::move(result)); + condition_results.notify_all(); } } }; @@ -1380,23 +1363,23 @@ struct server_context { {"error", error}, }); - server_task_result res; - res.id = id_task; - res.id_multi = id_multi; - res.stop = false; - res.error = true; - res.data = format_error_response(error, type); + server_task_result_ptr res = std::make_shared(); + res->id = id_task; + res->id_multi = id_multi; + res->stop = false; + res->error = true; + res->data = format_error_response(error, type); queue_results.send(res); } void send_partial_response(server_slot & slot, completion_token_output tkn) { - server_task_result res; - res.id = slot.id_task; - res.id_multi = slot.id_multi; - res.error = false; - res.stop = false; - res.data = json { + server_task_result_ptr res = std::make_shared(); + res->id = slot.id_task; + res->id_multi = slot.id_multi; + res->error = false; + res->stop = false; + res->data = json { {"content", tkn.text_to_send}, {"stop", false}, {"id_slot", slot.id}, @@ -1416,24 +1399,24 @@ struct server_context { } slot.n_sent_token_probs = probs_stop_pos; - res.data["completion_probabilities"] = probs_vector_to_json(ctx, probs_output); + res->data["completion_probabilities"] = probs_vector_to_json(ctx, probs_output); } if (slot.oaicompat) { - res.data["oaicompat_token_ctr"] = slot.n_decoded; - res.data["model"] = slot.oaicompat_model; + res->data["oaicompat_token_ctr"] = slot.n_decoded; + res->data["model"] = slot.oaicompat_model; } queue_results.send(res); } void send_final_response(const server_slot & slot) { - server_task_result res; - res.id = slot.id_task; - res.id_multi = slot.id_multi; - res.error = false; - res.stop = true; - res.data = json { + server_task_result_ptr res = std::make_shared(); + res->id = slot.id_task; + res->id_multi = slot.id_multi; + res->error = false; + res->stop = true; + res->data = json { {"content", !slot.params.stream ? slot.generated_text : ""}, {"id_slot", slot.id}, {"stop", true}, @@ -1466,23 +1449,23 @@ struct server_context { slot.generated_token_probs.end()); } - res.data["completion_probabilities"] = probs_vector_to_json(ctx, probs); + res->data["completion_probabilities"] = probs_vector_to_json(ctx, probs); } if (slot.oaicompat) { - res.data["oaicompat_token_ctr"] = slot.n_decoded; - res.data["model"] = slot.oaicompat_model; + res->data["oaicompat_token_ctr"] = slot.n_decoded; + res->data["model"] = slot.oaicompat_model; } queue_results.send(res); } void send_embedding(const server_slot & slot, const llama_batch & batch) { - server_task_result res; - res.id = slot.id_task; - res.id_multi = slot.id_multi; - res.error = false; - res.stop = true; + server_task_result_ptr res = std::make_shared(); + res->id = slot.id_task; + res->id_multi = slot.id_multi; + res->error = false; + res->stop = true; const int n_embd = llama_n_embd(model); @@ -1504,7 +1487,7 @@ struct server_context { {"seq_id", batch.seq_id[i][0]} }); - res.data = json { + res->data = json { {"embedding", std::vector(n_embd, 0.0f)}, }; @@ -1513,7 +1496,7 @@ struct server_context { llama_embd_normalize(embd, embd_res.data(), n_embd); - res.data = json { + res->data = json { {"embedding", embd_res}, }; } @@ -1704,15 +1687,15 @@ struct server_context { {"slots", slots_data} }); - server_task_result res; - res.id = task.id; - res.id_multi = task.id_multi; - res.stop = true; - res.error = false; - res.data = { + server_task_result_ptr res = std::make_shared(); + res->id = task.id; + res->id_multi = task.id_multi; + res->stop = true; + res->error = false; + res->data = { { "idle", n_idle_slots }, { "processing", n_processing_slots }, - { "deferred", queue_tasks.queue_tasks_deferred.size() }, + { "deferred", queue_tasks.n_queue_tasks_deferred.load() }, { "t_start", metrics.t_start}, { "n_prompt_tokens_processed_total", metrics.n_prompt_tokens_processed_total}, @@ -1762,11 +1745,11 @@ struct server_context { const int64_t t_end = ggml_time_us(); const double t_save_ms = (t_end - t_start) / 1000.0; - server_task_result result; - result.id = task.id; - result.stop = true; - result.error = false; - result.data = json { + server_task_result_ptr result = std::make_shared(); + result->id = task.id; + result->stop = true; + result->error = false; + result->data = json { { "id_slot", id_slot }, { "filename", filename }, { "n_saved", token_count }, // tokens saved @@ -1810,11 +1793,11 @@ struct server_context { const int64_t t_end = ggml_time_us(); const double t_restore_ms = (t_end - t_start) / 1000.0; - server_task_result result; - result.id = task.id; - result.stop = true; - result.error = false; - result.data = json { + server_task_result_ptr result = std::make_shared(); + result->id = task.id; + result->stop = true; + result->error = false; + result->data = json { { "id_slot", id_slot }, { "filename", filename }, { "n_restored", token_count }, // tokens restored @@ -1845,11 +1828,11 @@ struct server_context { llama_kv_cache_seq_rm(ctx, slot->id + 1, -1, -1); slot->cache_tokens.clear(); - server_task_result result; - result.id = task.id; - result.stop = true; - result.error = false; - result.data = json { + server_task_result_ptr result = std::make_shared(); + result->id = task.id; + result->stop = true; + result->error = false; + result->data = json { { "id_slot", id_slot }, { "n_erased", n_erased } }; @@ -1858,9 +1841,9 @@ struct server_context { case SERVER_TASK_TYPE_SET_LORA: { llama_lora_adapters_apply(ctx, lora_adapters); - server_task_result result; - result.id = task.id; - result.data = json{{ "success", true }}; + server_task_result_ptr result = std::make_shared(); + result->id = task.id; + result->data = json{{ "success", true }}; queue_results.send(result); } break; } @@ -1868,18 +1851,18 @@ struct server_context { void on_finish_multitask(const server_task_multi & multitask) { // all subtasks done == multitask is done - server_task_result result; - result.id = multitask.id; - result.stop = true; - result.error = false; + server_task_result_ptr result = std::make_shared(); + result->id = multitask.id; + result->stop = true; + result->error = false; // collect json results into one json result std::vector result_jsons; - for (const auto & subres : multitask.results) { - result_jsons.push_back(subres.data); - result.error = result.error && subres.error; + for (auto & pair : multitask.results) { + result_jsons.push_back(pair.second->data); + result->error = result->error && pair.second->error; } - result.data = json { + result->data = json { { "results", result_jsons } }; @@ -2732,11 +2715,11 @@ int main(int argc, char ** argv) { ctx_server.queue_tasks.post(task); // get the result - server_task_result result = ctx_server.queue_results.recv(task.id); + server_task_result_ptr result = ctx_server.queue_results.recv(task.id); ctx_server.queue_results.remove_waiting_task_id(task.id); - const int n_idle_slots = result.data.at("idle"); - const int n_processing_slots = result.data.at("processing"); + const int n_idle_slots = result->data.at("idle"); + const int n_processing_slots = result->data.at("processing"); json health = { {"status", "ok"}, @@ -2746,7 +2729,7 @@ int main(int argc, char ** argv) { res.status = 200; // HTTP OK if (params.endpoint_slots && req.has_param("include_slots")) { - health["slots"] = result.data.at("slots"); + health["slots"] = result->data.at("slots"); } if (n_idle_slots == 0) { @@ -2787,10 +2770,10 @@ int main(int argc, char ** argv) { ctx_server.queue_tasks.post(task); // get the result - server_task_result result = ctx_server.queue_results.recv(task.id); + server_task_result_ptr result = ctx_server.queue_results.recv(task.id); ctx_server.queue_results.remove_waiting_task_id(task.id); - res.set_content(result.data.at("slots").dump(), "application/json"); + res.set_content(result->data.at("slots").dump(), "application/json"); res.status = 200; // HTTP OK }; @@ -2812,10 +2795,10 @@ int main(int argc, char ** argv) { ctx_server.queue_tasks.post(task); // get the result - server_task_result result = ctx_server.queue_results.recv(task.id); + server_task_result_ptr result = ctx_server.queue_results.recv(task.id); ctx_server.queue_results.remove_waiting_task_id(task.id); - json data = result.data; + json data = result->data; const uint64_t n_prompt_tokens_processed = data.at("n_prompt_tokens_processed"); const uint64_t t_prompt_processing = data.at("t_prompt_processing"); @@ -2915,13 +2898,13 @@ int main(int argc, char ** argv) { const int id_task = ctx_server.queue_tasks.post(task); ctx_server.queue_results.add_waiting_task_id(id_task); - server_task_result result = ctx_server.queue_results.recv(id_task); + server_task_result_ptr result = ctx_server.queue_results.recv(id_task); ctx_server.queue_results.remove_waiting_task_id(id_task); - if (result.error) { - res_error(res, result.data); + if (result->error) { + res_error(res, result->data); } else { - res.set_content(result.data.dump(), "application/json"); + res.set_content(result->data.dump(), "application/json"); } }; @@ -2945,13 +2928,13 @@ int main(int argc, char ** argv) { const int id_task = ctx_server.queue_tasks.post(task); ctx_server.queue_results.add_waiting_task_id(id_task); - server_task_result result = ctx_server.queue_results.recv(id_task); + server_task_result_ptr result = ctx_server.queue_results.recv(id_task); ctx_server.queue_results.remove_waiting_task_id(id_task); - if (result.error) { - res_error(res, result.data); + if (result->error) { + res_error(res, result->data); } else { - res.set_content(result.data.dump(), "application/json"); + res.set_content(result->data.dump(), "application/json"); } }; @@ -2965,13 +2948,13 @@ int main(int argc, char ** argv) { const int id_task = ctx_server.queue_tasks.post(task); ctx_server.queue_results.add_waiting_task_id(id_task); - server_task_result result = ctx_server.queue_results.recv(id_task); + server_task_result_ptr result = ctx_server.queue_results.recv(id_task); ctx_server.queue_results.remove_waiting_task_id(id_task); - if (result.error) { - res_error(res, result.data); + if (result->error) { + res_error(res, result->data); } else { - res.set_content(result.data.dump(), "application/json"); + res.set_content(result->data.dump(), "application/json"); } }; @@ -3027,6 +3010,13 @@ int main(int argc, char ** argv) { return; } + // FIXME: ignore title summarization query + if (req.body.find("") != std::string::npos) { + res.set_content("{ \"success\", true }", "application/json"); + res.status = 200; // HTTP OK + return; + } + res.set_header("Access-Control-Allow-Origin", req.get_header_value("Origin")); json data = json::parse(req.body); @@ -3037,22 +3027,22 @@ int main(int argc, char ** argv) { ctx_server.request_completion(id_task, -1, data, false, false); if (!json_value(data, "stream", false)) { - server_task_result result = ctx_server.queue_results.recv(id_task); - if (!result.error && result.stop) { - res.set_content(result.data.dump(-1, ' ', false, json::error_handler_t::replace), "application/json; charset=utf-8"); + server_task_result_ptr result = ctx_server.queue_results.recv(id_task); + if (!result->error && result->stop) { + res.set_content(result->data.dump(-1, ' ', false, json::error_handler_t::replace), "application/json; charset=utf-8"); } else { - res_error(res, result.data); + res_error(res, result->data); } ctx_server.queue_results.remove_waiting_task_id(id_task); } else { const auto chunked_content_provider = [id_task, &ctx_server](size_t, httplib::DataSink & sink) { while (true) { - server_task_result result = ctx_server.queue_results.recv(id_task); - if (!result.error) { + server_task_result_ptr result = ctx_server.queue_results.recv(id_task); + if (!result->error) { const std::string str = "data: " + - result.data.dump(-1, ' ', false, json::error_handler_t::replace) + + result->data.dump(-1, ' ', false, json::error_handler_t::replace) + "\n\n"; LOG_VERBOSE("data stream", { @@ -3064,13 +3054,13 @@ int main(int argc, char ** argv) { return false; } - if (result.stop) { + if (result->stop) { break; } } else { const std::string str = "error: " + - result.data.dump(-1, ' ', false, json::error_handler_t::replace) + + result->data.dump(-1, ' ', false, json::error_handler_t::replace) + "\n\n"; LOG_VERBOSE("data stream", { @@ -3137,22 +3127,22 @@ int main(int argc, char ** argv) { const auto completion_id = gen_chatcmplid(); if (!json_value(data, "stream", false)) { - server_task_result result = ctx_server.queue_results.recv(id_task); + server_task_result_ptr result = ctx_server.queue_results.recv(id_task); - if (!result.error && result.stop) { - json result_oai = format_final_response_oaicompat(data, result.data, completion_id); + if (!result->error && result->stop) { + json result_oai = format_final_response_oaicompat(data, result->data, completion_id); res.set_content(result_oai.dump(-1, ' ', false, json::error_handler_t::replace), "application/json; charset=utf-8"); } else { - res_error(res, result.data); + res_error(res, result->data); } ctx_server.queue_results.remove_waiting_task_id(id_task); } else { const auto chunked_content_provider = [id_task, &ctx_server, completion_id](size_t, httplib::DataSink & sink) { while (true) { - server_task_result result = ctx_server.queue_results.recv(id_task); - if (!result.error) { - std::vector result_array = format_partial_response_oaicompat(result.data, completion_id); + server_task_result_ptr result = ctx_server.queue_results.recv(id_task); + if (!result->error) { + std::vector result_array = format_partial_response_oaicompat(result->data, completion_id); for (auto it = result_array.begin(); it != result_array.end(); ++it) { if (!it->empty()) { @@ -3167,13 +3157,13 @@ int main(int argc, char ** argv) { } } } - if (result.stop) { + if (result->stop) { break; } } else { const std::string str = "error: " + - result.data.dump(-1, ' ', false, json::error_handler_t::replace) + + result->data.dump(-1, ' ', false, json::error_handler_t::replace) + "\n\n"; LOG_VERBOSE("data stream", {{"to_send", str}}); if (!sink.write(str.c_str(), str.size())) { @@ -3214,22 +3204,22 @@ int main(int argc, char ** argv) { ctx_server.request_completion(id_task, -1, data, true, false); if (!json_value(data, "stream", false)) { - server_task_result result = ctx_server.queue_results.recv(id_task); - if (!result.error && result.stop) { - res.set_content(result.data.dump(-1, ' ', false, json::error_handler_t::replace), "application/json; charset=utf-8"); + server_task_result_ptr result = ctx_server.queue_results.recv(id_task); + if (!result->error && result->stop) { + res.set_content(result->data.dump(-1, ' ', false, json::error_handler_t::replace), "application/json; charset=utf-8"); } else { - res_error(res, result.data); + res_error(res, result->data); } ctx_server.queue_results.remove_waiting_task_id(id_task); } else { const auto chunked_content_provider = [id_task, &ctx_server](size_t, httplib::DataSink & sink) { while (true) { - server_task_result result = ctx_server.queue_results.recv(id_task); - if (!result.error) { + server_task_result_ptr result = ctx_server.queue_results.recv(id_task); + if (!result->error) { const std::string str = "data: " + - result.data.dump(-1, ' ', false, json::error_handler_t::replace) + + result->data.dump(-1, ' ', false, json::error_handler_t::replace) + "\n\n"; LOG_VERBOSE("data stream", { @@ -3241,7 +3231,7 @@ int main(int argc, char ** argv) { return false; } - if (result.stop) { + if (result->stop) { break; } } else { @@ -3317,19 +3307,19 @@ int main(int argc, char ** argv) { ctx_server.request_completion(id_task, -1, {{"prompt", prompt}}, false, true); // get the result - server_task_result result = ctx_server.queue_results.recv(id_task); + server_task_result_ptr result = ctx_server.queue_results.recv(id_task); ctx_server.queue_results.remove_waiting_task_id(id_task); - if (!result.error) { - if (result.data.count("results")) { + if (!result->error) { + if (result->data.count("results")) { // result for multi-task - responses = result.data.at("results"); + responses = result->data.at("results"); } else { // result for single task - responses = std::vector{result.data}; + responses = std::vector{result->data}; } } else { // error received, ignore everything else - res_error(res, result.data); + res_error(res, result->data); return; } } @@ -3383,10 +3373,10 @@ int main(int argc, char ** argv) { const int id_task = ctx_server.queue_tasks.post(task); ctx_server.queue_results.add_waiting_task_id(id_task); - server_task_result result = ctx_server.queue_results.recv(id_task); + server_task_result_ptr result = ctx_server.queue_results.recv(id_task); ctx_server.queue_results.remove_waiting_task_id(id_task); - res.set_content(result.data.dump(), "application/json"); + res.set_content(result->data.dump(), "application/json"); res.status = 200; // HTTP OK }; diff --git a/src/llama.cpp b/src/llama.cpp index 76039f8e3..c57748cee 100644 --- a/src/llama.cpp +++ b/src/llama.cpp @@ -16308,7 +16308,7 @@ static void llama_kv_cache_update_internal(struct llama_context & lctx) { // apply K-shift if needed if (lctx.model.hparams.rope_type != LLAMA_ROPE_TYPE_NONE && lctx.kv_self.has_shift) { - if (lctx.model.arch == LLM_ARCH_DEEPSEEK2) { // not supported due to MLA + if (lctx.model.arch == LLM_ARCH_DEEPSEEK2 && lctx.cparams.mla_attn) { // not supported due to MLA GGML_ABORT("Deepseek2 does not support K-shift"); }