11"""
2- FetchNodelevelK Module
2+ FetchNodeLevelK Module
33"""
44from typing import List , Optional
55from .base_node import BaseNode
6+ from ..docloaders import ChromiumLoader
7+ from ..utils .cleanup_html import cleanup_html
8+ from ..utils .convert_to_md import convert_to_md
9+ from langchain_core .documents import Document
10+ from bs4 import BeautifulSoup
11+ from urllib .parse import quote , urljoin
612
7- class FetchNodelevelK (BaseNode ):
13+ class FetchNodeLevelK (BaseNode ):
814 """
9- A node responsible for compressing the input tokens and storing the document
10- in a vector database for retrieval. Relevant chunks are stored in the state.
11-
12- It allows scraping of big documents without exceeding the token limit of the language model .
15+ A node responsible for fetching the HTML content of a specified URL and all its sub-links
16+ recursively up to a certain level of hyperlink the graph. This content is then used to update
17+ the graph's state. It uses ChromiumLoader to fetch the content from a web page asynchronously
18+ (with proxy protection) .
1319
1420 Attributes:
1521 llm_model: An instance of a language model client, configured for generating answers.
@@ -27,16 +33,158 @@ def __init__(
2733 input : str ,
2834 output : List [str ],
2935 node_config : Optional [dict ] = None ,
30- node_name : str = "RAG " ,
36+ node_name : str = "FetchLevelK " ,
3137 ):
3238 super ().__init__ (node_name , "node" , input , output , 2 , node_config )
33-
34- self .llm_model = node_config ["llm_model" ]
39+
3540 self .embedder_model = node_config .get ("embedder_model" , None )
41+
3642 self .verbose = (
3743 False if node_config is None else node_config .get ("verbose" , False )
3844 )
45+
3946 self .cache_path = node_config .get ("cache_path" , False )
47+
48+ self .headless = (
49+ True if node_config is None else node_config .get ("headless" , True )
50+ )
51+
52+ self .loader_kwargs = (
53+ {} if node_config is None else node_config .get ("loader_kwargs" , {})
54+ )
55+
56+ self .browser_base = (
57+ None if node_config is None else node_config .get ("browser_base" , None )
58+ )
59+
60+ self .depth = (
61+ 1 if node_config is None else node_config .get ("depth" , 1 )
62+ )
63+
64+ self .only_inside_links = (
65+ False if node_config is None else node_config .get ("only_inside_links" , False )
66+ )
67+
68+ self .min_input_len = 1
4069
4170 def execute (self , state : dict ) -> dict :
42- pass
71+ """
72+ Executes the node's logic to fetch the HTML content of a specified URL and all its sub-links
73+ and update the graph's state with the content.
74+
75+ Args:
76+ state (dict): The current state of the graph. The input keys will be used
77+ to fetch the correct data types from the state.
78+
79+ Returns:
80+ dict: The updated state with a new output key containing the fetched HTML content.
81+
82+ Raises:
83+ KeyError: If the input key is not found in the state, indicating that the
84+ necessary information to perform the operation is missing.
85+ """
86+
87+ self .logger .info (f"--- Executing { self .node_name } Node ---" )
88+
89+ # Interpret input keys based on the provided input expression
90+ input_keys = self .get_input_keys (state )
91+ # Fetching data from the state based on the input keys
92+ input_data = [state [key ] for key in input_keys ]
93+
94+ source = input_data [0 ]
95+
96+ documents = [{"source" : source }]
97+
98+ loader_kwargs = {}
99+
100+ if self .node_config is not None :
101+ loader_kwargs = self .node_config .get ("loader_kwargs" , {})
102+
103+ for _ in range (self .depth ):
104+ documents = self .obtain_content (documents , loader_kwargs )
105+
106+ filtered_documents = [doc for doc in documents if 'document' in doc ]
107+
108+ state .update ({self .output [0 ]: filtered_documents })
109+
110+ return state
111+
112+ def fetch_content (self , source : str , loader_kwargs ) -> Optional [str ]:
113+ self .logger .info (f"--- (Fetching HTML from: { source } ) ---" )
114+
115+ if self .browser_base is not None :
116+ try :
117+ from ..docloaders .browser_base import browser_base_fetch
118+ except ImportError :
119+ raise ImportError ("""The browserbase module is not installed.
120+ Please install it using `pip install browserbase`.""" )
121+
122+ data = browser_base_fetch (self .browser_base .get ("api_key" ),
123+ self .browser_base .get ("project_id" ), [source ])
124+
125+ document = [Document (page_content = content ,
126+ metadata = {"source" : source }) for content in data ]
127+
128+ else :
129+ loader = ChromiumLoader ([source ], headless = self .headless , ** loader_kwargs )
130+
131+ document = loader .load ()
132+
133+ return document
134+
135+ def extract_links (self , html_content : str ) -> list :
136+ soup = BeautifulSoup (html_content , 'html.parser' )
137+ links = [link ['href' ] for link in soup .find_all ('a' , href = True )]
138+ self .logger .info (f"Extracted { len (links )} links." )
139+ return links
140+
141+ def get_full_links (self , base_url : str , links : list ) -> list :
142+ full_links = []
143+ for link in links :
144+ if self .only_inside_links and link .startswith ("http" ):
145+ continue
146+ full_link = link if link .startswith ("http" ) else urljoin (base_url , link )
147+ full_links .append (full_link )
148+ return full_links
149+
150+ def obtain_content (self , documents : List , loader_kwargs ) -> List :
151+ new_documents = []
152+ for doc in documents :
153+ source = doc ['source' ]
154+ if 'document' not in doc :
155+ document = self .fetch_content (source , loader_kwargs )
156+
157+ if not document or not document [0 ].page_content .strip ():
158+ self .logger .warning (f"Failed to fetch content for { source } " )
159+ documents .remove (doc )
160+ continue
161+
162+ #doc['document'] = document[0].page_content
163+ doc ['document' ] = document
164+
165+ links = self .extract_links (doc ['document' ][0 ].page_content )
166+ full_links = self .get_full_links (source , links )
167+
168+ # Check if the links are already present in other documents
169+ for link in full_links :
170+ # Check if any document is from the same link
171+ if not any (d .get ('source' , '' ) == link for d in documents ) and not any (d .get ('source' , '' ) == link for d in new_documents ):
172+ # Add the document
173+ new_documents .append ({"source" : link })
174+
175+ documents .extend (new_documents )
176+ return documents
177+
178+ def process_links (self , base_url : str , links : list , loader_kwargs , depth : int , current_depth : int = 1 ) -> dict :
179+ content_dict = {}
180+ for idx , link in enumerate (links , start = 1 ):
181+ full_link = link if link .startswith ("http" ) else urljoin (base_url , link )
182+ self .logger .info (f"Processing link { idx } : { full_link } " )
183+ link_content = self .fetch_content (full_link , loader_kwargs )
184+
185+ if current_depth < depth :
186+ new_links = self .extract_links (link_content )
187+ content_dict .update (self .process_links (full_link , new_links , depth , current_depth + 1 ))
188+ else :
189+ self .logger .warning (f"Failed to fetch content for { full_link } " )
190+ return content_dict
0 commit comments