Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 118 additions & 90 deletions api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
import requests
from django.utils import timezone
from datetime import timedelta
import json


ZEN_AGENT_API_URL = os.environ.get("ZEN_AGENT_API_URL")
MAX_CONVERSATIONS_PER_DAY = 5
MAX_RUNS_PER_CONVERSATION_PER_DAY = 20
MAX_CONVERSATIONS_PER_DAY = 500
MAX_RUNS_PER_CONVERSATION_PER_DAY = 2000


class ConversationViewSet(viewsets.ModelViewSet):
Expand Down Expand Up @@ -204,6 +205,16 @@ def by_conversation(self, request):




from django.http import StreamingHttpResponse



MAX_RUNS_PER_CONVERSATION_PER_DAY = 50
ZEN_AGENT_API_URL = os.environ.get("ZEN_AGENT_API_URL")



class RunViewSet(viewsets.ModelViewSet):
queryset = Run.objects.all()
serializer_class = RunSerializer
Expand Down Expand Up @@ -250,11 +261,114 @@ def create(self, request, *args, **kwargs):
for file in request.FILES.getlist('files'):
RunInputFile.objects.create(run=run, file=file)

threading.Thread(target=self.simulate_status, args=(run.id,)).start()
threading.Thread(target=self._stream_agent_response, args=(run.id,)).start()

serializer = RunSerializer(run)
return Response(serializer.data, status=201)

def _stream_agent_response(self, run_id):
"""Handles streaming responses from the agent and saves artifacts via RunOutputArtifact."""
try:
run = Run.objects.get(id=run_id)
run.status = Run.RUNNING
run.save(update_fields=['status'])

with requests.post(ZEN_AGENT_API_URL, json={"query": run.user_input}, stream=True, timeout=60) as response:
if response.status_code != 200:
run.status = Run.FAILED
run.final_output = f"Agent returned status {response.status_code}"
run.save(update_fields=['status', 'final_output'])
return

final_output = None

for line in response.iter_lines(decode_unicode=True):
if not line.strip():
continue

try:
data = json.loads(line)
msg_type = data.get("type")

if msg_type == "progress":
message = data.get("message", "")
RunOutputArtifact.objects.create(
run=run,
artifact_type="progress",
data={"message": message}
)
run.final_output = f"⏳ {message}"
run.save(update_fields=['final_output'])

elif msg_type == "thinking":
content = data.get("content", "")
RunOutputArtifact.objects.create(
run=run,
artifact_type="thinking",
data={"content": content}
)
run.final_output = f" {content}"
run.save(update_fields=['final_output'])

elif msg_type == "route":
decision = data.get("decision", "")
RunOutputArtifact.objects.create(
run=run,
artifact_type="route",
data={"decision": decision}
)

elif msg_type in ["comparative_chunk", "scenario_chunk"]:
content = data.get("content", "")
RunOutputArtifact.objects.create(
run=run,
artifact_type="thinking",
data={"content": content}
)

elif msg_type == "final":
final_output = data.get("response", "")

elif msg_type in ["chart", "table", "text"]:
RunOutputArtifact.objects.create(
run=run,
artifact_type=msg_type,
data=data.get("data", {}),
title=data.get("title", "")
)

elif msg_type == "error":
error_msg = data.get("message", "Agent error")
RunOutputArtifact.objects.create(
run=run,
artifact_type="error",
data={"message": error_msg}
)
run.final_output = f" {error_msg}"
run.save(update_fields=['final_output'])

except json.JSONDecodeError:
# Silent failure or log via proper logging (e.g., logger.warning)
pass
except Exception:
pass

if final_output is not None:
run.final_output = final_output
run.status = Run.COMPLETED
run.save(update_fields=['status', 'final_output'])
else:
run.status = Run.FAILED
run.final_output = "No final response received from agent."
run.save(update_fields=['status', 'final_output'])

except Exception as e:
run = Run.objects.filter(id=run_id).first()
if run:
run.status = Run.FAILED
run.final_output = f"Agent request failed: {str(e)}"
run.save(update_fields=['status', 'final_output'])

def list(self, request, *args, **kwargs):
user = request.user
if getattr(user, 'role', '').lower() == 'admin':
Expand All @@ -274,92 +388,6 @@ def retrieve(self, request, pk=None, *args, **kwargs):
serializer = RunSerializer(run)
return Response(serializer.data)

def simulate_status(self, run_id):
try:
run = Run.objects.get(id=run_id)
run.status = Run.RUNNING
run.save(update_fields=['status'])

try:
response = requests.post(
ZEN_AGENT_API_URL,
json={"query": run.user_input},
timeout=30
)
response.raise_for_status()
result = response.json()
except Exception as e:
run.status = Run.FAILED
run.final_output = f"Agent request failed: {str(e)}"
run.save(update_fields=['status', 'final_output'])
return

query_type = result.get("type", "rag")

if query_type == "forecast":
forecast_display = result.get("forecast_display", "Forecast data unavailable")
interpretation = result.get("interpretation", "No interpretation available")
confidence_level = result.get("confidence_level", "Medium")
data_points = result.get("data_points_used", 0)

agent_response = (
f"{interpretation}\n\n"
f"FORECAST SUMMARY:\n"
f"{forecast_display}\n"
f"Confidence Level: {confidence_level} ({data_points} data points used)"
)

dual_forecast = result.get("dual_forecast", {})
if dual_forecast:
RunOutputArtifact.objects.create(
run=run,
artifact_type="json",
data={"dual_forecast": dual_forecast},
title="Detailed Forecast Data"
)

elif query_type == "scenario":
agent_response = result.get("llm_analysis", "No scenario analysis available.")

elif query_type == "comparative":
agent_response = result.get("response", "No comparative analysis available.")

else:
agent_response = result.get("response", "No response received.")

graph_url = result.get("graph_url")
thought_process = result.get("thought_process", [])
followup = result.get("followup")

if graph_url:
RunOutputArtifact.objects.create(
run=run,
artifact_type="link",
data={"url": graph_url},
title="Graph Link"
)
if thought_process:
RunOutputArtifact.objects.create(
run=run,
artifact_type="list",
data={"steps": thought_process},
title="Thought Process"
)
if followup:
RunOutputArtifact.objects.create(
run=run,
artifact_type="text",
data={"content": followup},
title="Follow-up Suggestion"
)

run.final_output = agent_response
run.status = Run.COMPLETED
run.save(update_fields=['status', 'final_output'])

except Run.DoesNotExist:
pass

def destroy(self, request, *args, **kwargs):
pk = kwargs.get('pk')
try:
Expand All @@ -373,4 +401,4 @@ def destroy(self, request, *args, **kwargs):
run.delete()
return Response({'message': 'Run deleted successfully'}, status=status.HTTP_200_OK)
else:
return Response({'error': 'You do not have permission to delete this run.'}, status=status.HTTP_403_FORBIDDEN)
return Response({'error': 'You do not have permission to delete this run.'}, status=status.HTTP_403_FORBIDDEN)
5 changes: 4 additions & 1 deletion zeno/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def env_set(*names):

DJANGO_SECRET_KEY = os.environ.get('DJANGO_SECRET_KEY')
SECRET_KEY = DJANGO_SECRET_KEY
DEBUG = False
DEBUG = True
ALLOWED_HOSTS = ["*"]

INSTALLED_APPS = [
Expand Down Expand Up @@ -131,3 +131,6 @@ def env_set(*names):
}
CORS_ORIGIN_ALLOW_ALL = True


ZEN_AGENT_API_URL = os.getenv("ZEN_AGENT_API_URL")