Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions backend/app/core/orchestration/queue_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async def connect(self):
await self.channel.declare_queue(queue_name, durable=True)
logger.info("Successfully connected to RabbitMQ")
except Exception as e:
logger.error(f"Failed to connect to RabbitMQ: {e}")
logger.error(f"Failed to connect to RabbitMQ: {e}", exc_info=True)
raise

async def start(self, num_workers: int = 3):
Expand Down Expand Up @@ -79,14 +79,22 @@ async def enqueue(self,
if delay > 0:
await asyncio.sleep(delay)

# Guard against publishing while disconnected
if not self.channel or self.channel.is_closed:
raise RuntimeError("Queue channel is not connected. Call start() before enqueue().")

queue_item = {
"id": message.get("id", f"msg_{datetime.now().timestamp()}"),
"priority": priority,
"priority": priority.value, # Serialize enum to string
"data": message
}
json_message = json.dumps(queue_item).encode()
await self.channel.default_exchange.publish(
aio_pika.Message(body=json_message),
aio_pika.Message(
body=json_message,
delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
content_type="application/json"
),
routing_key=self.queues[priority]
)
logger.info(f"Enqueued message {queue_item['id']} with priority {priority}")
Expand Down Expand Up @@ -114,13 +122,13 @@ async def _worker(self, worker_name: str):
await self._process_item(item, worker_name)
await message.ack()
except Exception as e:
logger.error(f"Error processing message: {e}")
logger.error("Error processing message: %s", e, exc_info=True)
await message.nack(requeue=False)
except asyncio.CancelledError:
logger.info(f"Worker {worker_name} cancelled")
return
except Exception as e:
logger.error(f"Worker {worker_name} error: {e}")
logger.error("Worker %s error: %s", worker_name, e, exc_info=True)
await asyncio.sleep(0.1)

async def _process_item(self, item: Dict[str, Any], worker_name: str):
Expand Down
17 changes: 6 additions & 11 deletions backend/app/database/weaviate/client.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
import weaviate
import weaviate.exceptions
from contextlib import asynccontextmanager
from typing import AsyncGenerator
import logging

logger = logging.getLogger(__name__)

_client = None


def get_client():
"""Get or create the global Weaviate client instance."""
global _client
if _client is None:
_client = weaviate.use_async_with_local()
return _client
"""Create a new async Weaviate client instance per context use."""
return weaviate.use_async_with_local()

@asynccontextmanager
async def get_weaviate_client() -> AsyncGenerator[weaviate.WeaviateClient, None]:
Expand All @@ -22,11 +17,11 @@ async def get_weaviate_client() -> AsyncGenerator[weaviate.WeaviateClient, None]
try:
await client.connect()
yield client
except Exception as e:
logger.error(f"Weaviate client error: {str(e)}")
except weaviate.exceptions.WeaviateBaseError as e:
logger.error("Weaviate client error: %s", e, exc_info=True)
raise
finally:
try:
await client.close()
except Exception as e:
logger.warning(f"Error closing Weaviate client: {str(e)}")
logger.warning("Error closing Weaviate client: %s", e, exc_info=True)
2 changes: 0 additions & 2 deletions frontend/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ function App() {
supabase.auth.getSession().then(({ data, error }) => {
if (error) {
toast.error('User Login Failed');
console.error('Error checking session:', error);
return;
}
setIsAuthenticated(!!data.session);
Expand Down Expand Up @@ -93,7 +92,6 @@ function App() {
const { error } = await supabase.auth.signOut();
if (error) {
toast.error('Logout failed');
console.error('Error during logout:', error);
return;
}
toast.success('Signed out!');
Expand Down
8 changes: 7 additions & 1 deletion frontend/src/components/integration/BotIntegration.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,13 @@ const BotIntegration: React.FC<BotIntegrationProps> = ({
setIntegration(null);
}
} catch (error) {
console.error('Error loading integration status:', error);
// Handle integration status loading errors
setIsConnected(false);
setIntegration(null);
// Log in development for debugging
if (import.meta.env.DEV) {
console.error('Failed to load integration status:', error);
}
}
};

Expand Down
19 changes: 16 additions & 3 deletions frontend/src/components/pages/LoginPage.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { useState, ReactNode, FormEvent } from "react";
import { motion } from "framer-motion";
import { useNavigate } from 'react-router-dom';
import { useNavigate, useSearchParams } from 'react-router-dom';
import { toast } from "react-hot-toast";
import { supabase } from "../../lib/supabaseClient";
import {
Expand Down Expand Up @@ -49,6 +49,7 @@ const InputField = ({ icon: Icon, ...props }: InputFieldProps) => (

export default function LoginPage({ onLogin }: LoginPageProps) {
const navigate = useNavigate();
const [searchParams] = useSearchParams();
const [isLoading, setIsLoading] = useState<boolean>(false);

const handleLogin = async (e: FormEvent<HTMLFormElement>) => {
Expand All @@ -64,8 +65,20 @@ export default function LoginPage({ onLogin }: LoginPageProps) {
setIsLoading(false);
if(data && !error){
toast.success('Successfully logged in!');
onLogin();
navigate('/');
onLogin();
const rawReturn = searchParams.get('returnUrl');
try {
const decoded = rawReturn ? decodeURIComponent(rawReturn) : null;
// Protect against open redirects by only allowing internal paths
if (decoded && decoded.startsWith('/')) {
navigate(decoded, { replace: true });
} else {
navigate('/', { replace: true });
}
Comment on lines +69 to +77
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Open redirect vulnerability: startsWith('/') is insufficient.

The check decoded.startsWith('/') can be bypassed with protocol-relative URLs like //evil.com or backslash variants like /\evil.com, which browsers may interpret as external redirects.

Proposed fix
       const rawReturn = searchParams.get('returnUrl');
       try {
         const decoded = rawReturn ? decodeURIComponent(rawReturn) : null;
         // Protect against open redirects by only allowing internal paths
-        if (decoded && decoded.startsWith('/')) {
+        if (decoded && decoded.startsWith('/') && !decoded.startsWith('//') && !decoded.startsWith('/\\')) {
           navigate(decoded, { replace: true });
         } else {
           navigate('/', { replace: true });
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const rawReturn = searchParams.get('returnUrl');
try {
const decoded = rawReturn ? decodeURIComponent(rawReturn) : null;
// Protect against open redirects by only allowing internal paths
if (decoded && decoded.startsWith('/')) {
navigate(decoded, { replace: true });
} else {
navigate('/', { replace: true });
}
const rawReturn = searchParams.get('returnUrl');
try {
const decoded = rawReturn ? decodeURIComponent(rawReturn) : null;
// Protect against open redirects by only allowing internal paths
if (decoded && decoded.startsWith('/') && !decoded.startsWith('//') && !decoded.startsWith('/\\')) {
navigate(decoded, { replace: true });
} else {
navigate('/', { replace: true });
}
🤖 Prompt for AI Agents
In frontend/src/components/pages/LoginPage.tsx around lines 69 to 77, the
current check decoded.startsWith('/') permits protocol-relative URLs (e.g.
//evil.com) and backslash variants; update validation to only allow internal
absolute-paths by ensuring the decoded path starts with a single forward slash
not followed by another slash or backslash (e.g. use a regex like ^\/(?![\/\\])
and also reject any input that contains backslashes or a colon), or
alternatively construct new URL(decoded, window.location.origin) and only accept
it if its origin equals window.location.origin and its pathname begins with '/';
then navigate to the validated pathname (or fall back to '/').

} catch (e) {
// If decoding fails, fallback to home
navigate('/', { replace: true });
}
}
else
{
Expand Down
47 changes: 43 additions & 4 deletions frontend/src/lib/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,18 @@
import axios, { AxiosInstance } from 'axios';
import { supabase } from './supabaseClient';

// Prevents multiple simultaneous 401 handlers from racing and triggering
// sign-out/redirect more than once.
let isHandlingUnauthorized = false;
Comment on lines +7 to +9
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Race condition guard never resets — breaks subsequent 401 handling.

isHandlingUnauthorized is set to true on Line 110 but never reset to false. After the first 401 triggers a redirect, this flag stays true for the lifetime of the page. If the user logs back in (or if the page is cached/restored), any future 401 responses will be silently ignored, leaving users stuck with broken API calls.

Proposed fix

Since window.location.href navigates away and reloads the page (resetting module state), this works for the happy path. However, if navigation fails or is blocked, the flag stays set. Consider resetting in a finally block or after a timeout as a safety net:

                     if (!isHandlingUnauthorized) {
                         isHandlingUnauthorized = true;
                         try {
                             await supabase.auth.signOut();
                         } catch (e) {
                             // Log but continue to redirect; signing out isn't critical here
                             console.error(
                                 'Error signing out on 401 handler',
                                 e
                             );
                         }
 
                         const returnUrl = encodeURIComponent(
                             pathname + window.location.search
                         );
                         window.location.href = `/login?returnUrl=${returnUrl}`;
+                        // Reset after a delay in case navigation is blocked (e.g., beforeunload handler)
+                        setTimeout(() => {
+                            isHandlingUnauthorized = false;
+                        }, 2000);
                     }

Also applies to: 109-125

🤖 Prompt for AI Agents
In frontend/src/lib/api.ts around lines 7-9 (and related handling at 109-125),
the module-level guard isHandlingUnauthorized is set true when a 401 is handled
but never reset, causing all subsequent 401s to be ignored; change the handler
so isHandlingUnauthorized is always reset (e.g., clear it in a finally block
after the redirect attempt and/or set a short fallback timeout to reset the flag
if navigation fails) so that the guard prevents concurrent handling but does not
permanently block future 401 handling.


// Public routes where we should NOT force a redirect to login
const PUBLIC_PATHS = [
'/login',
'/signup',
'/forgot-password',
'/reset-password',
];

// Backend API base URL
const API_BASE_URL =
import.meta.env.VITE_BACKEND_URL || 'http://localhost:8000';
Expand Down Expand Up @@ -83,10 +95,34 @@ class ApiClient {
// Add response interceptor for error handling
this.client.interceptors.response.use(
(response) => response,
(error) => {
async (error) => {
if (error.response?.status === 401) {
// Handle unauthorized - could redirect to login
console.error('Unauthorized request');
const pathname = window.location.pathname || '/';

// If we're already on a public/auth page (login, signup, reset, etc.)
// don't yank the user away mid-flow.
if (PUBLIC_PATHS.some((p) => pathname.startsWith(p))) {
return Promise.reject(error);
}

// Deduplicate concurrent 401 handlers to avoid multiple sign-outs / redirects
if (!isHandlingUnauthorized) {
isHandlingUnauthorized = true;
try {
await supabase.auth.signOut();
} catch (e) {
// Log but continue to redirect; signing out isn't critical here
console.error(
'Error signing out on 401 handler',
e
);
}

const returnUrl = encodeURIComponent(
pathname + window.location.search
);
window.location.href = `/login?returnUrl=${returnUrl}`;
}
}
return Promise.reject(error);
}
Expand Down Expand Up @@ -166,7 +202,10 @@ class ApiClient {
const response = await this.client.get('/v1/health');
return response.status === 200;
} catch (error) {
console.error('Backend health check failed:', error);
// Log in development for debugging
if (import.meta.env.DEV) {
console.error('Health check failed:', error);
}
return false;
}
}
Expand Down