From 31224f707e5c0313649f01149e4fde9c1f0611ab Mon Sep 17 00:00:00 2001 From: Pheobe-Apondi Date: Wed, 12 Nov 2025 12:17:53 +0300 Subject: [PATCH] fixes --- api/views.py | 208 +++++++++++++++++++++++++++-------------------- zeno/settings.py | 5 +- 2 files changed, 122 insertions(+), 91 deletions(-) diff --git a/api/views.py b/api/views.py index 87237a7..f9e0758 100644 --- a/api/views.py +++ b/api/views.py @@ -20,11 +20,12 @@ import requests from django.utils import timezone from datetime import timedelta +import json ZEN_AGENT_API_URL = os.environ.get("ZEN_AGENT_API_URL") -MAX_CONVERSATIONS_PER_DAY = 5 -MAX_RUNS_PER_CONVERSATION_PER_DAY = 20 +MAX_CONVERSATIONS_PER_DAY = 500 +MAX_RUNS_PER_CONVERSATION_PER_DAY = 2000 class ConversationViewSet(viewsets.ModelViewSet): @@ -204,6 +205,16 @@ def by_conversation(self, request): + +from django.http import StreamingHttpResponse + + + +MAX_RUNS_PER_CONVERSATION_PER_DAY = 50 +ZEN_AGENT_API_URL = os.environ.get("ZEN_AGENT_API_URL") + + + class RunViewSet(viewsets.ModelViewSet): queryset = Run.objects.all() serializer_class = RunSerializer @@ -250,11 +261,114 @@ def create(self, request, *args, **kwargs): for file in request.FILES.getlist('files'): RunInputFile.objects.create(run=run, file=file) - threading.Thread(target=self.simulate_status, args=(run.id,)).start() + threading.Thread(target=self._stream_agent_response, args=(run.id,)).start() serializer = RunSerializer(run) return Response(serializer.data, status=201) + def _stream_agent_response(self, run_id): + """Handles streaming responses from the agent and saves artifacts via RunOutputArtifact.""" + try: + run = Run.objects.get(id=run_id) + run.status = Run.RUNNING + run.save(update_fields=['status']) + + with requests.post(ZEN_AGENT_API_URL, json={"query": run.user_input}, stream=True, timeout=60) as response: + if response.status_code != 200: + run.status = Run.FAILED + run.final_output = f"Agent returned status {response.status_code}" + run.save(update_fields=['status', 'final_output']) + return + + final_output = None + + for line in response.iter_lines(decode_unicode=True): + if not line.strip(): + continue + + try: + data = json.loads(line) + msg_type = data.get("type") + + if msg_type == "progress": + message = data.get("message", "") + RunOutputArtifact.objects.create( + run=run, + artifact_type="progress", + data={"message": message} + ) + run.final_output = f"⏳ {message}" + run.save(update_fields=['final_output']) + + elif msg_type == "thinking": + content = data.get("content", "") + RunOutputArtifact.objects.create( + run=run, + artifact_type="thinking", + data={"content": content} + ) + run.final_output = f" {content}" + run.save(update_fields=['final_output']) + + elif msg_type == "route": + decision = data.get("decision", "") + RunOutputArtifact.objects.create( + run=run, + artifact_type="route", + data={"decision": decision} + ) + + elif msg_type in ["comparative_chunk", "scenario_chunk"]: + content = data.get("content", "") + RunOutputArtifact.objects.create( + run=run, + artifact_type="thinking", + data={"content": content} + ) + + elif msg_type == "final": + final_output = data.get("response", "") + + elif msg_type in ["chart", "table", "text"]: + RunOutputArtifact.objects.create( + run=run, + artifact_type=msg_type, + data=data.get("data", {}), + title=data.get("title", "") + ) + + elif msg_type == "error": + error_msg = data.get("message", "Agent error") + RunOutputArtifact.objects.create( + run=run, + artifact_type="error", + data={"message": error_msg} + ) + run.final_output = f" {error_msg}" + run.save(update_fields=['final_output']) + + except json.JSONDecodeError: + # Silent failure or log via proper logging (e.g., logger.warning) + pass + except Exception: + pass + + if final_output is not None: + run.final_output = final_output + run.status = Run.COMPLETED + run.save(update_fields=['status', 'final_output']) + else: + run.status = Run.FAILED + run.final_output = "No final response received from agent." + run.save(update_fields=['status', 'final_output']) + + except Exception as e: + run = Run.objects.filter(id=run_id).first() + if run: + run.status = Run.FAILED + run.final_output = f"Agent request failed: {str(e)}" + run.save(update_fields=['status', 'final_output']) + def list(self, request, *args, **kwargs): user = request.user if getattr(user, 'role', '').lower() == 'admin': @@ -274,92 +388,6 @@ def retrieve(self, request, pk=None, *args, **kwargs): serializer = RunSerializer(run) return Response(serializer.data) - def simulate_status(self, run_id): - try: - run = Run.objects.get(id=run_id) - run.status = Run.RUNNING - run.save(update_fields=['status']) - - try: - response = requests.post( - ZEN_AGENT_API_URL, - json={"query": run.user_input}, - timeout=30 - ) - response.raise_for_status() - result = response.json() - except Exception as e: - run.status = Run.FAILED - run.final_output = f"Agent request failed: {str(e)}" - run.save(update_fields=['status', 'final_output']) - return - - query_type = result.get("type", "rag") - - if query_type == "forecast": - forecast_display = result.get("forecast_display", "Forecast data unavailable") - interpretation = result.get("interpretation", "No interpretation available") - confidence_level = result.get("confidence_level", "Medium") - data_points = result.get("data_points_used", 0) - - agent_response = ( - f"{interpretation}\n\n" - f"FORECAST SUMMARY:\n" - f"{forecast_display}\n" - f"Confidence Level: {confidence_level} ({data_points} data points used)" - ) - - dual_forecast = result.get("dual_forecast", {}) - if dual_forecast: - RunOutputArtifact.objects.create( - run=run, - artifact_type="json", - data={"dual_forecast": dual_forecast}, - title="Detailed Forecast Data" - ) - - elif query_type == "scenario": - agent_response = result.get("llm_analysis", "No scenario analysis available.") - - elif query_type == "comparative": - agent_response = result.get("response", "No comparative analysis available.") - - else: - agent_response = result.get("response", "No response received.") - - graph_url = result.get("graph_url") - thought_process = result.get("thought_process", []) - followup = result.get("followup") - - if graph_url: - RunOutputArtifact.objects.create( - run=run, - artifact_type="link", - data={"url": graph_url}, - title="Graph Link" - ) - if thought_process: - RunOutputArtifact.objects.create( - run=run, - artifact_type="list", - data={"steps": thought_process}, - title="Thought Process" - ) - if followup: - RunOutputArtifact.objects.create( - run=run, - artifact_type="text", - data={"content": followup}, - title="Follow-up Suggestion" - ) - - run.final_output = agent_response - run.status = Run.COMPLETED - run.save(update_fields=['status', 'final_output']) - - except Run.DoesNotExist: - pass - def destroy(self, request, *args, **kwargs): pk = kwargs.get('pk') try: @@ -373,4 +401,4 @@ def destroy(self, request, *args, **kwargs): run.delete() return Response({'message': 'Run deleted successfully'}, status=status.HTTP_200_OK) else: - return Response({'error': 'You do not have permission to delete this run.'}, status=status.HTTP_403_FORBIDDEN) + return Response({'error': 'You do not have permission to delete this run.'}, status=status.HTTP_403_FORBIDDEN) \ No newline at end of file diff --git a/zeno/settings.py b/zeno/settings.py index 8ef1fee..bcbe39b 100644 --- a/zeno/settings.py +++ b/zeno/settings.py @@ -26,7 +26,7 @@ def env_set(*names): DJANGO_SECRET_KEY = os.environ.get('DJANGO_SECRET_KEY') SECRET_KEY = DJANGO_SECRET_KEY -DEBUG = False +DEBUG = True ALLOWED_HOSTS = ["*"] INSTALLED_APPS = [ @@ -131,3 +131,6 @@ def env_set(*names): } CORS_ORIGIN_ALLOW_ALL = True + +ZEN_AGENT_API_URL = os.getenv("ZEN_AGENT_API_URL") +