Skip to content

Commit 35b6393

Browse files
Incremental flow rework (#1696)
* Rework update output structure * Semver * Fix unit test * Update frequency in incremental --------- Co-authored-by: Alonso Guevara <[email protected]>
1 parent 5ef2399 commit 35b6393

File tree

13 files changed

+156
-140
lines changed

13 files changed

+156
-140
lines changed
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{
2+
"type": "minor",
3+
"description": "Rework the update output storage structure."
4+
}

graphrag/api/index.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
async def build_index(
2727
config: GraphRagConfig,
2828
method: IndexingMethod = IndexingMethod.Standard,
29+
is_update_run: bool = False,
2930
memory_profile: bool = False,
3031
callbacks: list[WorkflowCallbacks] | None = None,
3132
progress_logger: ProgressLogger | None = None,
@@ -50,8 +51,6 @@ async def build_index(
5051
list[PipelineRunResult]
5152
The list of pipeline run results
5253
"""
53-
is_update_run = bool(config.update_index_output)
54-
5554
pipeline_cache = (
5655
NoopPipelineCache() if config.cache.type == CacheType.none is None else None
5756
)

graphrag/cli/index.py

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -78,11 +78,13 @@ def index_cli(
7878
if output_dir:
7979
cli_overrides["output.base_dir"] = str(output_dir)
8080
cli_overrides["reporting.base_dir"] = str(output_dir)
81+
cli_overrides["update_index_output.base_dir"] = str(output_dir)
8182
config = load_config(root_dir, config_filepath, cli_overrides)
8283

8384
_run_index(
8485
config=config,
8586
method=method,
87+
is_update_run=False,
8688
verbose=verbose,
8789
memprofile=memprofile,
8890
cache=cache,
@@ -108,21 +110,14 @@ def update_cli(
108110
if output_dir:
109111
cli_overrides["output.base_dir"] = str(output_dir)
110112
cli_overrides["reporting.base_dir"] = str(output_dir)
111-
config = load_config(root_dir, config_filepath, cli_overrides)
112-
113-
# Check if update output exist, if not configure it with default values
114-
if not config.update_index_output:
115-
from graphrag.config.defaults import OUTPUT_TYPE, UPDATE_OUTPUT_BASE_DIR
116-
from graphrag.config.models.output_config import OutputConfig
113+
cli_overrides["update_index_output.base_dir"] = str(output_dir)
117114

118-
config.update_index_output = OutputConfig(
119-
type=OUTPUT_TYPE,
120-
base_dir=UPDATE_OUTPUT_BASE_DIR,
121-
)
115+
config = load_config(root_dir, config_filepath, cli_overrides)
122116

123117
_run_index(
124118
config=config,
125119
method=method,
120+
is_update_run=True,
126121
verbose=verbose,
127122
memprofile=memprofile,
128123
cache=cache,
@@ -135,6 +130,7 @@ def update_cli(
135130
def _run_index(
136131
config,
137132
method,
133+
is_update_run,
138134
verbose,
139135
memprofile,
140136
cache,
@@ -176,6 +172,7 @@ def _run_index(
176172
api.build_index(
177173
config=config,
178174
method=method,
175+
is_update_run=is_update_run,
179176
memory_profile=memprofile,
180177
progress_logger=progress_logger,
181178
)

graphrag/config/defaults.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,12 +128,11 @@
128128
SNAPSHOTS_GRAPHML = False
129129
SNAPSHOTS_EMBEDDINGS = False
130130
OUTPUT_BASE_DIR = "output"
131-
OUTPUT_DEFAULT_ID = "default_output"
132131
OUTPUT_TYPE = OutputType.file
132+
UPDATE_OUTPUT_BASE_DIR = "update_output"
133133
SUMMARIZE_DESCRIPTIONS_MAX_LENGTH = 500
134134
SUMMARIZE_MODEL_ID = DEFAULT_CHAT_MODEL_ID
135135
UMAP_ENABLED = False
136-
UPDATE_OUTPUT_BASE_DIR = "update_output"
137136

138137
# Graph Pruning
139138
PRUNE_MIN_NODE_FREQ = 2

graphrag/config/init_content.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,6 @@
9191
type: {defs.OUTPUT_TYPE.value} # [file, blob, cosmosdb]
9292
base_dir: "{defs.OUTPUT_BASE_DIR}"
9393
94-
## only turn this on if running `graphrag index` with custom settings
95-
## we normally use `graphrag update` with the defaults
96-
update_index_output:
97-
# type: {defs.OUTPUT_TYPE.value} # [file, blob, cosmosdb]
98-
# base_dir: "{defs.UPDATE_OUTPUT_BASE_DIR}"
99-
10094
### Workflow settings ###
10195
10296
extract_graph:

graphrag/config/models/graph_rag_config.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -134,20 +134,20 @@ def _validate_multi_output_base_dirs(self) -> None:
134134
(Path(self.root_dir) / output.base_dir).resolve()
135135
)
136136

137-
update_index_output: OutputConfig | None = Field(
137+
update_index_output: OutputConfig = Field(
138138
description="The output configuration for the updated index.",
139-
default=None,
139+
default=OutputConfig(
140+
type=defs.OUTPUT_TYPE,
141+
base_dir=defs.UPDATE_OUTPUT_BASE_DIR,
142+
),
140143
)
141144
"""The output configuration for the updated index."""
142145

143146
def _validate_update_index_output_base_dir(self) -> None:
144147
"""Validate the update index output base directory."""
145-
if (
146-
self.update_index_output
147-
and self.update_index_output.type == defs.OutputType.file
148-
):
148+
if self.update_index_output.type == defs.OutputType.file:
149149
if self.update_index_output.base_dir.strip() == "":
150-
msg = "Update index output base directory is required for file output. Please rerun `graphrag init` and set the update index output configuration."
150+
msg = "update_index_output base directory is required for file output. Please rerun `graphrag init` and set the update_index_output configuration."
151151
raise ValueError(msg)
152152
self.update_index_output.base_dir = str(
153153
(Path(self.root_dir) / self.update_index_output.base_dir).resolve()

graphrag/index/run/run_pipeline.py

Lines changed: 52 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import json
77
import logging
8+
import re
89
import time
910
import traceback
1011
from collections.abc import AsyncIterable
@@ -31,7 +32,7 @@
3132
from graphrag.logger.progress import Progress
3233
from graphrag.storage.factory import StorageFactory
3334
from graphrag.storage.pipeline_storage import PipelineStorage
34-
from graphrag.utils.storage import write_table_to_storage
35+
from graphrag.utils.storage import load_table_from_storage, write_table_to_storage
3536

3637
log = logging.getLogger(__name__)
3738

@@ -66,45 +67,49 @@ async def run_pipeline(
6667
if is_update_run:
6768
progress_logger.info("Running incremental indexing.")
6869

69-
update_storage_config = config.update_index_output.model_dump() # type: ignore
70-
update_index_storage = StorageFactory().create_storage(
71-
storage_type=update_storage_config["type"], # type: ignore
72-
kwargs=update_storage_config,
73-
)
74-
7570
delta_dataset = await get_delta_docs(dataset, storage)
7671

77-
# Fail on empty delta dataset
72+
# warn on empty delta dataset
7873
if delta_dataset.new_inputs.empty:
79-
error_msg = "Incremental Indexing Error: No new documents to process."
80-
raise ValueError(error_msg)
81-
82-
delta_storage = update_index_storage.child("delta")
83-
84-
# Run the pipeline on the new documents
85-
tables_dict = {}
86-
async for table in _run_pipeline(
87-
pipeline=pipeline,
88-
config=config,
89-
dataset=delta_dataset.new_inputs,
90-
cache=cache,
91-
storage=delta_storage,
92-
callbacks=callback_chain,
93-
logger=progress_logger,
94-
):
95-
tables_dict[table.workflow] = table.result
96-
97-
progress_logger.success("Finished running workflows on new documents.")
98-
99-
await update_dataframe_outputs(
100-
dataframe_dict=tables_dict,
101-
storage=storage,
102-
update_storage=update_index_storage,
103-
config=config,
104-
cache=cache,
105-
callbacks=NoopWorkflowCallbacks(),
106-
progress_logger=progress_logger,
107-
)
74+
warning_msg = "Incremental indexing found no new documents, exiting."
75+
progress_logger.warning(warning_msg)
76+
else:
77+
update_storage_config = config.update_index_output.model_dump() # type: ignore
78+
update_storage = StorageFactory().create_storage(
79+
storage_type=update_storage_config["type"], # type: ignore
80+
kwargs=update_storage_config,
81+
)
82+
# we use this to store the new subset index, and will merge its content with the previous index
83+
timestamped_storage = update_storage.child(time.strftime("%Y%m%d-%H%M%S"))
84+
delta_storage = timestamped_storage.child("delta")
85+
# copy the previous output to a backup folder, so we can replace it with the update
86+
# we'll read from this later when we merge the old and new indexes
87+
previous_storage = timestamped_storage.child("previous")
88+
await _copy_previous_output(storage, previous_storage)
89+
90+
# Run the pipeline on the new documents
91+
async for table in _run_pipeline(
92+
pipeline=pipeline,
93+
config=config,
94+
dataset=delta_dataset.new_inputs,
95+
cache=cache,
96+
storage=delta_storage,
97+
callbacks=callback_chain,
98+
logger=progress_logger,
99+
):
100+
yield table
101+
102+
progress_logger.success("Finished running workflows on new documents.")
103+
104+
await update_dataframe_outputs(
105+
previous_storage=previous_storage,
106+
delta_storage=delta_storage,
107+
output_storage=storage,
108+
config=config,
109+
cache=cache,
110+
callbacks=NoopWorkflowCallbacks(),
111+
progress_logger=progress_logger,
112+
)
108113

109114
else:
110115
progress_logger.info("Running standard indexing.")
@@ -172,3 +177,13 @@ async def _dump_stats(stats: PipelineRunStats, storage: PipelineStorage) -> None
172177
await storage.set(
173178
"stats.json", json.dumps(asdict(stats), indent=4, ensure_ascii=False)
174179
)
180+
181+
182+
async def _copy_previous_output(
183+
storage: PipelineStorage,
184+
copy_storage: PipelineStorage,
185+
):
186+
for file in storage.find(re.compile(r"\.parquet$")):
187+
base_name = file[0].replace(".parquet", "")
188+
table = await load_table_from_storage(base_name, storage)
189+
await write_table_to_storage(table, base_name, copy_storage)

graphrag/index/update/entities.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,16 @@ def _group_and_resolve_entities(
6565
"description": lambda x: list(x.astype(str)), # Ensure str
6666
# Concatenate nd.array into a single list
6767
"text_unit_ids": lambda x: list(itertools.chain(*x.tolist())),
68+
"degree": "first", # todo: we could probably re-compute this with the entire new graph
69+
"x": "first",
70+
"y": "first",
6871
})
6972
.reset_index()
7073
)
7174

75+
# recompute frequency to include new text units
76+
aggregated["frequency"] = aggregated["text_unit_ids"].apply(len)
77+
7278
# Force the result into a DataFrame
7379
resolved: pd.DataFrame = pd.DataFrame(aggregated)
7480

@@ -82,6 +88,10 @@ def _group_and_resolve_entities(
8288
"type",
8389
"description",
8490
"text_unit_ids",
91+
"frequency",
92+
"degree",
93+
"x",
94+
"y",
8595
],
8696
]
8797

0 commit comments

Comments
 (0)