Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 110 additions & 52 deletions ucm/store/pcstore/cc/domain/trans/share_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@
#include <unistd.h>
#include "file/file.h"
#include "logger/logger.h"
#include "trans/buffer.h"
#include "trans/device.h"

namespace UC {

static constexpr int32_t SHARE_BUFFER_MAGIC = (('S' << 16) | ('b' << 8) | 1);
static constexpr size_t INVALID_POSITION = size_t(-1);

struct ShareMutex {
pthread_mutex_t mutex;
Expand Down Expand Up @@ -87,12 +88,24 @@ struct ShareBlockHeader {
ShareBlockStatus status;
size_t offset;
void* Data() { return reinterpret_cast<char*>(this) + offset; }
void Refer()
{
if (this->ref == 0 && this->status != ShareBlockStatus::LOADED) {
this->status = ShareBlockStatus::INIT;
}
this->ref++;
}
void Occupy(const std::string& block)
{
this->id.Set(block);
this->ref = 1;
this->status = ShareBlockStatus::INIT;
}
};

struct ShareBufferHeader {
ShareMutex mutex;
std::atomic<int32_t> magic;
int32_t ref;
size_t blockSize;
size_t blockNumber;
ShareBlockHeader headers[0];
Expand Down Expand Up @@ -128,13 +141,14 @@ void CleanUpShmFileExceptMe(const std::string& me)
}

Status ShareBuffer::Setup(const size_t blockSize, const size_t blockNumber, const bool ioDirect,
const size_t nSharer, const std::string& uniqueId)
const std::string& uniqueId)
{
this->blockSize_ = blockSize;
this->blockNumber_ = blockNumber;
this->ioDirect_ = ioDirect;
this->nSharer_ = nSharer;
this->addr_ = nullptr;
tmpBufMaker_ = Trans::Device{}.MakeBuffer();
if (!tmpBufMaker_) { return Status::OutOfMemory(); }
this->shmName_ = ShmPrefix() + uniqueId;
CleanUpShmFileExceptMe(this->shmName_);
auto file = File::Make(this->shmName_);
Expand All @@ -149,31 +163,19 @@ Status ShareBuffer::Setup(const size_t blockSize, const size_t blockNumber, cons
ShareBuffer::~ShareBuffer()
{
if (!this->addr_) { return; }
auto bufferHeader = (ShareBufferHeader*)this->addr_;
bufferHeader->mutex.Lock();
auto ref = (--bufferHeader->ref);
bufferHeader->mutex.Unlock();
void* dataAddr = static_cast<char*>(this->addr_) + this->DataOffset();
Trans::Buffer::UnregisterHostBuffer(dataAddr);
const auto shmSize = this->ShmSize();
File::MUnmap(this->addr_, shmSize);
if (ref == 0) { File::ShmUnlink(this->shmName_); }
File::ShmUnlink(this->shmName_);
}

std::shared_ptr<ShareBuffer::Reader> ShareBuffer::MakeReader(const std::string& block,
const std::string& path)
{
auto index = this->AcquireBlock(block);
try {
void* addr = this->BlockAt(index);
return std::shared_ptr<Reader>(
new Reader{block, path, blockSize_, ioDirect_, nSharer_, addr},
[this, index](auto) { this->ReleaseBlock(index); });
} catch (...) {
this->ReleaseBlock(index);
UC_ERROR("Failed to create reader.");
return nullptr;
}
auto pos = this->AcquireBlock(block);
if (pos != INVALID_POSITION) { return MakeSharedReader(block, path, pos); }
return MakeLocalReader(block, path);
}

size_t ShareBuffer::DataOffset() const
Expand All @@ -198,7 +200,6 @@ Status ShareBuffer::InitShmBuffer(IFile* file)
auto bufferHeader = (ShareBufferHeader*)this->addr_;
bufferHeader->magic = 1;
bufferHeader->mutex.Init();
bufferHeader->ref = this->nSharer_;
bufferHeader->blockSize = this->blockSize_;
bufferHeader->blockNumber = this->blockNumber_;
const auto dataOffset = this->DataOffset();
Expand All @@ -215,7 +216,7 @@ Status ShareBuffer::InitShmBuffer(IFile* file)
auto dataSize = shmSize - dataOffset;
auto status = Trans::Buffer::RegisterHostBuffer(dataAddr, dataSize);
if (status.Success()) { return Status::OK(); }
UC_ERROR("Failed({}) to regitster host buffer({}).", status.ToString(), dataSize);
UC_ERROR("Failed({}) to register host buffer({}).", status.ToString(), dataSize);
return Status::Error();
}

Expand Down Expand Up @@ -246,7 +247,7 @@ Status ShareBuffer::LoadShmBuffer(IFile* file)
auto dataSize = shmSize - dataOffset;
auto status = Trans::Buffer::RegisterHostBuffer(dataAddr, dataSize);
if (status.Success()) { return Status::OK(); }
UC_ERROR("Failed({}) to regitster host buffer({}).", status.ToString(), dataSize);
UC_ERROR("Failed({}) to register host buffer({}).", status.ToString(), dataSize);
return Status::Error();
}

Expand All @@ -255,36 +256,39 @@ size_t ShareBuffer::AcquireBlock(const std::string& block)
static std::hash<std::string> hasher{};
auto pos = hasher(block) % this->blockNumber_;
auto bufferHeader = (ShareBufferHeader*)this->addr_;
auto reusedIdx = this->blockNumber_;
auto reusedPos = INVALID_POSITION;
bufferHeader->mutex.Lock();
for (size_t i = 0;; i++) {
if (!bufferHeader->headers[pos].id.Used()) {
if (reusedIdx == this->blockNumber_) { reusedIdx = pos; }
break;
}
if (bufferHeader->headers[pos].id == block) {
reusedIdx = pos;
break;
for (size_t i = 0; i < this->blockNumber_; i++) {
auto header = bufferHeader->headers + pos;
header->mutex.Lock();
if (header->id == block) {
header->Refer();
header->mutex.Unlock();
bufferHeader->mutex.Unlock();
return pos;
}
if (bufferHeader->headers[pos].ref <= 0) {
if (reusedIdx == this->blockNumber_) { reusedIdx = pos; }
if (!header->id.Used()) {
if (reusedPos != INVALID_POSITION) {
header->mutex.Unlock();
break;
}
header->Occupy(block);
header->mutex.Unlock();
bufferHeader->mutex.Unlock();
return pos;
}
if (header->ref <= 0 && reusedPos == INVALID_POSITION) { reusedPos = pos; }
header->mutex.Unlock();
pos = (pos + 1) % this->blockNumber_;
if (i == this->blockNumber_) {
UC_WARN("Buffer({}) used out.", this->blockNumber_);
i = 0;
}
}
auto blockHeader = bufferHeader->headers + reusedIdx;
blockHeader->mutex.Lock();
if (blockHeader->ref <= 0) {
blockHeader->id.Set(block);
blockHeader->ref = this->nSharer_;
blockHeader->status = ShareBlockStatus::INIT;
if (reusedPos != INVALID_POSITION) {
auto header = bufferHeader->headers + reusedPos;
header->mutex.Lock();
header->Occupy(block);
header->mutex.Unlock();
}
blockHeader->mutex.Unlock();
bufferHeader->mutex.Unlock();
return reusedIdx;
return reusedPos;
}

void ShareBuffer::ReleaseBlock(const size_t index)
Expand All @@ -301,7 +305,67 @@ void* ShareBuffer::BlockAt(const size_t index)
return bufferHeader->headers + index;
}

std::shared_ptr<ShareBuffer::Reader> ShareBuffer::MakeLocalReader(const std::string& block,
const std::string& path)
{
auto addr = tmpBufMaker_->MakeHostBuffer(blockSize_);
if (!addr) [[unlikely]] {
UC_ERROR("Failed to make buffer({}) on host.", blockSize_);
return nullptr;
}
Reader* reader = nullptr;
try {
reader = new Reader{block, path, blockSize_, ioDirect_, false, addr.get()};
return std::shared_ptr<Reader>(reader,
[addr = std::move(addr)](Reader* reader) { delete reader; });
} catch (const std::exception& e) {
if (reader) { delete reader; }
UC_ERROR("Failed({}) to create reader.", e.what());
return nullptr;
}
}

std::shared_ptr<ShareBuffer::Reader> ShareBuffer::MakeSharedReader(const std::string& block,
const std::string& path,
size_t position)
{
void* addr = this->BlockAt(position);
Reader* reader = nullptr;
try {
reader = new Reader{block, path, blockSize_, ioDirect_, true, addr};
return std::shared_ptr<Reader>(reader, [this, position](Reader* reader) {
delete reader;
this->ReleaseBlock(position);
});
} catch (...) {
this->ReleaseBlock(position);
if (reader) { delete reader; }
UC_ERROR("Failed to create reader.");
return nullptr;
}
}

Status ShareBuffer::Reader::Ready4Read()
{
if (shared_) { return Ready4ReadOnSharedBuffer(); }
return Ready4ReadOnLocalBuffer();
}

uintptr_t ShareBuffer::Reader::GetData()
{
if (shared_) {
auto header = (ShareBlockHeader*)this->addr_;
return (uintptr_t)header->Data();
}
return (uintptr_t)this->addr_;
}

Status ShareBuffer::Reader::Ready4ReadOnLocalBuffer()
{
return File::Read(this->path_, 0, this->length_, this->GetData(), this->ioDirect_);
}

Status ShareBuffer::Reader::Ready4ReadOnSharedBuffer()
{
auto header = (ShareBlockHeader*)this->addr_;
if (header->status == ShareBlockStatus::LOADED) { return Status::OK(); }
Expand All @@ -324,10 +388,4 @@ Status ShareBuffer::Reader::Ready4Read()
return s;
}

uintptr_t ShareBuffer::Reader::GetData()
{
auto header = (ShareBlockHeader*)this->addr_;
return (uintptr_t)header->Data();
}

} // namespace UC
18 changes: 12 additions & 6 deletions ucm/store/pcstore/cc/domain/trans/share_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <string>
#include "file/ifile.h"
#include "status/status.h"
#include "trans/buffer.h"

namespace UC {

Expand All @@ -39,7 +40,7 @@ class ShareBuffer {
std::string path_;
size_t length_;
bool ioDirect_;
size_t nSharer_;
bool shared_;
void* addr_;

public:
Expand All @@ -48,21 +49,23 @@ class ShareBuffer {

private:
Reader(const std::string& block, const std::string& path, const size_t length,
const bool ioDirect, const size_t nSharer, void* addr)
const bool ioDirect, const bool shared, void* addr)
: block_{block},
path_{path},
length_{length},
ioDirect_{ioDirect},
nSharer_{nSharer},
shared_{shared},
addr_{addr}
{
}
friend class ShareBuffer;
Status Ready4ReadOnLocalBuffer();
Status Ready4ReadOnSharedBuffer();
};

public:
Status Setup(const size_t blockSize, const size_t blockNumber, const bool ioDirect,
const size_t nSharer, const std::string& uniqueId);
const std::string& uniqueId);
~ShareBuffer();
std::shared_ptr<Reader> MakeReader(const std::string& block, const std::string& path);

Expand All @@ -74,14 +77,17 @@ class ShareBuffer {
size_t AcquireBlock(const std::string& block);
void ReleaseBlock(const size_t index);
void* BlockAt(const size_t index);
std::shared_ptr<Reader> MakeLocalReader(const std::string& block, const std::string& path);
std::shared_ptr<Reader> MakeSharedReader(const std::string& block, const std::string& path,
size_t position);

private:
size_t blockSize_;
size_t blockNumber_;
bool ioDirect_;
size_t nSharer_;
std::string shmName_;
void* addr_;
void* addr_{nullptr};
std::unique_ptr<Trans::Buffer> tmpBufMaker_{nullptr};
};

} // namespace UC
Expand Down
2 changes: 1 addition & 1 deletion ucm/store/pcstore/cc/domain/trans/trans_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ Status TransManager::Setup(const size_t rankSize, const int32_t deviceId, const
{
auto s = Status::OK();
if (rankSize > 1) {
s = this->shareQueue_.Setup(rankSize, deviceId, streamNumber, blockSize, ioSize, ioDirect,
s = this->shareQueue_.Setup(deviceId, streamNumber, blockSize, ioSize, ioDirect,
bufferNumber, layout, &this->failureSet_, uniqueId);
if (s.Failure()) { return s; }
}
Expand Down
19 changes: 10 additions & 9 deletions ucm/store/pcstore/cc/domain/trans/trans_share_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,17 @@ TransShareQueue::~TransShareQueue()
}
}

Status TransShareQueue::Setup(const size_t nSharer, const int32_t deviceId,
const size_t streamNumber, const size_t blockSize,
const size_t ioSize, const bool ioDirect, const size_t bufferNumber,
const SpaceLayout* layout, TaskSet* failureSet,
const std::string& uniqueId)
Status TransShareQueue::Setup(const int32_t deviceId, const size_t streamNumber,
const size_t blockSize, const size_t ioSize, const bool ioDirect,
const size_t bufferNumber, const SpaceLayout* layout,
TaskSet* failureSet, const std::string& uniqueId)
{
this->deviceId_ = deviceId;
this->streamNumber_ = streamNumber;
this->ioSize_ = ioSize;
this->layout_ = layout;
this->failureSet_ = failureSet;
auto status = this->buffer_.Setup(blockSize, bufferNumber, ioDirect, nSharer, uniqueId);
auto status = this->buffer_.Setup(blockSize, bufferNumber, ioDirect, uniqueId);
if (status.Failure()) { return status; }
std::list<std::promise<Status>> start(streamNumber);
std::list<std::future<Status>> fut;
Expand All @@ -67,9 +66,9 @@ Status TransShareQueue::Setup(const size_t nSharer, const int32_t deviceId,

void TransShareQueue::Dispatch(TaskPtr task, WaiterPtr waiter)
{
std::lock_guard<std::mutex> lg(this->mutex_);
std::list<BlockTask> blkTasks;
task->ForEachGroup(
[task, waiter, this](const std::string& block, std::vector<uintptr_t>& shards) {
[task, waiter, this, &blkTasks](const std::string& block, std::vector<uintptr_t>& shards) {
BlockTask blockTask;
blockTask.reader =
this->buffer_.MakeReader(block, this->layout_->DataFilePath(block, false));
Expand All @@ -82,8 +81,10 @@ void TransShareQueue::Dispatch(TaskPtr task, WaiterPtr waiter)
waiter->Done([task, ioSize] { UC_DEBUG("{}", task->Epilog(ioSize)); });
}
};
this->wait_.push_back(blockTask);
blkTasks.push_back(std::move(blockTask));
});
std::lock_guard<std::mutex> lg(this->mutex_);
this->wait_.splice(this->wait_.end(), blkTasks);
this->cv_.notify_all();
}

Expand Down
7 changes: 3 additions & 4 deletions ucm/store/pcstore/cc/domain/trans/trans_share_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,9 @@ class TransShareQueue {

public:
~TransShareQueue();
Status Setup(const size_t nSharer, const int32_t deviceId, const size_t streamNumber,
const size_t blockSize, const size_t ioSize, const bool ioDirect,
const size_t bufferNumber, const SpaceLayout* layout, TaskSet* failureSet,
const std::string& uniqueId);
Status Setup(const int32_t deviceId, const size_t streamNumber, const size_t blockSize,
const size_t ioSize, const bool ioDirect, const size_t bufferNumber,
const SpaceLayout* layout, TaskSet* failureSet, const std::string& uniqueId);
void Dispatch(TaskPtr task, WaiterPtr waiter);

private:
Expand Down