Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions src/datacell/extra_info_datacell.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,7 @@ class ExtraInfoDataCell : public ExtraInfoInterface {
this->max_capacity_ = new_capacity;
uint64_t io_size =
static_cast<uint64_t>(new_capacity) * static_cast<uint64_t>(extra_info_size_);
uint8_t end_flag =
127; // the value is meaningless, only to occupy the position for io allocate
this->io_->Write(&end_flag, 1, io_size);
this->io_->Resize(io_size);
}

void
Expand Down
4 changes: 1 addition & 3 deletions src/datacell/flatten_datacell.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,7 @@ class FlattenDataCell : public FlattenInterface {
}
this->max_capacity_ = new_capacity;
uint64_t io_size = static_cast<uint64_t>(new_capacity) * static_cast<uint64_t>(code_size_);
uint8_t end_flag =
127; // the value is meaningless, only to occupy the position for io allocate
this->io_->Write(&end_flag, 1, io_size);
this->io_->Resize(io_size);
}

void
Expand Down
4 changes: 1 addition & 3 deletions src/datacell/graph_datacell.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,7 @@ GraphDataCell<IOTmpl>::Resize(InnerIdType new_size) {
}
this->max_capacity_ = new_size;
uint64_t io_size = static_cast<uint64_t>(new_size) * static_cast<uint64_t>(code_line_size_);
uint8_t end_flag =
127; // the value is meaningless, only to occupy the position for io allocate
this->io_->Write(&end_flag, 1, io_size);
this->io_->Resize(io_size);
}

template <typename IOTmpl>
Expand Down
6 changes: 2 additions & 4 deletions src/datacell/sparse_vector_datacell.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,8 @@ class SparseVectorDataCell : public FlattenInterface {
}
uint64_t io_size = (new_capacity - total_count_) * max_code_size_ + current_offset_;
this->max_capacity_ = new_capacity;
uint8_t end_flag =
127; // the value is meaingless, only to occupy the position for io allocate
this->io_->Write(&end_flag, 1, io_size);
this->offset_io_->Write(&end_flag, 1, new_capacity * sizeof(uint32_t));
this->io_->Resize(io_size);
this->offset_io_->Resize(static_cast<uint64_t>(new_capacity) * sizeof(uint32_t));
}

void
Expand Down
13 changes: 13 additions & 0 deletions src/io/async_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,19 @@ AsyncIO::WriteImpl(const uint8_t* data, uint64_t size, uint64_t offset) {
fsync(wfd_);
}

void
AsyncIO::ResizeImpl(uint64_t size) {
#ifdef __APPLE__
auto ret = ftruncate(this->wfd_, static_cast<off_t>(size));
#else
auto ret = ftruncate64(this->wfd_, static_cast<int64_t>(size));
#endif
if (ret == -1) {
throw VsagException(ErrorType::INTERNAL_ERROR, "ftruncate failed");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The error message for ftruncate is a bit generic. Including the system error message using strerror(errno) would provide more context for debugging, which is a common practice elsewhere in this file.

Suggested change
throw VsagException(ErrorType::INTERNAL_ERROR, "ftruncate failed");
throw VsagException(ErrorType::INTERNAL_ERROR, fmt::format("ftruncate failed: {}", strerror(errno)));

}
this->size_ = size;
}

bool
AsyncIO::ReadImpl(uint64_t size, uint64_t offset, uint8_t* data) const {
bool need_release = true;
Expand Down
3 changes: 3 additions & 0 deletions src/io/async_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ class AsyncIO : public BasicIO<AsyncIO> {
void
WriteImpl(const uint8_t* data, uint64_t size, uint64_t offset);

void
ResizeImpl(uint64_t size);

bool
ReadImpl(uint64_t size, uint64_t offset, uint8_t* data) const;

Expand Down
19 changes: 19 additions & 0 deletions src/io/basic_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,24 @@ class BasicIO {
}
}

inline void
Resize(uint64_t size) {
if constexpr (has_ResizeImpl<IOTmpl>::value) {
return cast().ResizeImpl(size);
} else {
if (size <= this->size_) {
return;
}
ByteBuffer buffer(SERIALIZE_BUFFER_SIZE, this->allocator_);
uint64_t offset = this->size_;
while (offset < size) {
auto cur_size = std::min(SERIALIZE_BUFFER_SIZE, size - offset);
this->Write(buffer.data, cur_size, offset);
offset += cur_size;
}
}
Comment on lines +225 to +236

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The fallback implementation for Resize can be made more efficient. Instead of writing data in a loop to expand the underlying storage, a single Write call with a size of 0 at the target offset is sufficient. Most WriteImpl implementations will handle this by extending the storage to the specified offset without actually writing data, which is much faster.

        } else {
            if (size > this->size_) {
                // A single write of 0 bytes at the target size is enough to
                // trigger the size update in WriteImpl, which is more efficient
                // than writing data in a loop.
                this->Write(nullptr, 0, size);
            }
        }

}

inline int64_t
GetMemoryUsage() const {
return this->size_;
Expand Down Expand Up @@ -314,5 +332,6 @@ class BasicIO {
std::declval<uint64_t>())
GENERATE_HAS_MEMBER_FUNCTION(ReleaseImpl, void, std::declval<const uint8_t*>())
GENERATE_HAS_MEMBER_FUNCTION(InitIOImpl, void, std::declval<const IOParamPtr&>())
GENERATE_HAS_MEMBER_FUNCTION(ResizeImpl, void, std::declval<uint64_t>())
};
} // namespace vsag
13 changes: 13 additions & 0 deletions src/io/buffer_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,19 @@ BufferIO::WriteImpl(const uint8_t* data, uint64_t size, uint64_t offset) {
}
}

void
BufferIO::ResizeImpl(uint64_t size) {
#ifdef __APPLE__
auto ret = ftruncate(this->fd_, static_cast<off_t>(size));
#else
auto ret = ftruncate64(this->fd_, static_cast<int64_t>(size));
#endif
if (ret == -1) {
throw VsagException(ErrorType::INTERNAL_ERROR, "ftruncate failed");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to async_io.cpp, the error message for ftruncate could be improved by including the system error string via strerror(errno). This will make debugging failures easier.

Suggested change
throw VsagException(ErrorType::INTERNAL_ERROR, "ftruncate failed");
throw VsagException(ErrorType::INTERNAL_ERROR, fmt::format("ftruncate failed: {}", strerror(errno)));

}
this->size_ = size;
}

bool
BufferIO::ReadImpl(uint64_t size, uint64_t offset, uint8_t* data) const {
if (size == 0) {
Expand Down
3 changes: 3 additions & 0 deletions src/io/buffer_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ class BufferIO : public BasicIO<BufferIO> {
void
WriteImpl(const uint8_t* data, uint64_t size, uint64_t offset);

void
ResizeImpl(uint64_t size);

bool
ReadImpl(uint64_t size, uint64_t offset, uint8_t* data) const;

Expand Down
16 changes: 15 additions & 1 deletion src/io/memory_block_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,25 @@ MemoryBlockIO::check_and_realloc(uint64_t size) {
auto cur_block_size = this->blocks_.size();
this->blocks_.reserve(new_block_count);
while (cur_block_size < new_block_count) {
this->blocks_.emplace_back(static_cast<uint8_t*>(this->allocator_->Allocate(block_size_)));
auto* ptr = static_cast<uint8_t*>(this->allocator_->Allocate(block_size_));
if (ptr == nullptr) {
throw VsagException(ErrorType::INTERNAL_ERROR, "MemoryBlockIO allocation failed");
}
this->blocks_.emplace_back(ptr);
++cur_block_size;
}
}

void
MemoryBlockIO::ResizeImpl(uint64_t size) {
if (size <= this->size_) {
this->size_ = size;
return;
}
check_and_realloc(size);
this->size_ = size;
}

static int
countr_zero(uint64_t x) {
if (x == 0) {
Expand Down
3 changes: 3 additions & 0 deletions src/io/memory_block_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ class MemoryBlockIO : public BasicIO<MemoryBlockIO> {
void
WriteImpl(const uint8_t* data, uint64_t size, uint64_t offset);

void
ResizeImpl(uint64_t size);

bool
ReadImpl(uint64_t size, uint64_t offset, uint8_t* data) const;

Expand Down
10 changes: 10 additions & 0 deletions src/io/memory_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ MemoryIO::WriteImpl(const uint8_t* data, uint64_t size, uint64_t offset) {
memcpy(start_ + offset, data, size);
}

void
MemoryIO::ResizeImpl(uint64_t size) {
if (size <= this->size_) {
this->size_ = size;
return;
}
check_and_realloc(size);
this->size_ = size;
}

bool
MemoryIO::ReadImpl(uint64_t size, uint64_t offset, uint8_t* data) const {
bool ret = check_valid_offset(size + offset);
Expand Down
3 changes: 3 additions & 0 deletions src/io/memory_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ class MemoryIO : public BasicIO<MemoryIO> {
void
WriteImpl(const uint8_t* data, uint64_t size, uint64_t offset);

void
ResizeImpl(uint64_t size);

bool
ReadImpl(uint64_t size, uint64_t offset, uint8_t* data) const;

Expand Down
54 changes: 54 additions & 0 deletions src/io/mmap_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,60 @@ MMapIO::WriteImpl(const uint8_t* data, uint64_t size, uint64_t offset) {
memcpy(this->start_ + offset, data, size);
}

void
MMapIO::ResizeImpl(uint64_t size) {
auto new_size = size;
auto old_size = this->size_;
if (old_size == 0) {
old_size = DEFAULT_INIT_MMAP_SIZE;
}
if (new_size > old_size) {
auto ret =
#ifdef __APPLE__
ftruncate(this->fd_, static_cast<off_t>(new_size));
#else
ftruncate64(this->fd_, static_cast<int64_t>(new_size));
#endif
if (ret == -1) {
throw VsagException(ErrorType::INTERNAL_ERROR, "ftruncate failed");
}

#ifdef __APPLE__
munmap(this->start_, old_size);
void* addr = mmap(nullptr, new_size, PROT_READ | PROT_WRITE, MAP_SHARED, this->fd_, 0);
if (addr == MAP_FAILED) {
throw VsagException(ErrorType::INTERNAL_ERROR, "mmap remap failed");
}
this->start_ = static_cast<uint8_t*>(addr);
#else
this->start_ =
static_cast<uint8_t*>(mremap(this->start_, old_size, new_size, MREMAP_MAYMOVE));
#endif
} else if (new_size < old_size) {
#ifdef __APPLE__
munmap(this->start_, old_size);
void* addr = mmap(nullptr, new_size, PROT_READ | PROT_WRITE, MAP_SHARED, this->fd_, 0);
if (addr == MAP_FAILED) {
throw VsagException(ErrorType::INTERNAL_ERROR, "mmap remap failed");
}
this->start_ = static_cast<uint8_t*>(addr);
#else
this->start_ =
static_cast<uint8_t*>(mremap(this->start_, old_size, new_size, MREMAP_MAYMOVE));
#endif
auto ret =
#ifdef __APPLE__
ftruncate(this->fd_, static_cast<off_t>(new_size));
#else
ftruncate64(this->fd_, static_cast<int64_t>(new_size));
#endif
if (ret == -1) {
throw VsagException(ErrorType::INTERNAL_ERROR, "ftruncate failed");
}
}
this->size_ = new_size;
}
Comment on lines +109 to +160

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This implementation of ResizeImpl has a few issues:

  1. Critical Bug: The return value of mremap is not checked. It can return MAP_FAILED on error, which would lead to undefined behavior when the invalid pointer is used. This should be checked and an exception thrown on failure.
  2. Error Reporting: The error message for ftruncate is generic. Including strerror(errno) would provide more details on failure.
  3. Code Duplication: The logic for remapping memory (mremap or munmap/mmap) is duplicated for both growing and shrinking cases. This can be refactored to improve maintainability.

I've provided a suggested refactoring that addresses all these points by extracting the remap and truncate logic into lambdas, adding error checking, and improving error messages.

void
MMapIO::ResizeImpl(uint64_t size) {
    auto new_size = size;
    if (new_size == this->size_) {
        return;
    }

    auto old_size = this->size_;
    if (old_size == 0) {
        old_size = DEFAULT_INIT_MMAP_SIZE;
    }

    auto remap_memory = [&]() {
#ifdef __APPLE__
        munmap(this->start_, old_size);
        void* addr = mmap(nullptr, new_size, PROT_READ | PROT_WRITE, MAP_SHARED, this->fd_, 0);
        if (addr == MAP_FAILED) {
            throw VsagException(ErrorType::INTERNAL_ERROR, fmt::format("mmap remap failed: {}", strerror(errno)));
        }
        this->start_ = static_cast<uint8_t*>(addr);
#else
        void* new_addr = mremap(this->start_, old_size, new_size, MREMAP_MAYMOVE);
        if (new_addr == MAP_FAILED) {
            throw VsagException(ErrorType::INTERNAL_ERROR, fmt::format("mremap failed: {}", strerror(errno)));
        }
        this->start_ = static_cast<uint8_t*>(new_addr);
#endif
    };

    auto truncate_file = [&]() {
        auto ret =
#ifdef __APPLE__
            ftruncate(this->fd_, static_cast<off_t>(new_size));
#else
            ftruncate64(this->fd_, static_cast<int64_t>(new_size));
#endif
        if (ret == -1) {
            throw VsagException(ErrorType::INTERNAL_ERROR, fmt::format("ftruncate failed: {}", strerror(errno)));
        }
    };

    if (new_size > old_size) {
        truncate_file();
        remap_memory();
    } else if (new_size < old_size) {
        remap_memory();
        truncate_file();
    }
    this->size_ = new_size;
}


bool
MMapIO::ReadImpl(uint64_t size, uint64_t offset, uint8_t* data) const {
if (offset + size > this->size_) {
Expand Down
3 changes: 3 additions & 0 deletions src/io/mmap_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ class MMapIO : public BasicIO<MMapIO> {
void
WriteImpl(const uint8_t* data, uint64_t size, uint64_t offset);

void
ResizeImpl(uint64_t size);

bool
ReadImpl(uint64_t size, uint64_t offset, uint8_t* data) const;

Expand Down