Skip to content

Commit 4512ce0

Browse files
authored
Empty graph guards (#2126)
* Remove networkx from graph_extractor and clean out redundancy * Bubble pipeline error to console
1 parent e0cce31 commit 4512ce0

File tree

13 files changed

+160
-433
lines changed

13 files changed

+160
-433
lines changed

packages/graphrag/graphrag/api/index.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,9 @@ async def build_index(
8686
input_documents=input_documents,
8787
):
8888
outputs.append(output)
89-
if output.errors and len(output.errors) > 0:
89+
if output.error is not None:
9090
logger.error("Workflow %s completed with errors", output.workflow)
91+
workflow_callbacks.pipeline_error(output.error)
9192
else:
9293
logger.info("Workflow %s completed successfully", output.workflow)
9394
logger.debug(str(output.result))

packages/graphrag/graphrag/callbacks/console_workflow_callbacks.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ def workflow_end(self, name: str, instance: object) -> None:
3737
if self._verbose:
3838
print(instance)
3939

40+
def pipeline_error(self, error: BaseException) -> None:
41+
"""Execute this callback when an error occurs in the pipeline."""
42+
print(f"Pipeline error: {error}")
43+
4044
def progress(self, progress: Progress) -> None:
4145
"""Handle when progress occurs."""
4246
complete = progress.completed_items or 0

packages/graphrag/graphrag/callbacks/noop_workflow_callbacks.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,6 @@ def workflow_end(self, name: str, instance: object) -> None:
2525

2626
def progress(self, progress: Progress) -> None:
2727
"""Handle when progress occurs."""
28+
29+
def pipeline_error(self, error: BaseException) -> None:
30+
"""Execute this callback when an error occurs in the pipeline."""

packages/graphrag/graphrag/callbacks/workflow_callbacks.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,7 @@ def workflow_end(self, name: str, instance: object) -> None:
3535
def progress(self, progress: Progress) -> None:
3636
"""Handle when progress occurs."""
3737
...
38+
39+
def pipeline_error(self, error: BaseException) -> None:
40+
"""Execute this callback when an error occurs in the pipeline."""
41+
...

packages/graphrag/graphrag/callbacks/workflow_callbacks_manager.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,9 @@ def progress(self, progress: Progress) -> None:
5050
for callback in self._callbacks:
5151
if hasattr(callback, "progress"):
5252
callback.progress(progress)
53+
54+
def pipeline_error(self, error: BaseException) -> None:
55+
"""Execute this callback when an error occurs in the pipeline."""
56+
for callback in self._callbacks:
57+
if hasattr(callback, "pipeline_error"):
58+
callback.pipeline_error(error)

packages/graphrag/graphrag/cli/index.py

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -134,15 +134,6 @@ def _run_index(
134134
verbose=verbose,
135135
)
136136
)
137-
encountered_errors = any(
138-
output.errors and len(output.errors) > 0 for output in outputs
139-
)
140-
141-
if encountered_errors:
142-
logger.error(
143-
"Errors occurred during the pipeline run, see logs for more details."
144-
)
145-
else:
146-
logger.info("All workflows completed successfully.")
137+
encountered_errors = any(output.error is not None for output in outputs)
147138

148139
sys.exit(1 if encountered_errors else 0)

packages/graphrag/graphrag/index/operations/extract_graph/extract_graph.py

Lines changed: 18 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,11 @@
55

66
import logging
77

8-
import networkx as nx
98
import pandas as pd
109

1110
from graphrag.callbacks.workflow_callbacks import WorkflowCallbacks
1211
from graphrag.config.enums import AsyncType
1312
from graphrag.index.operations.extract_graph.graph_extractor import GraphExtractor
14-
from graphrag.index.operations.extract_graph.typing import (
15-
Document,
16-
EntityExtractionResult,
17-
EntityTypes,
18-
)
1913
from graphrag.index.utils.derive_from_rows import derive_from_rows
2014
from graphrag.language_model.protocol.base import ChatModel
2115

@@ -42,14 +36,15 @@ async def run_strategy(row):
4236
text = row[text_column]
4337
id = row[id_column]
4438
result = await run_extract_graph(
45-
[Document(text=text, id=id)],
46-
entity_types,
47-
model,
48-
prompt,
49-
max_gleanings,
39+
text=text,
40+
source_id=id,
41+
entity_types=entity_types,
42+
model=model,
43+
prompt=prompt,
44+
max_gleanings=max_gleanings,
5045
)
5146
num_started += 1
52-
return [result.entities, result.relationships, result.graph]
47+
return result
5348

5449
results = await derive_from_rows(
5550
text_units,
@@ -64,8 +59,8 @@ async def run_strategy(row):
6459
relationship_dfs = []
6560
for result in results:
6661
if result:
67-
entity_dfs.append(pd.DataFrame(result[0]))
68-
relationship_dfs.append(pd.DataFrame(result[1]))
62+
entity_dfs.append(result[0])
63+
relationship_dfs.append(result[1])
6964

7065
entities = _merge_entities(entity_dfs)
7166
relationships = _merge_relationships(relationship_dfs)
@@ -74,12 +69,13 @@ async def run_strategy(row):
7469

7570

7671
async def run_extract_graph(
77-
docs: list[Document],
78-
entity_types: EntityTypes,
72+
text: str,
73+
source_id: str,
74+
entity_types: list[str],
7975
model: ChatModel,
8076
prompt: str,
8177
max_gleanings: int,
82-
) -> EntityExtractionResult:
78+
) -> tuple[pd.DataFrame, pd.DataFrame]:
8379
"""Run the graph intelligence entity extraction strategy."""
8480
extractor = GraphExtractor(
8581
model=model,
@@ -89,36 +85,15 @@ async def run_extract_graph(
8985
"Entity Extraction Error", exc_info=e, extra={"stack": s, "details": d}
9086
),
9187
)
92-
text_list = [doc.text.strip() for doc in docs]
88+
text = text.strip()
9389

94-
results = await extractor(
95-
list(text_list),
90+
entities_df, relationships_df = await extractor(
91+
text,
9692
entity_types=entity_types,
93+
source_id=source_id,
9794
)
9895

99-
graph = results.output
100-
# Map the "source_id" back to the "id" field
101-
for _, node in graph.nodes(data=True): # type: ignore
102-
if node is not None:
103-
node["source_id"] = ",".join(
104-
docs[int(id)].id for id in node["source_id"].split(",")
105-
)
106-
107-
for _, _, edge in graph.edges(data=True): # type: ignore
108-
if edge is not None:
109-
edge["source_id"] = ",".join(
110-
docs[int(id)].id for id in edge["source_id"].split(",")
111-
)
112-
113-
entities = [
114-
({"title": item[0], **(item[1] or {})})
115-
for item in graph.nodes(data=True)
116-
if item is not None
117-
]
118-
119-
relationships = nx.to_pandas_edgelist(graph)
120-
121-
return EntityExtractionResult(entities, relationships, graph)
96+
return (entities_df, relationships_df)
12297

12398

12499
def _merge_entities(entity_dfs) -> pd.DataFrame:

0 commit comments

Comments
 (0)