11import json
22import os
33import shutil
4+ import uuid
45from contextlib import asynccontextmanager
5- from datetime import datetime , timezone , timedelta
6+ from datetime import datetime , timedelta , timezone
67from pathlib import Path
7- import re
88from typing import Annotated , Any
9- import uuid
109
1110import aiofiles
1211import redis .asyncio as aioredis
2726)
2827from mxgo .models import TaskStatus
2928from mxgo .reply_generation import generate_replies
30- from mxgo .scheduling .scheduler import Scheduler , is_one_time_task
3129from mxgo .scheduling .scheduled_task_executor import execute_scheduled_task
30+ from mxgo .scheduling .scheduler import Scheduler , is_one_time_task
3231from mxgo .schemas import (
3332 CreateNewsletterRequest ,
3433 CreateNewsletterResponse ,
35- HandlerAlias ,
3634 EmailAttachment ,
3735 EmailRequest ,
3836 EmailSuggestionRequest ,
3937 EmailSuggestionResponse ,
4038 GenerateEmailReplyRequest ,
39+ HandlerAlias ,
4140 NewsletterUsageInfo ,
4241 ReplyCandidate ,
4342 UsageInfo ,
@@ -879,26 +878,9 @@ async def generate_email_replies(
879878 ) from e
880879
881880
882- @app .post ("/create-newsletter" , response_model = CreateNewsletterResponse )
883- async def create_newsletter (
884- request : CreateNewsletterRequest ,
885- current_user : Annotated [AuthInfo , Depends (get_current_user )],
886- _token : Annotated [str , Depends (bearer_auth_scheme )] = ...,
887- ) -> CreateNewsletterResponse :
888- """
889- Create and schedule a recurring newsletter task for the authenticated user.
890-
891- Args:
892- request: The email generate response request.
893- current_user: The authenticated user from JWT token.
894-
895- Returns:
896- CreateNewsletterResponse: A response object indicating success, whitelist status, and created task IDs.
897- """
898- user_email = current_user .email
899- logger .info (f"Received newsletter creation request for user: { user_email } " )
900-
901- # Combine all instructions into a single prompt
881+ # Helper functions for create_newsletter
882+ def _build_newsletter_instructions (request : CreateNewsletterRequest ) -> str :
883+ """Builds the full instruction string from the request."""
902884 full_instructions = [f"PROMPT: { request .prompt } " ]
903885 if request .estimated_read_time :
904886 full_instructions .append (f"ESTIMATED READ TIME: { request .estimated_read_time } minutes" )
@@ -908,15 +890,11 @@ async def create_newsletter(
908890 full_instructions .append (f"GEOGRAPHIC FOCUS: { ', ' .join (request .geographic_locations )} " )
909891 if request .formatting_instructions :
910892 full_instructions .append (f"FORMATTING INSTRUCTIONS: { request .formatting_instructions } " )
911- distilled_instructions = "\n \n " .join (full_instructions )
893+ return "\n \n " .join (full_instructions )
912894
913- # Convert schedule options to cron expressions
914- try :
915- cron_expressions = convert_schedule_to_cron_list (request .schedule )
916- except ValueError as e :
917- raise HTTPException (status_code = status .HTTP_400_BAD_REQUEST , detail = str (e )) from e
918895
919- # Plan and Limit Validation
896+ async def _validate_newsletter_limits (user_email : str , cron_expressions : list [str ]):
897+ """Validates the user's plan limits for newsletters."""
920898 user_plan = await user .get_user_plan (user_email )
921899 plan_limits = NEWSLETTER_LIMITS_BY_PLAN .get (user_plan , NEWSLETTER_LIMITS_BY_PLAN [UserPlan .BETA ])
922900 min_interval = timedelta (days = plan_limits ["min_interval_days" ])
@@ -933,85 +911,63 @@ async def create_newsletter(
933911 f"(max: { plan_limits ['max_tasks' ]} )." ,
934912 )
935913
936- # Cron Validitiy Check
937914 for cron_expr in cron_expressions :
938- try :
939- if not is_one_time_task (cron_expr ):
940- interval = calculate_cron_interval (cron_expr )
941- if interval < min_interval :
942- raise HTTPException (
943- status_code = status .HTTP_400_BAD_REQUEST ,
944- detail = f"Cron interval is too frequent for { user_plan .value } plan. "
945- f"Minimum interval is { plan_limits ['min_interval_days' ]} days." ,
946- )
947- except ValueError as e :
948- raise HTTPException (status_code = status .HTTP_400_BAD_REQUEST , detail = f"Invalid cron expression: { e } " ) from e
949-
950- # Whitelist Check
951- exists_in_whitelist , is_verified = await whitelist .is_email_whitelisted (user_email )
952- is_whitelisted = exists_in_whitelist and is_verified
915+ if not is_one_time_task (cron_expr ):
916+ interval = calculate_cron_interval (cron_expr )
917+ if interval < min_interval :
918+ raise HTTPException (
919+ status_code = status .HTTP_400_BAD_REQUEST ,
920+ detail = f"Cron interval is too frequent for { user_plan .value } plan. "
921+ f"Minimum interval is { plan_limits ['min_interval_days' ]} days." ,
922+ )
953923
954- # Task Creation
955- created_task_ids = []
956- scheduler = Scheduler ()
957- is_scheduled = False
958924
959- for cron_expr in cron_expressions :
960- task_id = str (uuid .uuid4 ())
961- scheduler_job_id = f"task_{ task_id } "
925+ def _create_and_schedule_task (user_email : str , cron_expr : str , distilled_instructions : str , prompt : str ) -> str :
926+ """Creates a single newsletter task and schedules it."""
927+ task_id = str (uuid .uuid4 ())
928+ scheduler_job_id = f"task_{ task_id } "
929+ email_for_task = EmailRequest (
930+ from_email = user_email ,
931+ 932+ subject = f"Newsletter: { prompt [:50 ]} ..." ,
933+ distilled_processing_instructions = distilled_instructions ,
934+ distilled_alias = HandlerAlias .ASK ,
935+ messageId = f"<newsletter-{ task_id } -{ datetime .now (timezone .utc ).isoformat ()} @mxgo.ai>" ,
936+ parent_message_id = f"<newsletter-parent-{ task_id } @mxgo.ai>" ,
937+ )
962938
963- email_for_task = EmailRequest (
964- from_email = user_email ,
965- 966- subject = f"Newsletter: { request .prompt [:50 ]} ..." ,
967- distilled_processing_instructions = distilled_instructions ,
968- distilled_alias = HandlerAlias .ASK ,
969- messageId = f"<newsletter-{ task_id } -{ datetime .now (timezone .utc ).isoformat ()} @mxgo.ai>" ,
970- parent_message_id = f"<newsletter-parent-{ task_id } @mxgo.ai>" ,
939+ db_connection = init_db_connection ()
940+ with db_connection .get_session () as session :
941+ crud .create_task (
942+ session = session ,
943+ task_id = task_id ,
944+ email_id = user_email ,
945+ cron_expression = cron_expr ,
946+ email_request = email_for_task .model_dump (by_alias = True ),
947+ scheduler_job_id = scheduler_job_id ,
948+ status = TaskStatus .INITIALISED ,
971949 )
972950
973- try :
974- with db_connection .get_session () as session :
975- crud .create_task (
976- session = session ,
977- task_id = task_id ,
978- email_id = user_email ,
979- cron_expression = cron_expr ,
980- email_request = email_for_task .model_dump (by_alias = True ),
981- scheduler_job_id = scheduler_job_id ,
982- status = TaskStatus .INITIALISED ,
983- )
984-
985- scheduler .add_job (
986- job_id = scheduler_job_id ,
987- cron_expression = cron_expr ,
988- func = execute_scheduled_task ,
989- args = [task_id ],
990- )
951+ scheduler = Scheduler ()
952+ scheduler .add_job (job_id = scheduler_job_id , cron_expression = cron_expr , func = execute_scheduled_task , args = [task_id ])
991953
992- with db_connection .get_session () as session :
993- crud .update_task_status (session , task_id , TaskStatus .ACTIVE )
954+ with db_connection .get_session () as session :
955+ crud .update_task_status (session , task_id , TaskStatus .ACTIVE )
994956
995- created_task_ids .append (task_id )
996- is_scheduled = True
997- logger .info (f"Newsletter task { task_id } for { user_email } scheduled successfully." )
957+ logger .info (f"Newsletter task { task_id } for { user_email } scheduled successfully." )
958+ return task_id
998959
999- except Exception as e :
1000- logger .error (f"Failed to schedule newsletter task with cron '{ cron_expr } ' for { user_email } : { e } " )
1001- with db_connection .get_session () as session :
1002- for tid in created_task_ids :
1003- crud .delete_task (session , tid )
1004- raise HTTPException (status_code = 500 , detail = "Failed to schedule one or more newsletter tasks." ) from e
1005960
1006- # Sample Execution / Whitelist Action
1007- sample_email_sent = False
1008- if is_whitelisted and created_task_ids [0 ]:
961+ async def _handle_post_creation_action (
962+ user_email : str , * , is_whitelisted : bool , first_task_id : str , distilled_instructions : str , prompt : str
963+ ):
964+ """Sends a sample email if the user is whitelisted, otherwise triggers verification."""
965+ if is_whitelisted :
1009966 logger .info (f"User { user_email } is whitelisted. Sending sample newsletter." )
1010- first_task_id = created_task_ids [0 ]
1011967 sample_email_request = EmailRequest (
1012968 from_email = user_email ,
10139691014- subject = f"[SAMPLE] Newsletter: { request . prompt [:40 ]} ..." ,
970+ subject = f"[SAMPLE] Newsletter: { prompt [:40 ]} ..." ,
1015971 distilled_processing_instructions = distilled_instructions ,
1016972 distilled_alias = HandlerAlias .ASK ,
1017973 messageId = f"<newsletter-sample-{ first_task_id } -{ datetime .now (timezone .utc ).isoformat ()} @mxgo.ai>" ,
@@ -1021,18 +977,69 @@ async def create_newsletter(
1021977 sample_email_request .model_dump (),
1022978 email_attachments_dir = "" ,
1023979 attachment_info = [],
1024- scheduled_task_id = task_id ,
980+ scheduled_task_id = first_task_id ,
981+ )
982+ return True
983+
984+ logger .info (f"User { user_email } is not whitelisted. Triggering verification." )
985+ try :
986+ await whitelist .trigger_automatic_verification (user_email )
987+ except Exception as e :
988+ logger .error (f"Error triggering whitelist verification for { user_email } : { e } " )
989+ return False
990+
991+
992+ @app .post ("/create-newsletter" )
993+ async def create_newsletter (
994+ request : CreateNewsletterRequest ,
995+ current_user : Annotated [AuthInfo , Depends (get_current_user )],
996+ _token : Annotated [str , Depends (bearer_auth_scheme )] = ...,
997+ ) -> CreateNewsletterResponse :
998+ """
999+ Create and schedule a recurring newsletter task for the authenticated user.
1000+ """
1001+ user_email = current_user .email
1002+ logger .info (f"Received newsletter creation request for user: { user_email } " )
1003+
1004+ distilled_instructions = _build_newsletter_instructions (request )
1005+
1006+ try :
1007+ cron_expressions = convert_schedule_to_cron_list (request .schedule )
1008+ except ValueError as e :
1009+ raise HTTPException (status_code = status .HTTP_400_BAD_REQUEST , detail = str (e )) from e
1010+
1011+ await _validate_newsletter_limits (user_email , cron_expressions )
1012+
1013+ exists_in_whitelist , is_verified = await whitelist .is_email_whitelisted (user_email )
1014+ is_whitelisted = exists_in_whitelist and is_verified
1015+
1016+ created_task_ids = []
1017+ try :
1018+ for cron_expr in cron_expressions :
1019+ task_id = _create_and_schedule_task (user_email , cron_expr , distilled_instructions , request .prompt )
1020+ created_task_ids .append (task_id )
1021+ except Exception as e :
1022+ logger .error (f"Failed to schedule one or more newsletter tasks for { user_email } : { e } " )
1023+
1024+ # Rollback created tasks if any failed
1025+ db_connection = init_db_connection ()
1026+ with db_connection .get_session () as session :
1027+ for tid in created_task_ids :
1028+ crud .delete_task (session , tid )
1029+ raise HTTPException (status_code = 500 , detail = "Failed to schedule one or more newsletter tasks." ) from e
1030+
1031+ sample_email_sent = False
1032+ if created_task_ids :
1033+ sample_email_sent = await _handle_post_creation_action (
1034+ user_email ,
1035+ is_whitelisted = is_whitelisted ,
1036+ first_task_id = created_task_ids [0 ],
1037+ distilled_instructions = distilled_instructions ,
1038+ prompt = request .prompt ,
10251039 )
1026- sample_email_sent = True
1027- elif not is_whitelisted :
1028- logger .info (f"User { user_email } is not whitelisted. Triggering verification." )
1029- try :
1030- await whitelist .trigger_automatic_verification (user_email )
1031- except Exception as e :
1032- logger .error (f"Error triggering whitelist verification for { user_email } : { e } " )
10331040
10341041 return CreateNewsletterResponse (
1035- is_scheduled = is_scheduled ,
1042+ is_scheduled = bool ( created_task_ids ) ,
10361043 is_whitelisted = is_whitelisted ,
10371044 sample_email_sent = sample_email_sent ,
10381045 scheduled_task_ids = created_task_ids ,
@@ -1084,8 +1091,7 @@ async def get_user_info(
10841091 current_newsletter_count = crud .count_active_tasks_for_user (session , current_user .email )
10851092
10861093 newsletter_usage = NewsletterUsageInfo (
1087- current_count = current_newsletter_count ,
1088- max_allowed = max_newsletters_allowed
1094+ current_count = current_newsletter_count , max_allowed = max_newsletters_allowed
10891095 )
10901096
10911097 # Get usage information
@@ -1122,7 +1128,7 @@ async def get_user_info(
11221128 subscription_info = subscription_info ,
11231129 plan_name = user_plan .value ,
11241130 usage_info = usage_info ,
1125- newsletter_usage = newsletter_usage
1131+ newsletter_usage = newsletter_usage ,
11261132 )
11271133
11281134 except Exception as e :
0 commit comments