Skip to content
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
a2f627c
fix overflow for vertex/edge insert
zhanglei1949 Mar 6, 2026
4b84450
minor
zhanglei1949 Mar 10, 2026
bc11284
Update include/neug/utils/growth.h
zhanglei1949 Mar 10, 2026
3a96008
fix format
zhanglei1949 Mar 10, 2026
8627f3a
Merge branch 'main' into fix-overflow
zhanglei1949 Mar 10, 2026
0bf6026
reserve and dump
zhanglei1949 Mar 11, 2026
f4e93f4
remove logs
zhanglei1949 Mar 11, 2026
b137742
Merge branch 'main' into fix-overflow
zhanglei1949 Mar 11, 2026
dae1473
fix format
zhanglei1949 Mar 11, 2026
6a1623b
fix test
zhanglei1949 Mar 11, 2026
1ad875d
Merge branch 'main' into fix-overflow
zhanglei1949 Mar 11, 2026
645584c
Update src/utils/file_utils.cc
zhanglei1949 Mar 11, 2026
eb61a06
minor fix
zhanglei1949 Mar 11, 2026
f6501a8
fix string column reserve
zhanglei1949 Mar 11, 2026
40000f7
Update include/neug/utils/growth.h
zhanglei1949 Mar 11, 2026
69f43da
Update src/utils/file_utils.cc
zhanglei1949 Mar 11, 2026
9fd1bbc
fix string column stream compact
zhanglei1949 Mar 11, 2026
190ca50
diff capacity reserve for open/dump and inserting
zhanglei1949 Mar 12, 2026
b56eccc
refine ccache and try to test whether upload codecov could succeed
zhanglei1949 Mar 7, 2026
8e04f41
revert comment
zhanglei1949 Mar 10, 2026
4f900a6
fix cpp test
zhanglei1949 Mar 12, 2026
ebe5e12
fix avg width calculation
zhanglei1949 Mar 12, 2026
a9ed556
Merge branch 'fix-overflow' of https://github.com/alibaba/neug into f…
zhanglei1949 Mar 12, 2026
f9c31f7
fix: Use -I instead of -isystem for vendored third-party headers on m…
BingqingLyu Mar 11, 2026
9fcde04
fix: fix coverage report upload in workflow (#30)
lnfjpt Mar 12, 2026
e43ed4f
feat: Support `VARCHAR(max_length)` in NeuG Type System (#25)
shirly121 Mar 12, 2026
9d8aab6
test: add test for inserting string in embedding mode (#31)
zhanglei1949 Mar 12, 2026
7bdab8a
format
zhanglei1949 Mar 13, 2026
cd63a3c
merge main
zhanglei1949 Mar 13, 2026
5d6db2c
remove wrongly added file
zhanglei1949 Mar 13, 2026
f85ace2
fix
zhanglei1949 Mar 13, 2026
57f713f
remove some comments
zhanglei1949 Mar 13, 2026
43f38c4
params must be set for EnsureCapacity
zhanglei1949 Mar 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions include/neug/storages/csr/csr_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ enum class CsrType {

class CsrBase {
public:
static constexpr size_t INFINITE_CAPACITY =
std::numeric_limits<size_t>::max();
CsrBase() = default;
virtual ~CsrBase() = default;

Expand Down Expand Up @@ -66,6 +68,8 @@ class CsrBase {

virtual void resize(vid_t vnum) = 0;

virtual size_t capacity() const = 0;

virtual void close() = 0;

virtual void batch_sort_by_edge_data(timestamp_t ts) {
Expand Down
4 changes: 4 additions & 0 deletions include/neug/storages/csr/immutable_csr.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ class ImmutableCsr : public TypedCsrBase<EDATA_T> {

void resize(vid_t vnum) override;

size_t capacity() const override;

void close() override;

void batch_sort_by_edge_data(timestamp_t ts) override;
Expand Down Expand Up @@ -176,6 +178,8 @@ class SingleImmutableCsr : public TypedCsrBase<EDATA_T> {

void resize(vid_t vnum) override;

size_t capacity() const override;

void close() override;

void batch_sort_by_edge_data(timestamp_t ts) override;
Expand Down
6 changes: 6 additions & 0 deletions include/neug/storages/csr/mutable_csr.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ class MutableCsr : public TypedCsrBase<EDATA_T> {

void resize(vid_t vnum) override;

size_t capacity() const override;

void close() override;

void batch_sort_by_edge_data(timestamp_t ts) override;
Expand Down Expand Up @@ -252,6 +254,8 @@ class SingleMutableCsr : public TypedCsrBase<EDATA_T> {

void resize(vid_t vnum) override;

size_t capacity() const override;

void close() override;

void batch_sort_by_edge_data(timestamp_t ts) override;
Expand Down Expand Up @@ -336,6 +340,8 @@ class EmptyCsr : public TypedCsrBase<EDATA_T> {

void resize(vid_t vnum) override {}

size_t capacity() const override { return 0; }

void close() override {}

void batch_sort_by_edge_data(timestamp_t ts) override {}
Expand Down
10 changes: 10 additions & 0 deletions include/neug/storages/file_names.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,4 +246,14 @@ inline std::string wal_ingest_allocator_prefix(const std::string& work_dir,
std::to_string(thread_id) + "_";
}

inline std::string statistics_file_prefix(const std::string& v_label) {
return "statistics_" + v_label;
}

inline std::string statistics_file_prefix(const std::string& src_label,
const std::string& dst_label,
const std::string& edge_label) {
return "statistics_" + src_label + "_" + edge_label + "_" + dst_label;
}

} // namespace neug
31 changes: 31 additions & 0 deletions include/neug/storages/graph/edge_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ class EdgeTable {

void Resize(vid_t src_vertex_num, vid_t dst_vertex_num);

void EnsureCapacity(size_t capacity);

size_t EdgeNum() const;

size_t PropertyNum() const;
Expand Down Expand Up @@ -128,6 +130,34 @@ class EdgeTable {

void Compact(bool compact_csr, bool sort_on_compaction, timestamp_t ts);

inline size_t Size() const {
if (meta_->is_bundled()) {
if (out_csr_) {
return out_csr_->edge_num();
} else if (in_csr_) {
return in_csr_->edge_num();
} else {
THROW_RUNTIME_ERROR("both csr are null");
}
}
// TODO(zhanglei): the size may be inaccurate if some edges are deleted but
// not compacted yet.
return table_idx_.load();
}

inline size_t Capacity() const {
if (meta_->is_bundled()) {
if (out_csr_) {
return out_csr_->capacity();
} else if (in_csr_) {
return in_csr_->capacity();
} else {
THROW_RUNTIME_ERROR("both csr are null");
}
}
return capacity_.load();
}

private:
void dropAndCreateNewBundledCSR();
void dropAndCreateNewUnbundledCSR(bool delete_property);
Expand All @@ -141,6 +171,7 @@ class EdgeTable {
std::unique_ptr<CsrBase> in_csr_;
std::unique_ptr<Table> table_;
std::atomic<uint64_t> table_idx_{0};
std::atomic<uint64_t> capacity_{0};

friend class PropertyGraph;
};
Expand Down
15 changes: 13 additions & 2 deletions include/neug/storages/graph/property_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class PropertyGraph {
*
* @since v0.1.0
*/
~PropertyGraph();
~PropertyGraph() = default;

/**
* @brief Open the property graph from persistent storage.
Expand All @@ -137,7 +137,13 @@ class PropertyGraph {

void Compact(bool compact_csr, float reserve_ratio, timestamp_t ts);

void Dump(bool reopen = true);
/**
* @brief Dump the current graph state to persistent storage.
* @param reopen If true, reopens the graph after dumping (default: true)
* @param reserve_space If true, reserves space for all vertex and edge
* tables.
*/
void Dump(bool reopen = true, bool ensure_capacity = true);

/**
* @brief Dump schema information to a file.
Expand Down Expand Up @@ -298,6 +304,11 @@ class PropertyGraph {

Status Reserve(label_t v_label, vid_t vertex_reserve_size);

Status EnsureCapacity(label_t v_label, size_t capacity = 0);

Status EnsureCapacity(label_t src_label, label_t dst_label,
label_t edge_label, size_t capacity = 0);

Status BatchAddVertices(label_t v_label_id,
std::shared_ptr<IRecordBatchSupplier> supplier);

Expand Down
15 changes: 9 additions & 6 deletions include/neug/storages/graph/vertex_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "neug/storages/graph/vertex_timestamp.h"
#include "neug/storages/loader/loader_utils.h"
#include "neug/utils/arrow_utils.h"
#include "neug/utils/growth.h"
#include "neug/utils/indexers.h"
#include "neug/utils/property/table.h"

Expand Down Expand Up @@ -111,8 +112,7 @@ class VertexTable {
std::swap(work_dir_, other.work_dir_);
}

void Open(const std::string& work_dir, int memory_level,
bool build_empty_graph = false);
void Open(const std::string& work_dir, int memory_level);

void Dump(const std::string& target_dir);

Expand All @@ -122,6 +122,8 @@ class VertexTable {

void Reserve(size_t cap);

size_t EnsureCapacity(size_t capacity);

bool is_dropped() const { return table_ == nullptr; }

bool get_index(const Property& oid, vid_t& lid,
Expand All @@ -144,6 +146,8 @@ class VertexTable {
// Capacity of the vertex table
inline size_t Capacity() const { return indexer_.capacity(); }

inline size_t Size() const { return indexer_.size(); }

bool IsValidLid(vid_t lid, timestamp_t ts = MAX_TIMESTAMP) const;

IndexerType& get_indexer() { return indexer_; }
Expand Down Expand Up @@ -289,11 +293,10 @@ class VertexTable {
auto ind = std::get<2>(vertex_schema_->primary_keys[0]);
auto pk_array = columns[ind];
columns.erase(columns.begin() + ind);
auto cur_size = indexer_.capacity();
while (cur_size < indexer_.size() + pk_array->length()) {
cur_size = std::max(16, 2 * static_cast<int>(cur_size));
size_t new_size = indexer_.size() + pk_array->length();
while (new_size >= Capacity()) {
EnsureCapacity(calculate_new_capacity(new_size, true));
}
Reserve(cur_size);

auto vids = insert_primary_keys<PK_T>(pk_array);

Expand Down
12 changes: 12 additions & 0 deletions include/neug/utils/file_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,16 @@ void copy_directory(const std::string& src, const std::string& dst,

void remove_directory(const std::string& dir_path);

void write_file(const std::string& filename, const void* buffer, size_t size,
size_t num);

void read_file(const std::string& filename, void* buffer, size_t size,
size_t num);

void write_statistic_file(const std::string& file_path, size_t capacity,
size_t size);

void read_statistic_file(const std::string& file_path, size_t& capacity,
size_t& size);

} // namespace neug
42 changes: 42 additions & 0 deletions include/neug/utils/growth.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <cstddef>
#include <cstdint>

namespace neug {
inline size_t calculate_new_capacity(size_t current_capacity,
bool is_vertex_table) {
if (current_capacity < 4096) {
return 4096; // Start with a reasonable default capacity.
}
static constexpr size_t MAX_CAPACITY = std::numeric_limits<size_t>::max();
if (is_vertex_table) {
// For vertex tables, we grow exponentially: double the current capacity,
// with a 4K floor.
return current_capacity <= MAX_CAPACITY / 2 ? current_capacity * 2
: MAX_CAPACITY;
} else {
// For edge tables, we grow linearly: new capacity = current capacity +
// (current capacity + 4) / 5.
return current_capacity <= MAX_CAPACITY - (current_capacity + 4) / 5
? current_capacity + (current_capacity + 4) / 5
: MAX_CAPACITY;
}
}
} // namespace neug
9 changes: 0 additions & 9 deletions include/neug/utils/id_indexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,6 @@ class LFIndexer {
keys_->open(name + ".keys", "", data_dir);
indices_.open(data_dir + "/" + name + ".indices", false);
size_t num_elements = num_elements_.load();
keys_->resize(num_elements + (num_elements >> 2));

indices_size_ = indices_.size();
}
Expand All @@ -462,9 +461,6 @@ class LFIndexer {
LOG(INFO) << "Open indices file in "
<< tmp_dir(work_dir) + "/" + name + ".indices";
indices_.open(tmp_dir(work_dir) + "/" + name + ".indices", true);
size_t num_elements = num_elements_.load();

keys_->resize(num_elements + (num_elements >> 2));

indices_size_ = indices_.size();
}
Expand All @@ -478,8 +474,6 @@ class LFIndexer {
keys_->open_in_memory(name + ".keys");
indices_.open(name + ".indices", false);
indices_size_ = indices_.size();
size_t num_elements = num_elements_.load();
keys_->resize(num_elements + (num_elements >> 2));
}

void open_with_hugepages(const std::string& name, bool hugepage_table) {
Expand All @@ -495,12 +489,9 @@ class LFIndexer {
indices_.open(name + ".indices", false);
}
indices_size_ = indices_.size();
size_t num_elements = num_elements_.load();
keys_->resize(num_elements + (num_elements >> 2));
}

void dump(const std::string& name, const std::string& snapshot_dir) {
keys_->resize(num_elements_.load());
keys_->dump(snapshot_dir + "/" + name + ".keys");
indices_.dump(snapshot_dir + "/" + name + ".indices");
dump_meta(snapshot_dir + "/" + name + ".meta");
Expand Down
9 changes: 6 additions & 3 deletions include/neug/utils/mmap_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,9 @@ class mmap_array<std::string_view> {

// Compact the data buffer by removing unused space and updating offsets
// This is an in-place operation that shifts valid string data forward
// Returns the compacted data size
// Returns the compacted data size. Note that the reserved size of data buffer
// is not changed, and new strings can still be appended after the compacted
// data.
size_t compact() {
auto plan = prepare_compaction_plan();
if (items_.size() == 0) {
Expand All @@ -577,9 +579,11 @@ class mmap_array<std::string_view> {

std::vector<char> temp_buf(plan.total_size);
size_t write_offset = 0;
size_t limit_offset = 0;
for (const auto& entry : plan.entries) {
const char* src = data_.data() + entry.offset;
char* dst = temp_buf.data() + write_offset;
limit_offset = std::max(limit_offset, entry.offset + entry.length);
memcpy(dst, src, entry.length);
items_.set(entry.index,
{static_cast<uint64_t>(write_offset), entry.length});
Expand All @@ -588,9 +592,8 @@ class mmap_array<std::string_view> {
assert(write_offset == plan.total_size);
memcpy(data_.data(), temp_buf.data(), plan.total_size);

data_.resize(plan.total_size);
VLOG(1) << "Compaction completed. New data size: " << plan.total_size
<< ", old data size: " << size_before_compact;
<< ", old data size: " << limit_offset;
return plan.total_size;
}

Expand Down
Loading
Loading