Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,19 @@ updates:
directory: "/" # Location of package manifests
schedule:
interval: "weekly"
groups:
python-dependencies:
# A name for the group has to be provided. The name will be used in PR titles and branch names.
patterns:
- "*"

- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "weekly"
# Group all GitHub Actions updates into a single PR.
groups:
github-actions:
patterns:
- "*"

1 change: 1 addition & 0 deletions docker/init-db.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-E
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";

-- You can add other initialization SQL here
CREATE DATABASE mxtoaitest;

GRANT ALL PRIVILEGES ON DATABASE $POSTGRES_DB TO $POSTGRES_USER;
EOSQL
Expand Down
3 changes: 3 additions & 0 deletions docker/worker.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ RUN poetry config virtualenvs.create false && poetry install --no-root --no-inte
# Copy application code
COPY mxgo ./mxgo

# Copy test code into the container
COPY ./tests /app/tests

# Create directories
RUN mkdir -p /app/attachments

Expand Down
201 changes: 197 additions & 4 deletions mxgo/api.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import json
import os
import shutil
import uuid
from contextlib import asynccontextmanager
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Annotated, Any

Expand All @@ -13,23 +14,30 @@
from fastapi.security import APIKeyHeader, HTTPBearer
from sqlalchemy import text

from mxgo import user, validators
from mxgo import crud, user, validators, whitelist
from mxgo._logging import get_logger
from mxgo.auth import AuthInfo, get_current_user
from mxgo.config import ATTACHMENTS_DIR, RATE_LIMITS_BY_PLAN, SKIP_EMAIL_DELIVERY
from mxgo.config import ATTACHMENTS_DIR, NEWSLETTER_LIMITS_BY_PLAN, RATE_LIMITS_BY_PLAN, SKIP_EMAIL_DELIVERY
from mxgo.db import init_db_connection
from mxgo.dependencies import processing_instructions_resolver
from mxgo.email_sender import (
generate_email_id,
send_email_reply,
)
from mxgo.models import TaskStatus
from mxgo.reply_generation import generate_replies
from mxgo.scheduling.scheduled_task_executor import execute_scheduled_task
from mxgo.scheduling.scheduler import Scheduler, is_one_time_task
from mxgo.schemas import (
CreateNewsletterRequest,
CreateNewsletterResponse,
EmailAttachment,
EmailRequest,
EmailSuggestionRequest,
EmailSuggestionResponse,
GenerateEmailReplyRequest,
HandlerAlias,
NewsletterUsageInfo,
ReplyCandidate,
UsageInfo,
UsagePeriod,
Expand All @@ -38,6 +46,7 @@
)
from mxgo.suggestions import generate_suggestions, get_suggestions_model
from mxgo.tasks import process_email_task, rabbitmq_broker
from mxgo.utils import calculate_cron_interval, convert_schedule_to_cron_list
from mxgo.validators import (
get_current_usage_redis,
validate_api_key,
Expand Down Expand Up @@ -869,6 +878,174 @@ async def generate_email_replies(
) from e


# Helper functions for create_newsletter
def _build_newsletter_instructions(request: CreateNewsletterRequest) -> str:
"""Builds the full instruction string from the request."""
full_instructions = [f"PROMPT: {request.prompt}"]
if request.estimated_read_time:
full_instructions.append(f"ESTIMATED READ TIME: {request.estimated_read_time} minutes")
if request.sources:
full_instructions.append(f"SOURCES: {', '.join(request.sources)}")
if request.geographic_locations:
full_instructions.append(f"GEOGRAPHIC FOCUS: {', '.join(request.geographic_locations)}")
if request.formatting_instructions:
full_instructions.append(f"FORMATTING INSTRUCTIONS: {request.formatting_instructions}")
return "\n\n".join(full_instructions)


async def _validate_newsletter_limits(user_email: str, cron_expressions: list[str]):
"""Validates the user's plan limits for newsletters."""
user_plan = await user.get_user_plan(user_email)
plan_limits = NEWSLETTER_LIMITS_BY_PLAN.get(user_plan, NEWSLETTER_LIMITS_BY_PLAN[UserPlan.BETA])
min_interval = timedelta(days=plan_limits["min_interval_days"])

db_connection = init_db_connection()
with db_connection.get_session() as session:
active_task_count = crud.count_active_tasks_for_user(session, user_email)

if (active_task_count + len(cron_expressions)) > plan_limits["max_tasks"]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Newsletter limit reached for {user_plan.value} plan. "
f"You have {active_task_count} active tasks and are trying to add {len(cron_expressions)} more "
f"(max: {plan_limits['max_tasks']}).",
)

for cron_expr in cron_expressions:
if not is_one_time_task(cron_expr):
interval = calculate_cron_interval(cron_expr)
if interval < min_interval:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Cron interval is too frequent for {user_plan.value} plan. "
f"Minimum interval is {plan_limits['min_interval_days']} days.",
)


def _create_and_schedule_task(user_email: str, cron_expr: str, distilled_instructions: str, prompt: str) -> str:
"""Creates a single newsletter task and schedules it."""
task_id = str(uuid.uuid4())
scheduler_job_id = f"task_{task_id}"
email_for_task = EmailRequest(
from_email=user_email,
to="[email protected]",
subject=f"Newsletter: {prompt[:50]}...",
distilled_processing_instructions=distilled_instructions,
distilled_alias=HandlerAlias.ASK,
messageId=f"<newsletter-{task_id}-{datetime.now(timezone.utc).isoformat()}@mxgo.ai>",
parent_message_id=f"<newsletter-parent-{task_id}@mxgo.ai>",
)

db_connection = init_db_connection()
with db_connection.get_session() as session:
crud.create_task(
session=session,
task_id=task_id,
email_id=user_email,
cron_expression=cron_expr,
email_request=email_for_task.model_dump(by_alias=True),
scheduler_job_id=scheduler_job_id,
status=TaskStatus.INITIALISED,
)

scheduler = Scheduler()
scheduler.add_job(job_id=scheduler_job_id, cron_expression=cron_expr, func=execute_scheduled_task, args=[task_id])

with db_connection.get_session() as session:
crud.update_task_status(session, task_id, TaskStatus.ACTIVE)

logger.info(f"Newsletter task {task_id} for {user_email} scheduled successfully.")
return task_id


async def _handle_post_creation_action(
user_email: str, *, is_whitelisted: bool, first_task_id: str, distilled_instructions: str, prompt: str
):
"""Sends a sample email if the user is whitelisted, otherwise triggers verification."""
if is_whitelisted:
logger.info(f"User {user_email} is whitelisted. Sending sample newsletter.")
sample_email_request = EmailRequest(
from_email=user_email,
to="[email protected]",
subject=f"[SAMPLE] Newsletter: {prompt[:40]}...",
distilled_processing_instructions=distilled_instructions,
distilled_alias=HandlerAlias.ASK,
messageId=f"<newsletter-sample-{first_task_id}-{datetime.now(timezone.utc).isoformat()}@mxgo.ai>",
parent_message_id=f"<newsletter-parent-{first_task_id}@mxgo.ai>",
)
process_email_task.send(
sample_email_request.model_dump(),
email_attachments_dir="",
attachment_info=[],
scheduled_task_id=first_task_id,
)
return True

logger.info(f"User {user_email} is not whitelisted. Triggering verification.")
try:
await whitelist.trigger_automatic_verification(user_email)
except Exception as e:
logger.error(f"Error triggering whitelist verification for {user_email}: {e}")
return False


@app.post("/create-newsletter")
async def create_newsletter(
request: CreateNewsletterRequest,
current_user: Annotated[AuthInfo, Depends(get_current_user)],
_token: Annotated[str, Depends(bearer_auth_scheme)] = ...,
) -> CreateNewsletterResponse:
"""
Create and schedule a recurring newsletter task for the authenticated user.
"""
user_email = current_user.email
logger.info(f"Received newsletter creation request for user: {user_email}")

distilled_instructions = _build_newsletter_instructions(request)

try:
cron_expressions = convert_schedule_to_cron_list(request.schedule)
except ValueError as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) from e

await _validate_newsletter_limits(user_email, cron_expressions)

exists_in_whitelist, is_verified = await whitelist.is_email_whitelisted(user_email)
is_whitelisted = exists_in_whitelist and is_verified

created_task_ids = []
try:
for cron_expr in cron_expressions:
task_id = _create_and_schedule_task(user_email, cron_expr, distilled_instructions, request.prompt)
created_task_ids.append(task_id)
except Exception as e:
logger.error(f"Failed to schedule one or more newsletter tasks for {user_email}: {e}")

# Rollback created tasks if any failed
db_connection = init_db_connection()
with db_connection.get_session() as session:
for tid in created_task_ids:
crud.delete_task(session, tid)
raise HTTPException(status_code=500, detail="Failed to schedule one or more newsletter tasks.") from e

sample_email_sent = False
if created_task_ids:
sample_email_sent = await _handle_post_creation_action(
user_email,
is_whitelisted=is_whitelisted,
first_task_id=created_task_ids[0],
distilled_instructions=distilled_instructions,
prompt=request.prompt,
)

return CreateNewsletterResponse(
is_scheduled=bool(created_task_ids),
is_whitelisted=is_whitelisted,
sample_email_sent=sample_email_sent,
scheduled_task_ids=created_task_ids,
)


@app.get("/user")
async def get_user_info(
current_user: Annotated[AuthInfo, Depends(get_current_user)] = ...,
Expand Down Expand Up @@ -906,6 +1083,17 @@ async def get_user_info(
else:
logger.info(f"No customer ID found for email {current_user.email}")

# Get newsletter limits and current usage
newsletter_limits_config = NEWSLETTER_LIMITS_BY_PLAN.get(user_plan, NEWSLETTER_LIMITS_BY_PLAN[UserPlan.BETA])
max_newsletters_allowed = newsletter_limits_config["max_tasks"]

with init_db_connection().get_session() as session:
current_newsletter_count = crud.count_active_tasks_for_user(session, current_user.email)

newsletter_usage = NewsletterUsageInfo(
current_count=current_newsletter_count, max_allowed=max_newsletters_allowed
)

# Get usage information
normalized_user_email = user.normalize_email(current_user.email)
current_dt = datetime.now(timezone.utc)
Expand Down Expand Up @@ -936,7 +1124,12 @@ async def get_user_info(

logger.info(f"Successfully retrieved user info for {current_user.email}")

return UserInfoResponse(subscription_info=subscription_info, plan_name=user_plan.value, usage_info=usage_info)
return UserInfoResponse(
subscription_info=subscription_info,
plan_name=user_plan.value,
usage_info=usage_info,
newsletter_usage=newsletter_usage,
)

except Exception as e:
logger.error(f"Error retrieving user info for {current_user.email}: {e}")
Expand Down
5 changes: 3 additions & 2 deletions mxgo/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ def validate_jwt_token(token: str) -> AuthInfo:

try:
# Decode and validate the JWT token
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM],
options={"verify_exp": True}, audience="authenticated")
payload = jwt.decode(
token, JWT_SECRET, algorithms=[JWT_ALGORITHM], options={"verify_exp": True}, audience="authenticated"
)

# Extract required fields
user_id = payload.get("sub")
Expand Down
12 changes: 12 additions & 0 deletions mxgo/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@
# Scheduled tasks configuration
SCHEDULED_TASKS_MINIMUM_INTERVAL_HOURS = 1
SCHEDULED_TASKS_MAX_PER_EMAIL = 5

NEWSLETTER_LIMITS_BY_PLAN = {
UserPlan.BETA: {
"max_tasks": 3,
"min_interval_days": 7,
},
UserPlan.PRO: {
"max_tasks": 20,
"min_interval_days": 1,
},
}

RATE_LIMITS_BY_PLAN = {
UserPlan.BETA: {
"hour": {"limit": 10, "period_seconds": 3600, "expiry_seconds": 3600 * 2}, # 2hr expiry for 1hr window
Expand Down
7 changes: 4 additions & 3 deletions mxgo/email_handles.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
aliases=["deep-research"],
process_attachments=True,
deep_research_mandatory=True,
allowed_tools=COMMON_TOOLS + SEARCH_TOOLS + RESEARCH_TOOLS,
allowed_tools=COMMON_TOOLS + SEARCH_TOOLS + RESEARCH_TOOLS + NEWS_TOOLS,
add_summary=True,
target_model="gpt-4",
task_template=template_prompts.RESEARCH_TEMPLATE,
Expand All @@ -70,6 +70,7 @@
deep_research_mandatory=False,
allowed_tools=COMMON_TOOLS
+ SEARCH_TOOLS
+ NEWS_TOOLS
+ RESEARCH_TOOLS
+ [ToolName.MEETING_CREATOR, ToolName.SCHEDULED_TASKS],
target_model="gpt-4",
Expand Down Expand Up @@ -140,7 +141,7 @@
],
process_attachments=True,
deep_research_mandatory=False,
allowed_tools=[*COMMON_TOOLS, ToolName.SCHEDULED_TASKS],
allowed_tools=[*COMMON_TOOLS, ToolName.SCHEDULED_TASKS, *NEWS_TOOLS],
target_model="gpt-4",
task_template=template_prompts.FUTURE_TEMPLATE,
output_template=output_prompts.FUTURE_OUTPUT_GUIDELINES,
Expand Down Expand Up @@ -171,7 +172,7 @@
aliases=["breaking-news", "latest-news", "news-update", "current-events"],
process_attachments=True,
deep_research_mandatory=False,
allowed_tools=COMMON_TOOLS + NEWS_TOOLS + SEARCH_TOOLS,
allowed_tools=COMMON_TOOLS + NEWS_TOOLS + SEARCH_TOOLS + [ToolName.SCHEDULED_TASKS],
target_model="gpt-4",
task_template=template_prompts.NEWS_TEMPLATE,
output_template=output_prompts.NEWS_OUTPUT_GUIDELINES,
Expand Down
Loading
Loading