1+ import logging
2+ from http import HTTPStatus
3+ from http .client import HTTPException
4+ from typing import Optional , Dict
5+ import uuid
6+
7+ from fastapi import APIRouter , Body , Header , Request
8+ from fastapi .responses import JSONResponse
9+
10+ from consts .exceptions import UnauthorizedError , LimitExceededError , SignatureValidationError
11+ from services .northbound_service import (
12+ NorthboundContext ,
13+ get_conversation_history ,
14+ list_conversations ,
15+ start_streaming_chat ,
16+ stop_chat ,
17+ get_agent_info_list ,
18+ update_conversation_title
19+ )
20+
21+ from utils .auth_utils import get_current_user_id , validate_aksk_authentication
22+
23+
24+ router = APIRouter (prefix = "/nb/v1" , tags = ["northbound" ])
25+
26+
27+ def _get_header (headers : Dict [str , str ], name : str ) -> Optional [str ]:
28+ for k , v in headers .items ():
29+ if k .lower () == name .lower ():
30+ return v
31+ return None
32+
33+
34+ async def _parse_northbound_context (request : Request ) -> NorthboundContext :
35+ """
36+ Build northbound context from headers.
37+
38+ - X-Access-Key: Access key for AK/SK authentication
39+ - X-Timestamp: Timestamp for signature validation
40+ - X-Signature: HMAC-SHA256 signature signed with secret key
41+ - Authorization: Bearer <jwt>, jwt contains sub (user_id)
42+ - X-Request-Id: optional, generated if not provided
43+ """
44+ # 1. Verify AK/SK signature
45+ try :
46+ # Get request body for signature verification
47+ request_body = ""
48+ if request .method in ["POST" , "PUT" , "PATCH" ]:
49+ try :
50+ body_bytes = await request .body ()
51+ request_body = body_bytes .decode ('utf-8' ) if body_bytes else ""
52+ except Exception as e :
53+ logging .warning (f"Cannot read request body for signature verification: { e } " )
54+ request_body = ""
55+
56+ validate_aksk_authentication (request .headers , request_body )
57+ except (UnauthorizedError , LimitExceededError , SignatureValidationError ) as e :
58+ raise e
59+ except Exception :
60+ raise HTTPException (status_code = HTTPStatus .INTERNAL_SERVER_ERROR , detail = "Internal Server Error: cannot parse northbound context" )
61+
62+
63+ # 2. Parse JWT token
64+ auth_header = _get_header (request .headers , "Authorization" )
65+ if not auth_header :
66+ raise HTTPException (status_code = HTTPStatus .UNAUTHORIZED , detail = "Unauthorized: No authorization header found" )
67+
68+ # Use auth_utils to parse JWT token
69+ try :
70+ user_id , tenant_id = get_current_user_id (auth_header )
71+
72+ if not user_id :
73+ raise HTTPException (status_code = HTTPStatus .UNAUTHORIZED , detail = "Unauthorized: missing user_id in JWT token" )
74+ if not tenant_id :
75+ raise HTTPException (status_code = HTTPStatus .UNAUTHORIZED , detail = "Unauthorized: unregistered user_id in JWT token" )
76+
77+ except Exception :
78+ raise HTTPException (status_code = HTTPStatus .INTERNAL_SERVER_ERROR , detail = "Internal Server Error: cannot parse JWT token" )
79+
80+ request_id = _get_header (request .headers , "X-Request-Id" ) or str (uuid .uuid4 ())
81+
82+ return NorthboundContext (
83+ request_id = request_id ,
84+ tenant_id = tenant_id ,
85+ user_id = str (user_id ),
86+ authorization = auth_header ,
87+ )
88+
89+
90+ @router .get ("/health" )
91+ async def health_check ():
92+ return {"status" : "healthy" , "service" : "northbound-api" }
93+
94+
95+ @router .post ("/chat/run" )
96+ async def run_chat (
97+ request : Request ,
98+ conversation_id : str = Body (..., embed = True ),
99+ agent_name : str = Body (..., embed = True ),
100+ query : str = Body (..., embed = True ),
101+ idempotency_key : Optional [str ] = Header (None , alias = "Idempotency-Key" ),
102+ ):
103+ try :
104+ ctx : NorthboundContext = await _parse_northbound_context (request )
105+ return await start_streaming_chat (
106+ ctx = ctx ,
107+ external_conversation_id = conversation_id ,
108+ agent_name = agent_name ,
109+ query = query ,
110+ idempotency_key = idempotency_key ,
111+ )
112+ except UnauthorizedError :
113+ raise HTTPException (status_code = HTTPStatus .UNAUTHORIZED , detail = "Unauthorized: AK/SK authentication failed" )
114+ except LimitExceededError :
115+ raise HTTPException (status_code = HTTPStatus .TOO_MANY_REQUESTS , detail = "Too Many Requests: rate limit exceeded" )
116+ except SignatureValidationError :
117+ raise HTTPException (status_code = HTTPStatus .UNAUTHORIZED , detail = "Unauthorized: invalid signature" )
118+ except Exception :
119+ raise HTTPException (status_code = HTTPStatus .INTERNAL_SERVER_ERROR , detail = "Internal Server Error" )
120+
121+
122+
123+ @router .get ("/chat/stop/{conversation_id}" )
124+ async def stop_chat_stream (request : Request , conversation_id : str ):
125+ try :
126+ ctx : NorthboundContext = await _parse_northbound_context (request )
127+ return await stop_chat (ctx = ctx , external_conversation_id = conversation_id )
128+ except UnauthorizedError :
129+ raise HTTPException (status_code = HTTPStatus .UNAUTHORIZED , detail = "Unauthorized: AK/SK authentication failed" )
130+ except LimitExceededError :
131+ raise HTTPException (status_code = HTTPStatus .TOO_MANY_REQUESTS , detail = "Too Many Requests: rate limit exceeded" )
132+ except SignatureValidationError :
133+ raise HTTPException (status_code = HTTPStatus .UNAUTHORIZED , detail = "Unauthorized: invalid signature" )
134+ except Exception :
135+ raise HTTPException (status_code = HTTPStatus .INTERNAL_SERVER_ERROR , detail = "Internal Server Error" )
136+
137+
138+ @router .get ("/conversations/{conversation_id}" )
139+ async def get_history (request : Request , conversation_id : str ):
140+ try :
141+ ctx : NorthboundContext = await _parse_northbound_context (request )
142+ return await get_conversation_history (ctx = ctx , external_conversation_id = conversation_id )
143+ except UnauthorizedError :
144+ raise HTTPException (status_code = HTTPStatus .UNAUTHORIZED , detail = "Unauthorized: AK/SK authentication failed" )
145+ except LimitExceededError :
146+ raise HTTPException (status_code = HTTPStatus .TOO_MANY_REQUESTS , detail = "Too Many Requests: rate limit exceeded" )
147+ except SignatureValidationError :
148+ raise HTTPException (status_code = HTTPStatus .UNAUTHORIZED , detail = "Unauthorized: invalid signature" )
149+ except Exception :
150+ raise HTTPException (status_code = HTTPStatus .INTERNAL_SERVER_ERROR , detail = "Internal Server Error" )
151+
152+
153+ @router .get ("/agents" )
154+ async def list_agents (request : Request ):
155+ try :
156+ ctx : NorthboundContext = await _parse_northbound_context (request )
157+ return await get_agent_info_list (ctx = ctx )
158+ except UnauthorizedError :
159+ raise HTTPException (status_code = HTTPStatus .UNAUTHORIZED , detail = "Unauthorized: AK/SK authentication failed" )
160+ except LimitExceededError :
161+ raise HTTPException (status_code = HTTPStatus .TOO_MANY_REQUESTS , detail = "Too Many Requests: rate limit exceeded" )
162+ except SignatureValidationError :
163+ raise HTTPException (status_code = HTTPStatus .UNAUTHORIZED , detail = "Unauthorized: invalid signature" )
164+ except Exception :
165+ raise HTTPException (status_code = HTTPStatus .INTERNAL_SERVER_ERROR , detail = "Internal Server Error" )
166+
167+
168+ @router .get ("/conversations" )
169+ async def list_convs (request : Request ):
170+ try :
171+ ctx : NorthboundContext = await _parse_northbound_context (request )
172+ return await list_conversations (ctx = ctx )
173+ except UnauthorizedError :
174+ raise HTTPException (status_code = HTTPStatus .UNAUTHORIZED , detail = "Unauthorized: AK/SK authentication failed" )
175+ except LimitExceededError :
176+ raise HTTPException (status_code = HTTPStatus .TOO_MANY_REQUESTS , detail = "Too Many Requests: rate limit exceeded" )
177+ except SignatureValidationError :
178+ raise HTTPException (status_code = HTTPStatus .UNAUTHORIZED , detail = "Unauthorized: invalid signature" )
179+ except Exception :
180+ raise HTTPException (status_code = HTTPStatus .INTERNAL_SERVER_ERROR , detail = "Internal Server Error" )
181+
182+
183+ @router .put ("/conversations/{conversation_id}/title" )
184+ async def update_convs_title (
185+ request : Request ,
186+ conversation_id : str ,
187+ title : str ,
188+ idempotency_key : Optional [str ] = Header (None , alias = "Idempotency-Key" ),
189+ ):
190+ try :
191+ ctx : NorthboundContext = await _parse_northbound_context (request )
192+ result = await update_conversation_title (
193+ ctx = ctx ,
194+ external_conversation_id = conversation_id ,
195+ title = title ,
196+ idempotency_key = idempotency_key ,
197+ )
198+ headers_out = {"Idempotency-Key" : result .get ("idempotency_key" , "" ), "X-Request-Id" : ctx .request_id }
199+ return JSONResponse (content = result , headers = headers_out )
200+
201+ except UnauthorizedError :
202+ raise HTTPException (status_code = HTTPStatus .UNAUTHORIZED , detail = "Unauthorized: AK/SK authentication failed" )
203+ except LimitExceededError :
204+ raise HTTPException (status_code = HTTPStatus .TOO_MANY_REQUESTS , detail = "Too Many Requests: rate limit exceeded" )
205+ except SignatureValidationError :
206+ raise HTTPException (status_code = HTTPStatus .UNAUTHORIZED , detail = "Unauthorized: invalid signature" )
207+ except Exception :
208+ raise HTTPException (status_code = HTTPStatus .INTERNAL_SERVER_ERROR , detail = "Internal Server Error" )
0 commit comments