Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ucm/store/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ target_link_libraries(storeintf INTERFACE storedetail infra_status)
add_subdirectory(nfsstore)
add_subdirectory(pcstore)
add_subdirectory(posix)
add_subdirectory(ds3fs)
add_subdirectory(cache)
add_subdirectory(mooncakestore)
add_subdirectory(test)
30 changes: 30 additions & 0 deletions ucm/store/ds3fs/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
find_path(HF3FS_USRBIO_INCLUDE_DIR NAMES hf3fs_usrbio.h PATHS /usr/local/3fs/src/lib/api NO_DEFAULT_PATH)
find_library(HF3FS_USRBIO_LIBRARY NAMES hf3fs_api_shared PATHS /usr/local/3fs/src/lib/rs/hf3fs-usrbio-sys/lib NO_DEFAULT_PATH)

if(HF3FS_USRBIO_INCLUDE_DIR AND HF3FS_USRBIO_LIBRARY)
file(GLOB_RECURSE UCM_DS3FS_STORE_CC_SOURCE_FILES "./cc/*.cc")
add_library(ds3fsstore STATIC ${UCM_DS3FS_STORE_CC_SOURCE_FILES})
target_include_directories(ds3fsstore PUBLIC
${CMAKE_CURRENT_SOURCE_DIR}/cc
)

target_include_directories(ds3fsstore PRIVATE ${HF3FS_USRBIO_INCLUDE_DIR})
target_link_libraries(ds3fsstore PRIVATE ${HF3FS_USRBIO_LIBRARY})
target_link_libraries(ds3fsstore PUBLIC storeintf infra_logger)

file(GLOB_RECURSE UCM_DS3FS_STORE_CPY_SOURCE_FILES "./cpy/*.cc")
pybind11_add_module(ucmds3fsstore ${UCM_DS3FS_STORE_CPY_SOURCE_FILES})
target_link_libraries(ucmds3fsstore PRIVATE ds3fsstore)

file(RELATIVE_PATH INSTALL_REL_PATH ${CMAKE_SOURCE_DIR} ${CMAKE_CURRENT_SOURCE_DIR})
install(TARGETS ucmds3fsstore LIBRARY DESTINATION ${INSTALL_REL_PATH} COMPONENT ucm)
else()
message(STATUS "ds3fsstore: Skipping build - required HF3FS dependencies not found")
if(NOT HF3FS_USRBIO_INCLUDE_DIR)
message(STATUS " - Missing: hf3fs_usrbio.h ")
endif()
if(NOT HF3FS_USRBIO_LIBRARY)
message(STATUS " - Missing: hf3fs_api_shared library ")
endif()
message(STATUS " Please ensure HF3FS dependencies are installed or build paths are correct")
endif()
98 changes: 98 additions & 0 deletions ucm/store/ds3fs/cc/ds3fs_file.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/**
* MIT License
*
* Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
* */
#include "ds3fs_file.h"
#include <sys/stat.h>
#include <unistd.h>

namespace UC::Ds3fsStore {

static constexpr auto NewFilePerm = (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
static constexpr auto NewDirPerm = (S_IRWXU | S_IRGRP | S_IROTH);

Ds3fsFile::~Ds3fsFile()
{
if (handle_ != -1) { Close(); }
}

Status Ds3fsFile::MkDir()
{
auto ret = mkdir(path_.c_str(), NewDirPerm);
auto eno = errno;
if (ret != 0) [[unlikely]] {
if (eno == EEXIST) { return Status::DuplicateKey(); }
return Status::OsApiError(std::to_string(eno));
}
return Status::OK();
}

Status Ds3fsFile::RmDir()
{
auto ret = rmdir(path_.c_str());
auto eno = errno;
if (ret != 0) [[unlikely]] { return Status::OsApiError(std::to_string(eno)); }
return Status::OK();
}

Status Ds3fsFile::Rename(const std::string& newName)
{
auto ret = rename(path_.c_str(), newName.c_str());
auto eno = errno;
if (ret != 0) [[unlikely]] {
if (eno == ENOENT) { return Status::NotFound(); }
return Status::OsApiError(std::to_string(eno));
}
return Status::OK();
}

Status Ds3fsFile::Access(const int32_t mode)
{
auto ret = access(path_.c_str(), mode);
auto eno = errno;
if (ret != 0) [[unlikely]] {
if (eno == ENOENT) { return Status::NotFound(); }
return Status::OsApiError(std::to_string(eno));
}
return Status::OK();
}

Status Ds3fsFile::Open(const uint32_t flags)
{
handle_ = open(path_.c_str(), flags, NewFilePerm);
auto eno = errno;
if (handle_ < 0) [[unlikely]] {
if (eno == EEXIST) { return Status::DuplicateKey(); }
return Status::OsApiError(std::to_string(eno));
}
return Status::OK();
}

void Ds3fsFile::Close()
{
close(handle_);
handle_ = -1;
}

void Ds3fsFile::Remove() { remove(path_.c_str()); }

} // namespace UC::Ds3fsStore
75 changes: 75 additions & 0 deletions ucm/store/ds3fs/cc/ds3fs_file.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/**
* MIT License
*
* Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
* */
#ifndef UNIFIEDCACHE_DS3FS_STORE_CC_DS3FS_FILE_H
#define UNIFIEDCACHE_DS3FS_STORE_CC_DS3FS_FILE_H

#include <fcntl.h>
#include "status/status.h"

namespace UC::Ds3fsStore {

class Ds3fsFile {
public:
struct AccessMode {
static constexpr int32_t READ = R_OK;
static constexpr int32_t WRITE = W_OK;
static constexpr int32_t EXIST = F_OK;
static constexpr int32_t EXECUTE = X_OK;
};
struct OpenFlag {
static constexpr uint32_t READ_ONLY = O_RDONLY;
static constexpr uint32_t WRITE_ONLY = O_WRONLY;
static constexpr uint32_t READ_WRITE = O_RDWR;
static constexpr uint32_t CREATE = O_CREAT;
static constexpr uint32_t DIRECT = O_DIRECT;
static constexpr uint32_t APPEND = O_APPEND;
static constexpr uint32_t EXCL = O_EXCL;
};

private:
std::string path_{};
int32_t handle_{-1};

public:
explicit Ds3fsFile(std::string path) : path_{std::move(path)} {}
~Ds3fsFile();
const std::string& Path() const { return path_; }
int32_t ReleaseHandle()
{
int32_t fd = handle_;
handle_ = -1;
return fd;
}
Status MkDir();
Status RmDir();
Status Rename(const std::string& newName);
Status Access(const int32_t mode);
Status Open(const uint32_t flags);
void Close();
void Remove();
};

} // namespace UC::Ds3fsStore

#endif
152 changes: 152 additions & 0 deletions ucm/store/ds3fs/cc/ds3fs_store.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/**
* MIT License
*
* Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
* */
#include "ds3fs_store.h"
#include <fmt/ranges.h>
#include "logger/logger.h"
#include "space_manager.h"
#include "trans_manager.h"

namespace UC::Ds3fsStore {

class Ds3fsStoreImpl {
public:
SpaceManager spaceMgr;
TransManager transMgr;
bool transEnable{false};

public:
Status Setup(const Config& config)
{
auto s = CheckConfig(config);
if (s.Failure()) [[unlikely]] {
UC_ERROR("Failed to check config params: {}.", s);
return s;
}
s = spaceMgr.Setup(config);
if (s.Failure()) [[unlikely]] { return s; }
transEnable = config.deviceId >= 0;
if (transEnable) {
s = transMgr.Setup(config, spaceMgr.GetLayout());
if (s.Failure()) [[unlikely]] { return s; }
}
ShowConfig(config);
return Status::OK();
}

private:
Status CheckConfig(const Config& config)
{
if (config.storageBackends.empty()) {
return Status::InvalidParam("invalid storage backends");
}
if (config.deviceId < -1) {
return Status::InvalidParam("invalid device({})", config.deviceId);
}
if (config.deviceId == -1) { return Status::OK(); }
if (config.tensorSize == 0 || config.shardSize < config.tensorSize ||
config.blockSize < config.shardSize || config.shardSize % config.tensorSize != 0 ||
config.blockSize % config.shardSize != 0) {
return Status::InvalidParam("invalid size({},{},{})", config.tensorSize,
config.shardSize, config.blockSize);
}
if (config.streamNumber == 0) {
return Status::InvalidParam("invalid stream number({})", config.streamNumber);
}
return Status::OK();
}
void ShowConfig(const Config& config)
{
constexpr const char* ns = "Ds3fsStore";
std::string buildType = UCM_BUILD_TYPE;
if (buildType.empty()) { buildType = "Release"; }
UC_INFO("{}-{}({}).", ns, UCM_COMMIT_ID, buildType);
UC_INFO("Set {}::StorageBackends to {}.", ns, config.storageBackends[0]);
UC_INFO("Set {}::DeviceId to {}.", ns, config.deviceId);
if (config.deviceId == -1) { return; }
UC_INFO("Set {}::TensorSize to {}.", ns, config.tensorSize);
UC_INFO("Set {}::ShardSize to {}.", ns, config.shardSize);
UC_INFO("Set {}::BlockSize to {}.", ns, config.blockSize);
UC_INFO("Set {}::IoDirect to {}.", ns, config.ioDirect);
UC_INFO("Set {}::StreamNumber to {}.", ns, config.streamNumber);
UC_INFO("Set {}::TimeoutMs to {}.", ns, config.timeoutMs);
}
};

Ds3fsStore::~Ds3fsStore() = default;

Status Ds3fsStore::Setup(const Config& config)
{
try {
impl_ = std::make_shared<Ds3fsStoreImpl>();
} catch (const std::exception& e) {
UC_ERROR("Failed({}) to make ds3fs store object.", e.what());
return Status::Error(e.what());
}
return impl_->Setup(config);
}

std::string Ds3fsStore::Readme() const { return "Ds3fsStore"; }

Expected<std::vector<uint8_t>> Ds3fsStore::Lookup(const Detail::BlockId* blocks, size_t num)
{
return impl_->spaceMgr.Lookup(blocks, num);
}

void Ds3fsStore::Prefetch(const Detail::BlockId* blocks, size_t num) {}

Expected<Detail::TaskHandle> Ds3fsStore::Load(Detail::TaskDesc task)
{
if (!impl_->transEnable) { return Status::Error("transfer is not enable"); }
auto res = impl_->transMgr.Submit({TransTask::Type::LOAD, std::move(task)});
if (!res) [[unlikely]] {
UC_ERROR("Failed({}) to submit load task({}).", res.Error(), task.brief);
}
return res;
}

Expected<Detail::TaskHandle> Ds3fsStore::Dump(Detail::TaskDesc task)
{
if (!impl_->transEnable) { return Status::Error("transfer is not enable"); }
auto res = impl_->transMgr.Submit({TransTask::Type::DUMP, std::move(task)});
if (!res) [[unlikely]] {
UC_ERROR("Failed({}) to submit dump task({}).", res.Error(), task.brief);
}
return res;
}

Expected<bool> Ds3fsStore::Check(Detail::TaskHandle taskId)
{
auto res = impl_->transMgr.Check(taskId);
if (!res) [[unlikely]] { UC_ERROR("Failed({}) to check task({}).", res.Error(), taskId); }
return res;
}

Status Ds3fsStore::Wait(Detail::TaskHandle taskId)
{
auto s = impl_->transMgr.Wait(taskId);
if (s.Failure()) [[unlikely]] { UC_ERROR("Failed({}) to wait task({}).", s, taskId); }
return s;
}

} // namespace UC::Ds3fsStore
Loading