Skip to content

Commit 0bcdd3d

Browse files
Merge pull request #104 from open-sciencelab/feature/optimize-pipeline
refactor: delete meta storage
2 parents 48d44c5 + b9901ad commit 0bcdd3d

File tree

4 files changed

+8
-43
lines changed

4 files changed

+8
-43
lines changed

graphgen/graphgen.py

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
from graphgen.models import (
1010
JsonKVStorage,
1111
JsonListStorage,
12-
MetaJsonKVStorage,
1312
NetworkXStorage,
1413
OpenAIClient,
1514
Tokenizer,
@@ -54,9 +53,6 @@ def __init__(
5453
)
5554
self.trainee_llm_client: BaseLLMWrapper = trainee_llm_client
5655

57-
self.meta_storage: MetaJsonKVStorage = MetaJsonKVStorage(
58-
self.working_dir, namespace="_meta"
59-
)
6056
self.full_docs_storage: JsonKVStorage = JsonKVStorage(
6157
self.working_dir, namespace="full_docs"
6258
)
@@ -98,11 +94,7 @@ async def read(self, read_config: Dict):
9894
batch = {}
9995
for doc in doc_stream:
10096
doc_id = compute_mm_hash(doc, prefix="doc-")
101-
10297
batch[doc_id] = doc
103-
if batch:
104-
self.full_docs_storage.upsert(batch)
105-
self.full_docs_storage.index_done_callback()
10698

10799
# TODO: configurable whether to use coreference resolution
108100

@@ -120,7 +112,7 @@ async def chunk(self, chunk_config: Dict):
120112
chunk documents into smaller pieces from full_docs_storage if not already present
121113
"""
122114

123-
new_docs = self.meta_storage.get_new_data(self.full_docs_storage)
115+
new_docs = self.full_docs_storage.get_all()
124116
if len(new_docs) == 0:
125117
logger.warning("All documents are already in the storage")
126118
return
@@ -143,16 +135,15 @@ async def chunk(self, chunk_config: Dict):
143135

144136
self.chunks_storage.upsert(inserting_chunks)
145137
self.chunks_storage.index_done_callback()
146-
self.meta_storage.mark_done(self.full_docs_storage)
147-
self.meta_storage.index_done_callback()
148138

149139
@async_to_sync_method
150140
async def build_kg(self):
151141
"""
152142
build knowledge graph from text chunks
153143
"""
154-
# Step 1: get new chunks according to meta and chunks storage
155-
inserting_chunks = self.meta_storage.get_new_data(self.chunks_storage)
144+
# Step 1: get new chunks
145+
inserting_chunks = self.chunks_storage.get_all()
146+
156147
if len(inserting_chunks) == 0:
157148
logger.warning("All chunks are already in the storage")
158149
return
@@ -169,18 +160,16 @@ async def build_kg(self):
169160
logger.warning("No entities or relations extracted from text chunks")
170161
return
171162

172-
# Step 3: mark meta
163+
# Step 3: upsert new entities and relations to the graph storage
173164
self.graph_storage.index_done_callback()
174-
self.meta_storage.mark_done(self.chunks_storage)
175-
self.meta_storage.index_done_callback()
176165

177166
return _add_entities_and_relations
178167

179168
@async_to_sync_method
180169
async def search(self, search_config: Dict):
181170
logger.info("[Search] %s ...", ", ".join(search_config["data_sources"]))
182171

183-
seeds = self.meta_storage.get_new_data(self.full_docs_storage)
172+
seeds = self.full_docs_storage.get_all()
184173
if len(seeds) == 0:
185174
logger.warning("All documents are already been searched")
186175
return
@@ -198,8 +187,6 @@ async def search(self, search_config: Dict):
198187
return
199188
self.search_storage.upsert(search_results)
200189
self.search_storage.index_done_callback()
201-
self.meta_storage.mark_done(self.full_docs_storage)
202-
self.meta_storage.index_done_callback()
203190

204191
@async_to_sync_method
205192
async def quiz_and_judge(self, quiz_and_judge_config: Dict):
@@ -268,8 +255,6 @@ async def extract(self, extract_config: Dict):
268255

269256
self.extract_storage.upsert(results)
270257
self.extract_storage.index_done_callback()
271-
self.meta_storage.mark_done(self.chunks_storage)
272-
self.meta_storage.index_done_callback()
273258

274259
@async_to_sync_method
275260
async def generate(self, generate_config: Dict):

graphgen/models/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,5 +31,5 @@
3131
from .searcher.web.bing_search import BingSearch
3232
from .searcher.web.google_search import GoogleSearch
3333
from .splitter import ChineseRecursiveTextSplitter, RecursiveCharacterSplitter
34-
from .storage import JsonKVStorage, JsonListStorage, MetaJsonKVStorage, NetworkXStorage
34+
from .storage import JsonKVStorage, JsonListStorage, NetworkXStorage
3535
from .tokenizer import Tokenizer
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
from .json_storage import JsonKVStorage, JsonListStorage, MetaJsonKVStorage
1+
from .json_storage import JsonKVStorage, JsonListStorage
22
from .networkx_storage import NetworkXStorage

graphgen/models/storage/json_storage.py

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -92,23 +92,3 @@ def upsert(self, data: list):
9292

9393
def drop(self):
9494
self._data = []
95-
96-
97-
@dataclass
98-
class MetaJsonKVStorage(JsonKVStorage):
99-
def __post_init__(self):
100-
self._file_name = os.path.join(self.working_dir, f"{self.namespace}.json")
101-
self._data = load_json(self._file_name) or {}
102-
logger.info("Load KV %s with %d data", self.namespace, len(self._data))
103-
104-
def get_new_data(self, storage_instance: "JsonKVStorage") -> dict:
105-
new_data = {}
106-
for k, v in storage_instance.data.items():
107-
if k not in self._data:
108-
new_data[k] = v
109-
return new_data
110-
111-
def mark_done(self, storage_instance: "JsonKVStorage"):
112-
new_data = self.get_new_data(storage_instance)
113-
if new_data:
114-
self._data.update(new_data)

0 commit comments

Comments
 (0)