Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,19 @@ updates:
directory: "/" # Location of package manifests
schedule:
interval: "weekly"
groups:
python-dependencies:
# A name for the group has to be provided. The name will be used in PR titles and branch names.
patterns:
- "*"

- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "weekly"
# Group all GitHub Actions updates into a single PR.
groups:
github-actions:
patterns:
- "*"

271 changes: 267 additions & 4 deletions mxgo/api.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import json
import os
import shutil
import uuid
from contextlib import asynccontextmanager
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Annotated, Any

Expand All @@ -13,23 +14,31 @@
from fastapi.security import APIKeyHeader, HTTPBearer
from sqlalchemy import text

from mxgo import user, validators
from mxgo import crud, user, validators, whitelist
from mxgo._logging import get_logger
from mxgo.auth import AuthInfo, get_current_user
from mxgo.config import ATTACHMENTS_DIR, RATE_LIMITS_BY_PLAN, SKIP_EMAIL_DELIVERY
from mxgo.config import ATTACHMENTS_DIR, NEWSLETTER_LIMITS_BY_PLAN, RATE_LIMITS_BY_PLAN, SKIP_EMAIL_DELIVERY
from mxgo.db import init_db_connection
from mxgo.dependencies import processing_instructions_resolver
from mxgo.email_sender import (
generate_email_id,
send_email_reply,
)
from mxgo.models import TaskStatus
from mxgo.prompts.template_prompts import NEWSLETTER_TEMPLATE
from mxgo.reply_generation import generate_replies
from mxgo.scheduling.scheduled_task_executor import execute_scheduled_task
from mxgo.scheduling.scheduler import Scheduler, is_one_time_task
from mxgo.schemas import (
CreateNewsletterRequest,
CreateNewsletterResponse,
EmailAttachment,
EmailRequest,
EmailSuggestionRequest,
EmailSuggestionResponse,
GenerateEmailReplyRequest,
HandlerAlias,
NewsletterUsageInfo,
ReplyCandidate,
UsageInfo,
UsagePeriod,
Expand All @@ -38,6 +47,7 @@
)
from mxgo.suggestions import generate_suggestions, get_suggestions_model
from mxgo.tasks import process_email_task, rabbitmq_broker
from mxgo.utils import calculate_cron_interval, convert_schedule_to_cron_list
from mxgo.validators import (
get_current_usage_redis,
validate_api_key,
Expand Down Expand Up @@ -869,6 +879,243 @@ async def generate_email_replies(
) from e


# Helper functions for create_newsletter
def _build_newsletter_instructions(request: CreateNewsletterRequest) -> str:
"""Builds the full instruction string from the request using the NEWSLETTER template."""
user_instructions = []
if request.estimated_read_time:
user_instructions.append(
f"- **Target Read Time**: The newsletter should be concise enough to be read in approximately {request.estimated_read_time} minutes."
)
if request.sources:
user_instructions.append(
f"- **Prioritize Sources**: When researching, give priority to information from the following sources: {', '.join(request.sources)}."
)
if request.geographic_locations:
user_instructions.append(
f"- **Geographic Focus**: The content should be primarily relevant to the following locations: {', '.join(request.geographic_locations)}."
)
if request.formatting_instructions:
user_instructions.append(
f"- **Formatting Rules**: Strictly follow these formatting instructions: {request.formatting_instructions}."
)

if user_instructions:
user_instructions_section = "\n".join(user_instructions)
else:
user_instructions_section = (
"No specific user instructions were provided. Use your best judgment to create a high-quality newsletter."
)

# This becomes the detailed, distilled instructions for the agent.
return NEWSLETTER_TEMPLATE.format(prompt=request.prompt, user_instructions_section=user_instructions_section)


async def _validate_newsletter_limits(user_email: str, cron_expressions: list[str]):
"""Validates the user's plan limits for newsletters."""
# Get user plan and corresponding limits from config
user_plan = await user.get_user_plan(user_email)
plan_limits = NEWSLETTER_LIMITS_BY_PLAN.get(user_plan, NEWSLETTER_LIMITS_BY_PLAN[UserPlan.BETA])
min_interval = timedelta(days=plan_limits["min_interval_days"])

# Check total task count against the plan's max tasks
db_connection = init_db_connection()
with db_connection.get_session() as session:
active_task_count = crud.count_active_tasks_for_user(session, user_email)

if (active_task_count + len(cron_expressions)) > plan_limits["max_tasks"]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Newsletter limit reached for {user_plan.value} plan. "
f"You have {active_task_count} active tasks and are trying to add {len(cron_expressions)} more "
f"(max: {plan_limits['max_tasks']}).",
)

# Loop through each cron expression to validate its frequency
for cron_expr in cron_expressions:
# One-time tasks don't have a recurring interval, so we skip them
if not is_one_time_task(cron_expr):
interval = calculate_cron_interval(cron_expr)
if interval < min_interval:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Cron interval is too frequent for {user_plan.value} plan. "
f"Minimum interval is {plan_limits['min_interval_days']} days.",
)


def _create_and_schedule_task(user_email: str, cron_expr: str, distilled_instructions: str, prompt: str) -> str: # noqa: ARG001
"""Creates a single newsletter task and schedules it."""
task_id = str(uuid.uuid4())
scheduler_job_id = f"task_{task_id}"
email_for_task = EmailRequest(
from_email=user_email,
to="[email protected]",
subject="Generate Newsletter as per following Instructions",
distilled_processing_instructions=distilled_instructions,
distilled_alias=HandlerAlias.ASK,
messageId=f"<newsletter-{task_id}-{datetime.now(timezone.utc).isoformat()}@mxgo.ai>",
parent_message_id=f"<newsletter-parent-{task_id}@mxgo.ai>",
)

db_connection = init_db_connection()
with db_connection.get_session() as session:
crud.create_task(
session=session,
task_id=task_id,
email_id=user_email,
cron_expression=cron_expr,
email_request=email_for_task.model_dump(by_alias=True),
scheduler_job_id=scheduler_job_id,
status=TaskStatus.INITIALISED,
)

scheduler = Scheduler()
try:
scheduler.add_job(
job_id=scheduler_job_id, cron_expression=cron_expr, func=execute_scheduled_task, args=[task_id]
)
except Exception as e:
logger.error(f"Failed to schedule task {task_id}: {e}")

with db_connection.get_session() as session:
crud.delete_task(session, task_id)
raise

with db_connection.get_session() as session:
crud.update_task_status(session, task_id, TaskStatus.ACTIVE)

logger.info(f"Newsletter task {task_id} for {user_email} scheduled successfully.")
return task_id


async def _handle_post_creation_action(
user_email: str,
*,
is_whitelisted: bool,
first_task_id: str,
distilled_instructions: str,
prompt: str, # noqa: ARG001
):
"""Sends a sample email if the user is whitelisted, otherwise triggers verification."""
if is_whitelisted:
logger.info(f"User {user_email} is whitelisted. Sending sample newsletter.")
sample_email_request = EmailRequest(
from_email=user_email,
to="[email protected]",
subject="[SAMPLE] Generate Newsletter as per following Instructions",
distilled_processing_instructions=distilled_instructions,
distilled_alias=HandlerAlias.ASK,
messageId=f"<newsletter-sample-{first_task_id}-{datetime.now(timezone.utc).isoformat()}@mxgo.ai>",
parent_message_id=f"<newsletter-parent-{first_task_id}@mxgo.ai>",
)
process_email_task.send(
sample_email_request.model_dump(by_alias=True),
email_attachments_dir="",
attachment_info=[],
scheduled_task_id=first_task_id,
)
return True

logger.info(f"User {user_email} is not whitelisted. Triggering verification.")
try:
await whitelist.trigger_automatic_verification(user_email)
except Exception as e:
logger.error(f"Error triggering whitelist verification for {user_email}: {e}")
return False


@app.post("/create-newsletter")
async def create_newsletter(
request: CreateNewsletterRequest,
current_user: Annotated[AuthInfo, Depends(get_current_user)],
_token: Annotated[str, Depends(bearer_auth_scheme)] = ...,
) -> CreateNewsletterResponse:
"""
Create and schedule a recurring newsletter task for the authenticated user.
"""
user_email = current_user.email
logger.info(f"Received newsletter creation request for user: {user_email}")

if validators.redis_client:
redis_key = f"newsletter_request:{request.request_id}"
existing_task_ids_json = await validators.redis_client.get(redis_key)
if existing_task_ids_json:
logger.info(
f"Duplicate request_id {request.request_id} detected for {user_email}, returning existing tasks from Redis"
)
existing_task_ids = json.loads(existing_task_ids_json)
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail={
"message": "This request has already been processed.",
"status": "duplicate",
"request_id": request.request_id,
"scheduled_task_ids": existing_task_ids,
},
)

distilled_instructions = _build_newsletter_instructions(request)

try:
cron_expressions = convert_schedule_to_cron_list(request.schedule)
except ValueError as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) from e

await _validate_newsletter_limits(user_email, cron_expressions)

exists_in_whitelist, is_verified = await whitelist.is_email_whitelisted(user_email)
is_whitelisted = exists_in_whitelist and is_verified

created_task_ids = []
try:
for cron_expr in cron_expressions:
task_id = _create_and_schedule_task(user_email, cron_expr, distilled_instructions, request.prompt)
created_task_ids.append(task_id)
except Exception as e:
logger.error(f"Failed to schedule one or more newsletter tasks for {user_email}: {e}")

# Rollback created tasks if any failed
scheduler = Scheduler()
db_connection = init_db_connection()
with db_connection.get_session() as session:
for tid in created_task_ids:
crud.delete_task(session, tid)

try:
scheduler.remove_job(f"task_{tid}")
logger.info(f"Removed scheduler job for rolled-back task {tid}")
except Exception as scheduler_e:
logger.error(f"Failed to remove scheduler job for task {tid}: {scheduler_e}")

raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to schedule one or more newsletter tasks.",
) from e

sample_email_sent = False
if created_task_ids:
sample_email_sent = await _handle_post_creation_action(
user_email,
is_whitelisted=is_whitelisted,
first_task_id=created_task_ids[0],
distilled_instructions=distilled_instructions,
prompt=request.prompt,
)

if validators.redis_client and created_task_ids:
redis_key = f"newsletter_request:{request.request_id}"
# Store for 24 hours
await validators.redis_client.setex(redis_key, 86400, json.dumps(created_task_ids))

return CreateNewsletterResponse(
is_scheduled=bool(created_task_ids),
is_whitelisted=is_whitelisted,
sample_email_sent=sample_email_sent,
scheduled_task_ids=created_task_ids,
)


@app.get("/user")
async def get_user_info(
current_user: Annotated[AuthInfo, Depends(get_current_user)] = ...,
Expand Down Expand Up @@ -906,6 +1153,17 @@ async def get_user_info(
else:
logger.info(f"No customer ID found for email {current_user.email}")

# Get newsletter limits and current usage
newsletter_limits_config = NEWSLETTER_LIMITS_BY_PLAN.get(user_plan, NEWSLETTER_LIMITS_BY_PLAN[UserPlan.BETA])
max_newsletters_allowed = newsletter_limits_config["max_tasks"]

with init_db_connection().get_session() as session:
current_newsletter_count = crud.count_active_tasks_for_user(session, current_user.email)

newsletter_usage = NewsletterUsageInfo(
current_count=current_newsletter_count, max_allowed=max_newsletters_allowed
)

# Get usage information
normalized_user_email = user.normalize_email(current_user.email)
current_dt = datetime.now(timezone.utc)
Expand Down Expand Up @@ -936,7 +1194,12 @@ async def get_user_info(

logger.info(f"Successfully retrieved user info for {current_user.email}")

return UserInfoResponse(subscription_info=subscription_info, plan_name=user_plan.value, usage_info=usage_info)
return UserInfoResponse(
subscription_info=subscription_info,
plan_name=user_plan.value,
usage_info=usage_info,
newsletter_usage=newsletter_usage,
)

except Exception as e:
logger.error(f"Error retrieving user info for {current_user.email}: {e}")
Expand Down
5 changes: 3 additions & 2 deletions mxgo/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ def validate_jwt_token(token: str) -> AuthInfo:

try:
# Decode and validate the JWT token
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM],
options={"verify_exp": True}, audience="authenticated")
payload = jwt.decode(
token, JWT_SECRET, algorithms=[JWT_ALGORITHM], options={"verify_exp": True}, audience="authenticated"
)

# Extract required fields
user_id = payload.get("sub")
Expand Down
Loading
Loading