Skip to content

Commit 95ca472

Browse files
authored
Merge pull request #4 from Dembrane/feature/echo-407-lets-retry-with-backoff-for-network-requests-in-the-views
ECHO-407-lets-retry-with-backoff-for-network-requests-in-the-views
2 parents 850346d + b32e934 commit 95ca472

File tree

7 files changed

+335
-102
lines changed

7 files changed

+335
-102
lines changed

core/topic_modeling.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
import os
22
from typing import List, Optional
3+
34
import torch
4-
import pandas as pd
55
from umap import UMAP
6+
from runpod import RunPodLogger
67
from hdbscan import HDBSCAN
78
from bertopic import BERTopic
89
from bertopic.vectorizers import ClassTfidfTransformer
910
from sentence_transformers import SentenceTransformer
1011
from sklearn.feature_extraction.text import CountVectorizer
11-
from runpod import RunPodLogger
1212

1313
logger = RunPodLogger()
1414

integrations/azure_client.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@
22
import json
33
import asyncio
44
from typing import Dict, List
5-
from pydantic import BaseModel
6-
from litellm import completion
5+
76
from runpod import RunPodLogger
7+
from litellm import completion
8+
from pydantic import BaseModel
89

910
logger = RunPodLogger()
1011

integrations/rag_client.py

Lines changed: 83 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,45 @@
11
import os
22
from typing import List, Optional
3-
import requests
3+
44
import aiohttp
5+
import requests
56
from runpod import RunPodLogger
7+
from utils.retry import retry_with_backoff, async_retry_with_backoff
8+
69
from integrations.directus_client import get_directus_token
710

811
logger = RunPodLogger()
912

1013

14+
def _make_rag_request(url: str, payload: dict, headers: dict) -> str:
15+
"""
16+
Helper function to make the actual RAG API request.
17+
18+
Args:
19+
url: The API endpoint URL
20+
payload: The request payload
21+
headers: The request headers
22+
23+
Returns:
24+
str: RAG prompt string from the server
25+
26+
Raises:
27+
Exception: If the API call fails
28+
"""
29+
logger.debug(f"Making RAG API request to {url}")
30+
response = requests.post(url, json=payload, headers=headers, timeout=120)
31+
response.raise_for_status()
32+
33+
result = response.text
34+
logger.debug("Successfully retrieved RAG prompt")
35+
return result
36+
37+
1138
def get_rag_prompt(
1239
query: str, segment_ids: Optional[List[str]] = None, rag_server_url: Optional[str] = None
1340
) -> str:
1441
"""
15-
Retrieve RAG prompt by calling the external RAG server API.
42+
Retrieve RAG prompt by calling the external RAG server API with retry logic.
1643
1744
Args:
1845
query: The query string to send to the RAG server
@@ -24,7 +51,7 @@ def get_rag_prompt(
2451
2552
Raises:
2653
ValueError: If RAG_SERVER_URL is not set and no URL is provided
27-
Exception: If the API call fails
54+
Exception: If the API call fails after all retries
2855
"""
2956
if rag_server_url is None:
3057
rag_server_url = os.getenv("RAG_SERVER_URL")
@@ -48,28 +75,58 @@ def get_rag_prompt(
4875

4976
headers = {"Content-Type": "application/json"}
5077
headers["Authorization"] = f"Bearer {get_directus_token()}"
51-
try:
52-
logger.debug(f"Making RAG API request to {url}")
53-
response = requests.post(url, json=payload, headers=headers, timeout=120)
54-
response.raise_for_status()
55-
56-
result = response.text
57-
logger.debug("Successfully retrieved RAG prompt")
58-
return result
5978

60-
except requests.exceptions.RequestException as e:
61-
logger.error(f"Error calling API: {e}")
79+
try:
80+
return retry_with_backoff(
81+
_make_rag_request,
82+
max_retries=3,
83+
initial_delay=2,
84+
backoff_factor=2,
85+
jitter=0.5,
86+
logger=logger,
87+
url=url,
88+
payload=payload,
89+
headers=headers,
90+
)
91+
except Exception as e:
92+
logger.error(f"Error calling API after all retries: {e}")
6293
if hasattr(e, "response") and e.response is not None:
6394
logger.error(f"Response status: {e.response.status_code}")
6495
logger.error(f"Response text: {e.response.text}")
6596
raise Exception(f"Failed to get RAG prompt from server: {str(e)}") from e
6697

6798

99+
async def _make_rag_request_async(url: str, payload: dict, headers: dict) -> str:
100+
"""
101+
Helper function to make the actual async RAG API request.
102+
103+
Args:
104+
url: The API endpoint URL
105+
payload: The request payload
106+
headers: The request headers
107+
108+
Returns:
109+
str: RAG prompt string from the server
110+
111+
Raises:
112+
Exception: If the API call fails
113+
"""
114+
logger.debug(f"Making async RAG API request to {url}")
115+
async with aiohttp.ClientSession() as session:
116+
async with session.post(
117+
url, json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=120)
118+
) as response:
119+
response.raise_for_status()
120+
result = await response.text()
121+
logger.debug("Successfully retrieved RAG prompt")
122+
return result
123+
124+
68125
async def get_rag_prompt_async(
69126
query: str, segment_ids: Optional[List[str]] = None, rag_server_url: Optional[str] = None
70127
) -> str:
71128
"""
72-
Async version of get_rag_prompt for parallel processing.
129+
Async version of get_rag_prompt for parallel processing with retry logic.
73130
"""
74131
if rag_server_url is None:
75132
rag_server_url = os.getenv("RAG_SERVER_URL")
@@ -95,16 +152,17 @@ async def get_rag_prompt_async(
95152
headers["Authorization"] = f"Bearer {get_directus_token()}"
96153

97154
try:
98-
logger.debug(f"Making async RAG API request to {url}")
99-
async with aiohttp.ClientSession() as session:
100-
async with session.post(
101-
url, json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=120)
102-
) as response:
103-
response.raise_for_status()
104-
result = await response.text()
105-
logger.debug("Successfully retrieved RAG prompt")
106-
return result
107-
155+
return await async_retry_with_backoff(
156+
_make_rag_request_async,
157+
max_retries=3,
158+
initial_delay=2,
159+
backoff_factor=2,
160+
jitter=0.5,
161+
logger=logger,
162+
url=url,
163+
payload=payload,
164+
headers=headers,
165+
)
108166
except Exception as e:
109-
logger.error(f"Error calling API: {e}")
167+
logger.error(f"Error calling API after all retries: {e}")
110168
raise Exception(f"Failed to get RAG prompt from server: {str(e)}") from e

services/aspect_processor.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
from typing import Dict, List
2-
from tqdm.asyncio import tqdm
2+
33
from runpod import RunPodLogger
4-
from data_model import Aspect
54
from prompts import (
65
rag_user_prompt,
76
rag_system_prompt,
87
initial_rag_prompt,
98
fallback_get_aspect_response_list_user_prompt,
109
fallback_get_aspect_response_list_system_prompt,
1110
)
11+
from data_model import Aspect
12+
from tqdm.asyncio import tqdm
1213
from integrations.rag_client import get_rag_prompt_async
1314
from integrations.azure_client import run_formated_llm_call_async
15+
1416
from services.image_generator import get_image_url_async
1517

1618
logger = RunPodLogger()

0 commit comments

Comments
 (0)