Skip to content

Commit 39f55cd

Browse files
tosin2013claude
andcommitted
feat(orchestrator): Add intent parser fast path for deterministic routing
Integrate the intent parser as a fast path at the top of /orchestrator/chat so straightforward operations (deploy FreeIPA, list VMs, trigger DAGs) are handled deterministically without requiring the full PydanticAI agent pipeline or RAG documents. Key changes: - Add _try_intent_fast_path() in main.py that classifies and executes high-confidence requests via the intent parser, skipping Manager/Developer agents entirely - Add dynamic DAG registry (dag_registry.py) that scans airflow/dags/ at startup to build service-keyword-to-DAG mappings automatically — no code changes needed when new DAGs are added - Add HTTP API fallback in DAG handler for triggering DAGs when running outside the Airflow container (Airflow REST API at localhost:8888) - Unwrap @mcp.tool() FunctionTool wrappers in all 6 handler modules so backend functions are callable from the intent parser context - Mount intent_parser and mcp_server_fastmcp.py into the AI assistant container via deploy-qubinode.sh volume mounts - Create localhost_ssh Airflow connection during deployment for SSHOperator DAGs - Tune classifier confidence normalization (max_possible 10→5) so real matches reach actionable thresholds Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent 4a02fda commit 39f55cd

File tree

13 files changed

+555
-44
lines changed

13 files changed

+555
-44
lines changed

ai-assistant/src/main.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,80 @@ class OrchestratorResponse(BaseModel):
560560
code_changes: List[dict] = []
561561

562562

563+
async def _try_intent_fast_path(message: str) -> Optional[OrchestratorResponse]:
564+
"""
565+
Try to handle the request via the deterministic intent parser.
566+
Returns OrchestratorResponse if handled, None to fall through to agents.
567+
"""
568+
try:
569+
import sys as _sys
570+
if "/app" not in _sys.path:
571+
_sys.path.insert(0, "/app")
572+
from intent_parser import IntentParser, IntentCategory
573+
574+
parser = IntentParser()
575+
parsed = parser.classify(message)
576+
577+
# Only fast-path for high-confidence, actionable intents
578+
FAST_PATH_CATEGORIES = {
579+
IntentCategory.DAG_TRIGGER,
580+
IntentCategory.DAG_LIST,
581+
IntentCategory.DAG_INFO,
582+
IntentCategory.VM_LIST,
583+
IntentCategory.VM_INFO,
584+
IntentCategory.VM_CREATE,
585+
IntentCategory.VM_DELETE,
586+
IntentCategory.VM_PREFLIGHT,
587+
IntentCategory.SYSTEM_STATUS,
588+
IntentCategory.SYSTEM_INFO,
589+
IntentCategory.RAG_QUERY,
590+
IntentCategory.RAG_STATS,
591+
}
592+
593+
if parsed.confidence < 0.5 or parsed.category not in FAST_PATH_CATEGORIES:
594+
return None # Fall through to full orchestrator
595+
596+
# Execute via intent parser
597+
result = await parser.process(message)
598+
599+
return OrchestratorResponse(
600+
session_id=str(uuid.uuid4()),
601+
model_used="intent-parser (deterministic)",
602+
plan={
603+
"session_id": "intent-fast-path",
604+
"user_intent": message,
605+
"planned_tasks": [f"Execute {parsed.category.value}: {message}"],
606+
"estimated_confidence": parsed.confidence,
607+
"requires_external_docs": False,
608+
"required_providers": [],
609+
"escalation_triggers": [],
610+
},
611+
response_text=result.output if result.success else f"Error: {result.error}",
612+
confidence=parsed.confidence,
613+
escalation_needed=not result.success,
614+
escalation_reason=result.error if not result.success else None,
615+
planned_tasks=[f"Execute {parsed.category.value}: {message}"],
616+
required_providers=[],
617+
timestamp=time.time(),
618+
execution_performed=True,
619+
execution_success=result.success,
620+
tasks_executed=1,
621+
tasks_succeeded=1 if result.success else 0,
622+
tasks_failed=0 if result.success else 1,
623+
tasks_escalated=0,
624+
execution_confidence=parsed.confidence,
625+
execution_log_file=None,
626+
execution_summary=result.output,
627+
code_changes=[],
628+
)
629+
except ImportError:
630+
logger.debug("Intent parser not available, falling through to agents")
631+
return None
632+
except Exception as e:
633+
logger.warning(f"Intent parser fast path failed: {e}, falling through to agents")
634+
return None
635+
636+
563637
@app.post("/orchestrator/chat", response_model=OrchestratorResponse)
564638
async def orchestrator_chat(request: OrchestratorRequest):
565639
"""
@@ -610,6 +684,11 @@ async def orchestrator_chat(request: OrchestratorRequest):
610684
if not any([has_gemini, has_openrouter, has_anthropic, has_openai, has_ollama]):
611685
raise HTTPException(status_code=503, detail="No LLM API key configured. Set one of: GEMINI_API_KEY, OPENROUTER_API_KEY, ANTHROPIC_API_KEY, OPENAI_API_KEY, or OLLAMA_BASE_URL")
612686

687+
# Fast path: try intent parser for deterministic routing
688+
intent_result = await _try_intent_fast_path(request.message)
689+
if intent_result is not None:
690+
return intent_result
691+
613692
try:
614693
# Create or use existing session
615694
session_id = request.session_id or str(uuid.uuid4())

airflow/dags/freeipa_deployment.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ def decide_action(**context):
188188
-P numcpus=2 \
189189
-P disks=[50] \
190190
-P nets=[default] \
191-
--wait || {{
191+
--wait < /dev/null || {{
192192
echo "[ERROR] Failed to create VM"
193193
exit 1
194194
}}

intent_parser/classifier.py

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -149,19 +149,38 @@ def _build_rules() -> dict:
149149
"boost": 1,
150150
}
151151

152+
# Build DAG_TRIGGER keywords dynamically from discovered DAGs
153+
from .dag_registry import get_deploy_keywords
154+
_deploy_kws = get_deploy_keywords()
155+
156+
dag_trigger_keywords = [
157+
_kw("trigger", "dag"),
158+
_kw("run", "dag"),
159+
_kw("execute", "dag"),
160+
_kw("start", "dag"),
161+
_kw("trigger", "workflow"),
162+
_kw("run", "workflow"),
163+
]
164+
# Add "deploy <service>" keywords for every discovered DAG service
165+
for svc_kw in _deploy_kws:
166+
dag_trigger_keywords.append(_kw("deploy", svc_kw))
167+
168+
# Build pattern alternation from service keywords for regex matching
169+
# Only use keywords >= 4 chars to avoid false positives
170+
_svc_names = "|".join(re.escape(k) for k in _deploy_kws if len(k) >= 4)
171+
172+
dag_trigger_patterns = [
173+
re.compile(r"\b(?:trigger|run|execute|start)\s+(?:the\s+)?(?:dag|workflow)\s+\w+", re.I),
174+
re.compile(r"\b(?:trigger|run|execute|start)\s+(?:the\s+)?\w+\s+(?:dag|workflow)\b", re.I),
175+
]
176+
if _svc_names:
177+
dag_trigger_patterns.append(
178+
re.compile(rf"\bdeploy\s+(?:a\s+)?(?:new\s+)?(?:{_svc_names})\b", re.I)
179+
)
180+
152181
rules[IntentCategory.DAG_TRIGGER] = {
153-
"keywords": [
154-
_kw("trigger", "dag"),
155-
_kw("run", "dag"),
156-
_kw("execute", "dag"),
157-
_kw("start", "dag"),
158-
_kw("trigger", "workflow"),
159-
_kw("run", "workflow"),
160-
],
161-
"patterns": [
162-
re.compile(r"\b(?:trigger|run|execute|start)\s+(?:the\s+)?(?:dag|workflow)\s+\w+", re.I),
163-
re.compile(r"\b(?:trigger|run|execute|start)\s+(?:the\s+)?\w+\s+(?:dag|workflow)\b", re.I),
164-
],
182+
"keywords": dag_trigger_keywords,
183+
"patterns": dag_trigger_patterns,
165184
"boost": 0,
166185
}
167186

@@ -409,7 +428,8 @@ def classify(text: str) -> ParsedIntent:
409428
)
410429

411430
# Normalize confidence: map score to 0-1 range
412-
max_possible = 10.0
431+
# A strong match typically scores 3-5 points (keyword=1 + pattern=2 + boost)
432+
max_possible = 5.0
413433
confidence = min(best_score / max_possible, 1.0)
414434

415435
# Boost confidence if clear winner (big gap to second)

intent_parser/dag_registry.py

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
"""
2+
Dynamic DAG registry for intent classification and entity extraction.
3+
4+
Scans airflow/dags/ at import time and builds:
5+
1. Service keywords for the classifier (deploy <service> -> DAG_TRIGGER)
6+
2. Service-to-DAG-ID mapping for the entity extractor
7+
8+
This means adding a new DAG file with proper tags/description automatically
9+
makes it discoverable by the intent parser — no code changes needed.
10+
"""
11+
12+
import logging
13+
import os
14+
import re
15+
from pathlib import Path
16+
from typing import Dict, List, Set, Tuple
17+
18+
logger = logging.getLogger("intent-parser.dag-registry")
19+
20+
# Words that are too generic to use as service keywords
21+
_GENERIC_WORDS = frozenset({
22+
"qubinode", "infrastructure", "deployment", "deploy", "kcli-pipelines",
23+
"ocp4-disconnected-helper", "disconnected", "utility", "workflow",
24+
"master", "ci", "openlineage", "adr-0054", "adr-0055", "adr-0049",
25+
"enterprise", "kcli", "pipelines",
26+
})
27+
28+
# Minimum tag/keyword length to avoid noise
29+
_MIN_KEYWORD_LEN = 2
30+
31+
32+
def _find_dags_path() -> Path:
33+
"""Locate the airflow/dags directory."""
34+
# Try multiple known locations
35+
candidates = [
36+
os.getenv("AIRFLOW_DAGS_PATH", ""),
37+
"/app/airflow/dags", # Inside container
38+
str(Path(__file__).parent.parent / "airflow" / "dags"), # Relative to repo root
39+
"/opt/qubinode_navigator/airflow/dags",
40+
]
41+
for c in candidates:
42+
if c and Path(c).is_dir():
43+
return Path(c)
44+
return Path("/app/airflow/dags") # Default, may not exist
45+
46+
47+
def _parse_dag_metadata(file_path: Path) -> dict:
48+
"""Parse a DAG file and extract dag_id, tags, and description."""
49+
try:
50+
content = file_path.read_text()
51+
except Exception:
52+
return {}
53+
54+
dag_id_m = re.search(r'DAG\s*\(\s*["\']([^"\']+)["\']', content)
55+
if not dag_id_m:
56+
dag_id_m = re.search(r'dag_id\s*=\s*["\']([^"\']+)["\']', content)
57+
if not dag_id_m:
58+
return {}
59+
60+
dag_id = dag_id_m.group(1)
61+
62+
tags_m = re.search(r"tags\s*=\s*\[([^\]]+)\]", content)
63+
tags = []
64+
if tags_m:
65+
tags = re.findall(r'["\']([^"\']+)["\']', tags_m.group(1))
66+
67+
desc_m = re.search(r'description\s*=\s*["\']([^"\']+)["\']', content)
68+
description = desc_m.group(1) if desc_m else ""
69+
70+
return {
71+
"dag_id": dag_id,
72+
"tags": tags,
73+
"description": description,
74+
}
75+
76+
77+
def scan_dags() -> List[dict]:
78+
"""Scan all DAG files and return metadata list."""
79+
dags_path = _find_dags_path()
80+
if not dags_path.exists():
81+
logger.warning(f"DAGs path not found: {dags_path}")
82+
return []
83+
84+
skip_files = {"dag_factory.py", "dag_helpers.py", "dag_loader.py", "dag_logging_mixin.py"}
85+
results = []
86+
87+
for dag_file in sorted(dags_path.glob("*.py")):
88+
if dag_file.name.startswith("_") or dag_file.name in skip_files:
89+
continue
90+
meta = _parse_dag_metadata(dag_file)
91+
if meta:
92+
results.append(meta)
93+
94+
logger.info(f"DAG registry scanned {len(results)} DAGs from {dags_path}")
95+
return results
96+
97+
98+
def _extract_service_keywords(dag: dict) -> Set[str]:
99+
"""
100+
Extract meaningful service keywords from a DAG's tags and description.
101+
102+
These are words specific enough to identify this DAG when a user says
103+
"deploy <keyword>".
104+
"""
105+
keywords = set()
106+
107+
# From tags
108+
for tag in dag.get("tags", []):
109+
tag_clean = tag.lower().strip()
110+
if tag_clean not in _GENERIC_WORDS and len(tag_clean) >= _MIN_KEYWORD_LEN:
111+
keywords.add(tag_clean)
112+
113+
# From dag_id: split on _ and take meaningful parts
114+
dag_id = dag.get("dag_id", "")
115+
for part in dag_id.lower().split("_"):
116+
if part not in _GENERIC_WORDS and len(part) >= 3 and part not in {"the", "and", "for"}:
117+
keywords.add(part)
118+
119+
return keywords
120+
121+
122+
def build_service_dag_map() -> Dict[str, str]:
123+
"""
124+
Build a keyword -> dag_id mapping from all discovered DAGs.
125+
126+
Example output:
127+
{
128+
"freeipa": "freeipa_deployment",
129+
"identity": "freeipa_deployment",
130+
"harbor": "harbor_deployment",
131+
"vyos": "vyos_router_deployment",
132+
"router": "vyos_router_deployment",
133+
...
134+
}
135+
136+
Priority: keywords from dag_id parts take precedence over tag-only keywords.
137+
This ensures "freeipa" maps to freeipa_deployment (not dns_management which
138+
merely has "freeipa" as a tag).
139+
"""
140+
dags = scan_dags()
141+
mapping: Dict[str, str] = {}
142+
143+
# Two passes: first dag_id-derived keywords (strong signal), then tag keywords
144+
# Pass 1: keywords that appear in the dag_id itself
145+
for dag in sorted(dags, key=lambda d: d["dag_id"]):
146+
dag_id = dag["dag_id"]
147+
dag_id_parts = set()
148+
for part in dag_id.lower().split("_"):
149+
if part not in _GENERIC_WORDS and len(part) >= 3 and part not in {"the", "and", "for"}:
150+
dag_id_parts.add(part)
151+
152+
for kw in dag_id_parts:
153+
if kw not in mapping:
154+
mapping[kw] = dag_id
155+
156+
# Pass 2: keywords from tags (only if not already claimed by a dag_id match)
157+
for dag in sorted(dags, key=lambda d: d["dag_id"]):
158+
dag_id = dag["dag_id"]
159+
for tag in dag.get("tags", []):
160+
tag_clean = tag.lower().strip()
161+
if tag_clean not in _GENERIC_WORDS and len(tag_clean) >= _MIN_KEYWORD_LEN:
162+
if tag_clean not in mapping:
163+
mapping[tag_clean] = dag_id
164+
165+
logger.info(f"DAG registry built {len(mapping)} service keyword mappings")
166+
return mapping
167+
168+
169+
def build_deploy_keywords() -> List[str]:
170+
"""
171+
Build a list of service keywords that should trigger DAG_TRIGGER
172+
when preceded by "deploy".
173+
174+
Used by the classifier to add dynamic keywords like:
175+
_kw("deploy", "freeipa")
176+
_kw("deploy", "harbor")
177+
_kw("deploy", "vyos")
178+
"""
179+
service_map = build_service_dag_map()
180+
# Return unique keywords (some may map to the same DAG)
181+
return sorted(service_map.keys())
182+
183+
184+
# Module-level cache (built once at import time)
185+
_service_dag_map: Dict[str, str] = {}
186+
_deploy_keywords: List[str] = []
187+
188+
189+
def get_service_dag_map() -> Dict[str, str]:
190+
"""Get the cached service-to-DAG mapping."""
191+
global _service_dag_map
192+
if not _service_dag_map:
193+
_service_dag_map = build_service_dag_map()
194+
return _service_dag_map
195+
196+
197+
def get_deploy_keywords() -> List[str]:
198+
"""Get the cached list of deploy keywords."""
199+
global _deploy_keywords
200+
if not _deploy_keywords:
201+
_deploy_keywords = build_deploy_keywords()
202+
return _deploy_keywords

0 commit comments

Comments
 (0)