Skip to content

Commit 412168d

Browse files
Merge pull request #9 from egehanyorulmaz/dev/improve_company_metadata
Refactor: Parallelize Company Search with a Dynamic Graph
2 parents 5919dfe + 4e2956d commit 412168d

File tree

6 files changed

+100
-51
lines changed

6 files changed

+100
-51
lines changed

src/core/agents/graph_builder.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,29 @@ def build_resume_analysis_graph():
2626

2727
# Use functools.partial to inject dependencies into the nodes
2828
workflow.add_node("parse_resume", partial(parse_resume_node, extractor=llm_extractor))
29-
workflow.add_node("company_research", partial(search_company_info_node, template_service=template_service, model_name="gpt-4o"))
29+
workflow.add_node("job_company_research", partial(search_company_info_node,
30+
template_service=template_service,
31+
branch="job_description",
32+
model_name="gpt-4.1-mini"))
33+
workflow.add_node("resume_company_research", partial(search_company_info_node,
34+
template_service=template_service,
35+
branch="resume",
36+
model_name="gpt-4.1-mini"))
37+
3038
workflow.add_node("parse_job_description", partial(parse_job_description_node, extractor=llm_extractor))
3139
workflow.add_node("experience_analyzer", partial(analyze_experience_node, extractor=llm_extractor))
32-
# Connect nodes sequentially
40+
41+
#### RESUME PATH ###
3342
workflow.add_edge(START, "parse_resume")
34-
workflow.add_edge("parse_resume", "company_research")
35-
workflow.add_edge("company_research", "parse_job_description")
36-
workflow.add_edge("parse_job_description", "experience_analyzer")
43+
workflow.add_edge("parse_resume", "resume_company_research")
44+
45+
#### JOB DESCRIPTION PATH ###
46+
workflow.add_edge(START, "parse_job_description")
47+
workflow.add_edge("parse_job_description", "job_company_research")
48+
49+
#### ANALYSIS PATH ###
50+
workflow.add_edge("resume_company_research", "experience_analyzer")
51+
workflow.add_edge("job_company_research", "experience_analyzer")
3752
workflow.add_edge("experience_analyzer", END)
3853

3954
# Compile and return the graph

src/core/agents/search_agents.py

Lines changed: 41 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@
44
from langchain_openai import ChatOpenAI
55

66
from pydantic import BaseModel, Field
7-
from typing import Type, Optional, Dict, Any, List
7+
from typing import Type, Optional, Dict, Any, List, Literal
8+
import logging
89

910
from src.core.ports.secondary.template_service import TemplateService
1011
from src.core.agents.utils.state import AgentState
11-
from src.core.domain.company_search import CompanySearchResponse
12+
from src.core.domain.company_search import CompanyInfo
1213

14+
logger = logging.getLogger("core.agents.search_agents")
1315

1416
def get_search_tool(max_results=3, time_range="month"):
1517
"""
@@ -30,7 +32,7 @@ def get_search_tool(max_results=3, time_range="month"):
3032
topic="general", # Use "general" for company information
3133
)
3234

33-
async def create_company_search_agent(response_format: Type[BaseModel],
35+
def create_company_search_agent(response_format: Type[BaseModel],
3436
template_service: TemplateService,
3537
model_name="gpt-3.5-turbo"):
3638
"""
@@ -65,25 +67,26 @@ async def create_company_search_agent(response_format: Type[BaseModel],
6567

6668
async def search_company_info(company_name: str,
6769
template_service: TemplateService,
68-
model_name="gpt-3.5-turbo") -> Optional[CompanySearchResponse]:
70+
model_name="gpt-3.5-turbo") -> Optional[CompanyInfo]:
6971
"""
7072
Search for up-to-date information about a company and return structured data.
7173
"""
74+
logger.info(f"Collecting more details on {company_name}")
7275
# TODO: Retry on OpenAI rate limit errors
7376
search_query = template_service.render_prompt(
7477
"prompts/company_search/search_query.j2",
75-
**{"company_name": company_name, "search_result_format": CompanySearchResponse.model_json_schema()}
78+
**{"company_name": company_name, "search_result_format": CompanyInfo.model_json_schema()}
7679
)
7780

7881
try:
79-
agent = await create_company_search_agent(CompanySearchResponse, template_service, model_name)
82+
agent = create_company_search_agent(CompanyInfo, template_service, model_name)
8083
response = await agent.ainvoke(
8184
{"messages": [{"role": "user", "content": search_query}]}
8285
)
8386

8487
if "structured_response" in response:
8588
result = response["structured_response"]
86-
return CompanySearchResponse.model_validate(result)
89+
return CompanyInfo.model_validate(result)
8790
else:
8891
print("No structured response found in agent output")
8992
return None
@@ -92,23 +95,39 @@ async def search_company_info(company_name: str,
9295
return None
9396

9497

95-
async def main():
98+
99+
100+
if __name__ == "__main__":
101+
import asyncio
96102
from src.core.domain.config import TemplateConfig
97103
from src.infrastructure.template.jinja_template_service import JinjaTemplateService
98104
template_config = TemplateConfig.development()
99105
template_service = JinjaTemplateService(config=template_config)
100106

101-
company_info = await search_company_info("Apple Inc.", template_service, model_name="gpt-4o")
102-
103-
if company_info:
104-
print("\nCompany Information:")
105-
print(f"Name: {company_info.company_name}")
106-
print(f"Industry: {company_info.company_industry}")
107-
print(f"Size: {company_info.company_size} employees")
108-
print(f"Revenue: {company_info.company_revenue}")
109-
print(f"Location: {company_info.company_location}")
110-
print(f"Website: {company_info.company_website}")
111-
print(f"Founded: {company_info.founded_year or 'Unknown'}")
112-
print(f"Description: {company_info.company_description}")
113-
else:
114-
print("Could not retrieve company information.")
107+
async def main():
108+
companies = ["Apple Inc.", "Microsoft", "Google", "A non-existent company",
109+
"NVIDIA", "Tesla", "Amazon", "Meta"]
110+
concurrency_limit = 5
111+
112+
tasks = [search_company_info(company_name=name, template_service=template_service,
113+
model_name="gpt-4.1-mini") for name in companies]
114+
results = await asyncio.gather(*tasks)
115+
116+
company_info_map = {info.name: info for info in results if info}
117+
118+
for company_name in companies:
119+
company_info = company_info_map.get(company_name)
120+
if company_info:
121+
print("\nCompany Information:")
122+
print(f" Name: {company_info.name}")
123+
print(f" Industry: {company_info.industry}")
124+
print(f" Size: {company_info.size} employees")
125+
print(f" Revenue: {company_info.revenue}")
126+
print(f" Location: {company_info.location}")
127+
print(f" Website: {company_info.website}")
128+
print(f" Founded: {company_info.founded_year or 'Unknown'}")
129+
print(f" Description: {company_info.description}")
130+
else:
131+
print(f"\nCould not retrieve company information for {company_name}.")
132+
133+
asyncio.run(main())

src/core/agents/utils/nodes.py

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,19 @@
11
import asyncio
2-
from typing import Dict, Optional
2+
from typing import Dict, Optional, Literal, List
33
from langsmith import traceable
4+
import logging
5+
46
from src.core.agents.utils.state import AgentState
57
from src.core.domain.resume import Resume
68
from src.core.domain.job_description import JobDescription
7-
from src.core.domain.company_search import CompanySearchResponse
9+
from src.core.domain.company_search import CompanyInfo
810
from src.infrastructure.components import llm_extractor
911
from src.core.ports.secondary.llm_extractor import LLMExtractor
1012
from src.core.ports.secondary.template_service import TemplateService
1113
from src.core.agents.search_agents import search_company_info
1214

15+
logger = logging.getLogger("core.agents.nodes")
16+
1317
@traceable(run_type="llm")
1418
async def parse_resume_node(state: AgentState, extractor: LLMExtractor):
1519
resume = await extractor.parse_document(
@@ -33,14 +37,20 @@ async def parse_job_description_node(state: AgentState, extractor: LLMExtractor)
3337
@traceable(run_type="tool")
3438
async def search_company_info_node(state: AgentState,
3539
template_service: TemplateService,
36-
model_name="gpt-4o") -> Dict[str, Optional[CompanySearchResponse]]:
37-
"""
38-
Search for information about a company and return structured data.
39-
"""
40-
company_names = [experience.company for experience in state["resume"].experiences]
41-
42-
tasks = [search_company_info(company, template_service, model_name) for company in company_names]
43-
results = await asyncio.gather(*tasks)
44-
company_info_list = [result for result in results if result is not None]
45-
print(f"Retrieved information for {len(company_info_list)} out of {len(company_names)} companies.")
46-
return {"company_info": company_info_list}
40+
branch: Literal["resume", "job_desription"] = "job_description",
41+
model_name="gpt-4.1-mini") -> Dict[str, Optional[CompanyInfo]]:
42+
if branch == "resume":
43+
companies = state["resume"].company_names
44+
elif branch == "job_description":
45+
companies = [state["job_description"].company_name]
46+
else:
47+
raise ValueError("This branch is not supported.")
48+
49+
tasks = [search_company_info(company_name=company_name,
50+
model_name=model_name,
51+
template_service=template_service) for company_name in companies]
52+
results = await asyncio.gather(*tasks, return_exceptions=True)
53+
results = {result.name: result for result in results if isinstance(result, CompanyInfo)}
54+
logger.info(f"Retrieved information for branch {branch} with length {len(results)} companies.")
55+
56+
return {f"{branch}_company_info": results}

src/core/agents/utils/state.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from typing import List, Optional, TypedDict, Dict
22

3-
from src.core.domain.company_search import CompanySearchResponse
3+
from src.core.domain.company_search import CompanyInfo
44
from src.core.domain.resume import Resume
55
from src.core.domain.job_description import JobDescription
66
from src.core.domain.resume_match import (
@@ -25,7 +25,8 @@ class AgentState(TypedDict):
2525
job_description: JobDescription
2626

2727
# Enriched data
28-
company_info: Optional[Dict[str, CompanySearchResponse]]
28+
resume_company_info: Optional[Dict[str, CompanyInfo]]
29+
job_description_company_info: Optional[Dict[str, CompanyInfo]]
2930

3031
# Analysis results (populated by agents)
3132
skill_matches: Optional[List[SkillMatch]]

src/core/domain/company_search.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
from pydantic import BaseModel, Field
22
from typing import Optional
33

4-
class CompanySearchResponse(BaseModel):
5-
company_name: str = Field(description="The name of the company")
6-
company_description: Optional[str] = Field(None, description="A description of the company")
7-
company_website: Optional[str] = Field(None, description="The website of the company")
8-
company_location: Optional[str] = Field(None, description="The location of the company")
9-
company_industry: Optional[str] = Field(None, description="The industry of the company")
10-
company_size: Optional[int] = Field(None, description="The approximate number of employees at the company")
11-
company_revenue: Optional[str] = Field(None, description="The annual revenue of the company")
4+
class CompanyInfo(BaseModel):
5+
name: str = Field(description="The name of the company")
6+
description: Optional[str] = Field(None, description="A description of the company")
7+
website: Optional[str] = Field(None, description="The website of the company")
8+
location: Optional[str] = Field(None, description="The location of the company")
9+
industry: Optional[str] = Field(None, description="The industry of the company")
10+
size: Optional[int] = Field(None, description="The approximate number of employees at the company")
11+
revenue: Optional[str] = Field(None, description="The annual revenue of the company")
1212
founded_year: Optional[int] = Field(None, description="The year the company was founded")
1313

1414

1515
if __name__ == "__main__":
16-
data_model = CompanySearchResponse.model_json_schema()
16+
data_model = CompanyInfo.model_json_schema()
1717
print(data_model)

src/core/domain/resume.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,3 +99,7 @@ def parse_raw_json(cls, json_str: str) -> "Resume":
9999
return cls.model_validate_json(json_str)
100100
except json.JSONDecodeError as e:
101101
raise ValueError(f"Invalid JSON string: {str(e)}")
102+
103+
@property
104+
def company_names(self) -> List[str]:
105+
return [experience.company for experience in self.experiences]

0 commit comments

Comments
 (0)