2828from mxgo .models import TaskStatus
2929from mxgo .prompts .template_prompts import NEWSLETTER_TEMPLATE
3030from mxgo .reply_generation import generate_replies
31+ from mxgo .routed_litellm_model import RoutedLiteLLMModel
3132from mxgo .scheduling .scheduled_task_executor import execute_scheduled_task
3233from mxgo .scheduling .scheduler import Scheduler , is_one_time_task
3334from mxgo .schemas import (
4142 HandlerAlias ,
4243 NewsletterUsageInfo ,
4344 ReplyCandidate ,
45+ ScheduleType ,
4446 UsageInfo ,
4547 UsagePeriod ,
4648 UserInfoResponse ,
5052from mxgo .tasks import process_email_task , rabbitmq_broker
5153from mxgo .utils import calculate_cron_interval , convert_schedule_to_cron_list
5254from mxgo .validators import (
55+ check_rate_limit_redis ,
5356 get_current_usage_redis ,
5457 validate_api_key ,
5558 validate_attachments ,
@@ -905,9 +908,53 @@ async def generate_email_replies(
905908
906909
907910# Helper functions for create_newsletter
911+ async def _generate_newsletter_subject (prompt : str , * , is_sample : bool = False ) -> str :
912+ """
913+ Generate a descriptive email subject for the newsletter using LLM.
914+
915+ Args:
916+ prompt: The user's newsletter prompt/instructions
917+ is_sample: If True, prepend [SAMPLE] to the subject
918+
919+ Returns:
920+ A descriptive email subject for the newsletter
921+
922+ """
923+ try :
924+
925+ model = RoutedLiteLLMModel (
926+ target_model = os .getenv ("LITELLM_SUGGESTIONS_MODEL_GROUP" , "gpt-4" ),
927+ flatten_messages_as_text = False ,
928+ )
929+
930+ response = model (
931+ messages = [{"role" : "user" , "content" : f'Generate a concise, single line email subject prefixed with "Newsletter:" for a newsletter based on these instructions: { prompt } ' }],
932+ temperature = 0.3 ,
933+ )
934+ subject = response .content
935+ except Exception as e :
936+ logger .warning (f"Failed to generate newsletter subject via LLM: { e } " )
937+ # Fallback to generic prompt
938+ fallback = "Generate newsletter as per the following instructions"
939+ if is_sample :
940+ return f"[SAMPLE] Newsletter: { fallback } "
941+ return f"Newsletter: { fallback } "
942+ else :
943+ if is_sample :
944+ return f"[SAMPLE] { subject } "
945+ return subject
946+
947+
908948def _build_newsletter_instructions (request : CreateNewsletterRequest ) -> str :
909949 """Builds the full instruction string from the request using the NEWSLETTER template."""
910950 user_instructions = []
951+
952+ # Add current date context for time-sensitive queries
953+ today = datetime .now (timezone .utc ).strftime ("%B %d, %Y" )
954+ user_instructions .append (
955+ f"- **Current Date**: Today is { today } . When the user mentions 'latest', 'recent', 'this year', etc., focus on the most current information available."
956+ )
957+
911958 if request .estimated_read_time :
912959 user_instructions .append (
913960 f"- **Target Read Time**: The newsletter should be concise enough to be read in approximately { request .estimated_read_time } minutes."
@@ -920,6 +967,10 @@ def _build_newsletter_instructions(request: CreateNewsletterRequest) -> str:
920967 user_instructions .append (
921968 f"- **Geographic Focus**: The content should be primarily relevant to the following locations: { ', ' .join (request .geographic_locations )} ."
922969 )
970+ if request .language :
971+ user_instructions .append (
972+ f"- **Language**: Write the newsletter in { request .language } ."
973+ )
923974 if request .formatting_instructions :
924975 user_instructions .append (
925976 f"- **Formatting Rules**: Strictly follow these formatting instructions: { request .formatting_instructions } ."
@@ -936,24 +987,52 @@ def _build_newsletter_instructions(request: CreateNewsletterRequest) -> str:
936987 return NEWSLETTER_TEMPLATE .format (prompt = request .prompt , user_instructions_section = user_instructions_section )
937988
938989
939- async def _validate_newsletter_limits (user_email : str , cron_expressions : list [str ]):
940- """Validates the user's plan limits for newsletters."""
941- # Get user plan and corresponding limits from config
990+ async def _validate_newsletter_limits (
991+ user_email : str ,
992+ schedule_type : ScheduleType ,
993+ cron_expressions : list [str ],
994+ ):
995+ """
996+ Validates the user's plan limits for newsletters.
997+ - IMMEDIATE: Checks global rate limits (hourly/daily/monthly email quota)
998+ - RECURRING/SPECIFIC_DATES: Checks newsletter-specific limits (max_tasks, min_interval)
999+ """
9421000 user_plan = await user .get_user_plan (user_email )
1001+
1002+ if schedule_type == ScheduleType .IMMEDIATE :
1003+ # For immediate newsletters, check global rate limits
1004+ plan_limits = RATE_LIMITS_BY_PLAN .get (user_plan , RATE_LIMITS_BY_PLAN [UserPlan .BETA ])
1005+ rate_limit_exceeded = await check_rate_limit_redis (
1006+ key_type = "email" ,
1007+ identifier = user .normalize_email (user_email ),
1008+ plan_or_domain_limits = plan_limits ,
1009+ current_dt = datetime .now (timezone .utc ),
1010+ plan_name_for_key = user_plan .value ,
1011+ )
1012+ if rate_limit_exceeded :
1013+ raise HTTPException (
1014+ status_code = status .HTTP_429_TOO_MANY_REQUESTS ,
1015+ detail = f"Rate limit exceeded ({ rate_limit_exceeded } ). Please try again later." ,
1016+ )
1017+ return
1018+
1019+ # For RECURRING/SPECIFIC_DATES, check newsletter-specific limits
9431020 plan_limits = NEWSLETTER_LIMITS_BY_PLAN .get (user_plan , NEWSLETTER_LIMITS_BY_PLAN [UserPlan .BETA ])
9441021 min_interval = timedelta (days = plan_limits ["min_interval_days" ])
9451022
946- # Check total task count against the plan's max tasks
1023+ # Count only recurring tasks for max_tasks limit (one-time tasks don't count)
1024+ recurring_cron_count = sum (1 for expr in cron_expressions if not is_one_time_task (expr ))
1025+
9471026 db_connection = init_db_connection ()
9481027 with db_connection .get_session () as session :
949- active_task_count = crud .count_active_tasks_for_user (session , user_email )
1028+ recurring_task_count = crud .count_recurring_tasks_for_user (session , user_email )
9501029
951- if (active_task_count + len ( cron_expressions ) ) > plan_limits ["max_tasks" ]:
1030+ if (recurring_task_count + recurring_cron_count ) > plan_limits ["max_tasks" ]:
9521031 raise HTTPException (
9531032 status_code = status .HTTP_403_FORBIDDEN ,
9541033 detail = f"Newsletter limit reached for { user_plan .value } plan. "
955- f"You have { active_task_count } active tasks and are trying to add { len ( cron_expressions ) } more "
956- f"(max: { plan_limits ['max_tasks' ]} )." ,
1034+ f"You have { recurring_task_count } recurring tasks and are trying to add { recurring_cron_count } more "
1035+ f"(max: { plan_limits ['max_tasks' ]} )."
9571036 )
9581037
9591038 # Loop through each cron expression to validate its frequency
@@ -969,14 +1048,14 @@ async def _validate_newsletter_limits(user_email: str, cron_expressions: list[st
9691048 )
9701049
9711050
972- def _create_and_schedule_task (user_email : str , cron_expr : str , distilled_instructions : str , prompt : str ) -> str : # noqa: ARG001
1051+ def _create_and_schedule_task (user_email : str , cron_expr : str , distilled_instructions : str , subject : str ) -> str :
9731052 """Creates a single newsletter task and schedules it."""
9741053 task_id = str (uuid .uuid4 ())
9751054 scheduler_job_id = f"task_{ task_id } "
9761055 email_for_task = EmailRequest (
9771056 from_email = user_email ,
9781057 to = "ask@mxgo.ai" ,
979- subject = "Generate Newsletter as per following Instructions" ,
1058+ subject = subject ,
9801059 distilled_processing_instructions = distilled_instructions ,
9811060 distilled_alias = HandlerAlias .ASK ,
9821061 messageId = f"<newsletter-{ task_id } -{ datetime .now (timezone .utc ).isoformat ()} @mxgo.ai>" ,
@@ -1014,30 +1093,49 @@ def _create_and_schedule_task(user_email: str, cron_expr: str, distilled_instruc
10141093 return task_id
10151094
10161095
1096+ def _send_newsletter_email (
1097+ user_email : str ,
1098+ subject : str ,
1099+ distilled_instructions : str ,
1100+ task_id : str ,
1101+ message_id_prefix : str = "newsletter" ,
1102+ scheduled_task_id : str | None = None ,
1103+ ) -> None :
1104+ """Sends a newsletter email via the task queue."""
1105+ email_request = EmailRequest (
1106+ from_email = user_email ,
1107+ to = "ask@mxgo.ai" ,
1108+ subject = subject ,
1109+ distilled_processing_instructions = distilled_instructions ,
1110+ distilled_alias = HandlerAlias .ASK ,
1111+ messageId = f"<{ message_id_prefix } -{ task_id } -{ datetime .now (timezone .utc ).isoformat ()} @mxgo.ai>" ,
1112+ parent_message_id = f"<newsletter-parent-{ task_id } @mxgo.ai>" ,
1113+ )
1114+ process_email_task .send (
1115+ email_request .model_dump (by_alias = True ),
1116+ email_attachments_dir = "" ,
1117+ attachment_info = [],
1118+ scheduled_task_id = scheduled_task_id ,
1119+ )
1120+
1121+
10171122async def _handle_post_creation_action (
10181123 user_email : str ,
10191124 * ,
10201125 is_whitelisted : bool ,
10211126 first_task_id : str ,
10221127 distilled_instructions : str ,
1023- prompt : str , # noqa: ARG001
1128+ subject : str ,
10241129):
10251130 """Sends a sample email if the user is whitelisted, otherwise triggers verification."""
10261131 if is_whitelisted :
10271132 logger .info (f"User { user_email } is whitelisted. Sending sample newsletter." )
1028- sample_email_request = EmailRequest (
1029- from_email = user_email ,
1030- to = "ask@mxgo.ai" ,
1031- subject = "[SAMPLE] Generate Newsletter as per following Instructions" ,
1032- distilled_processing_instructions = distilled_instructions ,
1033- distilled_alias = HandlerAlias .ASK ,
1034- messageId = f"<newsletter-sample-{ first_task_id } -{ datetime .now (timezone .utc ).isoformat ()} @mxgo.ai>" ,
1035- parent_message_id = f"<newsletter-parent-{ first_task_id } @mxgo.ai>" ,
1036- )
1037- process_email_task .send (
1038- sample_email_request .model_dump (by_alias = True ),
1039- email_attachments_dir = "" ,
1040- attachment_info = [],
1133+ _send_newsletter_email (
1134+ user_email = user_email ,
1135+ subject = f"[SAMPLE] { subject } " ,
1136+ distilled_instructions = distilled_instructions ,
1137+ task_id = first_task_id ,
1138+ message_id_prefix = "newsletter-sample" ,
10411139 scheduled_task_id = first_task_id ,
10421140 )
10431141 return True
@@ -1062,23 +1160,26 @@ async def create_newsletter(
10621160 user_email = current_user .email
10631161 logger .info (f"Received newsletter creation request for user: { user_email } " )
10641162
1065- if validators .redis_client :
1066- redis_key = f"newsletter_request:{ request .request_id } "
1067- existing_task_ids_json = await validators .redis_client .get (redis_key )
1068- if existing_task_ids_json :
1069- logger .info (
1070- f"Duplicate request_id { request .request_id } detected for { user_email } , returning existing tasks from Redis"
1071- )
1072- existing_task_ids = json .loads (existing_task_ids_json )
1073- raise HTTPException (
1074- status_code = status .HTTP_409_CONFLICT ,
1075- detail = {
1076- "message" : "This request has already been processed." ,
1077- "status" : "duplicate" ,
1078- "request_id" : request .request_id ,
1079- "scheduled_task_ids" : existing_task_ids ,
1080- },
1081- )
1163+ async def check_duplicate ():
1164+ if validators .redis_client :
1165+ redis_key = f"newsletter_request:{ request .request_id } "
1166+ existing_task_ids_json = await validators .redis_client .get (redis_key )
1167+ if existing_task_ids_json :
1168+ logger .info (
1169+ f"Duplicate request_id { request .request_id } detected for { user_email } , returning existing tasks from Redis"
1170+ )
1171+ existing_task_ids = json .loads (existing_task_ids_json )
1172+ raise HTTPException (
1173+ status_code = status .HTTP_409_CONFLICT ,
1174+ detail = {
1175+ "message" : "This request has already been processed." ,
1176+ "status" : "duplicate" ,
1177+ "request_id" : request .request_id ,
1178+ "scheduled_task_ids" : existing_task_ids ,
1179+ },
1180+ )
1181+
1182+ await check_duplicate ()
10821183
10831184 distilled_instructions = _build_newsletter_instructions (request )
10841185
@@ -1087,36 +1188,82 @@ async def create_newsletter(
10871188 except ValueError as e :
10881189 raise HTTPException (status_code = status .HTTP_400_BAD_REQUEST , detail = str (e )) from e
10891190
1090- await _validate_newsletter_limits (user_email , cron_expressions )
1191+ await _validate_newsletter_limits (user_email , request . schedule . type , cron_expressions )
10911192
10921193 exists_in_whitelist , is_verified = await whitelist .is_email_whitelisted (user_email )
10931194 is_whitelisted = exists_in_whitelist and is_verified
10941195
1095- created_task_ids = []
1096- try :
1097- for cron_expr in cron_expressions :
1098- task_id = _create_and_schedule_task (user_email , cron_expr , distilled_instructions , request .prompt )
1099- created_task_ids .append (task_id )
1100- except Exception as e :
1101- logger .error (f"Failed to schedule one or more newsletter tasks for { user_email } : { e } " )
1196+ # Handle IMMEDIATE schedule type separately - send directly without scheduling
1197+ if request .schedule .type == ScheduleType .IMMEDIATE :
1198+ if not is_whitelisted :
1199+ # Trigger verification for non-whitelisted users
1200+ try :
1201+ await whitelist .trigger_newsletter_verification (user_email )
1202+ except Exception as e :
1203+ logger .error (f"Error triggering whitelist verification for { user_email } : { e } " )
1204+ return CreateNewsletterResponse (
1205+ is_scheduled = False ,
1206+ is_whitelisted = False ,
1207+ sample_email_sent = False ,
1208+ scheduled_task_ids = [],
1209+ )
11021210
1103- # Rollback created tasks if any failed
1104- scheduler = Scheduler ()
1105- db_connection = init_db_connection ()
1106- with db_connection .get_session () as session :
1107- for tid in created_task_ids :
1108- crud .delete_task (session , tid )
1211+ # For whitelisted users, send immediately without creating a scheduled task
1212+ task_id = str (uuid .uuid4 ())
1213+ subject = await _generate_newsletter_subject (request .prompt , is_sample = False )
1214+ _send_newsletter_email (
1215+ user_email = user_email ,
1216+ subject = subject ,
1217+ distilled_instructions = distilled_instructions ,
1218+ task_id = task_id ,
1219+ message_id_prefix = "newsletter-immediate" ,
1220+ )
1221+ logger .info (f"Immediate newsletter sent for { user_email } " )
11091222
1110- try :
1111- scheduler .remove_job (f"task_{ tid } " )
1112- logger .info (f"Removed scheduler job for rolled-back task { tid } " )
1113- except Exception as scheduler_e :
1114- logger .error (f"Failed to remove scheduler job for task { tid } : { scheduler_e } " )
1223+ if validators .redis_client :
1224+ redis_key = f"newsletter_request:{ request .request_id } "
1225+ await validators .redis_client .setex (redis_key , 86400 , json .dumps ([task_id ]))
11151226
1116- raise HTTPException (
1117- status_code = status .HTTP_500_INTERNAL_SERVER_ERROR ,
1118- detail = "Failed to schedule one or more newsletter tasks." ,
1119- ) from e
1227+ return CreateNewsletterResponse (
1228+ is_scheduled = False ,
1229+ is_whitelisted = True ,
1230+ sample_email_sent = True ,
1231+ scheduled_task_ids = [task_id ],
1232+ )
1233+
1234+ # For SPECIFIC_DATES and RECURRING_WEEKLY, proceed with scheduling
1235+ subject = await _generate_newsletter_subject (request .prompt , is_sample = False )
1236+
1237+ def schedule_tasks ():
1238+ created_task_ids = []
1239+ try :
1240+ for cron_expr in cron_expressions :
1241+ task_id = _create_and_schedule_task (user_email , cron_expr , distilled_instructions , subject )
1242+ created_task_ids .append (task_id )
1243+ except Exception as e :
1244+ logger .error (f"Failed to schedule one or more newsletter tasks for { user_email } : { e } " )
1245+
1246+ # Rollback created tasks if any failed
1247+ scheduler = Scheduler ()
1248+ db_connection = init_db_connection ()
1249+ with db_connection .get_session () as session :
1250+ for tid in created_task_ids :
1251+ crud .delete_task (session , tid )
1252+
1253+ try :
1254+ scheduler .remove_job (f"task_{ tid } " )
1255+ logger .info (f"Removed scheduler job for rolled-back task { tid } " )
1256+ except Exception as scheduler_e :
1257+ logger .error (f"Failed to remove scheduler job for task { tid } : { scheduler_e } " )
1258+
1259+ raise HTTPException (
1260+ status_code = status .HTTP_500_INTERNAL_SERVER_ERROR ,
1261+ detail = "Failed to schedule one or more newsletter tasks." ,
1262+ ) from e
1263+ else :
1264+ return created_task_ids
1265+
1266+ created_task_ids = schedule_tasks ()
11201267
11211268 sample_email_sent = False
11221269 if created_task_ids :
@@ -1125,7 +1272,7 @@ async def create_newsletter(
11251272 is_whitelisted = is_whitelisted ,
11261273 first_task_id = created_task_ids [0 ],
11271274 distilled_instructions = distilled_instructions ,
1128- prompt = request . prompt ,
1275+ subject = subject ,
11291276 )
11301277
11311278 if validators .redis_client and created_task_ids :
@@ -1183,7 +1330,7 @@ async def get_user_info(
11831330 max_newsletters_allowed = newsletter_limits_config ["max_tasks" ]
11841331
11851332 with init_db_connection ().get_session () as session :
1186- current_newsletter_count = crud .count_active_tasks_for_user (session , current_user .email )
1333+ current_newsletter_count = crud .count_recurring_tasks_for_user (session , current_user .email )
11871334
11881335 newsletter_usage = NewsletterUsageInfo (
11891336 current_count = current_newsletter_count , max_allowed = max_newsletters_allowed
0 commit comments