1313from graphrag .callbacks .workflow_callbacks import WorkflowCallbacks
1414from graphrag .chunking .chunker import Chunker
1515from graphrag .chunking .chunker_factory import create_chunker
16+ from graphrag .chunking .prepend_metadata import prepend_metadata as prepend_metadata_fn
1617from graphrag .config .models .graph_rag_config import GraphRagConfig
1718from graphrag .index .typing .context import PipelineRunContext
1819from graphrag .index .typing .workflow import WorkflowFunctionOutput
@@ -39,6 +40,7 @@ async def run_workflow(
3940 context .callbacks ,
4041 tokenizer = tokenizer ,
4142 chunker = chunker ,
43+ prepend_metadata = config .chunks .prepend_metadata ,
4244 )
4345
4446 await write_table_to_storage (output , "text_units" , context .output_storage )
@@ -52,6 +54,7 @@ def create_base_text_units(
5254 callbacks : WorkflowCallbacks ,
5355 tokenizer : Tokenizer ,
5456 chunker : Chunker ,
57+ prepend_metadata : bool | None = False ,
5558) -> pd .DataFrame :
5659 """All the steps to transform base text_units."""
5760 documents .sort_values (by = ["id" ], ascending = [True ], inplace = True )
@@ -63,10 +66,12 @@ def create_base_text_units(
6366 logger .info ("Starting chunking process for %d documents" , total_rows )
6467
6568 def chunker_with_logging (row : pd .Series , row_index : int ) -> Any :
66- metadata = row .get ("metadata" )
67- if (metadata is not None ) and isinstance (metadata , str ):
68- metadata = json .loads (metadata )
69- row ["chunks" ] = chunker .chunk (row ["text" ], metadata = metadata )
69+ row ["chunks" ] = chunker .chunk (row ["text" ])
70+
71+ metadata = row .get ("metadata" , None )
72+ if prepend_metadata and metadata is not None :
73+ metadata = json .loads (metadata ) if isinstance (metadata , str ) else metadata
74+ row ["chunks" ] = [prepend_metadata_fn (chunk , metadata ) for chunk in row ["chunks" ]]
7075 tick ()
7176 logger .info ("chunker progress: %d/%d" , row_index + 1 , total_rows )
7277 return row
0 commit comments