Skip to content

Commit d515fb0

Browse files
committed
♻️ Refactor: Sort backend imports (unit test) #1037
1 parent b511115 commit d515fb0

File tree

9 files changed

+567
-484
lines changed

9 files changed

+567
-484
lines changed

backend/services/agent_service.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -729,8 +729,7 @@ def _memory_token(message_text: str) -> str:
729729
):
730730
yield data_chunk
731731
except Exception as run_exc:
732-
logger.error(
733-
f"Agent run error after memory failure: {str(run_exc)}")
732+
logger.error(f"Agent run error after memory failure: {str(run_exc)}")
734733
raise AgentRunException(f"Agent run error: {str(run_exc)}")
735734
except Exception as e:
736735
logger.error(f"Generate stream with memory error: {str(e)}")

backend/services/conversation_management_service.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,7 @@ def save_message(request: MessageRequest, authorization: Optional[str] = Header(
6969
# Validate conversation_id
7070
conversation_id = message_data.get('conversation_id')
7171
if not conversation_id:
72-
raise Exception(
73-
"conversation_id is required, please call /conversation/create to create a conversation first")
72+
raise Exception("conversation_id is required, please call /conversation/create to create a conversation first")
7473

7574
# Process different types of message units
7675
message_units = message_data['message']
@@ -303,8 +302,7 @@ def update_conversation_title(conversation_id: int, title: str, user_id: str = N
303302
"""
304303
success = rename_conversation(conversation_id, title, user_id)
305304
if not success:
306-
raise Exception(
307-
f"Conversation {conversation_id} does not exist or has been deleted")
305+
raise Exception(f"Conversation {conversation_id} does not exist or has been deleted")
308306
return success
309307

310308

@@ -357,8 +355,7 @@ def rename_conversation_service(conversation_id: int, name: str, user_id: str) -
357355
try:
358356
success = rename_conversation(conversation_id, name, user_id)
359357
if not success:
360-
raise Exception(
361-
f"Conversation {conversation_id} does not exist or has been deleted")
358+
raise Exception(f"Conversation {conversation_id} does not exist or has been deleted")
362359
return True
363360
except Exception as e:
364361
logging.error(f"Failed to rename conversation: {str(e)}")
@@ -379,8 +376,7 @@ def delete_conversation_service(conversation_id: int, user_id: str) -> bool:
379376
try:
380377
success = delete_conversation(conversation_id, user_id)
381378
if not success:
382-
raise Exception(
383-
f"Conversation {conversation_id} does not exist or has been deleted")
379+
raise Exception(f"Conversation {conversation_id} does not exist or has been deleted")
384380
return True
385381
except Exception as e:
386382
logging.error(f"Failed to delete conversation: {str(e)}")

backend/services/data_process_service.py

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,7 @@ def _init_redis_client(self):
6565
connection_pool=self.redis_pool)
6666
logger.info("Redis client initialized successfully.")
6767
else:
68-
logger.warning(
69-
"REDIS_BACKEND_URL not set, Redis client not initialized.")
68+
logger.warning("REDIS_BACKEND_URL not set, Redis client not initialized.")
7069
except Exception as e:
7170
logger.error(f"Failed to initialize Redis client: {str(e)}")
7271

@@ -83,8 +82,7 @@ def _init_clip_model(self):
8382
self.clip_available = True
8483
logger.info("CLIP model loaded successfully")
8584
except Exception as e:
86-
logger.warning(
87-
f"Failed to load CLIP model, size-only filtering will be used: {str(e)}")
85+
logger.warning(f"Failed to load CLIP model, size-only filtering will be used: {str(e)}")
8886
self.clip_available = False
8987

9088
async def start(self):
@@ -104,8 +102,7 @@ def _get_celery_inspector(self):
104102
if not celery_app.conf.broker_url or not celery_app.conf.result_backend:
105103
celery_app.conf.broker_url = REDIS_URL
106104
celery_app.conf.result_backend = REDIS_BACKEND_URL
107-
logger.warning(
108-
f"Celery broker URL is not configured properly, reconfiguring to {celery_app.conf.broker_url}")
105+
logger.warning(f"Celery broker URL is not configured properly, reconfiguring to {celery_app.conf.broker_url}")
109106
try:
110107
inspector = celery_app.control.inspect()
111108
inspector.ping()
@@ -179,10 +176,8 @@ def get_reserved():
179176
# Add to the set, duplicates will be handled
180177
task_ids.add(task_id)
181178
except Exception as redis_error:
182-
logger.warning(
183-
f"Failed to query Redis for stored task IDs: {str(redis_error)}")
184-
logger.debug(
185-
f"Total unique task IDs collected (inspector + Redis): {len(task_ids)}")
179+
logger.warning(f"Failed to query Redis for stored task IDs: {str(redis_error)}")
180+
logger.debug(f"Total unique task IDs collected (inspector + Redis): {len(task_ids)}")
186181
tasks = [get_task_info(task_id) for task_id in task_ids]
187182
all_task_infos = await asyncio.gather(*tasks, return_exceptions=True)
188183
for task_info in all_task_infos:

backend/services/model_health_service.py

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@
1818

1919

2020
async def _embedding_dimension_check(
21-
model_name: str,
22-
model_type: str,
23-
model_base_url: str,
24-
model_api_key: str):
21+
model_name: str,
22+
model_type: str,
23+
model_base_url: str,
24+
model_api_key: str):
2525

2626
# Test connectivity based on different model types
2727
if model_type == "embedding":
28-
embedding = await OpenAICompatibleEmbedding(
28+
embedding =await OpenAICompatibleEmbedding(
2929
model_name=model_name,
3030
base_url=model_base_url,
3131
api_key=model_api_key,
@@ -34,7 +34,7 @@ async def _embedding_dimension_check(
3434
if len(embedding) > 0:
3535
return len(embedding[0])
3636
elif model_type == "multi_embedding":
37-
embedding = await JinaEmbedding(
37+
embedding =await JinaEmbedding(
3838
model_name=model_name,
3939
base_url=model_base_url,
4040
api_key=model_api_key,
@@ -65,8 +65,7 @@ async def _perform_connectivity_check(
6565
bool: Connectivity check result
6666
"""
6767
if "localhost" in model_base_url or "127.0.0.1" in model_base_url:
68-
model_base_url = model_base_url.replace(
69-
"localhost", "host.docker.internal").replace("127.0.0.1", "host.docker.internal")
68+
model_base_url = model_base_url.replace("localhost", "host.docker.internal").replace("127.0.0.1", "host.docker.internal")
7069

7170
connectivity: bool
7271

@@ -139,19 +138,16 @@ async def check_model_connectivity(display_name: str, authorization: Optional[st
139138
model_name, model_type, model_base_url, model_api_key
140139
)
141140
except Exception as e:
142-
update_data = {
143-
"connect_status": ModelConnectStatusEnum.UNAVAILABLE.value}
141+
update_data = {"connect_status": ModelConnectStatusEnum.UNAVAILABLE.value}
144142
logger.error(f"Error checking model connectivity: {str(e)}")
145143
update_model_record(model["model_id"], update_data)
146144
return ModelResponse(code=400, message=str(e),
147145
data={"connectivity": False, "connect_status": ModelConnectStatusEnum.UNAVAILABLE.value})
148146

149147
if connectivity:
150-
logger.info(
151-
f"CONNECTED: {model_name}; Base URL: {model.get('base_url')}; API Key: {model.get('api_key')}")
148+
logger.info(f"CONNECTED: {model_name}; Base URL: {model.get('base_url')}; API Key: {model.get('api_key')}")
152149
else:
153-
logger.warning(
154-
f"UNCONNECTED: {model_name}; Base URL: {model.get('base_url')}; API Key: {model.get('api_key')}")
150+
logger.warning(f"UNCONNECTED: {model_name}; Base URL: {model.get('base_url')}; API Key: {model.get('api_key')}")
155151
connect_status = ModelConnectStatusEnum.AVAILABLE.value if connectivity else ModelConnectStatusEnum.UNAVAILABLE.value
156152
update_data = {"connect_status": connect_status}
157153
update_model_record(model["model_id"], update_data)
@@ -160,8 +156,7 @@ async def check_model_connectivity(display_name: str, authorization: Optional[st
160156
except Exception as e:
161157
logger.error(f"Error checking model connectivity: {str(e)}")
162158
if 'model' in locals() and model:
163-
update_data = {
164-
"connect_status": ModelConnectStatusEnum.UNAVAILABLE.value}
159+
update_data = {"connect_status": ModelConnectStatusEnum.UNAVAILABLE.value}
165160
update_model_record(model["model_id"], update_data)
166161
return ModelResponse(code=500, message=f"Connectivity test error: {str(e)}",
167162
data={"connectivity": False, "connect_status": ModelConnectStatusEnum.UNAVAILABLE.value})
@@ -177,8 +172,7 @@ async def check_me_model_connectivity(model_name: str):
177172
result = response.json()['data']
178173

179174
# Find model
180-
model_data = next(
181-
(item for item in result if item['id'] == model_name), None)
175+
model_data = next((item for item in result if item['id'] == model_name), None)
182176
if not model_data:
183177
return ModelResponse(code=404, message="Specified model not found",
184178
data={"connectivity": False, "message": "Specified model not found", "connect_status": ""})
@@ -187,8 +181,7 @@ async def check_me_model_connectivity(model_name: str):
187181

188182
# Test model based on type
189183
if model_type == 'llm':
190-
payload = {"model": model_name, "messages": [
191-
{"role": "user", "content": "hello"}]}
184+
payload = {"model": model_name, "messages": [{"role": "user", "content": "hello"}]}
192185
api_response = await client.post(
193186
f"{MODEL_ENGINE_HOST}/open/router/v1/chat/completions",
194187
headers=headers,
@@ -238,17 +231,15 @@ async def verify_model_config_connectivity(model_config: dict):
238231
model_type = model_config["model_type"]
239232
model_base_url = model_config["base_url"]
240233
model_api_key = model_config["api_key"]
241-
embedding_dim = model_config.get(
242-
"embedding_dim", model_config.get("max_tokens", 1024))
234+
embedding_dim = model_config.get("embedding_dim", model_config.get("max_tokens", 1024))
243235

244236
try:
245237
# Use the common connectivity check function
246238
connectivity = await _perform_connectivity_check(
247239
model_name, model_type, model_base_url, model_api_key, embedding_dim
248240
)
249241
except ValueError as e:
250-
logger.warning(
251-
f"UNCONNECTED: {model_name}; Base URL: {model_base_url}; API Key: {model_api_key}; Error: {str(e)}")
242+
logger.warning(f"UNCONNECTED: {model_name}; Base URL: {model_base_url}; API Key: {model_api_key}; Error: {str(e)}")
252243
return ModelResponse(
253244
code=400,
254245
message=str(e),
@@ -275,8 +266,7 @@ async def verify_model_config_connectivity(model_config: dict):
275266
)
276267
except Exception as e:
277268
error_message = str(e)
278-
logger.warning(
279-
f"UNCONNECTED: {model_name}; Base URL: {model_base_url}; API Key: {model_api_key}; Error: {error_message}")
269+
logger.warning(f"UNCONNECTED: {model_name}; Base URL: {model_base_url}; API Key: {model_api_key}; Error: {error_message}")
280270
return ModelResponse(
281271
code=500,
282272
message="",

backend/services/northbound_service.py

Lines changed: 18 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,11 @@ async def idempotency_start(key: str, ttl_seconds: Optional[int] = None) -> None
6767
async with _IDEMPOTENCY_LOCK:
6868
# purge expired
6969
now = _now_seconds()
70-
expired = [k for k, v in _IDEMPOTENCY_RUNNING.items(
71-
) if now - v > (ttl_seconds or _IDEMPOTENCY_TTL_SECONDS_DEFAULT)]
70+
expired = [k for k, v in _IDEMPOTENCY_RUNNING.items() if now - v > (ttl_seconds or _IDEMPOTENCY_TTL_SECONDS_DEFAULT)]
7271
for k in expired:
7372
_IDEMPOTENCY_RUNNING.pop(k, None)
7473
if key in _IDEMPOTENCY_RUNNING:
75-
raise LimitExceededError(
76-
"Duplicate request is still running, please wait.")
74+
raise LimitExceededError("Duplicate request is still running, please wait.")
7775
_IDEMPOTENCY_RUNNING[key] = now
7876

7977

@@ -93,8 +91,7 @@ async def check_and_consume_rate_limit(tenant_id: str) -> None:
9391
state = _RATE_STATE.setdefault(tenant_id, {})
9492
count = state.get(bucket, 0)
9593
if count >= _RATE_LIMIT_PER_MINUTE:
96-
raise LimitExceededError(
97-
"Query rate exceeded limit. Please try again later")
94+
raise LimitExceededError("Query rate exceeded limit. Please try again later")
9895
state[bucket] = count + 1
9996
# cleanup old buckets, keep only current
10097
for b in list(state.keys()):
@@ -123,20 +120,17 @@ def _build_idempotency_key(*parts: Any) -> str:
123120
async def to_external_conversation_id(internal_id: int) -> str:
124121
if not internal_id:
125122
raise Exception("invalid internal conversation id")
126-
external_id = get_external_id_by_internal(
127-
internal_id=internal_id, mapping_type="CONVERSATION")
123+
external_id = get_external_id_by_internal(internal_id=internal_id, mapping_type="CONVERSATION")
128124
if not external_id:
129-
logger.error(
130-
f"cannot find external id for conversation_id: {internal_id}")
125+
logger.error(f"cannot find external id for conversation_id: {internal_id}")
131126
raise Exception("cannot find external id")
132127
return external_id
133128

134129

135130
async def to_internal_conversation_id(external_id: str) -> int:
136131
if not external_id:
137132
raise Exception("invalid external conversation id")
138-
internal_id = get_internal_id_by_external(
139-
external_id=external_id, mapping_type="CONVERSATION")
133+
internal_id = get_internal_id_by_external(external_id=external_id, mapping_type="CONVERSATION")
140134
return internal_id
141135

142136

@@ -147,8 +141,7 @@ async def get_agent_info_by_name(agent_name: str, tenant_id: str) -> int:
147141
try:
148142
return await get_agent_id_by_name(agent_name=agent_name, tenant_id=tenant_id)
149143
except Exception as _:
150-
raise Exception(
151-
f"Failed to get agent id for agent_name: {agent_name} in tenant_id: {tenant_id}")
144+
raise Exception(f"Failed to get agent id for agent_name: {agent_name} in tenant_id: {tenant_id}")
152145

153146

154147
async def start_streaming_chat(
@@ -166,22 +159,18 @@ async def start_streaming_chat(
166159
internal_conversation_id = await to_internal_conversation_id(external_conversation_id)
167160
# Add mapping to postgres database
168161
if internal_conversation_id is None:
169-
logging.info(
170-
f"Conversation {external_conversation_id} not found, creating a new conversation")
162+
logging.info(f"Conversation {external_conversation_id} not found, creating a new conversation")
171163
# Create a new conversation and get its internal ID
172-
new_conversation = create_new_conversation(
173-
title="New Conversation", user_id=ctx.user_id)
164+
new_conversation = create_new_conversation(title="New Conversation", user_id=ctx.user_id)
174165
internal_conversation_id = new_conversation["conversation_id"]
175166
# Add the new mapping to the database
176-
add_mapping_id(internal_id=internal_conversation_id,
177-
external_id=external_conversation_id, tenant_id=ctx.tenant_id, user_id=ctx.user_id)
167+
add_mapping_id(internal_id=internal_conversation_id, external_id=external_conversation_id, tenant_id=ctx.tenant_id, user_id=ctx.user_id)
178168

179169
# Get history according to internal_conversation_id
180170
history = await get_conversation_history(ctx, external_conversation_id)
181171
agent_id = await get_agent_id_by_name(agent_name=agent_name, tenant_id=ctx.tenant_id)
182172
# Idempotency: only prevent concurrent duplicate starts
183-
composed_key = idempotency_key or _build_idempotency_key(
184-
ctx.tenant_id, external_conversation_id, agent_id, query)
173+
composed_key = idempotency_key or _build_idempotency_key(ctx.tenant_id, external_conversation_id, agent_id, query)
185174
await idempotency_start(composed_key)
186175
agent_request = AgentRequest(
187176
conversation_id=internal_conversation_id,
@@ -193,13 +182,11 @@ async def start_streaming_chat(
193182
)
194183

195184
except LimitExceededError as _:
196-
raise LimitExceededError(
197-
"Query rate exceeded limit. Please try again later.")
185+
raise LimitExceededError("Query rate exceeded limit. Please try again later.")
198186
except UnauthorizedError as _:
199187
raise UnauthorizedError("Cannot authenticate.")
200188
except Exception as e:
201-
raise Exception(
202-
f"Failed to start streaming chat for external conversation id {external_conversation_id}: {str(e)}")
189+
raise Exception(f"Failed to start streaming chat for external conversation id {external_conversation_id}: {str(e)}")
203190

204191
try:
205192
response = await run_agent_stream(
@@ -226,8 +213,7 @@ async def stop_chat(ctx: NorthboundContext, external_conversation_id: str) -> Di
226213
stop_result = stop_agent_tasks(internal_id)
227214
return {"message": stop_result.get("message", "success"), "data": external_conversation_id, "requestId": ctx.request_id}
228215
except Exception as e:
229-
raise Exception(
230-
f"Failed to stop chat for external conversation id {external_conversation_id}: {str(e)}")
216+
raise Exception(f"Failed to stop chat for external conversation id {external_conversation_id}: {str(e)}")
231217

232218

233219
async def list_conversations(ctx: NorthboundContext) -> Dict[str, Any]:
@@ -266,8 +252,7 @@ async def get_agent_info_list(ctx: NorthboundContext) -> Dict[str, Any]:
266252
agent_info.pop("agent_id", None)
267253
return {"message": "success", "data": agent_info_list, "requestId": ctx.request_id}
268254
except Exception as e:
269-
raise Exception(
270-
f"Failed to get agent info list for tenant {ctx.tenant_id}: {str(e)}")
255+
raise Exception(f"Failed to get agent info list for tenant {ctx.tenant_id}: {str(e)}")
271256

272257

273258
async def update_conversation_title(ctx: NorthboundContext, external_conversation_id: str, title: str, idempotency_key: Optional[str] = None) -> Dict[str, Any]:
@@ -276,8 +261,7 @@ async def update_conversation_title(ctx: NorthboundContext, external_conversatio
276261
internal_id = await to_internal_conversation_id(external_conversation_id)
277262

278263
# Idempotency: avoid concurrent duplicate title update for same conversation
279-
composed_key = idempotency_key or _build_idempotency_key(
280-
ctx.tenant_id, external_conversation_id, title)
264+
composed_key = idempotency_key or _build_idempotency_key(ctx.tenant_id, external_conversation_id, title)
281265
await idempotency_start(composed_key)
282266

283267
update_conversation_title_service(internal_id, title, ctx.user_id)
@@ -288,11 +272,9 @@ async def update_conversation_title(ctx: NorthboundContext, external_conversatio
288272
"idempotency_key": composed_key,
289273
}
290274
except LimitExceededError as _:
291-
raise LimitExceededError(
292-
"Duplicate request is still running, please wait.")
275+
raise LimitExceededError("Duplicate request is still running, please wait.")
293276
except Exception as e:
294-
raise Exception(
295-
f"Failed to update conversation title for external conversation id {external_conversation_id}: {str(e)}")
277+
raise Exception(f"Failed to update conversation title for external conversation id {external_conversation_id}: {str(e)}")
296278
finally:
297279
if composed_key:
298280
asyncio.create_task(_release_idempotency_after_delay(composed_key))

0 commit comments

Comments
 (0)