Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
279 changes: 213 additions & 66 deletions mxgo/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from mxgo.models import TaskStatus
from mxgo.prompts.template_prompts import NEWSLETTER_TEMPLATE
from mxgo.reply_generation import generate_replies
from mxgo.routed_litellm_model import RoutedLiteLLMModel
from mxgo.scheduling.scheduled_task_executor import execute_scheduled_task
from mxgo.scheduling.scheduler import Scheduler, is_one_time_task
from mxgo.schemas import (
Expand All @@ -41,6 +42,7 @@
HandlerAlias,
NewsletterUsageInfo,
ReplyCandidate,
ScheduleType,
UsageInfo,
UsagePeriod,
UserInfoResponse,
Expand All @@ -50,6 +52,7 @@
from mxgo.tasks import process_email_task, rabbitmq_broker
from mxgo.utils import calculate_cron_interval, convert_schedule_to_cron_list
from mxgo.validators import (
check_rate_limit_redis,
get_current_usage_redis,
validate_api_key,
validate_attachments,
Expand Down Expand Up @@ -905,9 +908,53 @@ async def generate_email_replies(


# Helper functions for create_newsletter
async def _generate_newsletter_subject(prompt: str, *, is_sample: bool = False) -> str:
"""
Generate a descriptive email subject for the newsletter using LLM.

Args:
prompt: The user's newsletter prompt/instructions
is_sample: If True, prepend [SAMPLE] to the subject

Returns:
A descriptive email subject for the newsletter

"""
try:

model = RoutedLiteLLMModel(
target_model=os.getenv("LITELLM_SUGGESTIONS_MODEL_GROUP", "gpt-4"),
flatten_messages_as_text=False,
)

response = model(
messages=[{"role": "user", "content": f'Generate a concise, single line email subject prefixed with "Newsletter:" for a newsletter based on these instructions: {prompt}'}],
temperature=0.3,
)
subject = response.content
except Exception as e:
logger.warning(f"Failed to generate newsletter subject via LLM: {e}")
# Fallback to generic prompt
fallback = "Generate newsletter as per the following instructions"
if is_sample:
return f"[SAMPLE] Newsletter: {fallback}"
return f"Newsletter: {fallback}"
else:
if is_sample:
return f"[SAMPLE] {subject}"
return subject


def _build_newsletter_instructions(request: CreateNewsletterRequest) -> str:
"""Builds the full instruction string from the request using the NEWSLETTER template."""
user_instructions = []

# Add current date context for time-sensitive queries
today = datetime.now(timezone.utc).strftime("%B %d, %Y")
user_instructions.append(
f"- **Current Date**: Today is {today}. When the user mentions 'latest', 'recent', 'this year', etc., focus on the most current information available."
)

if request.estimated_read_time:
user_instructions.append(
f"- **Target Read Time**: The newsletter should be concise enough to be read in approximately {request.estimated_read_time} minutes."
Expand All @@ -920,6 +967,10 @@ def _build_newsletter_instructions(request: CreateNewsletterRequest) -> str:
user_instructions.append(
f"- **Geographic Focus**: The content should be primarily relevant to the following locations: {', '.join(request.geographic_locations)}."
)
if request.language:
user_instructions.append(
f"- **Language**: Write the newsletter in {request.language}."
)
if request.formatting_instructions:
user_instructions.append(
f"- **Formatting Rules**: Strictly follow these formatting instructions: {request.formatting_instructions}."
Expand All @@ -936,24 +987,52 @@ def _build_newsletter_instructions(request: CreateNewsletterRequest) -> str:
return NEWSLETTER_TEMPLATE.format(prompt=request.prompt, user_instructions_section=user_instructions_section)


async def _validate_newsletter_limits(user_email: str, cron_expressions: list[str]):
"""Validates the user's plan limits for newsletters."""
# Get user plan and corresponding limits from config
async def _validate_newsletter_limits(
user_email: str,
schedule_type: ScheduleType,
cron_expressions: list[str],
):
"""
Validates the user's plan limits for newsletters.
- IMMEDIATE: Checks global rate limits (hourly/daily/monthly email quota)
- RECURRING/SPECIFIC_DATES: Checks newsletter-specific limits (max_tasks, min_interval)
"""
user_plan = await user.get_user_plan(user_email)

if schedule_type == ScheduleType.IMMEDIATE:
# For immediate newsletters, check global rate limits
plan_limits = RATE_LIMITS_BY_PLAN.get(user_plan, RATE_LIMITS_BY_PLAN[UserPlan.BETA])
rate_limit_exceeded = await check_rate_limit_redis(
key_type="email",
identifier=user.normalize_email(user_email),
plan_or_domain_limits=plan_limits,
current_dt=datetime.now(timezone.utc),
plan_name_for_key=user_plan.value,
)
if rate_limit_exceeded:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail=f"Rate limit exceeded ({rate_limit_exceeded}). Please try again later.",
)
return

# For RECURRING/SPECIFIC_DATES, check newsletter-specific limits
plan_limits = NEWSLETTER_LIMITS_BY_PLAN.get(user_plan, NEWSLETTER_LIMITS_BY_PLAN[UserPlan.BETA])
min_interval = timedelta(days=plan_limits["min_interval_days"])

# Check total task count against the plan's max tasks
# Count only recurring tasks for max_tasks limit (one-time tasks don't count)
recurring_cron_count = sum(1 for expr in cron_expressions if not is_one_time_task(expr))

db_connection = init_db_connection()
with db_connection.get_session() as session:
active_task_count = crud.count_active_tasks_for_user(session, user_email)
recurring_task_count = crud.count_recurring_tasks_for_user(session, user_email)

if (active_task_count + len(cron_expressions)) > plan_limits["max_tasks"]:
if (recurring_task_count + recurring_cron_count) > plan_limits["max_tasks"]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Newsletter limit reached for {user_plan.value} plan. "
f"You have {active_task_count} active tasks and are trying to add {len(cron_expressions)} more "
f"(max: {plan_limits['max_tasks']}).",
f"You have {recurring_task_count} recurring tasks and are trying to add {recurring_cron_count} more "
f"(max: {plan_limits['max_tasks']})."
)

# Loop through each cron expression to validate its frequency
Expand All @@ -969,14 +1048,14 @@ async def _validate_newsletter_limits(user_email: str, cron_expressions: list[st
)


def _create_and_schedule_task(user_email: str, cron_expr: str, distilled_instructions: str, prompt: str) -> str: # noqa: ARG001
def _create_and_schedule_task(user_email: str, cron_expr: str, distilled_instructions: str, subject: str) -> str:
"""Creates a single newsletter task and schedules it."""
task_id = str(uuid.uuid4())
scheduler_job_id = f"task_{task_id}"
email_for_task = EmailRequest(
from_email=user_email,
to="ask@mxgo.ai",
subject="Generate Newsletter as per following Instructions",
subject=subject,
distilled_processing_instructions=distilled_instructions,
distilled_alias=HandlerAlias.ASK,
messageId=f"<newsletter-{task_id}-{datetime.now(timezone.utc).isoformat()}@mxgo.ai>",
Expand Down Expand Up @@ -1014,30 +1093,49 @@ def _create_and_schedule_task(user_email: str, cron_expr: str, distilled_instruc
return task_id


def _send_newsletter_email(
user_email: str,
subject: str,
distilled_instructions: str,
task_id: str,
message_id_prefix: str = "newsletter",
scheduled_task_id: str | None = None,
) -> None:
"""Sends a newsletter email via the task queue."""
email_request = EmailRequest(
from_email=user_email,
to="ask@mxgo.ai",
subject=subject,
distilled_processing_instructions=distilled_instructions,
distilled_alias=HandlerAlias.ASK,
messageId=f"<{message_id_prefix}-{task_id}-{datetime.now(timezone.utc).isoformat()}@mxgo.ai>",
parent_message_id=f"<newsletter-parent-{task_id}@mxgo.ai>",
)
process_email_task.send(
email_request.model_dump(by_alias=True),
email_attachments_dir="",
attachment_info=[],
scheduled_task_id=scheduled_task_id,
)


async def _handle_post_creation_action(
user_email: str,
*,
is_whitelisted: bool,
first_task_id: str,
distilled_instructions: str,
prompt: str, # noqa: ARG001
subject: str,
):
"""Sends a sample email if the user is whitelisted, otherwise triggers verification."""
if is_whitelisted:
logger.info(f"User {user_email} is whitelisted. Sending sample newsletter.")
sample_email_request = EmailRequest(
from_email=user_email,
to="ask@mxgo.ai",
subject="[SAMPLE] Generate Newsletter as per following Instructions",
distilled_processing_instructions=distilled_instructions,
distilled_alias=HandlerAlias.ASK,
messageId=f"<newsletter-sample-{first_task_id}-{datetime.now(timezone.utc).isoformat()}@mxgo.ai>",
parent_message_id=f"<newsletter-parent-{first_task_id}@mxgo.ai>",
)
process_email_task.send(
sample_email_request.model_dump(by_alias=True),
email_attachments_dir="",
attachment_info=[],
_send_newsletter_email(
user_email=user_email,
subject=f"[SAMPLE] {subject}",
distilled_instructions=distilled_instructions,
task_id=first_task_id,
message_id_prefix="newsletter-sample",
scheduled_task_id=first_task_id,
)
return True
Expand All @@ -1062,23 +1160,26 @@ async def create_newsletter(
user_email = current_user.email
logger.info(f"Received newsletter creation request for user: {user_email}")

if validators.redis_client:
redis_key = f"newsletter_request:{request.request_id}"
existing_task_ids_json = await validators.redis_client.get(redis_key)
if existing_task_ids_json:
logger.info(
f"Duplicate request_id {request.request_id} detected for {user_email}, returning existing tasks from Redis"
)
existing_task_ids = json.loads(existing_task_ids_json)
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail={
"message": "This request has already been processed.",
"status": "duplicate",
"request_id": request.request_id,
"scheduled_task_ids": existing_task_ids,
},
)
async def check_duplicate():
if validators.redis_client:
redis_key = f"newsletter_request:{request.request_id}"
existing_task_ids_json = await validators.redis_client.get(redis_key)
if existing_task_ids_json:
logger.info(
f"Duplicate request_id {request.request_id} detected for {user_email}, returning existing tasks from Redis"
)
existing_task_ids = json.loads(existing_task_ids_json)
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail={
"message": "This request has already been processed.",
"status": "duplicate",
"request_id": request.request_id,
"scheduled_task_ids": existing_task_ids,
},
)

await check_duplicate()

distilled_instructions = _build_newsletter_instructions(request)

Expand All @@ -1087,36 +1188,82 @@ async def create_newsletter(
except ValueError as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) from e

await _validate_newsletter_limits(user_email, cron_expressions)
await _validate_newsletter_limits(user_email, request.schedule.type, cron_expressions)

exists_in_whitelist, is_verified = await whitelist.is_email_whitelisted(user_email)
is_whitelisted = exists_in_whitelist and is_verified

created_task_ids = []
try:
for cron_expr in cron_expressions:
task_id = _create_and_schedule_task(user_email, cron_expr, distilled_instructions, request.prompt)
created_task_ids.append(task_id)
except Exception as e:
logger.error(f"Failed to schedule one or more newsletter tasks for {user_email}: {e}")
# Handle IMMEDIATE schedule type separately - send directly without scheduling
if request.schedule.type == ScheduleType.IMMEDIATE:
if not is_whitelisted:
# Trigger verification for non-whitelisted users
try:
await whitelist.trigger_newsletter_verification(user_email)
except Exception as e:
logger.error(f"Error triggering whitelist verification for {user_email}: {e}")
return CreateNewsletterResponse(
is_scheduled=False,
is_whitelisted=False,
sample_email_sent=False,
scheduled_task_ids=[],
)

# Rollback created tasks if any failed
scheduler = Scheduler()
db_connection = init_db_connection()
with db_connection.get_session() as session:
for tid in created_task_ids:
crud.delete_task(session, tid)
# For whitelisted users, send immediately without creating a scheduled task
task_id = str(uuid.uuid4())
subject = await _generate_newsletter_subject(request.prompt, is_sample=False)
_send_newsletter_email(
user_email=user_email,
subject=subject,
distilled_instructions=distilled_instructions,
task_id=task_id,
message_id_prefix="newsletter-immediate",
)
logger.info(f"Immediate newsletter sent for {user_email}")

try:
scheduler.remove_job(f"task_{tid}")
logger.info(f"Removed scheduler job for rolled-back task {tid}")
except Exception as scheduler_e:
logger.error(f"Failed to remove scheduler job for task {tid}: {scheduler_e}")
if validators.redis_client:
redis_key = f"newsletter_request:{request.request_id}"
await validators.redis_client.setex(redis_key, 86400, json.dumps([task_id]))

raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to schedule one or more newsletter tasks.",
) from e
return CreateNewsletterResponse(
is_scheduled=False,
is_whitelisted=True,
sample_email_sent=True,
scheduled_task_ids=[task_id],
)

# For SPECIFIC_DATES and RECURRING_WEEKLY, proceed with scheduling
subject = await _generate_newsletter_subject(request.prompt, is_sample=False)

def schedule_tasks():
created_task_ids = []
try:
for cron_expr in cron_expressions:
task_id = _create_and_schedule_task(user_email, cron_expr, distilled_instructions, subject)
created_task_ids.append(task_id)
except Exception as e:
logger.error(f"Failed to schedule one or more newsletter tasks for {user_email}: {e}")

# Rollback created tasks if any failed
scheduler = Scheduler()
db_connection = init_db_connection()
with db_connection.get_session() as session:
for tid in created_task_ids:
crud.delete_task(session, tid)

try:
scheduler.remove_job(f"task_{tid}")
logger.info(f"Removed scheduler job for rolled-back task {tid}")
except Exception as scheduler_e:
logger.error(f"Failed to remove scheduler job for task {tid}: {scheduler_e}")

raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to schedule one or more newsletter tasks.",
) from e
else:
return created_task_ids

created_task_ids = schedule_tasks()

sample_email_sent = False
if created_task_ids:
Expand All @@ -1125,7 +1272,7 @@ async def create_newsletter(
is_whitelisted=is_whitelisted,
first_task_id=created_task_ids[0],
distilled_instructions=distilled_instructions,
prompt=request.prompt,
subject=subject,
)

if validators.redis_client and created_task_ids:
Expand Down Expand Up @@ -1183,7 +1330,7 @@ async def get_user_info(
max_newsletters_allowed = newsletter_limits_config["max_tasks"]

with init_db_connection().get_session() as session:
current_newsletter_count = crud.count_active_tasks_for_user(session, current_user.email)
current_newsletter_count = crud.count_recurring_tasks_for_user(session, current_user.email)

newsletter_usage = NewsletterUsageInfo(
current_count=current_newsletter_count, max_allowed=max_newsletters_allowed
Expand Down
Loading