1- """
2- BaseGraph Module
3- """
4-
51import time
62import warnings
73from langchain_community .callbacks import get_openai_callback
84from typing import Tuple
95
6+ # Import telemetry functions
7+ from ..telemetry import log_graph_execution , log_event
108
119class BaseGraph :
1210 """
@@ -46,12 +44,12 @@ class BaseGraph:
4644 ... )
4745 """
4846
49- def __init__ (self , nodes : list , edges : list , entry_point : str , use_burr : bool = False , burr_config : dict = None ):
50-
47+ def __init__ (self , nodes : list , edges : list , entry_point : str , use_burr : bool = False , burr_config : dict = None , graph_name : str = "Custom" ):
5148 self .nodes = nodes
5249 self .raw_edges = edges
5350 self .edges = self ._create_edges ({e for e in edges })
5451 self .entry_point = entry_point .node_name
52+ self .graph_name = graph_name
5553 self .initial_state = {}
5654
5755 if nodes [0 ].node_name != entry_point .node_name :
@@ -103,12 +101,46 @@ def _execute_standard(self, initial_state: dict) -> Tuple[dict, list]:
103101 "total_cost_USD" : 0.0 ,
104102 }
105103
104+ start_time = time .time ()
105+ error_node = None
106+ source_type = None
107+ llm_model = None
108+ embedder_model = None
109+
106110 while current_node_name :
107111 curr_time = time .time ()
108112 current_node = next (node for node in self .nodes if node .node_name == current_node_name )
109113
114+ # check if there is a "source" key in the node config
115+ if current_node .__class__ .__name__ == "FetchNode" :
116+ # get the second key name of the state dictionary
117+ source_type = list (state .keys ())[1 ]
118+ # quick fix for local_dir source type
119+ if source_type == "local_dir" :
120+ source_type = "html_dir"
121+
122+ # check if there is an "llm_model" variable in the class
123+ if hasattr (current_node , "llm_model" ) and llm_model is None :
124+ llm_model = current_node .llm_model
125+ if hasattr (llm_model , "model_name" ):
126+ llm_model = llm_model .model_name
127+ elif hasattr (llm_model , "model" ):
128+ llm_model = llm_model .model
129+
130+ # check if there is an "embedder_model" variable in the class
131+ if hasattr (current_node , "embedder_model" ) and embedder_model is None :
132+ embedder_model = current_node .embedder_model
133+ if hasattr (embedder_model , "model_name" ):
134+ embedder_model = embedder_model .model_name
135+ elif hasattr (embedder_model , "model" ):
136+ embedder_model = embedder_model .model
137+
110138 with get_openai_callback () as cb :
111- result = current_node .execute (state )
139+ try :
140+ result = current_node .execute (state )
141+ except Exception as e :
142+ error_node = current_node .node_name
143+ raise e
112144 node_exec_time = time .time () - curr_time
113145 total_exec_time += node_exec_time
114146
@@ -147,6 +179,17 @@ def _execute_standard(self, initial_state: dict) -> Tuple[dict, list]:
147179 "exec_time" : total_exec_time ,
148180 })
149181
182+ # Log the graph execution telemetry
183+ graph_execution_time = time .time () - start_time
184+ log_graph_execution (
185+ graph_name = self .graph_name ,
186+ llm_model = llm_model ,
187+ embedder_model = embedder_model ,
188+ source_type = source_type ,
189+ execution_time = graph_execution_time ,
190+ error_node = error_node
191+ )
192+
150193 return state , exec_info
151194
152195 def execute (self , initial_state : dict ) -> Tuple [dict , list ]:
@@ -162,7 +205,6 @@ def execute(self, initial_state: dict) -> Tuple[dict, list]:
162205
163206 self .initial_state = initial_state
164207 if self .use_burr :
165-
166208 from ..integrations import BurrBridge
167209
168210 bridge = BurrBridge (self , self .burr_config )
@@ -190,4 +232,4 @@ def append_node(self, node):
190232 # add the node to the list of nodes
191233 self .nodes .append (node )
192234 # update the edges connecting the last node to the new node
193- self .edges = self ._create_edges ({e for e in self .raw_edges })
235+ self .edges = self ._create_edges ({e for e in self .raw_edges })
0 commit comments