diff --git a/ucm/store/CMakeLists.txt b/ucm/store/CMakeLists.txt index 418e8f855..e899a0f90 100644 --- a/ucm/store/CMakeLists.txt +++ b/ucm/store/CMakeLists.txt @@ -5,6 +5,7 @@ target_link_libraries(storeintf INTERFACE storedetail infra_status) add_subdirectory(nfsstore) add_subdirectory(pcstore) add_subdirectory(posix) +add_subdirectory(ds3fs) add_subdirectory(cache) add_subdirectory(mooncakestore) add_subdirectory(test) diff --git a/ucm/store/ds3fs/CMakeLists.txt b/ucm/store/ds3fs/CMakeLists.txt new file mode 100644 index 000000000..0149fe7fa --- /dev/null +++ b/ucm/store/ds3fs/CMakeLists.txt @@ -0,0 +1,30 @@ +find_path(HF3FS_USRBIO_INCLUDE_DIR NAMES hf3fs_usrbio.h PATHS /usr/local/3fs/src/lib/api NO_DEFAULT_PATH) +find_library(HF3FS_USRBIO_LIBRARY NAMES hf3fs_api_shared PATHS /usr/local/3fs/src/lib/rs/hf3fs-usrbio-sys/lib NO_DEFAULT_PATH) + +if(HF3FS_USRBIO_INCLUDE_DIR AND HF3FS_USRBIO_LIBRARY) + file(GLOB_RECURSE UCM_DS3FS_STORE_CC_SOURCE_FILES "./cc/*.cc") + add_library(ds3fsstore STATIC ${UCM_DS3FS_STORE_CC_SOURCE_FILES}) + target_include_directories(ds3fsstore PUBLIC + ${CMAKE_CURRENT_SOURCE_DIR}/cc + ) + + target_include_directories(ds3fsstore PRIVATE ${HF3FS_USRBIO_INCLUDE_DIR}) + target_link_libraries(ds3fsstore PRIVATE ${HF3FS_USRBIO_LIBRARY}) + target_link_libraries(ds3fsstore PUBLIC storeintf infra_logger) + + file(GLOB_RECURSE UCM_DS3FS_STORE_CPY_SOURCE_FILES "./cpy/*.cc") + pybind11_add_module(ucmds3fsstore ${UCM_DS3FS_STORE_CPY_SOURCE_FILES}) + target_link_libraries(ucmds3fsstore PRIVATE ds3fsstore) + + file(RELATIVE_PATH INSTALL_REL_PATH ${CMAKE_SOURCE_DIR} ${CMAKE_CURRENT_SOURCE_DIR}) + install(TARGETS ucmds3fsstore LIBRARY DESTINATION ${INSTALL_REL_PATH} COMPONENT ucm) +else() + message(STATUS "ds3fsstore: Skipping build - required HF3FS dependencies not found") + if(NOT HF3FS_USRBIO_INCLUDE_DIR) + message(STATUS " - Missing: hf3fs_usrbio.h ") + endif() + if(NOT HF3FS_USRBIO_LIBRARY) + message(STATUS " - Missing: hf3fs_api_shared library ") + endif() + message(STATUS " Please ensure HF3FS dependencies are installed or build paths are correct") +endif() \ No newline at end of file diff --git a/ucm/store/ds3fs/cc/ds3fs_file.cc b/ucm/store/ds3fs/cc/ds3fs_file.cc new file mode 100644 index 000000000..71c60a814 --- /dev/null +++ b/ucm/store/ds3fs/cc/ds3fs_file.cc @@ -0,0 +1,98 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#include "ds3fs_file.h" +#include +#include + +namespace UC::Ds3fsStore { + +static constexpr auto NewFilePerm = (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); +static constexpr auto NewDirPerm = (S_IRWXU | S_IRGRP | S_IROTH); + +Ds3fsFile::~Ds3fsFile() +{ + if (handle_ != -1) { Close(); } +} + +Status Ds3fsFile::MkDir() +{ + auto ret = mkdir(path_.c_str(), NewDirPerm); + auto eno = errno; + if (ret != 0) [[unlikely]] { + if (eno == EEXIST) { return Status::DuplicateKey(); } + return Status::OsApiError(std::to_string(eno)); + } + return Status::OK(); +} + +Status Ds3fsFile::RmDir() +{ + auto ret = rmdir(path_.c_str()); + auto eno = errno; + if (ret != 0) [[unlikely]] { return Status::OsApiError(std::to_string(eno)); } + return Status::OK(); +} + +Status Ds3fsFile::Rename(const std::string& newName) +{ + auto ret = rename(path_.c_str(), newName.c_str()); + auto eno = errno; + if (ret != 0) [[unlikely]] { + if (eno == ENOENT) { return Status::NotFound(); } + return Status::OsApiError(std::to_string(eno)); + } + return Status::OK(); +} + +Status Ds3fsFile::Access(const int32_t mode) +{ + auto ret = access(path_.c_str(), mode); + auto eno = errno; + if (ret != 0) [[unlikely]] { + if (eno == ENOENT) { return Status::NotFound(); } + return Status::OsApiError(std::to_string(eno)); + } + return Status::OK(); +} + +Status Ds3fsFile::Open(const uint32_t flags) +{ + handle_ = open(path_.c_str(), flags, NewFilePerm); + auto eno = errno; + if (handle_ < 0) [[unlikely]] { + if (eno == EEXIST) { return Status::DuplicateKey(); } + return Status::OsApiError(std::to_string(eno)); + } + return Status::OK(); +} + +void Ds3fsFile::Close() +{ + close(handle_); + handle_ = -1; +} + +void Ds3fsFile::Remove() { remove(path_.c_str()); } + +} // namespace UC::Ds3fsStore diff --git a/ucm/store/ds3fs/cc/ds3fs_file.h b/ucm/store/ds3fs/cc/ds3fs_file.h new file mode 100644 index 000000000..f3cb3d819 --- /dev/null +++ b/ucm/store/ds3fs/cc/ds3fs_file.h @@ -0,0 +1,75 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#ifndef UNIFIEDCACHE_DS3FS_STORE_CC_DS3FS_FILE_H +#define UNIFIEDCACHE_DS3FS_STORE_CC_DS3FS_FILE_H + +#include +#include "status/status.h" + +namespace UC::Ds3fsStore { + +class Ds3fsFile { +public: + struct AccessMode { + static constexpr int32_t READ = R_OK; + static constexpr int32_t WRITE = W_OK; + static constexpr int32_t EXIST = F_OK; + static constexpr int32_t EXECUTE = X_OK; + }; + struct OpenFlag { + static constexpr uint32_t READ_ONLY = O_RDONLY; + static constexpr uint32_t WRITE_ONLY = O_WRONLY; + static constexpr uint32_t READ_WRITE = O_RDWR; + static constexpr uint32_t CREATE = O_CREAT; + static constexpr uint32_t DIRECT = O_DIRECT; + static constexpr uint32_t APPEND = O_APPEND; + static constexpr uint32_t EXCL = O_EXCL; + }; + +private: + std::string path_{}; + int32_t handle_{-1}; + +public: + explicit Ds3fsFile(std::string path) : path_{std::move(path)} {} + ~Ds3fsFile(); + const std::string& Path() const { return path_; } + int32_t ReleaseHandle() + { + int32_t fd = handle_; + handle_ = -1; + return fd; + } + Status MkDir(); + Status RmDir(); + Status Rename(const std::string& newName); + Status Access(const int32_t mode); + Status Open(const uint32_t flags); + void Close(); + void Remove(); +}; + +} // namespace UC::Ds3fsStore + +#endif diff --git a/ucm/store/ds3fs/cc/ds3fs_store.cc b/ucm/store/ds3fs/cc/ds3fs_store.cc new file mode 100644 index 000000000..39f1228a1 --- /dev/null +++ b/ucm/store/ds3fs/cc/ds3fs_store.cc @@ -0,0 +1,152 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#include "ds3fs_store.h" +#include +#include "logger/logger.h" +#include "space_manager.h" +#include "trans_manager.h" + +namespace UC::Ds3fsStore { + +class Ds3fsStoreImpl { +public: + SpaceManager spaceMgr; + TransManager transMgr; + bool transEnable{false}; + +public: + Status Setup(const Config& config) + { + auto s = CheckConfig(config); + if (s.Failure()) [[unlikely]] { + UC_ERROR("Failed to check config params: {}.", s); + return s; + } + s = spaceMgr.Setup(config); + if (s.Failure()) [[unlikely]] { return s; } + transEnable = config.deviceId >= 0; + if (transEnable) { + s = transMgr.Setup(config, spaceMgr.GetLayout()); + if (s.Failure()) [[unlikely]] { return s; } + } + ShowConfig(config); + return Status::OK(); + } + +private: + Status CheckConfig(const Config& config) + { + if (config.storageBackends.empty()) { + return Status::InvalidParam("invalid storage backends"); + } + if (config.deviceId < -1) { + return Status::InvalidParam("invalid device({})", config.deviceId); + } + if (config.deviceId == -1) { return Status::OK(); } + if (config.tensorSize == 0 || config.shardSize < config.tensorSize || + config.blockSize < config.shardSize || config.shardSize % config.tensorSize != 0 || + config.blockSize % config.shardSize != 0) { + return Status::InvalidParam("invalid size({},{},{})", config.tensorSize, + config.shardSize, config.blockSize); + } + if (config.streamNumber == 0) { + return Status::InvalidParam("invalid stream number({})", config.streamNumber); + } + return Status::OK(); + } + void ShowConfig(const Config& config) + { + constexpr const char* ns = "Ds3fsStore"; + std::string buildType = UCM_BUILD_TYPE; + if (buildType.empty()) { buildType = "Release"; } + UC_INFO("{}-{}({}).", ns, UCM_COMMIT_ID, buildType); + UC_INFO("Set {}::StorageBackends to {}.", ns, config.storageBackends[0]); + UC_INFO("Set {}::DeviceId to {}.", ns, config.deviceId); + if (config.deviceId == -1) { return; } + UC_INFO("Set {}::TensorSize to {}.", ns, config.tensorSize); + UC_INFO("Set {}::ShardSize to {}.", ns, config.shardSize); + UC_INFO("Set {}::BlockSize to {}.", ns, config.blockSize); + UC_INFO("Set {}::IoDirect to {}.", ns, config.ioDirect); + UC_INFO("Set {}::StreamNumber to {}.", ns, config.streamNumber); + UC_INFO("Set {}::TimeoutMs to {}.", ns, config.timeoutMs); + } +}; + +Ds3fsStore::~Ds3fsStore() = default; + +Status Ds3fsStore::Setup(const Config& config) +{ + try { + impl_ = std::make_shared(); + } catch (const std::exception& e) { + UC_ERROR("Failed({}) to make ds3fs store object.", e.what()); + return Status::Error(e.what()); + } + return impl_->Setup(config); +} + +std::string Ds3fsStore::Readme() const { return "Ds3fsStore"; } + +Expected> Ds3fsStore::Lookup(const Detail::BlockId* blocks, size_t num) +{ + return impl_->spaceMgr.Lookup(blocks, num); +} + +void Ds3fsStore::Prefetch(const Detail::BlockId* blocks, size_t num) {} + +Expected Ds3fsStore::Load(Detail::TaskDesc task) +{ + if (!impl_->transEnable) { return Status::Error("transfer is not enable"); } + auto res = impl_->transMgr.Submit({TransTask::Type::LOAD, std::move(task)}); + if (!res) [[unlikely]] { + UC_ERROR("Failed({}) to submit load task({}).", res.Error(), task.brief); + } + return res; +} + +Expected Ds3fsStore::Dump(Detail::TaskDesc task) +{ + if (!impl_->transEnable) { return Status::Error("transfer is not enable"); } + auto res = impl_->transMgr.Submit({TransTask::Type::DUMP, std::move(task)}); + if (!res) [[unlikely]] { + UC_ERROR("Failed({}) to submit dump task({}).", res.Error(), task.brief); + } + return res; +} + +Expected Ds3fsStore::Check(Detail::TaskHandle taskId) +{ + auto res = impl_->transMgr.Check(taskId); + if (!res) [[unlikely]] { UC_ERROR("Failed({}) to check task({}).", res.Error(), taskId); } + return res; +} + +Status Ds3fsStore::Wait(Detail::TaskHandle taskId) +{ + auto s = impl_->transMgr.Wait(taskId); + if (s.Failure()) [[unlikely]] { UC_ERROR("Failed({}) to wait task({}).", s, taskId); } + return s; +} + +} // namespace UC::Ds3fsStore diff --git a/ucm/store/ds3fs/cc/ds3fs_store.h b/ucm/store/ds3fs/cc/ds3fs_store.h new file mode 100644 index 000000000..a84d55806 --- /dev/null +++ b/ucm/store/ds3fs/cc/ds3fs_store.h @@ -0,0 +1,52 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#ifndef UNIFIEDCACHE_STORE_CC_DS3FS_STORE_H +#define UNIFIEDCACHE_STORE_CC_DS3FS_STORE_H + +#include +#include "global_config.h" +#include "ucmstore_v1.h" + +namespace UC::Ds3fsStore { + +class Ds3fsStoreImpl; +class Ds3fsStore : public StoreV1 { +public: + ~Ds3fsStore() override; + Status Setup(const Config& config); + std::string Readme() const override; + Expected> Lookup(const Detail::BlockId* blocks, size_t num) override; + void Prefetch(const Detail::BlockId* blocks, size_t num) override; + Expected Load(Detail::TaskDesc task) override; + Expected Dump(Detail::TaskDesc task) override; + Expected Check(Detail::TaskHandle taskId) override; + Status Wait(Detail::TaskHandle taskId) override; + +private: + std::shared_ptr impl_; +}; + +} // namespace UC::Ds3fsStore + +#endif diff --git a/ucm/store/ds3fs/cc/global_config.h b/ucm/store/ds3fs/cc/global_config.h new file mode 100644 index 000000000..a97ac51dd --- /dev/null +++ b/ucm/store/ds3fs/cc/global_config.h @@ -0,0 +1,48 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#ifndef UNIFIEDCACHE_DS3FS_STORE_CC_GLOBAL_CONFIG_H +#define UNIFIEDCACHE_DS3FS_STORE_CC_GLOBAL_CONFIG_H + +#include +#include + +namespace UC::Ds3fsStore { + +struct Config { + std::vector storageBackends{}; + int32_t deviceId{-1}; + size_t tensorSize{0}; + size_t shardSize{0}; + size_t blockSize{0}; + bool ioDirect{true}; + size_t streamNumber{32}; + size_t timeoutMs{30000}; + size_t iorEntries{1}; + int32_t iorDepth{1}; + int32_t numaId{-1}; +}; + +} // namespace UC::Ds3fsStore + +#endif diff --git a/ucm/store/ds3fs/cc/space_layout.cc b/ucm/store/ds3fs/cc/space_layout.cc new file mode 100644 index 000000000..972c642d5 --- /dev/null +++ b/ucm/store/ds3fs/cc/space_layout.cc @@ -0,0 +1,94 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#include "space_layout.h" +#include +#include +#include "ds3fs_file.h" +#include "logger/logger.h" + +namespace UC::Ds3fsStore { + +Status SpaceLayout::Setup(const std::string& storageBackend) +{ + auto normalizedPath = storageBackend; + if (normalizedPath.back() != '/') { normalizedPath += '/'; } + + for (const auto& root : this->RelativeRoots()) { + Ds3fsFile dir(normalizedPath + root); + auto status = dir.MkDir(); + if (status == Status::DuplicateKey()) { status = Status::OK(); } + if (status.Failure()) { return status; } + } + + this->storageBackend_ = normalizedPath; + return Status::OK(); +} + +std::string SpaceLayout::DataFilePath(const Detail::BlockId& blockId, bool activated) const +{ + const auto& file = DataFileName(blockId); + const auto& parent = DataParentName(file, activated); + return fmt::format("{}{}/{}", storageBackend_, parent, file); +} + +Status SpaceLayout::CommitFile(const Detail::BlockId& blockId, bool success) const +{ + const auto& file = DataFileName(blockId); + const auto& activated = fmt::format("{}{}/{}", storageBackend_, TempFileRoot(), file); + auto s = Status::OK(); + if (success) { + const auto& parent = fmt::format("{}{}", storageBackend_, DataParentName(file, false)); + const auto& archived = fmt::format("{}/{}", parent, file); + Ds3fsFile dir(parent); + s = dir.MkDir(); + if (s == Status::OK() || s == Status::DuplicateKey()) { + Ds3fsFile activatedFile(activated); + s = activatedFile.Rename(archived); + } + } + if (!success || s.Failure()) { + Ds3fsFile activatedFile(activated); + activatedFile.Remove(); + } + return s; +} + +std::vector SpaceLayout::RelativeRoots() const { return {TempFileRoot()}; } + +std::string SpaceLayout::DataParentName(const std::string& blockFile, bool activated) const +{ + if (activated) { return TempFileRoot(); } + return blockFile.substr(0, 2); +} + +std::string SpaceLayout::DataFileRoot() const { return "data"; } + +std::string SpaceLayout::TempFileRoot() const { return ".temp"; } + +std::string SpaceLayout::DataFileName(const Detail::BlockId& blockId) const +{ + return fmt::format("{:02x}", fmt::join(blockId, "")); +} + +} // namespace UC::Ds3fsStore \ No newline at end of file diff --git a/ucm/store/ds3fs/cc/space_layout.h b/ucm/store/ds3fs/cc/space_layout.h new file mode 100644 index 000000000..05bbea3cf --- /dev/null +++ b/ucm/store/ds3fs/cc/space_layout.h @@ -0,0 +1,51 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#ifndef UNIFIEDCACHE_DS3FS_SPACE_LAYOUT_H +#define UNIFIEDCACHE_DS3FS_SPACE_LAYOUT_H + +#include "status/status.h" +#include "type/types.h" + +namespace UC::Ds3fsStore { + +class SpaceLayout { +public: + Status Setup(const std::string& storageBackend); + std::string DataFilePath(const Detail::BlockId& blockId, bool activated) const; + Status CommitFile(const Detail::BlockId& blockId, bool success) const; + +private: + std::vector RelativeRoots() const; + std::string DataParentName(const std::string& blockFile, bool activated) const; + std::string DataFileRoot() const; + std::string TempFileRoot() const; + std::string DataFileName(const Detail::BlockId& blockId) const; + +private: + std::string storageBackend_; +}; + +} // namespace UC::Ds3fsStore + +#endif \ No newline at end of file diff --git a/ucm/store/ds3fs/cc/space_manager.cc b/ucm/store/ds3fs/cc/space_manager.cc new file mode 100644 index 000000000..d5b32ac4f --- /dev/null +++ b/ucm/store/ds3fs/cc/space_manager.cc @@ -0,0 +1,56 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#include "space_manager.h" +#include "ds3fs_file.h" +#include "logger/logger.h" + +namespace UC::Ds3fsStore { + +Status SpaceManager::Setup(const Config& config) +{ + return layout_.Setup(config.storageBackends[0]); +} + +std::vector SpaceManager::Lookup(const Detail::BlockId* blocks, size_t num) +{ + std::vector res(num); + for (size_t i = 0; i < num; i++) { res[i] = Lookup(blocks + i); } + return res; +} + +uint8_t SpaceManager::Lookup(const Detail::BlockId* block) +{ + const auto& path = layout_.DataFilePath(*block, false); + Ds3fsFile file(path); + constexpr auto mode = + Ds3fsFile::AccessMode::EXIST | Ds3fsFile::AccessMode::READ | Ds3fsFile::AccessMode::WRITE; + auto s = file.Access(mode); + if (s.Failure()) { + if (s != Status::NotFound()) { UC_ERROR("Failed({}) to access file({}).", s, path); } + return false; + } + return true; +} + +} // namespace UC::Ds3fsStore \ No newline at end of file diff --git a/ucm/store/ds3fs/cc/space_manager.h b/ucm/store/ds3fs/cc/space_manager.h new file mode 100644 index 000000000..f8947647d --- /dev/null +++ b/ucm/store/ds3fs/cc/space_manager.h @@ -0,0 +1,46 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#ifndef UNIFIEDCACHE_SPACE_MANAGER_H +#define UNIFIEDCACHE_SPACE_MANAGER_H + +#include "global_config.h" +#include "space_layout.h" + +namespace UC::Ds3fsStore { + +class SpaceManager { + SpaceLayout layout_; + +public: + Status Setup(const Config& config); + std::vector Lookup(const Detail::BlockId* blocks, size_t num); + const SpaceLayout* GetLayout() const { return &layout_; } + +private: + uint8_t Lookup(const Detail::BlockId* block); +}; + +} // namespace UC::Ds3fsStore + +#endif \ No newline at end of file diff --git a/ucm/store/ds3fs/cc/trans_manager.h b/ucm/store/ds3fs/cc/trans_manager.h new file mode 100644 index 000000000..092c5a94f --- /dev/null +++ b/ucm/store/ds3fs/cc/trans_manager.h @@ -0,0 +1,65 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#ifndef UNIFIEDCACHE_DS3FS_STORE_CC_TRANS_MANAGER_H +#define UNIFIEDCACHE_DS3FS_STORE_CC_TRANS_MANAGER_H + +#include "logger/logger.h" +#include "template/task_wrapper.h" +#include "trans_queue.h" + +namespace UC::Ds3fsStore { + +class TransManager : public Detail::TaskWrapper { + TransQueue queue_; + size_t shardSize_; + +public: + Status Setup(const Config& config, const SpaceLayout* layout) + { + timeoutMs_ = config.timeoutMs; + shardSize_ = config.shardSize; + return queue_.Setup(config, &failureSet_, layout); + } + +protected: + void Dispatch(TaskPtr t, WaiterPtr w) override + { + const auto id = t->id; + const auto& brief = t->desc.brief; + const auto num = t->desc.size(); + const auto size = shardSize_ * num; + const auto tp = w->startTp; + UC_DEBUG("Ds3fs task({},{},{},{}) dispatching.", id, brief, num, size); + w->SetEpilog([id, brief = std::move(brief), num, size, tp] { + auto cost = NowTime::Now() - tp; + UC_DEBUG("Ds3fs task({},{},{},{}) finished, cost {:.3f}ms.", id, brief, num, size, + cost * 1e3); + }); + queue_.Push(t, w); + } +}; + +} // namespace UC::Ds3fsStore + +#endif diff --git a/ucm/store/ds3fs/cc/trans_queue.cc b/ucm/store/ds3fs/cc/trans_queue.cc new file mode 100644 index 000000000..bb6ce167a --- /dev/null +++ b/ucm/store/ds3fs/cc/trans_queue.cc @@ -0,0 +1,227 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#include "trans_queue.h" +#include +#include +#include "ds3fs_file.h" +#include "logger/logger.h" + +namespace UC::Ds3fsStore { + +Status TransQueue::Setup(const Config& config, TaskIdSet* failureSet, const SpaceLayout* layout) +{ + failureSet_ = failureSet; + layout_ = layout; + ioSize_ = config.tensorSize; + shardSize_ = config.shardSize; + nShardPerBlock_ = config.blockSize / config.shardSize; + ioDirect_ = config.ioDirect; + mountPoint_ = config.storageBackends[0]; + iorEntries_ = config.iorEntries; + iorDepth_ = config.iorDepth; + numaId_ = config.numaId; + + auto success = pool_.SetNWorker(config.streamNumber) + .SetWorkerInitFn([this](auto& ctx) { return InitWorkerContext(ctx); }) + .SetWorkerFn([this](auto& ios, auto& ctx) { Worker(ios, ctx); }) + .Run(); + if (!success) [[unlikely]] { + return Status::Error(fmt::format("workers({}) start failed", config.streamNumber)); + } + return Status::OK(); +} + +bool TransQueue::InitWorkerContext(std::unique_ptr& ctx) +{ + ctx = std::make_unique(); + + auto s = ctx->Init(mountPoint_, ioSize_, iorEntries_, iorDepth_, numaId_); + if (s.Failure()) [[unlikely]] { + UC_ERROR("Failed to initialize worker context: {}", s); + return false; + } + + return true; +} + +void TransQueue::Push(TaskPtr task, WaiterPtr waiter) +{ + waiter->Set(task->desc.size()); + std::list ios; + for (auto&& shard : task->desc) { + ios.emplace_back({task->id, task->type, std::move(shard), waiter}); + } + ios.front().firstIo = true; + pool_.Push(ios); +} + +void TransQueue::Worker(IoUnit& ios, const std::unique_ptr& ctx) +{ + if (ios.firstIo) { + auto wait = NowTime::Now() - ios.waiter->startTp; + UC_DEBUG("Ds3fs task({}) start running, wait {:.3f}ms.", ios.owner, wait * 1e3); + } + if (failureSet_->Contains(ios.owner)) { + ios.waiter->Done(); + return; + } + auto s = Status::OK(); + if (ios.type == TransTask::Type::DUMP) { + s = H2S(ios, ctx); + if (ios.shard.index + 1 == nShardPerBlock_) { + layout_->CommitFile(ios.shard.owner, s.Success()); + } + } else { + s = S2H(ios, ctx); + } + if (s.Failure()) [[unlikely]] { failureSet_->Insert(ios.owner); } + ios.waiter->Done(); +} + +Status TransQueue::H2S(IoUnit& ios, const std::unique_ptr& ctx) +{ + const auto& path = layout_->DataFilePath(ios.shard.owner, true); + auto flags = Ds3fsFile::OpenFlag::CREATE | Ds3fsFile::OpenFlag::WRITE_ONLY; + if (ioDirect_) { flags |= Ds3fsFile::OpenFlag::DIRECT; } + + Ds3fsFile file{path}; + auto s = file.Open(flags); + if (s.Failure()) [[unlikely]] { + UC_ERROR("Failed to open file({}): {}", path, s); + return s; + } + + int fd = file.ReleaseHandle(); + + FdGuard fdGuard; + s = fdGuard.Register(fd); + if (s.Failure()) [[unlikely]] { + UC_ERROR("Failed to register fd({}) for file({}): {}", fd, path, s); + return s; + } + + auto offset = shardSize_ * ios.shard.index; + return DoIoWrite(ctx, fd, offset, ios); +} + +Status TransQueue::S2H(IoUnit& ios, const std::unique_ptr& ctx) +{ + const auto& path = layout_->DataFilePath(ios.shard.owner, false); + auto flags = Ds3fsFile::OpenFlag::READ_ONLY; + if (ioDirect_) { flags |= Ds3fsFile::OpenFlag::DIRECT; } + + Ds3fsFile file{path}; + auto s = file.Open(flags); + if (s.Failure()) [[unlikely]] { + UC_ERROR("Failed to open file({}): {}", path, s); + return s; + } + + int fd = file.ReleaseHandle(); + + FdGuard fdGuard; + s = fdGuard.Register(fd); + if (s.Failure()) [[unlikely]] { + UC_ERROR("Failed to register fd({}) for file({}): {}", fd, path, s); + return s; + } + + auto offset = shardSize_ * ios.shard.index; + return DoIoRead(ctx, fd, offset, ios); +} + +Status TransQueue::DoIoRead(const std::unique_ptr& ctx, int fd, size_t offset, + IoUnit& ios) +{ + int prepRes = hf3fs_prep_io(ctx->iorRead.Get(), ctx->iov.Get(), true, ctx->iov.Base(), fd, + offset, ioSize_, nullptr); + if (prepRes < 0) [[unlikely]] { + UC_ERROR("Failed to prep read io: result={}", prepRes); + return Status::OsApiError(fmt::format("Failed to prep read io: {}", prepRes)); + } + + int submitRes = hf3fs_submit_ios(ctx->iorRead.Get()); + if (submitRes < 0) [[unlikely]] { + UC_ERROR("Failed to submit read io: result={}", submitRes); + return Status::OsApiError(fmt::format("Failed to submit read ios: {}", submitRes)); + } + + struct hf3fs_cqe cqe; + int waitRes = hf3fs_wait_for_ios(ctx->iorRead.Get(), &cqe, 1, 1, nullptr); + if (waitRes <= 0) [[unlikely]] { + UC_ERROR("Failed to wait for read io: result={}", waitRes); + return Status::OsApiError(fmt::format("Failed to wait for read ios: {}", waitRes)); + } + + auto s = CheckIoResult(cqe, layout_->DataFilePath(ios.shard.owner, false), offset, true); + if (s.Failure()) { return s; } + + std::memcpy(reinterpret_cast(ios.shard.addrs[0]), ctx->iov.Base(), ioSize_); + return Status::OK(); +} + +Status TransQueue::DoIoWrite(const std::unique_ptr& ctx, int fd, size_t offset, + IoUnit& ios) +{ + std::memcpy(ctx->iov.Base(), reinterpret_cast(ios.shard.addrs[0]), ioSize_); + + int prepRes = hf3fs_prep_io(ctx->iorWrite.Get(), ctx->iov.Get(), false, ctx->iov.Base(), fd, + offset, ioSize_, nullptr); + if (prepRes < 0) [[unlikely]] { + UC_ERROR("Failed to prep write io: result={}", prepRes); + return Status::OsApiError(fmt::format("Failed to prep write io: {}", prepRes)); + } + + int submitRes = hf3fs_submit_ios(ctx->iorWrite.Get()); + if (submitRes < 0) [[unlikely]] { + UC_ERROR("Failed to submit write io: result={}", submitRes); + return Status::OsApiError(fmt::format("Failed to submit write ios: {}", submitRes)); + } + + struct hf3fs_cqe cqe; + int waitRes = hf3fs_wait_for_ios(ctx->iorWrite.Get(), &cqe, 1, 1, nullptr); + if (waitRes <= 0) [[unlikely]] { + UC_ERROR("Failed to wait for write io: result={}", waitRes); + return Status::OsApiError(fmt::format("Failed to wait for write ios: {}", waitRes)); + } + + auto s = CheckIoResult(cqe, layout_->DataFilePath(ios.shard.owner, true), offset, false); + if (s.Failure()) { return s; } + + return Status::OK(); +} + +Status TransQueue::CheckIoResult(const hf3fs_cqe& cqe, const std::string& path, size_t offset, + bool isRead) +{ + if (cqe.result < 0) [[unlikely]] { + const char* op = isRead ? "Read" : "Write"; + UC_ERROR("{} operation failed: result={}, offset={}, size={}, path={}", op, cqe.result, + offset, ioSize_, path); + return Status::OsApiError(fmt::format("{} operation failed: {}", op, cqe.result)); + } + return Status::OK(); +} + +} // namespace UC::Ds3fsStore \ No newline at end of file diff --git a/ucm/store/ds3fs/cc/trans_queue.h b/ucm/store/ds3fs/cc/trans_queue.h new file mode 100644 index 000000000..12c87988f --- /dev/null +++ b/ucm/store/ds3fs/cc/trans_queue.h @@ -0,0 +1,183 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#ifndef UNIFIEDCACHE_DS3FS_STORE_CC_TRANS_QUEUE_H +#define UNIFIEDCACHE_DS3FS_STORE_CC_TRANS_QUEUE_H + +#include +#include +#include +#include +#include +#include "global_config.h" +#include "space_layout.h" +#include "template/hashset.h" +#include "thread/latch.h" +#include "thread/thread_pool.h" +#include "trans_task.h" + +namespace UC::Ds3fsStore { + +class TransQueue { + using TaskIdSet = HashSet; + using TaskPtr = std::shared_ptr; + using WaiterPtr = std::shared_ptr; + +private: + struct IoUnit { + Detail::TaskHandle owner; + TransTask::Type type; + Detail::Shard shard; + std::shared_ptr waiter; + bool firstIo{false}; + }; + + class IovGuard { + struct hf3fs_iov iov_; + bool valid_{false}; + + public: + IovGuard() = default; + Status Create(const std::string& mountPoint, size_t size, int numaId) + { + int res = hf3fs_iovcreate(&iov_, mountPoint.c_str(), size, 0, numaId); + if (res < 0) { + return Status::OsApiError(fmt::format("Failed to create IOV: {}", res)); + } + valid_ = true; + return Status::OK(); + } + ~IovGuard() + { + if (valid_) hf3fs_iovdestroy(&iov_); + } + struct hf3fs_iov* Get() { return &iov_; } + void* Base() { return iov_.base; } + IovGuard(const IovGuard&) = delete; + IovGuard& operator=(const IovGuard&) = delete; + }; + + class IorGuard { + struct hf3fs_ior ior_; + bool valid_{false}; + + public: + IorGuard() = default; + Status Create(const std::string& mountPoint, size_t entries, bool isRead, int depth, + int numaId) + { + int res = + hf3fs_iorcreate4(&ior_, mountPoint.c_str(), entries, isRead, depth, 0, numaId, 0); + if (res < 0) { + return Status::OsApiError(fmt::format("Failed to create IOR: {}", res)); + } + valid_ = true; + return Status::OK(); + } + ~IorGuard() + { + if (valid_) hf3fs_iordestroy(&ior_); + } + struct hf3fs_ior* Get() { return &ior_; } + IorGuard(const IorGuard&) = delete; + IorGuard& operator=(const IorGuard&) = delete; + }; + + class FdGuard { + int fd_{-1}; + + public: + FdGuard() = default; + Status Register(int fd) + { + auto res = hf3fs_reg_fd(fd, 0); + if (res > 0) { + return Status::OsApiError(fmt::format("Failed to register fd({}): {}", fd, res)); + } + fd_ = fd; + return Status::OK(); + } + ~FdGuard() + { + if (fd_ >= 0) { + hf3fs_dereg_fd(fd_); + close(fd_); + } + } + FdGuard(const FdGuard&) = delete; + FdGuard& operator=(const FdGuard&) = delete; + }; + + struct WorkerContext { + IovGuard iov; + IorGuard iorRead; + IorGuard iorWrite; + + bool initialized{false}; + + Status Init(const std::string& mountPoint, size_t ioSize, size_t iorEntries, int iorDepth, + int numaId) + { + auto s = iov.Create(mountPoint, ioSize, numaId); + if (s.Failure()) { return s; } + + s = iorRead.Create(mountPoint, iorEntries, true, iorDepth, numaId); + if (s.Failure()) { return s; } + + s = iorWrite.Create(mountPoint, iorEntries, false, iorDepth, numaId); + if (s.Failure()) { return s; } + + initialized = true; + return Status::OK(); + } + }; + + TaskIdSet* failureSet_; + const SpaceLayout* layout_; + ThreadPool> pool_; + size_t ioSize_; + size_t shardSize_; + size_t nShardPerBlock_; + bool ioDirect_; + std::string mountPoint_; + size_t iorEntries_; + int iorDepth_; + int numaId_; + +public: + Status Setup(const Config& config, TaskIdSet* failureSet, const SpaceLayout* layout); + void Push(TaskPtr task, WaiterPtr waiter); + +private: + bool InitWorkerContext(std::unique_ptr& ctx); + void Worker(IoUnit& ios, const std::unique_ptr& ctx); + Status H2S(IoUnit& ios, const std::unique_ptr& ctx); + Status S2H(IoUnit& ios, const std::unique_ptr& ctx); + Status DoIoRead(const std::unique_ptr& ctx, int fd, size_t offset, IoUnit& ios); + Status DoIoWrite(const std::unique_ptr& ctx, int fd, size_t offset, IoUnit& ios); + Status CheckIoResult(const hf3fs_cqe& cqe, const std::string& path, size_t offset, bool isRead); +}; + +} // namespace UC::Ds3fsStore + +#endif \ No newline at end of file diff --git a/ucm/store/ds3fs/cc/trans_task.h b/ucm/store/ds3fs/cc/trans_task.h new file mode 100644 index 000000000..0624aec43 --- /dev/null +++ b/ucm/store/ds3fs/cc/trans_task.h @@ -0,0 +1,52 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#ifndef UNIFIEDCACHE_DS3FS_STORE_CC_TRANS_TASK_H +#define UNIFIEDCACHE_DS3FS_STORE_CC_TRANS_TASK_H + +#include +#include "type/types.h" + +namespace UC::Ds3fsStore { + +class TransTask { +public: + enum class Type : uint8_t { LOAD, DUMP }; + Detail::TaskHandle id{0}; + Type type{Type::DUMP}; + Detail::TaskDesc desc; + +public: + TransTask(Type type, Detail::TaskDesc desc) : id{NextId()}, type{type}, desc{std::move(desc)} {} + +private: + static size_t NextId() noexcept + { + static std::atomic id{1}; + return id.fetch_add(1, std::memory_order_relaxed); + }; +}; + +} // namespace UC::Ds3fsStore + +#endif diff --git a/ucm/store/ds3fs/connector.py b/ucm/store/ds3fs/connector.py new file mode 100644 index 000000000..2db2bdc4a --- /dev/null +++ b/ucm/store/ds3fs/connector.py @@ -0,0 +1,156 @@ +# -*- coding: utf-8 -*- +# +# MIT License +# +# Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +import array +from dataclasses import dataclass +from typing import Dict, List + +import numpy as np +import torch + +from ucm.store.ds3fs import ucmds3fsstore +from ucm.store.ucmstore_v1 import Task, UcmKVStoreBaseV1 + + +@dataclass +class Ds3fsTransTask(Task): + task_id: int + + +class UcmDs3fsStore(UcmKVStoreBaseV1): + def __init__(self, config: Dict[str, object]) -> None: + super().__init__(config) + + key_mapping = { + "storage_backends": "storageBackends", + "device_id": "deviceId", + "tensor_size": "tensorSize", + "shard_size": "shardSize", + "block_size": "blockSize", + "io_direct": "ioDirect", + "stream_number": "streamNumber", + "timeout_ms": "timeoutMs", + "ior_entries": "iorEntries", + "ior_depth": "iorDepth", + "numa_id": "numaId", + } + self.store = ucmds3fsstore.Ds3fsStore() + param = ucmds3fsstore.Ds3fsStore.Config() + for key, value in config.items(): + attr = key_mapping.get(key) + if attr and hasattr(param, attr): + if key == "storage_backends" and isinstance(value, list): + value = self._normalize_storage_backends(value) + setattr(param, attr, value) + self.store.Setup(param) + + @staticmethod + def _normalize_storage_backends(backends: List[str]) -> List[str]: + normalized = [] + for backend in backends: + path = backend + if path.endswith("/"): + path = path[:-1] + if "/" in path: + last_slash = path.rfind("/") + last_dir = path[last_slash + 1 :] + if last_dir in ("kv", "rope"): + path = path[:last_slash] + normalized.append(path) + return normalized + + def cc_store(self) -> int: + return self.store.Self() + + def lookup(self, block_ids: List[bytes]) -> List[bool]: + flat = np.frombuffer(b"".join(block_ids), dtype=np.uint8) + res = self.store.Lookup(flat) + return np.frombuffer(res, dtype=bool) + + def prefetch(self, block_ids: List[bytes]) -> None: + flat = np.frombuffer(b"".join(block_ids), dtype=np.uint8) + self.store.Prefetch(flat) + + def _tensor_normalize(self, tensors: List[List[torch.Tensor]]) -> np.ndarray: + n_rows = len(tensors) + n_cols = len(tensors[0]) + flat = np.fromiter( + (t for row in tensors for t in row), dtype=object, count=n_rows * n_cols + ) + ptrs = np.vectorize(torch.Tensor.data_ptr, otypes=[np.uint64])(flat) + return ptrs.reshape(n_rows, n_cols) + + def load( + self, + block_ids: List[bytes], + shard_index: List[int], + dst_tensor: List[List[torch.Tensor]], + ) -> Task: + ids = np.frombuffer(b"".join(block_ids), dtype=np.uint8) + indexes = array.array("Q", shard_index) + addrs = self._tensor_normalize(dst_tensor) + task_id = self.store.Load(ids, indexes, addrs) + return Ds3fsTransTask(task_id) + + def dump( + self, + block_ids: List[bytes], + shard_index: List[int], + src_tensor: List[List[torch.Tensor]], + ) -> Task: + ids = np.frombuffer(b"".join(block_ids), dtype=np.uint8) + indexes = array.array("Q", shard_index) + addrs = self._tensor_normalize(src_tensor) + task_id = self.store.Dump(ids, indexes, addrs) + return Ds3fsTransTask(task_id) + + def load_data( + self, + block_ids: List[bytes], + shard_index: List[int], + dst_addr: List[List[int]], + ) -> Task: + ids = np.frombuffer(b"".join(block_ids), dtype=np.uint8) + indexes = array.array("Q", shard_index) + addrs = np.array(dst_addr, np.uint64) + task_id = self.store.Load(ids, indexes, addrs) + return Ds3fsTransTask(task_id) + + def dump_data( + self, + block_ids: List[bytes], + shard_index: List[int], + src_addr: List[List[int]], + ) -> Task: + ids = np.frombuffer(b"".join(block_ids), dtype=np.uint8) + indexes = array.array("Q", shard_index) + addrs = np.array(src_addr, np.uint64) + task_id = self.store.Dump(ids, indexes, addrs) + return Ds3fsTransTask(task_id) + + def wait(self, task: Task) -> None: + return self.store.Wait(task.task_id) + + def check(self, task: Task) -> bool: + return self.store.Check(task.task_id) diff --git a/ucm/store/ds3fs/cpy/ds3fs_store.py.cc b/ucm/store/ds3fs/cpy/ds3fs_store.py.cc new file mode 100644 index 000000000..be070ee2d --- /dev/null +++ b/ucm/store/ds3fs/cpy/ds3fs_store.py.cc @@ -0,0 +1,61 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#include "ds3fs_store.h" +#include "template/store_binder.h" + +PYBIND11_MODULE(ucmds3fsstore, module) +{ + namespace py = pybind11; + using namespace UC::Ds3fsStore; + using Ds3fsStorePy = UC::Detail::StoreBinder; + module.attr("project") = UCM_PROJECT_NAME; + module.attr("version") = UCM_PROJECT_VERSION; + module.attr("commit_id") = UCM_COMMIT_ID; + module.attr("build_type") = UCM_BUILD_TYPE; + auto store = py::class_>(module, "Ds3fsStore"); + auto config = py::class_(store, "Config"); + config.def(py::init<>()); + config.def_readwrite("storageBackends", &Config::storageBackends); + config.def_readwrite("deviceId", &Config::deviceId); + config.def_readwrite("tensorSize", &Config::tensorSize); + config.def_readwrite("shardSize", &Config::shardSize); + config.def_readwrite("blockSize", &Config::blockSize); + config.def_readwrite("ioDirect", &Config::ioDirect); + config.def_readwrite("streamNumber", &Config::streamNumber); + config.def_readwrite("timeoutMs", &Config::timeoutMs); + config.def_readwrite("iorEntries", &Config::iorEntries); + config.def_readwrite("iorDepth", &Config::iorDepth); + config.def_readwrite("numaId", &Config::numaId); + store.def(py::init<>()); + store.def("Self", &Ds3fsStorePy::Self); + store.def("Setup", &Ds3fsStorePy::Setup); + store.def("Lookup", &Ds3fsStorePy::Lookup, py::arg("ids").noconvert()); + store.def("Prefetch", &Ds3fsStorePy::Prefetch, py::arg("ids").noconvert()); + store.def("Load", &Ds3fsStorePy::Load, py::arg("ids").noconvert(), + py::arg("indexes").noconvert(), py::arg("addrs").noconvert()); + store.def("Dump", &Ds3fsStorePy::Dump, py::arg("ids").noconvert(), + py::arg("indexes").noconvert(), py::arg("addrs").noconvert()); + store.def("Check", &Ds3fsStorePy::Check); + store.def("Wait", &Ds3fsStorePy::Wait); +} diff --git a/ucm/store/pipeline/connector.py b/ucm/store/pipeline/connector.py index 5185869ea..41f553413 100644 --- a/ucm/store/pipeline/connector.py +++ b/ucm/store/pipeline/connector.py @@ -28,12 +28,26 @@ import torch from ucm.store.cache.connector import UcmCacheStore +from ucm.store.ds3fs.connector import UcmDs3fsStore from ucm.store.posix.connector import UcmPosixStore from ucm.store.ucmstore_v1 import Task, UcmKVStoreBaseV1 PipelineBuilder = Callable[[Dict[str, object], List[UcmKVStoreBaseV1]], None] +def _build_cache_ds3fs_pipeline( + config: Dict[str, object], store: List[UcmKVStoreBaseV1] +) -> None: + ds3fs_config = copy.deepcopy(config) + if int(config["device_id"]) >= 0: + ds3fs_config |= {"tensor_size": config["shard_size"]} + ds3fs_store = UcmDs3fsStore(ds3fs_config) + store.append(ds3fs_store) + cache_config = copy.deepcopy(config) | {"store_backend": ds3fs_store.cc_store()} + cache_store = UcmCacheStore(cache_config) + store.append(cache_store) + + def _build_cache_posix_pipeline( config: Dict[str, object], store: List[UcmKVStoreBaseV1] ) -> None: @@ -49,6 +63,7 @@ def _build_cache_posix_pipeline( PIPELINE_REGISTRY: Dict[str, PipelineBuilder] = { "Cache|Posix": _build_cache_posix_pipeline, + "Cache|Ds3fs": _build_cache_ds3fs_pipeline, } diff --git a/ucm/store/test/e2e/ds3fs_test.py b/ucm/store/test/e2e/ds3fs_test.py new file mode 100644 index 000000000..099298b50 --- /dev/null +++ b/ucm/store/test/e2e/ds3fs_test.py @@ -0,0 +1,153 @@ +# -*- coding: utf-8 -*- +# +# MIT License +# +# Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +import os +import secrets +import time +from typing import List + +import cupy +import numpy as np + +from ucm.store.ds3fs.connector import UcmDs3fsStore +from ucm.store.ucmstore import Task + + +class Ds3fsStoreOnly: + def __init__( + self, + block_size: int, + storage_backends: List[str], + ): + ds3fs_config = {} + ds3fs_config["storage_backends"] = storage_backends + ds3fs_config["device_id"] = 0 + ds3fs_config["tensor_size"] = block_size + ds3fs_config["shard_size"] = block_size + ds3fs_config["block_size"] = block_size + ds3fs_config["io_direct"] = True + ds3fs_config["stream_number"] = 32 + self.ds3fs = UcmDs3fsStore(ds3fs_config) + + def lookup(self, block_ids: List[bytes]) -> List[bool]: + return self.ds3fs.lookup(block_ids) + + def prefetch(self, block_ids: List[bytes]) -> None: + return self.ds3fs.prefetch(block_ids) + + def load_data( + self, + block_ids: List[bytes], + shard_index: List[int], + dst_addr: List[List[int]], + ) -> Task: + return self.ds3fs.load_data(block_ids, shard_index, dst_addr) + + def dump_data( + self, + block_ids: List[bytes], + shard_index: List[int], + src_addr: List[List[int]], + ) -> Task: + return self.ds3fs.dump_data(block_ids, shard_index, src_addr) + + def wait(self, task: Task) -> None: + return self.ds3fs.wait(task) + + def check(self, task: Task) -> bool: + return self.ds3fs.check(task) + + +def e2e_test( + store: Ds3fsStoreOnly, + block_size: int, + block_num: int, +): + block_ids = [secrets.token_bytes(16) for _ in range(block_num)] + + founds = store.lookup(block_ids) + assert not all(founds), "Blocks should not exist before dump" + + shard_indexes = [0 for _ in range(block_num)] + + src_data = [] + src_arrays = [] + src_mems = [] + for i in range(block_num): + mem = cupy.cuda.alloc_pinned_memory(block_size) + arr = np.frombuffer(mem, dtype=np.uint8, count=block_size) + arr.flags.writeable = True + arr[:] = np.random.randint(0, 256, block_size, dtype=np.uint8) + src_data.append([mem.ptr]) + src_arrays.append(arr.copy()) + src_mems.append(mem) + + task = store.dump_data(block_ids, shard_indexes, src_data) + store.wait(task) + + founds = store.lookup(block_ids) + assert all(founds), "Blocks should exist after dump" + + dst_data = [] + dst_arrays = [] + dst_mems = [] + for i in range(block_num): + mem = cupy.cuda.alloc_pinned_memory(block_size) + arr = np.frombuffer(mem, dtype=np.uint8, count=block_size) + arr.flags.writeable = True + arr[:] = 0 + dst_data.append([mem.ptr]) + dst_arrays.append(arr) + dst_mems.append(mem) + + task = store.load_data(block_ids, shard_indexes, dst_data) + store.wait(task) + + for i, (src_arr, dst_arr) in enumerate(zip(src_arrays, dst_arrays)): + if not np.array_equal(src_arr, dst_arr): + diff_mask = src_arr != dst_arr + num_diff = diff_mask.sum() + print(f"DIFF at block {i}: {num_diff} bytes differ") + print(f" src sample: {src_arr[diff_mask][:10]}") + print(f" dst sample: {dst_arr[diff_mask][:10]}") + assert False, f"Data mismatch at block {i}" + + +def main(): + block_size = 1048576 * 16 + block_num = 256 + storage_backends = ["."] + test_batch_number = 64 + + store = Ds3fsStoreOnly(block_size, storage_backends) + + for i in range(test_batch_number): + e2e_test(store, block_size, block_num) + + time.sleep(10) + + +if __name__ == "__main__": + os.environ["UC_LOGGER_LEVEL"] = "debug" + main()