diff --git a/backend/app/api/routes/campaigns.py b/backend/app/api/routes/campaigns.py index d2daa70..592428e 100644 --- a/backend/app/api/routes/campaigns.py +++ b/backend/app/api/routes/campaigns.py @@ -6,6 +6,7 @@ from typing import Optional, List from datetime import datetime, timezone from app.core.supabase_clients import supabase_anon +from app.core.dependencies import get_current_brand from uuid import UUID router = APIRouter() @@ -73,37 +74,17 @@ class CampaignResponse(BaseModel): is_featured: bool -async def get_brand_id_from_user(user_id: str) -> str: - """Get brand ID from user ID.""" - supabase = supabase_anon - - try: - response = supabase.table("brands").select("id").eq("user_id", user_id).single().execute() - - if not response.data: - raise HTTPException(status_code=404, detail="Brand profile not found") - - return response.data["id"] - except HTTPException: - raise - except Exception as e: - if "PGRST116" in str(e): # No rows returned - raise HTTPException(status_code=404, detail="Brand profile not found") from e - raise HTTPException(status_code=500, detail=f"Error fetching brand profile: {str(e)}") from e - - @router.post("/campaigns", response_model=CampaignResponse, status_code=201) -async def create_campaign(campaign: CampaignCreate, user_id: str = Query(..., description="User ID from authentication")): +async def create_campaign(campaign: CampaignCreate, brand: dict = Depends(get_current_brand)): """ Create a new campaign for a brand. - - **user_id**: The authenticated user's ID (should be passed from auth middleware) - **campaign**: Campaign details matching the database schema """ supabase = supabase_anon - # Get brand ID from user ID - brand_id = await get_brand_id_from_user(user_id) + # Get brand ID from authenticated brand profile + brand_id = brand['id'] # Generate slug if not provided if not campaign.slug: @@ -171,7 +152,7 @@ async def create_campaign(campaign: CampaignCreate, user_id: str = Query(..., de @router.get("/campaigns", response_model=List[CampaignResponse]) async def get_campaigns( - user_id: str = Query(..., description="User ID from authentication"), + brand: dict = Depends(get_current_brand), status: Optional[str] = Query(None, description="Filter by status"), search: Optional[str] = Query(None, description="Search by title or description"), platform: Optional[str] = Query(None, description="Filter by platform"), @@ -185,7 +166,6 @@ async def get_campaigns( """ Get all campaigns for a brand with optional filters. - - **user_id**: The authenticated user's ID - **status**: Optional filter by campaign status - **search**: Optional search term for title or description - **platform**: Optional filter by platform @@ -198,8 +178,8 @@ async def get_campaigns( """ supabase = supabase_anon - # Get brand ID from user ID - brand_id = await get_brand_id_from_user(user_id) + # Get brand ID from authenticated brand profile + brand_id = brand['id'] try: # Build query @@ -251,18 +231,17 @@ async def get_campaigns( @router.get("/campaigns/{campaign_id}", response_model=CampaignResponse) async def get_campaign( campaign_id: str, - user_id: str = Query(..., description="User ID from authentication") + brand: dict = Depends(get_current_brand) ): """ Get a single campaign by ID. - **campaign_id**: The campaign ID - - **user_id**: The authenticated user's ID """ supabase = supabase_anon - # Get brand ID from user ID - brand_id = await get_brand_id_from_user(user_id) + # Get brand ID from authenticated brand profile + brand_id = brand['id'] try: # Fetch campaign and verify ownership @@ -285,19 +264,18 @@ async def get_campaign( async def update_campaign( campaign_id: str, campaign: CampaignUpdate, - user_id: str = Query(..., description="User ID from authentication") + brand: dict = Depends(get_current_brand) ): """ Update an existing campaign. - **campaign_id**: The campaign ID - **campaign**: Updated campaign details - - **user_id**: The authenticated user's ID """ supabase = supabase_anon - # Get brand ID from user ID - brand_id = await get_brand_id_from_user(user_id) + # Get brand ID from authenticated brand profile + brand_id = brand['id'] try: # Verify campaign exists and belongs to this brand @@ -346,18 +324,17 @@ async def update_campaign( @router.delete("/campaigns/{campaign_id}", status_code=204) async def delete_campaign( campaign_id: str, - user_id: str = Query(..., description="User ID from authentication") + brand: dict = Depends(get_current_brand) ): """ Delete a campaign. - **campaign_id**: The campaign ID - - **user_id**: The authenticated user's ID """ supabase = supabase_anon - # Get brand ID from user ID - brand_id = await get_brand_id_from_user(user_id) + # Get brand ID from authenticated brand profile + brand_id = brand['id'] try: # Verify campaign exists and belongs to this brand diff --git a/backend/app/api/routes/collaborations.py b/backend/app/api/routes/collaborations.py new file mode 100644 index 0000000..c1eee0f --- /dev/null +++ b/backend/app/api/routes/collaborations.py @@ -0,0 +1,630 @@ +""" +Collaborations management routes for creator users. +""" + +from fastapi import APIRouter, HTTPException, Depends, Query +from pydantic import BaseModel, Field +from typing import Optional, List +from datetime import datetime, date +import json +from groq import Groq +from app.core.supabase_clients import supabase_anon +from app.core.dependencies import get_current_creator +from app.core.config import settings + +router = APIRouter() + + +class CollaborationResponse(BaseModel): + """Schema for collaboration response.""" + id: str + creator1_id: str + creator2_id: str + collaboration_type: str + title: str + description: Optional[str] + status: str + match_score: Optional[float] + ai_suggestions: Optional[dict] + start_date: Optional[date] + end_date: Optional[date] + planned_deliverables: Optional[dict] + completed_deliverables: Optional[List[dict]] + initiator_id: Optional[str] + proposal_message: Optional[str] + response_message: Optional[str] + total_views: int + total_engagement: int + audience_growth: Optional[dict] + creator1_rating: Optional[int] + creator1_feedback: Optional[str] + creator2_rating: Optional[int] + creator2_feedback: Optional[str] + proposed_at: datetime + accepted_at: Optional[datetime] + completed_at: Optional[datetime] + + +@router.get("/collaborations", response_model=List[CollaborationResponse]) +async def get_my_collaborations( + creator: dict = Depends(get_current_creator), + status: Optional[str] = Query(None, description="Filter by status"), + limit: int = Query(50, ge=1, le=100), + offset: int = Query(0, ge=0) +): + """ + Get all collaborations for the authenticated creator. + + Returns collaborations where the creator is either creator1 or creator2. + + - **status**: Optional filter by collaboration status (proposed, accepted, planning, active, completed, declined, cancelled) + - **limit**: Maximum number of results (default: 50, max: 100) + - **offset**: Number of results to skip for pagination + """ + supabase = supabase_anon + creator_id = creator['id'] + + try: + # Get collaborations where creator is creator1 + query1 = supabase.table("creator_collaborations").select("*").eq("creator1_id", creator_id) + + # Get collaborations where creator is creator2 + query2 = supabase.table("creator_collaborations").select("*").eq("creator2_id", creator_id) + + # Apply status filter if provided + if status: + query1 = query1.eq("status", status) + query2 = query2.eq("status", status) + + # Execute both queries + response1 = query1.execute() + response2 = query2.execute() + + # Combine results + all_collaborations = (response1.data or []) + (response2.data or []) + + # Remove duplicates (in case of any edge cases) + seen = set() + unique_collaborations = [] + for collab in all_collaborations: + if collab['id'] not in seen: + seen.add(collab['id']) + unique_collaborations.append(collab) + + # Sort by proposed_at descending + unique_collaborations.sort(key=lambda x: x.get('proposed_at', ''), reverse=True) + + # Apply pagination + paginated = unique_collaborations[offset:offset + limit] + + return paginated + + except HTTPException: + raise + except Exception as e: + raise HTTPException( + status_code=500, + detail=f"Error fetching collaborations: {str(e)}" + ) from e + + +@router.get("/collaborations/{collaboration_id}", response_model=CollaborationResponse) +async def get_collaboration( + collaboration_id: str, + creator: dict = Depends(get_current_creator) +): + """ + Get a single collaboration by ID. + + Only returns the collaboration if the authenticated creator is involved (creator1 or creator2). + + - **collaboration_id**: The collaboration ID + """ + supabase = supabase_anon + creator_id = creator['id'] + + try: + # Fetch collaboration + response = supabase.table("creator_collaborations") \ + .select("*") \ + .eq("id", collaboration_id) \ + .single() \ + .execute() + + if not response.data: + raise HTTPException( + status_code=404, + detail="Collaboration not found" + ) + + collaboration = response.data + + # Verify creator is involved (creator1 or creator2) + if collaboration.get("creator1_id") != creator_id and collaboration.get("creator2_id") != creator_id: + raise HTTPException( + status_code=403, + detail="You don't have access to this collaboration" + ) + + return collaboration + + except HTTPException: + raise + except Exception as e: + if "PGRST116" in str(e): # No rows returned + raise HTTPException( + status_code=404, + detail="Collaboration not found or you don't have access to it" + ) from e + raise HTTPException( + status_code=500, + detail=f"Error fetching collaboration: {str(e)}" + ) from e + + +@router.get("/collaborations/stats/summary") +async def get_collaborations_stats( + creator: dict = Depends(get_current_creator) +): + """ + Get collaboration statistics for the authenticated creator. + + Returns counts of collaborations by status. + """ + supabase = supabase_anon + creator_id = creator['id'] + + try: + # Get collaborations where creator is creator1 + response1 = supabase.table("creator_collaborations") \ + .select("status") \ + .eq("creator1_id", creator_id) \ + .execute() + + # Get collaborations where creator is creator2 + response2 = supabase.table("creator_collaborations") \ + .select("status") \ + .eq("creator2_id", creator_id) \ + .execute() + + # Combine results + collaborations = (response1.data or []) + (response2.data or []) + + # Count by status + stats = { + "total": len(collaborations), + "proposed": 0, + "accepted": 0, + "planning": 0, + "active": 0, + "completed": 0, + "declined": 0, + "cancelled": 0 + } + + for collab in collaborations: + status = collab.get("status", "proposed") + if status in stats: + stats[status] += 1 + + return stats + + except Exception as e: + raise HTTPException( + status_code=500, + detail=f"Error fetching collaboration stats: {str(e)}" + ) from e + + +class CollaborationIdeasRequest(BaseModel): + """Request model for generating collaboration ideas.""" + target_creator_id: str + + +class CollaborationIdea(BaseModel): + """Schema for a single collaboration idea.""" + title: str + description: str + collaboration_type: str + why_it_works: str + + +class CollaborationIdeasResponse(BaseModel): + """Response model for collaboration ideas.""" + ideas: List[CollaborationIdea] + + +@router.post("/collaborations/generate-ideas", response_model=CollaborationIdeasResponse) +async def generate_collaboration_ideas( + request: CollaborationIdeasRequest, + creator: dict = Depends(get_current_creator) +): + """ + Generate collaboration ideas between the current creator and a target creator using AI. + + - **target_creator_id**: The ID of the creator to collaborate with + """ + supabase = supabase_anon + current_creator_id = creator['id'] + target_creator_id = request.target_creator_id + + # Prevent self-collaboration + if current_creator_id == target_creator_id: + raise HTTPException( + status_code=400, + detail="Cannot generate collaboration ideas with yourself" + ) + + try: + # Fetch both creator profiles + current_creator_response = supabase.table("creators") \ + .select("*") \ + .eq("id", current_creator_id) \ + .eq("is_active", True) \ + .single() \ + .execute() + + target_creator_response = supabase.table("creators") \ + .select("*") \ + .eq("id", target_creator_id) \ + .eq("is_active", True) \ + .single() \ + .execute() + + if not current_creator_response.data: + raise HTTPException(status_code=404, detail="Current creator profile not found") + if not target_creator_response.data: + raise HTTPException(status_code=404, detail="Target creator profile not found") + + current_creator = current_creator_response.data + target_creator = target_creator_response.data + + # Build prompt for Groq AI + prompt = f"""You are an expert at matching content creators for collaborations. Analyze the following two creator profiles and suggest 5 creative, specific collaboration ideas that would work well between them. + +Creator 1 Profile: +- Name: {current_creator.get('display_name', 'Unknown')} +- Tagline: {current_creator.get('tagline', 'N/A')} +- Bio: {current_creator.get('bio', 'N/A')} +- Primary Niche: {current_creator.get('primary_niche', 'N/A')} +- Secondary Niches: {', '.join(current_creator.get('secondary_niches', []) or [])} +- Content Types: {', '.join(current_creator.get('content_types', []) or [])} +- Collaboration Types Open To: {', '.join(current_creator.get('collaboration_types', []) or [])} +- Total Followers: {current_creator.get('total_followers', 0)} +- Engagement Rate: {current_creator.get('engagement_rate', 0)}% + +Creator 2 Profile: +- Name: {target_creator.get('display_name', 'Unknown')} +- Tagline: {target_creator.get('tagline', 'N/A')} +- Bio: {target_creator.get('bio', 'N/A')} +- Primary Niche: {target_creator.get('primary_niche', 'N/A')} +- Secondary Niches: {', '.join(target_creator.get('secondary_niches', []) or [])} +- Content Types: {', '.join(target_creator.get('content_types', []) or [])} +- Collaboration Types Open To: {', '.join(target_creator.get('collaboration_types', []) or [])} +- Total Followers: {target_creator.get('total_followers', 0)} +- Engagement Rate: {target_creator.get('engagement_rate', 0)}% + +Please provide 5 collaboration ideas. For each idea, provide: +1. A catchy title (max 60 characters) +2. A detailed description (2-3 sentences explaining the collaboration) +3. The collaboration type (e.g., "Video Collaboration", "Cross-Promotion", "Joint Series", "Challenge", "Podcast", etc.) +4. Why it works (1-2 sentences explaining why these creators are a good match for this idea) + +Format your response as a JSON array with this exact structure: +[ + {{ + "title": "Idea Title", + "description": "Detailed description of the collaboration idea", + "collaboration_type": "Type of collaboration", + "why_it_works": "Explanation of why this works for these creators" + }}, + ... +] + +Return ONLY the JSON array, no additional text or markdown formatting.""" + + # Call Groq API using official SDK + api_key = settings.groq_api_key + if not api_key: + raise HTTPException(status_code=500, detail="GROQ API key not configured") + + groq_client = Groq(api_key=api_key) + + try: + completion = groq_client.chat.completions.create( + model="meta-llama/llama-4-scout-17b-16e-instruct", + messages=[ + { + "role": "system", + "content": "You are an expert strategist who crafts detailed, actionable collaboration ideas for content creators. Always respond with valid JSON only.", + }, + {"role": "user", "content": prompt}, + ], + temperature=0.8, + max_completion_tokens=1200, + top_p=1, + stream=False, + ) + + content = completion.choices[0].message.content if completion.choices else "" + + # Parse JSON from the response + content = content.strip() + if content.startswith("```json"): + content = content[7:] + if content.startswith("```"): + content = content[3:] + if content.endswith("```"): + content = content[:-3] + content = content.strip() + + ideas_data = json.loads(content) + + # Validate and convert to our model + ideas = [] + for idea in ideas_data[:5]: # Take up to 5 ideas + ideas.append(CollaborationIdea( + title=idea.get("title", "Untitled Collaboration"), + description=idea.get("description", ""), + collaboration_type=idea.get("collaboration_type", "General Collaboration"), + why_it_works=idea.get("why_it_works", "") + )) + + if not ideas: + raise HTTPException( + status_code=500, + detail="Failed to generate collaboration ideas. Please try again." + ) + + return CollaborationIdeasResponse(ideas=ideas) + + except json.JSONDecodeError as e: + raise HTTPException( + status_code=500, + detail=f"Failed to parse AI response: {str(e)}" + ) + except Exception as e: + raise HTTPException( + status_code=502, + detail=f"GROQ API error: {str(e)}" + ) + + except HTTPException: + raise + except Exception as e: + raise HTTPException( + status_code=500, + detail=f"Error generating collaboration ideas: {str(e)}" + ) from e + + +class RecommendCreatorRequest(BaseModel): + """Request model for recommending best creator for a collaboration idea.""" + collaboration_idea: str + candidate_creator_ids: List[str] + + +class CreatorRecommendation(BaseModel): + """Schema for a creator recommendation.""" + creator_id: str + display_name: str + profile_picture_url: Optional[str] + primary_niche: str + match_score: float + reasoning: str + + +class RecommendCreatorResponse(BaseModel): + """Response model for creator recommendation.""" + recommended_creator: CreatorRecommendation + alternatives: List[CreatorRecommendation] + + +@router.post("/collaborations/recommend-creator", response_model=RecommendCreatorResponse) +async def recommend_creator_for_idea( + request: RecommendCreatorRequest, + creator: dict = Depends(get_current_creator) +): + """ + Recommend the best creator from a list of candidates for a specific collaboration idea. + + - **collaboration_idea**: Description of the collaboration idea/content + - **candidate_creator_ids**: List of creator IDs to choose from + """ + supabase = supabase_anon + current_creator_id = creator['id'] + + if not request.candidate_creator_ids: + raise HTTPException( + status_code=400, + detail="At least one candidate creator ID is required" + ) + + try: + # Fetch current creator profile + current_creator_response = supabase.table("creators") \ + .select("*") \ + .eq("id", current_creator_id) \ + .eq("is_active", True) \ + .single() \ + .execute() + + if not current_creator_response.data: + raise HTTPException(status_code=404, detail="Current creator profile not found") + + current_creator = current_creator_response.data + + # Fetch all candidate creator profiles + candidate_creators = [] + for candidate_id in request.candidate_creator_ids: + if candidate_id == current_creator_id: + continue # Skip self + response = supabase.table("creators") \ + .select("*") \ + .eq("id", candidate_id) \ + .eq("is_active", True) \ + .single() \ + .execute() + if response.data: + candidate_creators.append(response.data) + + if not candidate_creators: + raise HTTPException( + status_code=404, + detail="No valid candidate creators found" + ) + + # Build prompt for Groq AI + candidates_info = [] + for idx, cand in enumerate(candidate_creators): + candidates_info.append(f""" +Candidate {idx + 1} (ID: {cand.get('id', 'unknown')}): +- Name: {cand.get('display_name', 'Unknown')} +- Tagline: {cand.get('tagline', 'N/A')} +- Bio: {cand.get('bio', 'N/A')} +- Primary Niche: {cand.get('primary_niche', 'N/A')} +- Secondary Niches: {', '.join(cand.get('secondary_niches', []) or [])} +- Content Types: {', '.join(cand.get('content_types', []) or [])} +- Collaboration Types Open To: {', '.join(cand.get('collaboration_types', []) or [])} +- Total Followers: {cand.get('total_followers', 0)} +- Engagement Rate: {cand.get('engagement_rate', 0)}% +- Years of Experience: {cand.get('years_of_experience', 'N/A')} +""") + + prompt = f"""You are an expert at matching content creators for collaborations. A creator wants to collaborate on the following idea: + +COLLABORATION IDEA: +{request.collaboration_idea} + +CURRENT CREATOR PROFILE: +- Name: {current_creator.get('display_name', 'Unknown')} +- Tagline: {current_creator.get('tagline', 'N/A')} +- Bio: {current_creator.get('bio', 'N/A')} +- Primary Niche: {current_creator.get('primary_niche', 'N/A')} +- Secondary Niches: {', '.join(current_creator.get('secondary_niches', []) or [])} +- Content Types: {', '.join(current_creator.get('content_types', []) or [])} +- Collaboration Types Open To: {', '.join(current_creator.get('collaboration_types', []) or [])} + +CANDIDATE CREATORS: +{''.join(candidates_info)} + +Analyze which candidate creator would be the BEST match for this collaboration idea. Consider: +1. Niche compatibility +2. Content type alignment +3. Audience synergy +4. Collaboration type preferences +5. How well the idea fits each candidate's style and strengths + +Rank all candidates from best to worst match. For each candidate, provide: +- A match score (0-100) +- Detailed reasoning explaining why they are or aren't a good fit + +Format your response as JSON with this exact structure: +{{ + "recommendations": [ + {{ + "creator_id": "candidate_id_here", + "match_score": 85, + "reasoning": "Detailed explanation of why this creator is a good/bad match for the idea" + }}, + ... + ] +}} + +Return ONLY the JSON object, no additional text or markdown formatting.""" + + # Call Groq API + api_key = settings.groq_api_key + if not api_key: + raise HTTPException(status_code=500, detail="GROQ API key not configured") + + from groq import Groq + groq_client = Groq(api_key=api_key) + + try: + completion = groq_client.chat.completions.create( + model="meta-llama/llama-4-scout-17b-16e-instruct", + messages=[ + { + "role": "system", + "content": "You are an expert strategist who analyzes creator collaborations. Always respond with valid JSON only.", + }, + {"role": "user", "content": prompt}, + ], + temperature=0.7, + max_completion_tokens=1500, + top_p=1, + stream=False, + ) + + content = completion.choices[0].message.content.strip() + + # Clean JSON response + if content.startswith("```json"): + content = content[7:] + if content.startswith("```"): + content = content[3:] + if content.endswith("```"): + content = content[:-3] + content = content.strip() + + result_data = json.loads(content) + recommendations = result_data.get("recommendations", []) + + if not recommendations: + raise HTTPException( + status_code=500, + detail="Failed to get recommendations from AI" + ) + + # Map recommendations to creator data + creator_map = {cand['id']: cand for cand in candidate_creators} + ranked_creators = [] + + for rec in recommendations: + creator_id = rec.get("creator_id") + if creator_id in creator_map: + creator_data = creator_map[creator_id] + ranked_creators.append(CreatorRecommendation( + creator_id=creator_id, + display_name=creator_data.get('display_name', 'Unknown'), + profile_picture_url=creator_data.get('profile_picture_url'), + primary_niche=creator_data.get('primary_niche', ''), + match_score=float(rec.get("match_score", 0)), + reasoning=rec.get("reasoning", "") + )) + + if not ranked_creators: + raise HTTPException( + status_code=500, + detail="Failed to process recommendations" + ) + + # Return best match and alternatives + recommended = ranked_creators[0] + alternatives = ranked_creators[1:] if len(ranked_creators) > 1 else [] + + return RecommendCreatorResponse( + recommended_creator=recommended, + alternatives=alternatives + ) + + except json.JSONDecodeError as e: + raise HTTPException( + status_code=500, + detail=f"Failed to parse AI response: {str(e)}" + ) + except Exception as e: + raise HTTPException( + status_code=500, + detail=f"Error getting recommendation: {str(e)}" + ) + + except HTTPException: + raise + except Exception as e: + raise HTTPException( + status_code=500, + detail=f"Error recommending creator: {str(e)}" + ) from e + diff --git a/backend/app/api/routes/creators.py b/backend/app/api/routes/creators.py new file mode 100644 index 0000000..55a4d10 --- /dev/null +++ b/backend/app/api/routes/creators.py @@ -0,0 +1,507 @@ +""" +Creators listing routes for browsing all creators. +""" + +from fastapi import APIRouter, HTTPException, Depends, Query +from pydantic import BaseModel +from typing import Optional, List, Tuple +from math import exp +from datetime import datetime +import json +from groq import Groq +from app.core.config import settings +from app.core.supabase_clients import supabase_anon +from app.core.dependencies import get_current_creator +router = APIRouter() + + +class CreatorBasicResponse(BaseModel): + """Basic creator info for card display.""" + id: str + display_name: str + tagline: Optional[str] + bio: Optional[str] + profile_picture_url: Optional[str] + primary_niche: str + secondary_niches: Optional[List[str]] + total_followers: int + engagement_rate: Optional[float] + is_verified_creator: bool + profile_completion_percentage: int + + +class CreatorFullResponse(BaseModel): + """Full creator details for expanded view.""" + id: str + user_id: str + display_name: str + tagline: Optional[str] + bio: Optional[str] + profile_picture_url: Optional[str] + cover_image_url: Optional[str] + website_url: Optional[str] + youtube_url: Optional[str] + youtube_handle: Optional[str] + youtube_subscribers: Optional[int] + instagram_url: Optional[str] + instagram_handle: Optional[str] + instagram_followers: Optional[int] + tiktok_url: Optional[str] + tiktok_handle: Optional[str] + tiktok_followers: Optional[int] + twitter_url: Optional[str] + twitter_handle: Optional[str] + twitter_followers: Optional[int] + twitch_url: Optional[str] + twitch_handle: Optional[str] + twitch_followers: Optional[int] + linkedin_url: Optional[str] + facebook_url: Optional[str] + primary_niche: str + secondary_niches: Optional[List[str]] + content_types: Optional[List[str]] + content_language: Optional[List[str]] + total_followers: int + total_reach: Optional[int] + average_views: Optional[int] + engagement_rate: Optional[float] + audience_age_primary: Optional[str] + audience_gender_split: Optional[dict] + audience_locations: Optional[dict] + audience_interests: Optional[List[str]] + average_engagement_per_post: Optional[int] + posting_frequency: Optional[str] + best_performing_content_type: Optional[str] + years_of_experience: Optional[int] + content_creation_full_time: bool + team_size: int + equipment_quality: Optional[str] + editing_software: Optional[List[str]] + collaboration_types: Optional[List[str]] + preferred_brands_style: Optional[List[str]] + rate_per_post: Optional[float] + rate_per_video: Optional[float] + rate_per_story: Optional[float] + rate_per_reel: Optional[float] + rate_negotiable: bool + accepts_product_only_deals: bool + minimum_deal_value: Optional[float] + preferred_payment_terms: Optional[str] + portfolio_links: Optional[List[str]] + past_brand_collaborations: Optional[List[str]] + case_study_links: Optional[List[str]] + media_kit_url: Optional[str] + is_verified_creator: bool + profile_completion_percentage: int + created_at: Optional[str] + last_active_at: Optional[str] + + +@router.get("/creators", response_model=List[CreatorBasicResponse]) +async def list_creators( + creator: dict = Depends(get_current_creator), + search: Optional[str] = Query(None, description="Search by name, niche, or bio"), + niche: Optional[str] = Query(None, description="Filter by primary niche"), + limit: int = Query(50, ge=1, le=100), + offset: int = Query(0, ge=0) +): + """ + List all creators (excluding the current authenticated creator). + + - **search**: Search in display_name, tagline, bio, primary_niche, secondary_niches + - **niche**: Filter by primary niche + - **limit**: Maximum number of results (default: 50, max: 100) + - **offset**: Number of results to skip for pagination + """ + supabase = supabase_anon + current_creator_id = creator['id'] + + try: + # Build query - exclude current creator and only show active creators + query = supabase.table("creators").select("*").eq("is_active", True).neq("id", current_creator_id) + + # Apply niche filter if provided + if niche: + query = query.eq("primary_niche", niche) + + # Apply pagination and ordering + # Note: We fetch more results if search is provided, then filter in Python + fetch_limit = (limit * 3) if search else limit # Fetch more for search filtering + query = query.order("total_followers", desc=True).range(offset, offset + fetch_limit - 1) + + response = query.execute() + + creators = response.data if response.data else [] + + # Apply search filtering if provided + if search: + search_term = search.lower() + filtered_creators = [] + for c in creators: + # Check if search term matches any field + matches = ( + (c.get("display_name", "").lower().find(search_term) >= 0) or + (c.get("tagline", "").lower().find(search_term) >= 0 if c.get("tagline") else False) or + (c.get("bio", "").lower().find(search_term) >= 0 if c.get("bio") else False) or + (c.get("primary_niche", "").lower().find(search_term) >= 0) or + any(search_term in (niche or "").lower() for niche in (c.get("secondary_niches") or [])) + ) + if matches: + filtered_creators.append(c) + # Apply limit after filtering + creators = filtered_creators[:limit] + + return creators + + except HTTPException: + raise + except Exception as e: + raise HTTPException( + status_code=500, + detail=f"Error fetching creators: {str(e)}" + ) from e + + +class CreatorRecommendation(BaseModel): + id: str + display_name: str + profile_picture_url: Optional[str] + primary_niche: Optional[str] + total_followers: Optional[int] + engagement_rate: Optional[float] + top_platforms: Optional[List[str]] = None + match_score: float + reason: str + + +@router.get("/creators/recommendations", response_model=List[CreatorRecommendation]) +async def get_creator_recommendations( + creator: dict = Depends(get_current_creator), + limit: int = Query(4, ge=1, le=10), + use_ai: bool = Query(True, description="Use Groq to rerank top candidates") +): + """ + Recommend top creators to collaborate with the current creator. + Combines rules-based scoring with optional Groq reranking for reasons and fine-tuning. + """ + supabase = supabase_anon + current_creator_id = creator["id"] + + try: + # Fetch current creator full profile + current_resp = supabase.table("creators") \ + .select("*") \ + .eq("id", current_creator_id) \ + .single() \ + .execute() + if not current_resp.data: + raise HTTPException(status_code=404, detail="Current creator not found") + me = current_resp.data + + # Fetch candidate creators (active, not self) + candidates_resp = supabase.table("creators") \ + .select("*") \ + .eq("is_active", True) \ + .neq("id", current_creator_id) \ + .order("total_followers", desc=True) \ + .limit(200) \ + .execute() + candidates = candidates_resp.data or [] + + if not candidates: + return [] + + # Utility helpers + def list_overlap(a: Optional[List[str]], b: Optional[List[str]]) -> int: + sa = set((a or [])) + sb = set((b or [])) + return len(sa.intersection(sb)) + + def followers_proximity(a: Optional[int], b: Optional[int]) -> float: + if not a or not b or a <= 0 or b <= 0: + return 0.5 + ratio = max(a, b) / max(1, min(a, b)) + # Sigmoid-like decay with ratio; closer to 1 → closer to 1.0 score + return max(0.0, min(1.0, 1 / (1 + (ratio - 1)))) + + def normalize_percent(x: Optional[float]) -> float: + if x is None: + return 0.0 + return max(0.0, min(1.0, x / 10.0)) # treat 10% as "good" baseline + + def recency_score(last_active_at: Optional[str]) -> float: + if not last_active_at: + return 0.5 + try: + dt = datetime.fromisoformat(last_active_at.replace("Z", "+00:00")) + days = max(0.0, (datetime.now(dt.tzinfo) - dt).days) + # Decay after 30 days + return max(0.0, min(1.0, 1 / (1 + days / 30.0))) + except Exception: + return 0.5 + + me_niche = me.get("primary_niche") + me_secondary = me.get("secondary_niches") or [] + me_types = me.get("content_types") or [] + me_collab = me.get("collaboration_types") or [] + me_langs = me.get("content_language") or [] + me_followers = me.get("total_followers") or 0 + + # Score candidates + scored: List[Tuple[dict, float, str]] = [] + for c in candidates: + reason_bits = [] + score = 0.0 + + # Niche similarity (25) + niche_pts = 0.0 + if c.get("primary_niche") == me_niche: + niche_pts += 0.7 + reason_bits.append("same primary niche") + sec_overlap = list_overlap(me_secondary, c.get("secondary_niches")) + if sec_overlap > 0: + niche_pts += min(0.3, 0.15 * sec_overlap) + reason_bits.append("overlap in secondary niches") + score += niche_pts * 25 + + # Content types (15) + type_overlap = list_overlap(me_types, c.get("content_types")) + if type_overlap > 0: + score += min(1.0, type_overlap / 3.0) * 15 + reason_bits.append("compatible content types") + + # Openness alignment (10) + collab_overlap = list_overlap(me_collab, c.get("collaboration_types")) + if collab_overlap > 0: + score += min(1.0, collab_overlap / 2.0) * 10 + reason_bits.append("open to similar collaboration types") + + # Audience proximity (15) + prox = followers_proximity(me_followers, c.get("total_followers")) + score += prox * 15 + if prox > 0.7: + reason_bits.append("similar audience scale") + + # Engagement quality (15) + eng = normalize_percent(c.get("engagement_rate")) + score += eng * 10 + avg_views = c.get("average_views") or 0 + # Normalize avg_views relative to total_followers if available + if c.get("total_followers"): + view_ratio = min(1.0, (avg_views / max(1, c["total_followers"])) * 5) + score += view_ratio * 5 + if eng > 0.6: + reason_bits.append("strong engagement") + + # Recency/consistency (8) + score += recency_score(c.get("last_active_at")) * 8 + + # Experience/professionalism (6) + exp = c.get("years_of_experience") or 0 + exp_norm = min(1.0, exp / 5.0) + has_kit = 1.0 if c.get("media_kit_url") else 0.0 + score += (0.7 * exp_norm + 0.3 * has_kit) * 6 + + # Geo/language fit (6) + lang_overlap = list_overlap(me_langs, c.get("content_language")) + score += min(1.0, lang_overlap / 2.0) * 6 + if lang_overlap > 0: + reason_bits.append("language fit") + + reason = ", ".join(reason_bits) or "high potential match" + scored.append((c, score, reason)) + + # Sort by score + scored.sort(key=lambda x: x[1], reverse=True) + + # Diversity: limit to 2 per niche in the top picks + picks: List[Tuple[dict, float, str]] = [] + niche_counts = {} + for c, s, r in scored: + niche = c.get("primary_niche") or "other" + if niche_counts.get(niche, 0) >= 2 and len(picks) >= 2: + continue + picks.append((c, s, r)) + niche_counts[niche] = niche_counts.get(niche, 0) + 1 + if len(picks) >= max(12, limit * 3): + break + + # Optional Groq reranking to refine reasons and ordering + if use_ai and settings.groq_api_key: + try: + groq = Groq(api_key=settings.groq_api_key) + def compact_profile(p: dict) -> dict: + return { + "id": p.get("id"), + "name": p.get("display_name"), + "primary_niche": p.get("primary_niche"), + "secondary_niches": p.get("secondary_niches") or [], + "content_types": p.get("content_types") or [], + "collaboration_types": p.get("collaboration_types") or [], + "total_followers": p.get("total_followers") or 0, + "engagement_rate": p.get("engagement_rate") or 0, + "average_views": p.get("average_views") or 0, + "languages": p.get("content_language") or [], + } + payload = { + "me": compact_profile(me), + "candidates": [ + { + "candidate": compact_profile(c), + "rule_score": s, + "rule_reason": r + } for c, s, r in picks + ] + } + prompt = ( + "You are ranking creators for collaboration potential. " + "Rerank candidates considering niche fit, content compatibility, audience synergy, " + "and complementary strengths. Return JSON array of top items with fields: " + "[{id, reason, adjustment (number between -10 and +10)}]. " + "Keep reasons concise and actionable. Only return JSON." + f"\nINPUT JSON:\n{payload}" + ) + completion = groq.chat.completions.create( + model="meta-llama/llama-4-scout-17b-16e-instruct", + messages=[ + {"role": "system", "content": "Return only JSON."}, + {"role": "user", "content": prompt}, + ], + temperature=0.5, + max_completion_tokens=800, + top_p=1, + stream=False, + ) + content = completion.choices[0].message.content if completion.choices else "[]" + content = content.strip() + if content.startswith("```json"): + content = content[7:] + if content.startswith("```"): + content = content[3:] + if content.endswith("```"): + content = content[:-3] + ai_items = [] + try: + ai_items = json.loads(content) + except Exception: + ai_items = [] + + # Build map from id to adjustment and reason + adj = {item.get("id"): (float(item.get("adjustment", 0)), item.get("reason", "")) for item in ai_items if item.get("id")} + new_list = [] + for c, s, r in picks: + cid = c.get("id") + if cid in adj: + add, rr = adj[cid] + new_list.append((c, s + add, rr or r)) + else: + new_list.append((c, s, r)) + picks = new_list + picks.sort(key=lambda x: x[1], reverse=True) + except Exception: + # If AI rerank fails, continue with rules-based ranking + pass + + # Finalize top 'limit' + final = picks[:limit] + results: List[CreatorRecommendation] = [] + for c, s, r in final: + platforms = [] + if c.get("youtube_handle"): platforms.append("YouTube") + if c.get("instagram_handle"): platforms.append("Instagram") + if c.get("tiktok_handle"): platforms.append("TikTok") + if c.get("twitter_handle"): platforms.append("Twitter") + results.append(CreatorRecommendation( + id=c["id"], + display_name=c.get("display_name", "Unknown"), + profile_picture_url=c.get("profile_picture_url"), + primary_niche=c.get("primary_niche"), + total_followers=c.get("total_followers"), + engagement_rate=c.get("engagement_rate"), + top_platforms=platforms[:3] or None, + match_score=round(s, 2), + reason=r + )) + return results + + except HTTPException: + raise + except Exception as e: + raise HTTPException( + status_code=500, + detail=f"Error generating recommendations: {str(e)}" + ) from e + + +@router.get("/creators/{creator_id}", response_model=CreatorFullResponse) +async def get_creator_details( + creator_id: str, + creator: dict = Depends(get_current_creator) +): + """ + Get full details of a specific creator. + + - **creator_id**: The creator ID + """ + supabase = supabase_anon + + try: + # Fetch creator details + response = supabase.table("creators") \ + .select("*") \ + .eq("id", creator_id) \ + .eq("is_active", True) \ + .single() \ + .execute() + + if not response.data: + raise HTTPException( + status_code=404, + detail="Creator not found" + ) + + return response.data + + except HTTPException: + raise + except Exception as e: + if "PGRST116" in str(e): # No rows returned + raise HTTPException( + status_code=404, + detail="Creator not found" + ) from e + raise HTTPException( + status_code=500, + detail=f"Error fetching creator: {str(e)}" + ) from e + + +@router.get("/creators/niches/list") +async def list_niches( + creator: dict = Depends(get_current_creator) +): + """ + Get list of all unique primary niches for filtering. + """ + supabase = supabase_anon + + try: + # Get all unique primary niches + response = supabase.table("creators") \ + .select("primary_niche") \ + .eq("is_active", True) \ + .execute() + + creators = response.data if response.data else [] + + # Extract unique niches + niches = sorted(set(c.get("primary_niche") for c in creators if c.get("primary_niche"))) + + return {"niches": niches} + + except Exception as e: + raise HTTPException( + status_code=500, + detail=f"Error fetching niches: {str(e)}" + ) from e + diff --git a/backend/app/core/config.py b/backend/app/core/config.py index 69d5bfc..5af58d2 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -26,6 +26,9 @@ class Settings(BaseSettings): # Application Settings app_name: Optional[str] = None + # JWT Authentication + SUPABASE_JWT_SECRET: str # JWT Secret from Supabase Dashboard → Settings → API → JWT Settings + model_config = { "env_file": ".env" } diff --git a/backend/app/core/dependencies.py b/backend/app/core/dependencies.py new file mode 100644 index 0000000..5a53fd3 --- /dev/null +++ b/backend/app/core/dependencies.py @@ -0,0 +1,220 @@ +""" +FastAPI dependencies for authentication and authorization +Used across all protected endpoints +""" + +from typing import Optional +from fastapi import Depends, HTTPException, status +from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials +from app.core.security import jwt_handler +from app.core.supabase_clients import supabase_anon + + +# Security scheme for Swagger docs +security = HTTPBearer( + scheme_name="JWT Bearer Token", + description="Enter your Supabase JWT token" +) + +# Optional security scheme for endpoints that work with or without auth +optional_security = HTTPBearer( + scheme_name="JWT Bearer Token (Optional)", + description="Enter your Supabase JWT token (optional)", + auto_error=False +) + + +async def get_current_user( + credentials: HTTPAuthorizationCredentials = Depends(security) +) -> dict: + """ + Dependency to get current authenticated user from JWT token + + Usage: + @app.get("/protected") + async def protected_route(user = Depends(get_current_user)): + return {"user_id": user["id"]} + + Returns: + User profile dict with id, email, role + + Raises: + HTTPException 401: If token is invalid or user not found + """ + + # Extract token from Bearer scheme + token = credentials.credentials + + # Decode and validate token + try: + payload = jwt_handler.decode_token(token) + except HTTPException as e: + raise e + + # Get user ID from token + user_id = payload.get('sub') + + if not user_id: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid token: no user ID" + ) + + # Fetch user profile from database + try: + response = supabase_anon.table('profiles') \ + .select('*') \ + .eq('id', user_id) \ + .single() \ + .execute() + + if not response.data: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="User profile not found" + ) + + user = response.data + + # Add token email if not in profile + if 'email' not in user: + user['email'] = payload.get('email') + + return user + + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Error fetching user: {str(e)}" + ) + + +async def get_current_creator( + current_user: dict = Depends(get_current_user) +) -> dict: + """ + Dependency to verify user is a creator and get creator profile + + Usage: + @app.get("/creator-only") + async def creator_route(creator = Depends(get_current_creator)): + return {"creator_id": creator["id"]} + + Returns: + Creator profile dict + + Raises: + HTTPException 403: If user is not a creator + """ + + if current_user.get('role') != 'Creator': + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="This endpoint is only accessible to creators" + ) + + # Fetch creator profile + try: + response = supabase_anon.table('creators') \ + .select('*') \ + .eq('user_id', current_user['id']) \ + .single() \ + .execute() + + if not response.data: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Creator profile not found. Please complete onboarding." + ) + + return response.data + except Exception as e: + # Check if it's a "not found" error from Supabase + if "PGRST116" in str(e) or "No rows" in str(e): + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Creator profile not found. Please complete onboarding." + ) + # Re-raise other exceptions + raise + + +async def get_current_brand( + current_user: dict = Depends(get_current_user) +) -> dict: + """ + Dependency to verify user is a brand and get brand profile + + Usage: + @app.get("/brand-only") + async def brand_route(brand = Depends(get_current_brand)): + return {"brand_id": brand["id"]} + + Returns: + Brand profile dict + + Raises: + HTTPException 403: If user is not a brand + """ + + if current_user.get('role') != 'Brand': + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="This endpoint is only accessible to brands" + ) + + # Fetch brand profile + try: + response = supabase_anon.table('brands') \ + .select('*') \ + .eq('user_id', current_user['id']) \ + .single() \ + .execute() + + if not response.data: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Brand profile not found. Please complete onboarding." + ) + + return response.data + except HTTPException: + raise + except Exception as e: + # Check if it's a "not found" error from Supabase + if "PGRST116" in str(e) or "No rows" in str(e): + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Brand profile not found. Please complete onboarding." + ) from e + # Re-raise other exceptions + raise + + +async def get_optional_user( + credentials: Optional[HTTPAuthorizationCredentials] = Depends(optional_security) +) -> Optional[dict]: + """ + Dependency for endpoints that work with or without authentication + + Usage: + @app.get("/public-but-personalized") + async def route(user = Depends(get_optional_user)): + if user: + return {"message": f"Hello {user['name']}"} + return {"message": "Hello guest"} + + Returns: + User profile dict if authenticated, None otherwise + """ + + if not credentials: + return None + + try: + return await get_current_user(credentials) + try: + return await get_current_user(credentials) + except HTTPException: + return None + diff --git a/backend/app/core/security.py b/backend/app/core/security.py new file mode 100644 index 0000000..23dd3a7 --- /dev/null +++ b/backend/app/core/security.py @@ -0,0 +1,172 @@ +""" +Security utilities for JWT authentication +Handles token validation, creation, and user verification +""" + +import jwt +from datetime import datetime, timedelta +from typing import Optional, Dict, Any +from fastapi import HTTPException, status +from app.core.config import settings + + +class JWTHandler: + """Handle all JWT operations with Supabase tokens""" + + def __init__(self): + try: + self.secret_key = settings.SUPABASE_JWT_SECRET + if not self.secret_key: + raise ValueError("SUPABASE_JWT_SECRET is not set in environment variables") + except Exception as e: + raise ValueError(f"Failed to load JWT secret: {str(e)}. Please set SUPABASE_JWT_SECRET in your .env file.") + self.algorithm = "HS256" # Supabase uses HS256 for legacy keys + + def decode_token(self, token: str) -> Dict[str, Any]: + """ + Decode and validate JWT token from Supabase + + Args: + token: JWT token string (without 'Bearer ' prefix) + + Returns: + Decoded payload with user information + + Raises: + HTTPException: If token is invalid or expired + """ + try: + # Decode JWT using Supabase secret + # Note: We don't verify audience as Supabase tokens may have different audience claims + # The signature verification is sufficient for security + payload = jwt.decode( + token, + self.secret_key, + algorithms=[self.algorithm], + options={ + "verify_signature": True, + "verify_exp": True, + "verify_iat": True, + "verify_aud": False # Disable audience verification for Supabase tokens + } + ) + + # Validate required fields + if 'sub' not in payload: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid token: missing user ID" + ) + + return payload + + except jwt.ExpiredSignatureError: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Token has expired" + ) + + except jwt.InvalidTokenError as e: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail=f"Invalid token: {str(e)}" + ) + + def verify_token(self, token: str) -> bool: + """ + Quick token validation without full decode + + Args: + token: JWT token string + + Returns: + True if valid, False otherwise + """ + try: + jwt.decode( + token, + self.secret_key, + algorithms=[self.algorithm], + options={ + "verify_signature": True, + "verify_exp": True, + "verify_aud": False # Disable audience verification + } + ) + return True + except: + return False + + def get_user_id_from_token(self, token: str) -> str: + """ + Extract user ID from token + + Args: + token: JWT token string + + Returns: + User UUID as string + """ + payload = self.decode_token(token) + return payload['sub'] + + def get_user_email_from_token(self, token: str) -> Optional[str]: + """ + Extract user email from token + + Args: + token: JWT token string + + Returns: + User email if present in token + """ + payload = self.decode_token(token) + return payload.get('email') + + def get_user_role_from_token(self, token: str) -> Optional[str]: + """ + Extract user role from token metadata + + Args: + token: JWT token string + + Returns: + User role (creator/brand) if present + """ + payload = self.decode_token(token) + + # Supabase stores custom claims in user_metadata or app_metadata + user_metadata = payload.get('user_metadata', {}) + app_metadata = payload.get('app_metadata', {}) + + return ( + user_metadata.get('role') or + app_metadata.get('role') or + payload.get('role') + ) + + +# Singleton instance +jwt_handler = JWTHandler() + + +# Legacy functions for backward compatibility (optional - requires passlib) +def verify_password(plain_password: str, hashed_password: str) -> bool: + """Verify a password against a hash (if needed for custom auth)""" + try: + from passlib.context import CryptContext + pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") + return pwd_context.verify(plain_password, hashed_password) + except ImportError: + raise ImportError("passlib is required for password verification. Install with: pip install passlib[bcrypt]") + + +def get_password_hash(password: str) -> str: + """Hash a password (if needed for custom auth)""" + try: + from passlib.context import CryptContext + pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") + return pwd_context.hash(password) + except ImportError: + raise ImportError("passlib is required for password hashing. Install with: pip install passlib[bcrypt]") + diff --git a/backend/app/main.py b/backend/app/main.py index d6a8068..dd950a1 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -7,6 +7,8 @@ from app.api.routes import gemini_generate from app.api.routes import campaigns from app.api.routes import groq_generate +from app.api.routes import collaborations +from app.api.routes import creators app = FastAPI(title="Inpact Backend", version="0.1.0") # Verify Supabase client initialization on startup @@ -34,6 +36,8 @@ app.include_router(auth.router) app.include_router(campaigns.router) app.include_router(groq_generate.router) +app.include_router(collaborations.router) +app.include_router(creators.router) @app.get("/") def root(): diff --git a/backend/app/models/token.py b/backend/app/models/token.py new file mode 100644 index 0000000..426b0b6 --- /dev/null +++ b/backend/app/models/token.py @@ -0,0 +1,26 @@ +""" +Pydantic models for authentication tokens +""" + +from pydantic import BaseModel +from typing import Optional + + +class Token(BaseModel): + """JWT access token response""" + access_token: str + token_type: str = "bearer" + expires_in: int # seconds + + +class TokenData(BaseModel): + """Decoded token data""" + user_id: str + email: Optional[str] = None + role: Optional[str] = None + + +class TokenRefresh(BaseModel): + """Token refresh request""" + refresh_token: str + diff --git a/backend/env_example b/backend/env_example index 6335d97..627a208 100644 --- a/backend/env_example +++ b/backend/env_example @@ -26,3 +26,9 @@ GEMINI_API_KEY=your-gemini-api-key-here # CORS Origins (comma-separated) ALLOWED_ORIGINS=http://localhost:3000,http://localhost:3001 + +# JWT Secret Key from Supabase +# Location: Dashboard → Project Settings → API → JWT Settings → JWT Secret +# Use the JWT Secret (NOT the anon key!) + +SUPABASE_JWT_SECRET=your-jwt-secret-from-supabase-dashboard diff --git a/backend/requirements.txt b/backend/requirements.txt index 8ebaadc..f6fcc5f 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -23,4 +23,6 @@ typing-inspection==0.4.2 typing_extensions==4.15.0 requests +groq==0.11.0 uvicorn==0.38.0 +pyjwt[crypto]==2.8.0 diff --git a/backend/test_jwt.py b/backend/test_jwt.py new file mode 100644 index 0000000..ded0b39 --- /dev/null +++ b/backend/test_jwt.py @@ -0,0 +1,109 @@ +""" +Test script to verify JWT authentication +Run: python test_jwt.py +""" + +import requests +import os +from dotenv import load_dotenv + +load_dotenv() + +# Configuration +API_URL = "http://localhost:8000" +SUPABASE_URL = os.getenv("SUPABASE_URL") +SUPABASE_ANON_KEY = os.getenv("SUPABASE_KEY") + +# Test user credentials (update with your test account) +TEST_EMAIL = "test@example.com" +TEST_PASSWORD = "Test123!@#" + + +def get_jwt_token(): + """Login and get JWT token from Supabase""" + + # Using Supabase Auth API directly + response = requests.post( + f"{SUPABASE_URL}/auth/v1/token?grant_type=password", + headers={ + "apikey": SUPABASE_ANON_KEY, + "Content-Type": "application/json" + }, + json={ + "email": TEST_EMAIL, + "password": TEST_PASSWORD + }, + timeout=10 + ) + + if response.status_code != 200: + print(f"Login failed: {response.text}") + return None + + data = response.json() + return data.get("access_token") + + +def test_protected_endpoint(token): + """Test accessing protected endpoint with JWT""" + + response = requests.get( + f"{API_URL}/campaigns", + headers={ + "Authorization": f"Bearer {token}" + }, + timeout=10 + ) + + print(f"Status: {response.status_code}") + print(f"Response: {response.json()}") + + return response.status_code == 200 + + +def test_invalid_token(): + """Test with invalid token (should fail)""" + + response = requests.get( + f"{API_URL}/campaigns", + headers={ + "Authorization": "Bearer invalid_token_here" + }, + timeout=10 + ) + + print(f"Invalid token status: {response.status_code}") + print(f"Error message: {response.json()}") + + return response.status_code == 401 + + +if __name__ == "__main__": + print("=== JWT Authentication Test ===\n") + + # Test 1: Get JWT token + print("1. Getting JWT token...") + token = get_jwt_token() + + if not token: + print("❌ Failed to get JWT token") + exit(1) + + print(f"✅ Got JWT token: {token[:50]}...\n") + + # Test 2: Access protected endpoint + print("2. Testing protected endpoint with valid token...") + if test_protected_endpoint(token): + print("✅ Successfully accessed protected endpoint\n") + else: + print("❌ Failed to access protected endpoint\n") + + # Test 3: Test invalid token + print("3. Testing with invalid token (should fail)...") + if test_invalid_token(): + print("✅ Correctly rejected invalid token\n") + else: + print("❌ Invalid token was accepted (security issue!)\n") + + print("=== Tests Complete ===") +