diff --git a/ucm/store/pcstore/cc/domain/trans/share_buffer.cc b/ucm/store/pcstore/cc/domain/trans/share_buffer.cc index 86c3647ff..ddcf69546 100644 --- a/ucm/store/pcstore/cc/domain/trans/share_buffer.cc +++ b/ucm/store/pcstore/cc/domain/trans/share_buffer.cc @@ -29,11 +29,12 @@ #include #include "file/file.h" #include "logger/logger.h" -#include "trans/buffer.h" +#include "trans/device.h" namespace UC { static constexpr int32_t SHARE_BUFFER_MAGIC = (('S' << 16) | ('b' << 8) | 1); +static constexpr size_t INVALID_POSITION = size_t(-1); struct ShareMutex { pthread_mutex_t mutex; @@ -87,12 +88,24 @@ struct ShareBlockHeader { ShareBlockStatus status; size_t offset; void* Data() { return reinterpret_cast(this) + offset; } + void Refer() + { + if (this->ref == 0 && this->status != ShareBlockStatus::LOADED) { + this->status = ShareBlockStatus::INIT; + } + this->ref++; + } + void Occupy(const std::string& block) + { + this->id.Set(block); + this->ref = 1; + this->status = ShareBlockStatus::INIT; + } }; struct ShareBufferHeader { ShareMutex mutex; std::atomic magic; - int32_t ref; size_t blockSize; size_t blockNumber; ShareBlockHeader headers[0]; @@ -128,13 +141,14 @@ void CleanUpShmFileExceptMe(const std::string& me) } Status ShareBuffer::Setup(const size_t blockSize, const size_t blockNumber, const bool ioDirect, - const size_t nSharer, const std::string& uniqueId) + const std::string& uniqueId) { this->blockSize_ = blockSize; this->blockNumber_ = blockNumber; this->ioDirect_ = ioDirect; - this->nSharer_ = nSharer; this->addr_ = nullptr; + tmpBufMaker_ = Trans::Device{}.MakeBuffer(); + if (!tmpBufMaker_) { return Status::OutOfMemory(); } this->shmName_ = ShmPrefix() + uniqueId; CleanUpShmFileExceptMe(this->shmName_); auto file = File::Make(this->shmName_); @@ -149,31 +163,19 @@ Status ShareBuffer::Setup(const size_t blockSize, const size_t blockNumber, cons ShareBuffer::~ShareBuffer() { if (!this->addr_) { return; } - auto bufferHeader = (ShareBufferHeader*)this->addr_; - bufferHeader->mutex.Lock(); - auto ref = (--bufferHeader->ref); - bufferHeader->mutex.Unlock(); void* dataAddr = static_cast(this->addr_) + this->DataOffset(); Trans::Buffer::UnregisterHostBuffer(dataAddr); const auto shmSize = this->ShmSize(); File::MUnmap(this->addr_, shmSize); - if (ref == 0) { File::ShmUnlink(this->shmName_); } + File::ShmUnlink(this->shmName_); } std::shared_ptr ShareBuffer::MakeReader(const std::string& block, const std::string& path) { - auto index = this->AcquireBlock(block); - try { - void* addr = this->BlockAt(index); - return std::shared_ptr( - new Reader{block, path, blockSize_, ioDirect_, nSharer_, addr}, - [this, index](auto) { this->ReleaseBlock(index); }); - } catch (...) { - this->ReleaseBlock(index); - UC_ERROR("Failed to create reader."); - return nullptr; - } + auto pos = this->AcquireBlock(block); + if (pos != INVALID_POSITION) { return MakeSharedReader(block, path, pos); } + return MakeLocalReader(block, path); } size_t ShareBuffer::DataOffset() const @@ -198,7 +200,6 @@ Status ShareBuffer::InitShmBuffer(IFile* file) auto bufferHeader = (ShareBufferHeader*)this->addr_; bufferHeader->magic = 1; bufferHeader->mutex.Init(); - bufferHeader->ref = this->nSharer_; bufferHeader->blockSize = this->blockSize_; bufferHeader->blockNumber = this->blockNumber_; const auto dataOffset = this->DataOffset(); @@ -215,7 +216,7 @@ Status ShareBuffer::InitShmBuffer(IFile* file) auto dataSize = shmSize - dataOffset; auto status = Trans::Buffer::RegisterHostBuffer(dataAddr, dataSize); if (status.Success()) { return Status::OK(); } - UC_ERROR("Failed({}) to regitster host buffer({}).", status.ToString(), dataSize); + UC_ERROR("Failed({}) to register host buffer({}).", status.ToString(), dataSize); return Status::Error(); } @@ -246,7 +247,7 @@ Status ShareBuffer::LoadShmBuffer(IFile* file) auto dataSize = shmSize - dataOffset; auto status = Trans::Buffer::RegisterHostBuffer(dataAddr, dataSize); if (status.Success()) { return Status::OK(); } - UC_ERROR("Failed({}) to regitster host buffer({}).", status.ToString(), dataSize); + UC_ERROR("Failed({}) to register host buffer({}).", status.ToString(), dataSize); return Status::Error(); } @@ -255,36 +256,39 @@ size_t ShareBuffer::AcquireBlock(const std::string& block) static std::hash hasher{}; auto pos = hasher(block) % this->blockNumber_; auto bufferHeader = (ShareBufferHeader*)this->addr_; - auto reusedIdx = this->blockNumber_; + auto reusedPos = INVALID_POSITION; bufferHeader->mutex.Lock(); - for (size_t i = 0;; i++) { - if (!bufferHeader->headers[pos].id.Used()) { - if (reusedIdx == this->blockNumber_) { reusedIdx = pos; } - break; - } - if (bufferHeader->headers[pos].id == block) { - reusedIdx = pos; - break; + for (size_t i = 0; i < this->blockNumber_; i++) { + auto header = bufferHeader->headers + pos; + header->mutex.Lock(); + if (header->id == block) { + header->Refer(); + header->mutex.Unlock(); + bufferHeader->mutex.Unlock(); + return pos; } - if (bufferHeader->headers[pos].ref <= 0) { - if (reusedIdx == this->blockNumber_) { reusedIdx = pos; } + if (!header->id.Used()) { + if (reusedPos != INVALID_POSITION) { + header->mutex.Unlock(); + break; + } + header->Occupy(block); + header->mutex.Unlock(); + bufferHeader->mutex.Unlock(); + return pos; } + if (header->ref <= 0 && reusedPos == INVALID_POSITION) { reusedPos = pos; } + header->mutex.Unlock(); pos = (pos + 1) % this->blockNumber_; - if (i == this->blockNumber_) { - UC_WARN("Buffer({}) used out.", this->blockNumber_); - i = 0; - } } - auto blockHeader = bufferHeader->headers + reusedIdx; - blockHeader->mutex.Lock(); - if (blockHeader->ref <= 0) { - blockHeader->id.Set(block); - blockHeader->ref = this->nSharer_; - blockHeader->status = ShareBlockStatus::INIT; + if (reusedPos != INVALID_POSITION) { + auto header = bufferHeader->headers + reusedPos; + header->mutex.Lock(); + header->Occupy(block); + header->mutex.Unlock(); } - blockHeader->mutex.Unlock(); bufferHeader->mutex.Unlock(); - return reusedIdx; + return reusedPos; } void ShareBuffer::ReleaseBlock(const size_t index) @@ -301,7 +305,67 @@ void* ShareBuffer::BlockAt(const size_t index) return bufferHeader->headers + index; } +std::shared_ptr ShareBuffer::MakeLocalReader(const std::string& block, + const std::string& path) +{ + auto addr = tmpBufMaker_->MakeHostBuffer(blockSize_); + if (!addr) [[unlikely]] { + UC_ERROR("Failed to make buffer({}) on host.", blockSize_); + return nullptr; + } + Reader* reader = nullptr; + try { + reader = new Reader{block, path, blockSize_, ioDirect_, false, addr.get()}; + return std::shared_ptr(reader, + [addr = std::move(addr)](Reader* reader) { delete reader; }); + } catch (const std::exception& e) { + if (reader) { delete reader; } + UC_ERROR("Failed({}) to create reader.", e.what()); + return nullptr; + } +} + +std::shared_ptr ShareBuffer::MakeSharedReader(const std::string& block, + const std::string& path, + size_t position) +{ + void* addr = this->BlockAt(position); + Reader* reader = nullptr; + try { + reader = new Reader{block, path, blockSize_, ioDirect_, true, addr}; + return std::shared_ptr(reader, [this, position](Reader* reader) { + delete reader; + this->ReleaseBlock(position); + }); + } catch (...) { + this->ReleaseBlock(position); + if (reader) { delete reader; } + UC_ERROR("Failed to create reader."); + return nullptr; + } +} + Status ShareBuffer::Reader::Ready4Read() +{ + if (shared_) { return Ready4ReadOnSharedBuffer(); } + return Ready4ReadOnLocalBuffer(); +} + +uintptr_t ShareBuffer::Reader::GetData() +{ + if (shared_) { + auto header = (ShareBlockHeader*)this->addr_; + return (uintptr_t)header->Data(); + } + return (uintptr_t)this->addr_; +} + +Status ShareBuffer::Reader::Ready4ReadOnLocalBuffer() +{ + return File::Read(this->path_, 0, this->length_, this->GetData(), this->ioDirect_); +} + +Status ShareBuffer::Reader::Ready4ReadOnSharedBuffer() { auto header = (ShareBlockHeader*)this->addr_; if (header->status == ShareBlockStatus::LOADED) { return Status::OK(); } @@ -324,10 +388,4 @@ Status ShareBuffer::Reader::Ready4Read() return s; } -uintptr_t ShareBuffer::Reader::GetData() -{ - auto header = (ShareBlockHeader*)this->addr_; - return (uintptr_t)header->Data(); -} - } // namespace UC diff --git a/ucm/store/pcstore/cc/domain/trans/share_buffer.h b/ucm/store/pcstore/cc/domain/trans/share_buffer.h index 8827468d0..8c34631eb 100644 --- a/ucm/store/pcstore/cc/domain/trans/share_buffer.h +++ b/ucm/store/pcstore/cc/domain/trans/share_buffer.h @@ -29,6 +29,7 @@ #include #include "file/ifile.h" #include "status/status.h" +#include "trans/buffer.h" namespace UC { @@ -39,7 +40,7 @@ class ShareBuffer { std::string path_; size_t length_; bool ioDirect_; - size_t nSharer_; + bool shared_; void* addr_; public: @@ -48,21 +49,23 @@ class ShareBuffer { private: Reader(const std::string& block, const std::string& path, const size_t length, - const bool ioDirect, const size_t nSharer, void* addr) + const bool ioDirect, const bool shared, void* addr) : block_{block}, path_{path}, length_{length}, ioDirect_{ioDirect}, - nSharer_{nSharer}, + shared_{shared}, addr_{addr} { } friend class ShareBuffer; + Status Ready4ReadOnLocalBuffer(); + Status Ready4ReadOnSharedBuffer(); }; public: Status Setup(const size_t blockSize, const size_t blockNumber, const bool ioDirect, - const size_t nSharer, const std::string& uniqueId); + const std::string& uniqueId); ~ShareBuffer(); std::shared_ptr MakeReader(const std::string& block, const std::string& path); @@ -74,14 +77,17 @@ class ShareBuffer { size_t AcquireBlock(const std::string& block); void ReleaseBlock(const size_t index); void* BlockAt(const size_t index); + std::shared_ptr MakeLocalReader(const std::string& block, const std::string& path); + std::shared_ptr MakeSharedReader(const std::string& block, const std::string& path, + size_t position); private: size_t blockSize_; size_t blockNumber_; bool ioDirect_; - size_t nSharer_; std::string shmName_; - void* addr_; + void* addr_{nullptr}; + std::unique_ptr tmpBufMaker_{nullptr}; }; } // namespace UC diff --git a/ucm/store/pcstore/cc/domain/trans/trans_manager.cc b/ucm/store/pcstore/cc/domain/trans/trans_manager.cc index 63ee887b6..d0ab526a1 100644 --- a/ucm/store/pcstore/cc/domain/trans/trans_manager.cc +++ b/ucm/store/pcstore/cc/domain/trans/trans_manager.cc @@ -34,7 +34,7 @@ Status TransManager::Setup(const size_t rankSize, const int32_t deviceId, const { auto s = Status::OK(); if (rankSize > 1) { - s = this->shareQueue_.Setup(rankSize, deviceId, streamNumber, blockSize, ioSize, ioDirect, + s = this->shareQueue_.Setup(deviceId, streamNumber, blockSize, ioSize, ioDirect, bufferNumber, layout, &this->failureSet_, uniqueId); if (s.Failure()) { return s; } } diff --git a/ucm/store/pcstore/cc/domain/trans/trans_share_queue.cc b/ucm/store/pcstore/cc/domain/trans/trans_share_queue.cc index 840bb6b5e..03efd2073 100644 --- a/ucm/store/pcstore/cc/domain/trans/trans_share_queue.cc +++ b/ucm/store/pcstore/cc/domain/trans/trans_share_queue.cc @@ -39,18 +39,17 @@ TransShareQueue::~TransShareQueue() } } -Status TransShareQueue::Setup(const size_t nSharer, const int32_t deviceId, - const size_t streamNumber, const size_t blockSize, - const size_t ioSize, const bool ioDirect, const size_t bufferNumber, - const SpaceLayout* layout, TaskSet* failureSet, - const std::string& uniqueId) +Status TransShareQueue::Setup(const int32_t deviceId, const size_t streamNumber, + const size_t blockSize, const size_t ioSize, const bool ioDirect, + const size_t bufferNumber, const SpaceLayout* layout, + TaskSet* failureSet, const std::string& uniqueId) { this->deviceId_ = deviceId; this->streamNumber_ = streamNumber; this->ioSize_ = ioSize; this->layout_ = layout; this->failureSet_ = failureSet; - auto status = this->buffer_.Setup(blockSize, bufferNumber, ioDirect, nSharer, uniqueId); + auto status = this->buffer_.Setup(blockSize, bufferNumber, ioDirect, uniqueId); if (status.Failure()) { return status; } std::list> start(streamNumber); std::list> fut; @@ -67,9 +66,9 @@ Status TransShareQueue::Setup(const size_t nSharer, const int32_t deviceId, void TransShareQueue::Dispatch(TaskPtr task, WaiterPtr waiter) { - std::lock_guard lg(this->mutex_); + std::list blkTasks; task->ForEachGroup( - [task, waiter, this](const std::string& block, std::vector& shards) { + [task, waiter, this, &blkTasks](const std::string& block, std::vector& shards) { BlockTask blockTask; blockTask.reader = this->buffer_.MakeReader(block, this->layout_->DataFilePath(block, false)); @@ -82,8 +81,10 @@ void TransShareQueue::Dispatch(TaskPtr task, WaiterPtr waiter) waiter->Done([task, ioSize] { UC_DEBUG("{}", task->Epilog(ioSize)); }); } }; - this->wait_.push_back(blockTask); + blkTasks.push_back(std::move(blockTask)); }); + std::lock_guard lg(this->mutex_); + this->wait_.splice(this->wait_.end(), blkTasks); this->cv_.notify_all(); } diff --git a/ucm/store/pcstore/cc/domain/trans/trans_share_queue.h b/ucm/store/pcstore/cc/domain/trans/trans_share_queue.h index 7d76cad0a..ad5dfa722 100644 --- a/ucm/store/pcstore/cc/domain/trans/trans_share_queue.h +++ b/ucm/store/pcstore/cc/domain/trans/trans_share_queue.h @@ -61,10 +61,9 @@ class TransShareQueue { public: ~TransShareQueue(); - Status Setup(const size_t nSharer, const int32_t deviceId, const size_t streamNumber, - const size_t blockSize, const size_t ioSize, const bool ioDirect, - const size_t bufferNumber, const SpaceLayout* layout, TaskSet* failureSet, - const std::string& uniqueId); + Status Setup(const int32_t deviceId, const size_t streamNumber, const size_t blockSize, + const size_t ioSize, const bool ioDirect, const size_t bufferNumber, + const SpaceLayout* layout, TaskSet* failureSet, const std::string& uniqueId); void Dispatch(TaskPtr task, WaiterPtr waiter); private: