Skip to content

Commit 465dbc8

Browse files
committed
draft
Signed-off-by: noob <[email protected]>
1 parent 4a9b204 commit 465dbc8

26 files changed

+1177
-1191
lines changed

src/executor/operator/physical_scan/physical_knn_scan_impl.cpp

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -958,16 +958,9 @@ void ExecuteHnswSearch(QueryContext *query_context,
958958
if (!status.ok()) {
959959
UnrecoverableError(status.message());
960960
}
961-
std::shared_ptr<HnswHandler> hnsw_handler;
961+
HnswHandler *hnsw_handler{};
962962
index_file_worker->Read(hnsw_handler);
963-
hnsw_search(hnsw_handler.get(), false);
964-
}
965-
if (mem_index) {
966-
auto memory_hnsw_index = mem_index->GetHnswIndex();
967-
if (memory_hnsw_index) {
968-
const HnswHandlerPtr hnsw_handler = memory_hnsw_index->get();
969-
hnsw_search(hnsw_handler, true);
970-
}
963+
hnsw_search(hnsw_handler, false);
971964
}
972965
}
973966

src/storage/buffer/file_worker/file_worker.cppm

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -244,12 +244,12 @@ protected:
244244

245245
virtual void Read(std::shared_ptr<EMVBIndex> &data, std::unique_ptr<LocalFileHandle> &file_handle, size_t file_size) {}
246246

247-
virtual bool
248-
Write(std::shared_ptr<HnswHandler> &data, std::unique_ptr<LocalFileHandle> &file_handle, bool &prepare_success, const FileWorkerSaveCtx &ctx) {
249-
return false;
250-
}
247+
// virtual bool
248+
// Write(std::shared_ptr<HnswHandler> &data, std::unique_ptr<LocalFileHandle> &file_handle, bool &prepare_success, const FileWorkerSaveCtx &ctx) {
249+
// return false;
250+
// }
251251

252-
virtual void Read(std::shared_ptr<HnswHandler> &data, std::unique_ptr<LocalFileHandle> &file_handle, size_t file_size) {}
252+
virtual void Read(HnswHandler *&data, std::unique_ptr<LocalFileHandle> &file_handle, size_t file_size) {}
253253

254254
virtual bool
255255
Write(std::span<IVFIndexInChunk> data, std::unique_ptr<LocalFileHandle> &file_handle, bool &prepare_success, const FileWorkerSaveCtx &ctx) {

src/storage/buffer/file_worker/hnsw_file_worker.cppm

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,18 @@ public:
4242
FileWorkerType Type() const override { return FileWorkerType::kHNSWIndexFile; }
4343

4444
protected:
45-
bool Write(std::shared_ptr<HnswHandler> &data,
46-
std::unique_ptr<LocalFileHandle> &file_handle,
47-
bool &prepare_success,
48-
const FileWorkerSaveCtx &ctx) override;
45+
// bool Write(std::shared_ptr<HnswHandler> &data,
46+
// std::unique_ptr<LocalFileHandle> &file_handle,
47+
// bool &prepare_success,
48+
// const FileWorkerSaveCtx &ctx) override;
4949

50-
void Read(std::shared_ptr<HnswHandler> &data, std::unique_ptr<LocalFileHandle> &file_handle, size_t file_size) override;
50+
void Read(HnswHandler *&data, std::unique_ptr<LocalFileHandle> &file_handle, size_t file_size) override;
5151

5252
private:
5353
mutable std::mutex mutex_;
5454
size_t index_size_{};
55+
boost::interprocess::managed_mapped_file segment_;
56+
bool inited_{};
5557
};
5658

5759
} // namespace infinity

src/storage/buffer/file_worker/hnsw_file_worker_impl.cpp

Lines changed: 50 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -64,46 +64,61 @@ HnswFileWorker::~HnswFileWorker() {
6464
mmap_ = nullptr;
6565
}
6666

67-
bool HnswFileWorker::Write(std::shared_ptr<HnswHandler> &data,
68-
std::unique_ptr<LocalFileHandle> &file_handle,
69-
bool &prepare_success,
70-
const FileWorkerSaveCtx &ctx) {
71-
std::unique_lock l(mutex_);
72-
73-
auto fd = file_handle->fd();
74-
mmap_size_ = data->CalcSize();
75-
ftruncate(fd, mmap_size_);
76-
77-
mmap_ = mmap(nullptr, mmap_size_, PROT_WRITE | PROT_READ, MAP_SHARED, fd, 0);
78-
79-
size_t offset{};
80-
data->SaveToPtr(mmap_, offset);
67+
// bool HnswFileWorker::Write(std::shared_ptr<HnswHandler> &data,
68+
// std::unique_ptr<LocalFileHandle> &file_handle,
69+
// bool &prepare_success,
70+
// const FileWorkerSaveCtx &ctx) {
71+
// std::unique_lock l(mutex_);
72+
//
73+
// auto fd = file_handle->fd();
74+
// mmap_size_ = data->CalcSize();
75+
// ftruncate(fd, mmap_size_);
76+
//
77+
// mmap_ = mmap(nullptr, mmap_size_, PROT_WRITE | PROT_READ, MAP_SHARED, fd, 0);
78+
//
79+
// size_t offset{};
80+
// data->SaveToPtr(mmap_, offset);
81+
//
82+
// auto &path = *rel_file_path_;
83+
// auto &cache_manager = InfinityContext::instance().storage()->fileworker_manager()->hnsw_map_.cache_manager_;
84+
// cache_manager.Set(path, data, data->MemUsage());
85+
// prepare_success = true;
86+
// return true;
87+
// }
88+
89+
void HnswFileWorker::Read(HnswHandler *&data, std::unique_ptr<LocalFileHandle> &file_handle, size_t file_size) {
90+
// std::unique_lock l(mutex_);
91+
// auto &path = *rel_file_path_;
92+
// std::println("{}", path);
93+
//
94+
// data = HnswHandler::Make(index_base_.get(), column_def_).release();
95+
// if (!mmap_) {
96+
// mmap_size_ = file_handle->FileSize();
97+
// auto fd = file_handle->fd();
98+
// mmap_ = mmap(nullptr, mmap_size_, PROT_WRITE | PROT_READ, MAP_SHARED, fd, 0);
99+
// }
100+
// data->LoadFromPtr(mmap_, mmap_size_, file_size);
81101

82102
auto &path = *rel_file_path_;
83-
auto &cache_manager = InfinityContext::instance().storage()->fileworker_manager()->hnsw_map_.cache_manager_;
84-
cache_manager.Set(path, data, data->MemUsage());
85-
prepare_success = true;
86-
return true;
87-
}
103+
auto tmp_path = GetFilePathTemp();
104+
if (!inited_) {
105+
if (!VirtualStore::Exists("/var/infinity/tmp")) {
106+
VirtualStore::MakeDirectory("/var/infinity/tmp");
107+
}
88108

89-
void HnswFileWorker::Read(std::shared_ptr<HnswHandler> &data, std::unique_ptr<LocalFileHandle> &file_handle, size_t file_size) {
90-
std::unique_lock l(mutex_);
91-
if (!file_handle) {
92-
return;
93-
}
94-
auto &path = *rel_file_path_;
95-
auto &cache_manager = InfinityContext::instance().storage()->fileworker_manager()->hnsw_map_.cache_manager_;
96-
bool flag = cache_manager.Get(path, data);
97-
if (!flag) {
98-
data = HnswHandler::Make(index_base_.get(), column_def_);
99-
if (!mmap_) {
100-
mmap_size_ = file_handle->FileSize();
101-
auto fd = file_handle->fd();
102-
mmap_ = mmap(nullptr, mmap_size_, PROT_WRITE | PROT_READ, MAP_SHARED, fd, 0);
109+
if (!VirtualStore::Exists(tmp_path.c_str())) {
110+
auto [handle, status] = VirtualStore::Open(tmp_path, FileAccessMode::kReadWrite);
111+
close(handle->fd());
112+
VirtualStore::DeleteFile(tmp_path.c_str());
103113
}
104-
data->LoadFromPtr(mmap_, mmap_size_, file_size);
105-
cache_manager.Set(path, data, data->MemUsage());
114+
segment_ = boost::interprocess::managed_mapped_file(boost::interprocess::open_or_create_infinity, tmp_path.c_str(), 1145141919ull);
115+
auto *sm = segment_.get_segment_manager();
116+
data = segment_.find_or_construct<HnswHandler>(path.c_str())(index_base_.get(), column_def_, sm);
117+
inited_ = true;
118+
return;
106119
}
120+
auto result = segment_.find<HnswHandler>(path.c_str());
121+
data = result.first;
107122
}
108123

109124
} // namespace infinity

src/storage/catalog/mem_index.cppm

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import row_id;
2525
namespace infinity {
2626

2727
class BaseMemIndex;
28-
class HnswIndexInMem;
28+
// class HnswIndexInMem;
2929
class IVFIndexInMem;
3030
class MemoryIndexer;
3131
class SecondaryIndexInMem;
@@ -52,8 +52,8 @@ public:
5252

5353
const BaseMemIndex *GetBaseMemIndex() const;
5454

55-
std::shared_ptr<HnswIndexInMem> GetHnswIndex();
56-
void SetHnswIndex(std::shared_ptr<HnswIndexInMem> hnsw_index);
55+
// std::shared_ptr<HnswIndexInMem> GetHnswIndex();
56+
// void SetHnswIndex(std::shared_ptr<HnswIndexInMem> hnsw_index);
5757

5858
std::shared_ptr<IVFIndexInMem> GetIVFIndex();
5959
void SetIVFIndex(std::shared_ptr<IVFIndexInMem> ivf_index);
@@ -85,7 +85,7 @@ private:
8585
bool is_dumping_{};
8686
bool is_updating_{};
8787

88-
std::shared_ptr<HnswIndexInMem> memory_hnsw_index_;
88+
// std::shared_ptr<HnswIndexInMem> memory_hnsw_index_;
8989
std::shared_ptr<IVFIndexInMem> memory_ivf_index_;
9090
std::shared_ptr<MemoryIndexer> memory_indexer_;
9191
std::shared_ptr<SecondaryIndexInMem> memory_secondary_index_;

src/storage/catalog/mem_index_impl.cpp

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -64,15 +64,15 @@ size_t MemIndex::GetRowCount() {
6464
}
6565

6666
bool MemIndex::IsNull() const {
67-
std::unique_lock<std::mutex> lock(mtx_);
68-
return memory_hnsw_index_ == nullptr && memory_ivf_index_ == nullptr && memory_indexer_ == nullptr && memory_secondary_index_ == nullptr &&
67+
std::unique_lock lock(mtx_);
68+
return /* memory_hnsw_index_ == nullptr && */ memory_ivf_index_ == nullptr && memory_indexer_ == nullptr && memory_secondary_index_ == nullptr &&
6969
memory_emvb_index_ == nullptr && memory_bmp_index_ == nullptr && memory_dummy_index_ == nullptr;
7070
}
7171

7272
void MemIndex::ClearMemIndex() {
73-
std::unique_lock<std::mutex> lock(mtx_);
73+
std::unique_lock lock(mtx_);
7474

75-
memory_hnsw_index_.reset();
75+
// memory_hnsw_index_.reset();
7676
memory_ivf_index_.reset();
7777
memory_indexer_.reset();
7878
memory_secondary_index_.reset();
@@ -83,11 +83,12 @@ void MemIndex::ClearMemIndex() {
8383
}
8484

8585
const BaseMemIndex *MemIndex::GetBaseMemIndex() const {
86-
std::unique_lock<std::mutex> lock(mtx_);
87-
BaseMemIndex *res = nullptr;
88-
if (memory_hnsw_index_.get() != nullptr) {
89-
res = static_cast<BaseMemIndex *>(memory_hnsw_index_.get());
90-
} else if (memory_ivf_index_.get() != nullptr) {
86+
std::unique_lock lock(mtx_);
87+
BaseMemIndex *res{};
88+
// if (memory_hnsw_index_.get() != nullptr) {
89+
// res = static_cast<BaseMemIndex *>(memory_hnsw_index_.get());
90+
// } else
91+
if (memory_ivf_index_.get() != nullptr) {
9192
res = static_cast<BaseMemIndex *>(memory_ivf_index_.get());
9293
} else if (memory_indexer_.get() != nullptr) {
9394
res = static_cast<BaseMemIndex *>(memory_indexer_.get());
@@ -104,15 +105,15 @@ const BaseMemIndex *MemIndex::GetBaseMemIndex() const {
104105
return res;
105106
}
106107

107-
std::shared_ptr<HnswIndexInMem> MemIndex::GetHnswIndex() {
108-
std::unique_lock<std::mutex> lock(mtx_);
109-
return memory_hnsw_index_;
110-
}
108+
// std::shared_ptr<HnswIndexInMem> MemIndex::GetHnswIndex() {
109+
// std::unique_lock<std::mutex> lock(mtx_);
110+
// return memory_hnsw_index_;
111+
// }
111112

112-
void MemIndex::SetHnswIndex(std::shared_ptr<HnswIndexInMem> hnsw_index) {
113-
std::unique_lock<std::mutex> lock(mtx_);
114-
memory_hnsw_index_ = hnsw_index;
115-
}
113+
// void MemIndex::SetHnswIndex(std::shared_ptr<HnswIndexInMem> hnsw_index) {
114+
// std::unique_lock<std::mutex> lock(mtx_);
115+
// memory_hnsw_index_ = hnsw_index;
116+
// }
116117

117118
std::shared_ptr<IVFIndexInMem> MemIndex::GetIVFIndex() {
118119
std::unique_lock<std::mutex> lock(mtx_);

src/storage/catalog/meta/block_version.cppm

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@ struct CreateField {
3434
static CreateField LoadFromFile(LocalFileHandle *file_handle);
3535
};
3636

37-
std::atomic_int cnt{};
38-
3937
export struct BlockVersion {
4038
using segment_manager = boost::interprocess::managed_mapped_file::segment_manager;
4139
using TxnTimeStamp_allocator = boost::interprocess::allocator<TxnTimeStamp, segment_manager>;

src/storage/catalog/meta/segment_index_meta.cppm

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public:
4747

4848
std::tuple<std::vector<ChunkID> *, Status> GetChunkIDs1();
4949

50-
Status SetChunkIDs(const std::vector<ChunkID> &chunk_ids);
50+
// Status SetChunkIDs(const std::vector<ChunkID> &chunk_ids);
5151

5252
Status RemoveChunkIDs(const std::vector<ChunkID> &chunk_ids);
5353

@@ -75,7 +75,7 @@ private:
7575

7676
Status LoadNextChunkID();
7777

78-
Status LoadFtInfo();
78+
// Status LoadFtInfo();
7979

8080
std::string GetSegmentIndexTag(const std::string &tag);
8181

src/storage/catalog/meta/segment_index_meta_impl.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ SegmentIndexMeta::SegmentIndexMeta(SegmentID segment_id, TableIndexMeta &table_i
4848
SegmentIndexMeta::~SegmentIndexMeta() = default;
4949

5050
Status SegmentIndexMeta::GetNextChunkID(ChunkID &chunk_id) {
51-
std::lock_guard<std::mutex> lock(mtx_);
51+
std::lock_guard lock(mtx_);
5252
if (!next_chunk_id_) {
5353
Status status = LoadNextChunkID();
5454
if (!status.ok()) {
@@ -126,7 +126,7 @@ Status SegmentIndexMeta::RemoveChunkIDs(const std::vector<ChunkID> &chunk_ids) {
126126
Status SegmentIndexMeta::AddChunkIndexID1(ChunkID chunk_id, NewTxn *new_txn) {
127127

128128
TableMeta &table_meta = table_index_meta_.table_meta();
129-
std::string chunk_id_key =
129+
auto chunk_id_key =
130130
KeyEncode::CatalogIdxChunkKey(table_meta.db_id_str(), table_meta.table_id_str(), table_index_meta_.index_id_str(), segment_id_, chunk_id);
131131
std::string commit_ts_str;
132132
switch (new_txn->GetTxnState()) {

src/storage/catalog/new_catalog.cppm

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,8 @@ public:
297297
size_t index_size,
298298
std::optional<ChunkIndexMeta> &chunk_index_meta);
299299

300+
static Status InitHnswChunkIndex(SegmentIndexMeta &segment_index_meta, NewTxn *new_txn, std::optional<ChunkIndexMeta> &chunk_index_meta);
301+
300302
static Status RestoreNewChunkIndex1(SegmentIndexMeta &segment_index_meta,
301303
NewTxn *new_txn,
302304
ChunkID chunk_id,

0 commit comments

Comments
 (0)