Skip to content

Commit a6a78d5

Browse files
authored
Nlp cache (microsoft#1689)
* Add cache to build_noun_graph * Semver
1 parent c02ab09 commit a6a78d5

File tree

10 files changed

+68
-9
lines changed

10 files changed

+68
-9
lines changed
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{
2+
"type": "patch",
3+
"description": "Add caching to NLP extractor."
4+
}

graphrag/config/models/extract_graph_nlp_config.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,3 +64,7 @@ class ExtractGraphNLPConfig(BaseModel):
6464
text_analyzer: TextAnalyzerConfig = Field(
6565
description="The text analyzer configuration.", default=TextAnalyzerConfig()
6666
)
67+
parallelization_num_threads: int = Field(
68+
description="The number of threads to use for the extraction process.",
69+
default=defs.PARALLELIZATION_NUM_THREADS,
70+
)

graphrag/index/flows/extract_graph_nlp.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import pandas as pd
77

8+
from graphrag.cache.pipeline_cache import PipelineCache
89
from graphrag.callbacks.workflow_callbacks import WorkflowCallbacks
910
from graphrag.config.models.embed_graph_config import EmbedGraphConfig
1011
from graphrag.config.models.extract_graph_nlp_config import ExtractGraphNLPConfig
@@ -20,9 +21,10 @@
2021
from graphrag.index.operations.prune_graph import prune_graph
2122

2223

23-
def extract_graph_nlp(
24+
async def extract_graph_nlp(
2425
text_units: pd.DataFrame,
2526
callbacks: WorkflowCallbacks,
27+
cache: PipelineCache,
2628
extraction_config: ExtractGraphNLPConfig,
2729
pruning_config: PruneGraphConfig,
2830
embed_config: EmbedGraphConfig | None = None,
@@ -31,10 +33,12 @@ def extract_graph_nlp(
3133
"""All the steps to create the base entity graph."""
3234
text_analyzer_config = extraction_config.text_analyzer
3335
text_analyzer = create_noun_phrase_extractor(text_analyzer_config)
34-
extracted_nodes, extracted_edges = build_noun_graph(
36+
extracted_nodes, extracted_edges = await build_noun_graph(
3537
text_units,
3638
text_analyzer=text_analyzer,
3739
normalize_edge_weights=extraction_config.normalize_edge_weights,
40+
num_threads=extraction_config.parallelization_num_threads,
41+
cache=cache,
3842
)
3943

4044
# create a temporary graph to prune, then turn it back into dataframes

graphrag/index/operations/build_noun_graph/build_noun_graph.py

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,37 +7,65 @@
77

88
import pandas as pd
99

10+
from graphrag.cache.noop_pipeline_cache import NoopPipelineCache
11+
from graphrag.cache.pipeline_cache import PipelineCache
12+
from graphrag.config.enums import AsyncType
1013
from graphrag.index.operations.build_noun_graph.np_extractors.base import (
1114
BaseNounPhraseExtractor,
1215
)
16+
from graphrag.index.run.derive_from_rows import derive_from_rows
17+
from graphrag.index.utils.hashing import gen_sha512_hash
1318

1419

15-
def build_noun_graph(
20+
async def build_noun_graph(
1621
text_unit_df: pd.DataFrame,
1722
text_analyzer: BaseNounPhraseExtractor,
1823
normalize_edge_weights: bool,
24+
num_threads: int = 4,
25+
cache: PipelineCache | None = None,
1926
) -> tuple[pd.DataFrame, pd.DataFrame]:
2027
"""Build a noun graph from text units."""
2128
text_units = text_unit_df.loc[:, ["id", "text"]]
22-
nodes_df = _extract_nodes(text_units, text_analyzer)
29+
nodes_df = await _extract_nodes(
30+
text_units, text_analyzer, num_threads=num_threads, cache=cache
31+
)
2332
edges_df = _extract_edges(nodes_df, normalize_edge_weights=normalize_edge_weights)
2433

2534
return (nodes_df, edges_df)
2635

2736

28-
def _extract_nodes(
37+
async def _extract_nodes(
2938
text_unit_df: pd.DataFrame,
3039
text_analyzer: BaseNounPhraseExtractor,
40+
num_threads: int = 4,
41+
cache: PipelineCache | None = None,
3142
) -> pd.DataFrame:
3243
"""
3344
Extract initial nodes and edges from text units.
3445
3546
Input: text unit df with schema [id, text, document_id]
3647
Returns a dataframe with schema [id, title, freq, text_unit_ids].
3748
"""
38-
text_unit_df["noun_phrases"] = text_unit_df["text"].apply(
39-
lambda text: text_analyzer.extract(text)
49+
cache = cache or NoopPipelineCache()
50+
cache = cache.child("extract_noun_phrases")
51+
52+
async def extract(row):
53+
text = row["text"]
54+
attrs = {"text": text, "analyzer": str(text_analyzer)}
55+
key = gen_sha512_hash(attrs, attrs.keys())
56+
result = await cache.get(key)
57+
if not result:
58+
result = text_analyzer.extract(text)
59+
await cache.set(key, result)
60+
return result
61+
62+
text_unit_df["noun_phrases"] = await derive_from_rows(
63+
text_unit_df,
64+
extract,
65+
num_threads=num_threads,
66+
async_type=AsyncType.Threaded,
4067
)
68+
4169
noun_node_df = text_unit_df.explode("noun_phrases")
4270
noun_node_df = noun_node_df.rename(
4371
columns={"noun_phrases": "title", "id": "text_unit_id"}

graphrag/index/operations/build_noun_graph/np_extractors/base.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,7 @@ def extract(self, text: str) -> list[str]:
3333
3434
Returns: List of noun phrases.
3535
"""
36+
37+
@abstractmethod
38+
def __str__(self) -> str:
39+
"""Return string representation of the extractor, used for cache key generation."""

graphrag/index/operations/build_noun_graph/np_extractors/cfg_extractor.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,3 +172,7 @@ def _tag_noun_phrases(
172172
cleaned_tokens, self.max_word_length
173173
),
174174
}
175+
176+
def __str__(self) -> str:
177+
"""Return string representation of the extractor, used for cache key generation."""
178+
return f"cfg_{self.model_name}_{self.max_word_length}_{self.include_named_entities}_{self.exclude_entity_tags}_{self.exclude_pos_tags}_{self.exclude_nouns}_{self.word_delimiter}_{self.noun_phrase_grammars}_{self.noun_phrase_tags}"

graphrag/index/operations/build_noun_graph/np_extractors/regex_extractor.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,3 +117,7 @@ def _tag_noun_phrases(
117117
"has_compound_words": has_compound_words,
118118
"has_valid_tokens": has_valid_tokens,
119119
}
120+
121+
def __str__(self) -> str:
122+
"""Return string representation of the extractor, used for cache key generation."""
123+
return f"regex_en_{self.exclude_nouns}_{self.max_word_length}_{self.word_delimiter}"

graphrag/index/operations/build_noun_graph/np_extractors/syntactic_parsing_extractor.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,3 +157,7 @@ def _tag_noun_phrases(
157157
cleaned_token_texts, self.max_word_length
158158
),
159159
}
160+
161+
def __str__(self) -> str:
162+
"""Return string representation of the extractor, used for cache key generation."""
163+
return f"syntactic_{self.model_name}_{self.max_word_length}_{self.include_named_entities}_{self.exclude_entity_tags}_{self.exclude_pos_tags}_{self.exclude_nouns}_{self.word_delimiter}"

graphrag/index/run/derive_from_rows.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
import pandas as pd
1414

15+
from graphrag.callbacks.noop_workflow_callbacks import NoopWorkflowCallbacks
1516
from graphrag.callbacks.workflow_callbacks import WorkflowCallbacks
1617
from graphrag.config.enums import AsyncType
1718
from graphrag.logger.progress import progress_ticker
@@ -33,11 +34,12 @@ def __init__(self, num_errors: int, example: str | None = None):
3334
async def derive_from_rows(
3435
input: pd.DataFrame,
3536
transform: Callable[[pd.Series], Awaitable[ItemType]],
36-
callbacks: WorkflowCallbacks,
37+
callbacks: WorkflowCallbacks | None = None,
3738
num_threads: int = 4,
3839
async_type: AsyncType = AsyncType.AsyncIO,
3940
) -> list[ItemType | None]:
4041
"""Apply a generic transform function to each row. Any errors will be reported and thrown."""
42+
callbacks = callbacks or NoopWorkflowCallbacks()
4143
match async_type:
4244
case AsyncType.AsyncIO:
4345
return await derive_from_rows_asyncio(

graphrag/index/workflows/extract_graph_nlp.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,10 @@ async def run_workflow(
2626
"""All the steps to create the base entity graph."""
2727
text_units = await load_table_from_storage("text_units", context.storage)
2828

29-
entities, relationships = extract_graph_nlp(
29+
entities, relationships = await extract_graph_nlp(
3030
text_units,
3131
callbacks,
32+
context.cache,
3233
extraction_config=config.extract_graph_nlp,
3334
pruning_config=config.prune_graph,
3435
embed_config=config.embed_graph,

0 commit comments

Comments
 (0)