Skip to content

Commit ce981c4

Browse files
authored
[bugfix] share buffer used out (cherry-picked from #592) (#598)
[bugfix] share buffer used out (#592) When the reserved shared memory is exhausted, temporarily allocate host memory for the current transfer task. (cherry picked from commit b9bce8a)
1 parent c24be03 commit ce981c4

File tree

5 files changed

+136
-72
lines changed

5 files changed

+136
-72
lines changed

ucm/store/pcstore/cc/domain/trans/share_buffer.cc

Lines changed: 110 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,12 @@
2929
#include <unistd.h>
3030
#include "file/file.h"
3131
#include "logger/logger.h"
32-
#include "trans/buffer.h"
32+
#include "trans/device.h"
3333

3434
namespace UC {
3535

3636
static constexpr int32_t SHARE_BUFFER_MAGIC = (('S' << 16) | ('b' << 8) | 1);
37+
static constexpr size_t INVALID_POSITION = size_t(-1);
3738

3839
struct ShareMutex {
3940
pthread_mutex_t mutex;
@@ -87,12 +88,24 @@ struct ShareBlockHeader {
8788
ShareBlockStatus status;
8889
size_t offset;
8990
void* Data() { return reinterpret_cast<char*>(this) + offset; }
91+
void Refer()
92+
{
93+
if (this->ref == 0 && this->status != ShareBlockStatus::LOADED) {
94+
this->status = ShareBlockStatus::INIT;
95+
}
96+
this->ref++;
97+
}
98+
void Occupy(const std::string& block)
99+
{
100+
this->id.Set(block);
101+
this->ref = 1;
102+
this->status = ShareBlockStatus::INIT;
103+
}
90104
};
91105

92106
struct ShareBufferHeader {
93107
ShareMutex mutex;
94108
std::atomic<int32_t> magic;
95-
int32_t ref;
96109
size_t blockSize;
97110
size_t blockNumber;
98111
ShareBlockHeader headers[0];
@@ -128,13 +141,14 @@ void CleanUpShmFileExceptMe(const std::string& me)
128141
}
129142

130143
Status ShareBuffer::Setup(const size_t blockSize, const size_t blockNumber, const bool ioDirect,
131-
const size_t nSharer, const std::string& uniqueId)
144+
const std::string& uniqueId)
132145
{
133146
this->blockSize_ = blockSize;
134147
this->blockNumber_ = blockNumber;
135148
this->ioDirect_ = ioDirect;
136-
this->nSharer_ = nSharer;
137149
this->addr_ = nullptr;
150+
tmpBufMaker_ = Trans::Device{}.MakeBuffer();
151+
if (!tmpBufMaker_) { return Status::OutOfMemory(); }
138152
this->shmName_ = ShmPrefix() + uniqueId;
139153
CleanUpShmFileExceptMe(this->shmName_);
140154
auto file = File::Make(this->shmName_);
@@ -149,31 +163,19 @@ Status ShareBuffer::Setup(const size_t blockSize, const size_t blockNumber, cons
149163
ShareBuffer::~ShareBuffer()
150164
{
151165
if (!this->addr_) { return; }
152-
auto bufferHeader = (ShareBufferHeader*)this->addr_;
153-
bufferHeader->mutex.Lock();
154-
auto ref = (--bufferHeader->ref);
155-
bufferHeader->mutex.Unlock();
156166
void* dataAddr = static_cast<char*>(this->addr_) + this->DataOffset();
157167
Trans::Buffer::UnregisterHostBuffer(dataAddr);
158168
const auto shmSize = this->ShmSize();
159169
File::MUnmap(this->addr_, shmSize);
160-
if (ref == 0) { File::ShmUnlink(this->shmName_); }
170+
File::ShmUnlink(this->shmName_);
161171
}
162172

163173
std::shared_ptr<ShareBuffer::Reader> ShareBuffer::MakeReader(const std::string& block,
164174
const std::string& path)
165175
{
166-
auto index = this->AcquireBlock(block);
167-
try {
168-
void* addr = this->BlockAt(index);
169-
return std::shared_ptr<Reader>(
170-
new Reader{block, path, blockSize_, ioDirect_, nSharer_, addr},
171-
[this, index](auto) { this->ReleaseBlock(index); });
172-
} catch (...) {
173-
this->ReleaseBlock(index);
174-
UC_ERROR("Failed to create reader.");
175-
return nullptr;
176-
}
176+
auto pos = this->AcquireBlock(block);
177+
if (pos != INVALID_POSITION) { return MakeSharedReader(block, path, pos); }
178+
return MakeLocalReader(block, path);
177179
}
178180

179181
size_t ShareBuffer::DataOffset() const
@@ -198,7 +200,6 @@ Status ShareBuffer::InitShmBuffer(IFile* file)
198200
auto bufferHeader = (ShareBufferHeader*)this->addr_;
199201
bufferHeader->magic = 1;
200202
bufferHeader->mutex.Init();
201-
bufferHeader->ref = this->nSharer_;
202203
bufferHeader->blockSize = this->blockSize_;
203204
bufferHeader->blockNumber = this->blockNumber_;
204205
const auto dataOffset = this->DataOffset();
@@ -215,7 +216,7 @@ Status ShareBuffer::InitShmBuffer(IFile* file)
215216
auto dataSize = shmSize - dataOffset;
216217
auto status = Trans::Buffer::RegisterHostBuffer(dataAddr, dataSize);
217218
if (status.Success()) { return Status::OK(); }
218-
UC_ERROR("Failed({}) to regitster host buffer({}).", status.ToString(), dataSize);
219+
UC_ERROR("Failed({}) to register host buffer({}).", status.ToString(), dataSize);
219220
return Status::Error();
220221
}
221222

@@ -246,7 +247,7 @@ Status ShareBuffer::LoadShmBuffer(IFile* file)
246247
auto dataSize = shmSize - dataOffset;
247248
auto status = Trans::Buffer::RegisterHostBuffer(dataAddr, dataSize);
248249
if (status.Success()) { return Status::OK(); }
249-
UC_ERROR("Failed({}) to regitster host buffer({}).", status.ToString(), dataSize);
250+
UC_ERROR("Failed({}) to register host buffer({}).", status.ToString(), dataSize);
250251
return Status::Error();
251252
}
252253

@@ -255,36 +256,39 @@ size_t ShareBuffer::AcquireBlock(const std::string& block)
255256
static std::hash<std::string> hasher{};
256257
auto pos = hasher(block) % this->blockNumber_;
257258
auto bufferHeader = (ShareBufferHeader*)this->addr_;
258-
auto reusedIdx = this->blockNumber_;
259+
auto reusedPos = INVALID_POSITION;
259260
bufferHeader->mutex.Lock();
260-
for (size_t i = 0;; i++) {
261-
if (!bufferHeader->headers[pos].id.Used()) {
262-
if (reusedIdx == this->blockNumber_) { reusedIdx = pos; }
263-
break;
264-
}
265-
if (bufferHeader->headers[pos].id == block) {
266-
reusedIdx = pos;
267-
break;
261+
for (size_t i = 0; i < this->blockNumber_; i++) {
262+
auto header = bufferHeader->headers + pos;
263+
header->mutex.Lock();
264+
if (header->id == block) {
265+
header->Refer();
266+
header->mutex.Unlock();
267+
bufferHeader->mutex.Unlock();
268+
return pos;
268269
}
269-
if (bufferHeader->headers[pos].ref <= 0) {
270-
if (reusedIdx == this->blockNumber_) { reusedIdx = pos; }
270+
if (!header->id.Used()) {
271+
if (reusedPos != INVALID_POSITION) {
272+
header->mutex.Unlock();
273+
break;
274+
}
275+
header->Occupy(block);
276+
header->mutex.Unlock();
277+
bufferHeader->mutex.Unlock();
278+
return pos;
271279
}
280+
if (header->ref <= 0 && reusedPos == INVALID_POSITION) { reusedPos = pos; }
281+
header->mutex.Unlock();
272282
pos = (pos + 1) % this->blockNumber_;
273-
if (i == this->blockNumber_) {
274-
UC_WARN("Buffer({}) used out.", this->blockNumber_);
275-
i = 0;
276-
}
277283
}
278-
auto blockHeader = bufferHeader->headers + reusedIdx;
279-
blockHeader->mutex.Lock();
280-
if (blockHeader->ref <= 0) {
281-
blockHeader->id.Set(block);
282-
blockHeader->ref = this->nSharer_;
283-
blockHeader->status = ShareBlockStatus::INIT;
284+
if (reusedPos != INVALID_POSITION) {
285+
auto header = bufferHeader->headers + reusedPos;
286+
header->mutex.Lock();
287+
header->Occupy(block);
288+
header->mutex.Unlock();
284289
}
285-
blockHeader->mutex.Unlock();
286290
bufferHeader->mutex.Unlock();
287-
return reusedIdx;
291+
return reusedPos;
288292
}
289293

290294
void ShareBuffer::ReleaseBlock(const size_t index)
@@ -301,7 +305,67 @@ void* ShareBuffer::BlockAt(const size_t index)
301305
return bufferHeader->headers + index;
302306
}
303307

308+
std::shared_ptr<ShareBuffer::Reader> ShareBuffer::MakeLocalReader(const std::string& block,
309+
const std::string& path)
310+
{
311+
auto addr = tmpBufMaker_->MakeHostBuffer(blockSize_);
312+
if (!addr) [[unlikely]] {
313+
UC_ERROR("Failed to make buffer({}) on host.", blockSize_);
314+
return nullptr;
315+
}
316+
Reader* reader = nullptr;
317+
try {
318+
reader = new Reader{block, path, blockSize_, ioDirect_, false, addr.get()};
319+
return std::shared_ptr<Reader>(reader,
320+
[addr = std::move(addr)](Reader* reader) { delete reader; });
321+
} catch (const std::exception& e) {
322+
if (reader) { delete reader; }
323+
UC_ERROR("Failed({}) to create reader.", e.what());
324+
return nullptr;
325+
}
326+
}
327+
328+
std::shared_ptr<ShareBuffer::Reader> ShareBuffer::MakeSharedReader(const std::string& block,
329+
const std::string& path,
330+
size_t position)
331+
{
332+
void* addr = this->BlockAt(position);
333+
Reader* reader = nullptr;
334+
try {
335+
reader = new Reader{block, path, blockSize_, ioDirect_, true, addr};
336+
return std::shared_ptr<Reader>(reader, [this, position](Reader* reader) {
337+
delete reader;
338+
this->ReleaseBlock(position);
339+
});
340+
} catch (...) {
341+
this->ReleaseBlock(position);
342+
if (reader) { delete reader; }
343+
UC_ERROR("Failed to create reader.");
344+
return nullptr;
345+
}
346+
}
347+
304348
Status ShareBuffer::Reader::Ready4Read()
349+
{
350+
if (shared_) { return Ready4ReadOnSharedBuffer(); }
351+
return Ready4ReadOnLocalBuffer();
352+
}
353+
354+
uintptr_t ShareBuffer::Reader::GetData()
355+
{
356+
if (shared_) {
357+
auto header = (ShareBlockHeader*)this->addr_;
358+
return (uintptr_t)header->Data();
359+
}
360+
return (uintptr_t)this->addr_;
361+
}
362+
363+
Status ShareBuffer::Reader::Ready4ReadOnLocalBuffer()
364+
{
365+
return File::Read(this->path_, 0, this->length_, this->GetData(), this->ioDirect_);
366+
}
367+
368+
Status ShareBuffer::Reader::Ready4ReadOnSharedBuffer()
305369
{
306370
auto header = (ShareBlockHeader*)this->addr_;
307371
if (header->status == ShareBlockStatus::LOADED) { return Status::OK(); }
@@ -324,10 +388,4 @@ Status ShareBuffer::Reader::Ready4Read()
324388
return s;
325389
}
326390

327-
uintptr_t ShareBuffer::Reader::GetData()
328-
{
329-
auto header = (ShareBlockHeader*)this->addr_;
330-
return (uintptr_t)header->Data();
331-
}
332-
333391
} // namespace UC

ucm/store/pcstore/cc/domain/trans/share_buffer.h

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include <string>
3030
#include "file/ifile.h"
3131
#include "status/status.h"
32+
#include "trans/buffer.h"
3233

3334
namespace UC {
3435

@@ -39,7 +40,7 @@ class ShareBuffer {
3940
std::string path_;
4041
size_t length_;
4142
bool ioDirect_;
42-
size_t nSharer_;
43+
bool shared_;
4344
void* addr_;
4445

4546
public:
@@ -48,21 +49,23 @@ class ShareBuffer {
4849

4950
private:
5051
Reader(const std::string& block, const std::string& path, const size_t length,
51-
const bool ioDirect, const size_t nSharer, void* addr)
52+
const bool ioDirect, const bool shared, void* addr)
5253
: block_{block},
5354
path_{path},
5455
length_{length},
5556
ioDirect_{ioDirect},
56-
nSharer_{nSharer},
57+
shared_{shared},
5758
addr_{addr}
5859
{
5960
}
6061
friend class ShareBuffer;
62+
Status Ready4ReadOnLocalBuffer();
63+
Status Ready4ReadOnSharedBuffer();
6164
};
6265

6366
public:
6467
Status Setup(const size_t blockSize, const size_t blockNumber, const bool ioDirect,
65-
const size_t nSharer, const std::string& uniqueId);
68+
const std::string& uniqueId);
6669
~ShareBuffer();
6770
std::shared_ptr<Reader> MakeReader(const std::string& block, const std::string& path);
6871

@@ -74,14 +77,17 @@ class ShareBuffer {
7477
size_t AcquireBlock(const std::string& block);
7578
void ReleaseBlock(const size_t index);
7679
void* BlockAt(const size_t index);
80+
std::shared_ptr<Reader> MakeLocalReader(const std::string& block, const std::string& path);
81+
std::shared_ptr<Reader> MakeSharedReader(const std::string& block, const std::string& path,
82+
size_t position);
7783

7884
private:
7985
size_t blockSize_;
8086
size_t blockNumber_;
8187
bool ioDirect_;
82-
size_t nSharer_;
8388
std::string shmName_;
84-
void* addr_;
89+
void* addr_{nullptr};
90+
std::unique_ptr<Trans::Buffer> tmpBufMaker_{nullptr};
8591
};
8692

8793
} // namespace UC

ucm/store/pcstore/cc/domain/trans/trans_manager.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ Status TransManager::Setup(const size_t rankSize, const int32_t deviceId, const
3434
{
3535
auto s = Status::OK();
3636
if (rankSize > 1) {
37-
s = this->shareQueue_.Setup(rankSize, deviceId, streamNumber, blockSize, ioSize, ioDirect,
37+
s = this->shareQueue_.Setup(deviceId, streamNumber, blockSize, ioSize, ioDirect,
3838
bufferNumber, layout, &this->failureSet_, uniqueId);
3939
if (s.Failure()) { return s; }
4040
}

ucm/store/pcstore/cc/domain/trans/trans_share_queue.cc

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,18 +39,17 @@ TransShareQueue::~TransShareQueue()
3939
}
4040
}
4141

42-
Status TransShareQueue::Setup(const size_t nSharer, const int32_t deviceId,
43-
const size_t streamNumber, const size_t blockSize,
44-
const size_t ioSize, const bool ioDirect, const size_t bufferNumber,
45-
const SpaceLayout* layout, TaskSet* failureSet,
46-
const std::string& uniqueId)
42+
Status TransShareQueue::Setup(const int32_t deviceId, const size_t streamNumber,
43+
const size_t blockSize, const size_t ioSize, const bool ioDirect,
44+
const size_t bufferNumber, const SpaceLayout* layout,
45+
TaskSet* failureSet, const std::string& uniqueId)
4746
{
4847
this->deviceId_ = deviceId;
4948
this->streamNumber_ = streamNumber;
5049
this->ioSize_ = ioSize;
5150
this->layout_ = layout;
5251
this->failureSet_ = failureSet;
53-
auto status = this->buffer_.Setup(blockSize, bufferNumber, ioDirect, nSharer, uniqueId);
52+
auto status = this->buffer_.Setup(blockSize, bufferNumber, ioDirect, uniqueId);
5453
if (status.Failure()) { return status; }
5554
std::list<std::promise<Status>> start(streamNumber);
5655
std::list<std::future<Status>> fut;
@@ -67,9 +66,9 @@ Status TransShareQueue::Setup(const size_t nSharer, const int32_t deviceId,
6766

6867
void TransShareQueue::Dispatch(TaskPtr task, WaiterPtr waiter)
6968
{
70-
std::lock_guard<std::mutex> lg(this->mutex_);
69+
std::list<BlockTask> blkTasks;
7170
task->ForEachGroup(
72-
[task, waiter, this](const std::string& block, std::vector<uintptr_t>& shards) {
71+
[task, waiter, this, &blkTasks](const std::string& block, std::vector<uintptr_t>& shards) {
7372
BlockTask blockTask;
7473
blockTask.reader =
7574
this->buffer_.MakeReader(block, this->layout_->DataFilePath(block, false));
@@ -82,8 +81,10 @@ void TransShareQueue::Dispatch(TaskPtr task, WaiterPtr waiter)
8281
waiter->Done([task, ioSize] { UC_DEBUG("{}", task->Epilog(ioSize)); });
8382
}
8483
};
85-
this->wait_.push_back(blockTask);
84+
blkTasks.push_back(std::move(blockTask));
8685
});
86+
std::lock_guard<std::mutex> lg(this->mutex_);
87+
this->wait_.splice(this->wait_.end(), blkTasks);
8788
this->cv_.notify_all();
8889
}
8990

ucm/store/pcstore/cc/domain/trans/trans_share_queue.h

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,9 @@ class TransShareQueue {
6161

6262
public:
6363
~TransShareQueue();
64-
Status Setup(const size_t nSharer, const int32_t deviceId, const size_t streamNumber,
65-
const size_t blockSize, const size_t ioSize, const bool ioDirect,
66-
const size_t bufferNumber, const SpaceLayout* layout, TaskSet* failureSet,
67-
const std::string& uniqueId);
64+
Status Setup(const int32_t deviceId, const size_t streamNumber, const size_t blockSize,
65+
const size_t ioSize, const bool ioDirect, const size_t bufferNumber,
66+
const SpaceLayout* layout, TaskSet* failureSet, const std::string& uniqueId);
6867
void Dispatch(TaskPtr task, WaiterPtr waiter);
6968

7069
private:

0 commit comments

Comments
 (0)