Skip to content

Commit d736062

Browse files
fix: Address all points raised in PR review
1 parent e185e5c commit d736062

File tree

7 files changed

+161
-44
lines changed

7 files changed

+161
-44
lines changed

docker/init-db.sh

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-E
99
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
1010
1111
-- You can add other initialization SQL here
12-
CREATE DATABASE mxtoaitest;
1312
1413
GRANT ALL PRIVILEGES ON DATABASE $POSTGRES_DB TO $POSTGRES_USER;
1514
EOSQL

mxgo/api.py

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -895,10 +895,12 @@ def _build_newsletter_instructions(request: CreateNewsletterRequest) -> str:
895895

896896
async def _validate_newsletter_limits(user_email: str, cron_expressions: list[str]):
897897
"""Validates the user's plan limits for newsletters."""
898+
# Get user plan and corresponding limits from config
898899
user_plan = await user.get_user_plan(user_email)
899900
plan_limits = NEWSLETTER_LIMITS_BY_PLAN.get(user_plan, NEWSLETTER_LIMITS_BY_PLAN[UserPlan.BETA])
900901
min_interval = timedelta(days=plan_limits["min_interval_days"])
901902

903+
# Check total task count against the plan's max tasks
902904
db_connection = init_db_connection()
903905
with db_connection.get_session() as session:
904906
active_task_count = crud.count_active_tasks_for_user(session, user_email)
@@ -911,7 +913,9 @@ async def _validate_newsletter_limits(user_email: str, cron_expressions: list[st
911913
f"(max: {plan_limits['max_tasks']}).",
912914
)
913915

916+
# Loop through each cron expression to validate its frequency
914917
for cron_expr in cron_expressions:
918+
# One-time tasks don't have a recurring interval, so we skip them
915919
if not is_one_time_task(cron_expr):
916920
interval = calculate_cron_interval(cron_expr)
917921
if interval < min_interval:
@@ -922,14 +926,14 @@ async def _validate_newsletter_limits(user_email: str, cron_expressions: list[st
922926
)
923927

924928

925-
def _create_and_schedule_task(user_email: str, cron_expr: str, distilled_instructions: str, prompt: str) -> str:
929+
def _create_and_schedule_task(user_email: str, cron_expr: str, distilled_instructions: str, prompt: str) -> str: # noqa: ARG001
926930
"""Creates a single newsletter task and schedules it."""
927931
task_id = str(uuid.uuid4())
928932
scheduler_job_id = f"task_{task_id}"
929933
email_for_task = EmailRequest(
930934
from_email=user_email,
931935
932-
subject=f"Newsletter: {prompt[:50]}...",
936+
subject="Generate Newsletter as per following Instructions",
933937
distilled_processing_instructions=distilled_instructions,
934938
distilled_alias=HandlerAlias.ASK,
935939
messageId=f"<newsletter-{task_id}-{datetime.now(timezone.utc).isoformat()}@mxgo.ai>",
@@ -959,15 +963,20 @@ def _create_and_schedule_task(user_email: str, cron_expr: str, distilled_instruc
959963

960964

961965
async def _handle_post_creation_action(
962-
user_email: str, *, is_whitelisted: bool, first_task_id: str, distilled_instructions: str, prompt: str
966+
user_email: str,
967+
*,
968+
is_whitelisted: bool,
969+
first_task_id: str,
970+
distilled_instructions: str,
971+
prompt: str, # noqa: ARG001
963972
):
964973
"""Sends a sample email if the user is whitelisted, otherwise triggers verification."""
965974
if is_whitelisted:
966975
logger.info(f"User {user_email} is whitelisted. Sending sample newsletter.")
967976
sample_email_request = EmailRequest(
968977
from_email=user_email,
969978
970-
subject=f"[SAMPLE] Newsletter: {prompt[:40]}...",
979+
subject="[SAMPLE] Generate Newsletter as per following Instructions",
971980
distilled_processing_instructions=distilled_instructions,
972981
distilled_alias=HandlerAlias.ASK,
973982
messageId=f"<newsletter-sample-{first_task_id}-{datetime.now(timezone.utc).isoformat()}@mxgo.ai>",
@@ -1001,6 +1010,27 @@ async def create_newsletter(
10011010
user_email = current_user.email
10021011
logger.info(f"Received newsletter creation request for user: {user_email}")
10031012

1013+
if validators.redis_client:
1014+
redis_key = f"newsletter_request:{request.request_id}"
1015+
existing_task_ids_json = await validators.redis_client.get(redis_key)
1016+
if existing_task_ids_json:
1017+
logger.info(
1018+
f"Duplicate request_id {request.request_id} detected for {user_email}, returning existing tasks from Redis"
1019+
)
1020+
existing_task_ids = json.loads(existing_task_ids_json)
1021+
return Response(
1022+
content=json.dumps(
1023+
{
1024+
"message": "This request has already been processed.",
1025+
"status": "duplicate",
1026+
"request_id": request.request_id,
1027+
"scheduled_task_ids": existing_task_ids,
1028+
}
1029+
),
1030+
status_code=status.HTTP_409_CONFLICT,
1031+
media_type="application/json",
1032+
)
1033+
10041034
distilled_instructions = _build_newsletter_instructions(request)
10051035

10061036
try:
@@ -1022,10 +1052,18 @@ async def create_newsletter(
10221052
logger.error(f"Failed to schedule one or more newsletter tasks for {user_email}: {e}")
10231053

10241054
# Rollback created tasks if any failed
1055+
scheduler = Scheduler()
10251056
db_connection = init_db_connection()
10261057
with db_connection.get_session() as session:
10271058
for tid in created_task_ids:
10281059
crud.delete_task(session, tid)
1060+
1061+
try:
1062+
scheduler.remove_job(f"task_{tid}")
1063+
logger.info(f"Removed scheduler job for rolled-back task {tid}")
1064+
except Exception as scheduler_e:
1065+
logger.error(f"Failed to remove scheduler job for task {tid}: {scheduler_e}")
1066+
10291067
raise HTTPException(status_code=500, detail="Failed to schedule one or more newsletter tasks.") from e
10301068

10311069
sample_email_sent = False
@@ -1038,6 +1076,11 @@ async def create_newsletter(
10381076
prompt=request.prompt,
10391077
)
10401078

1079+
if validators.redis_client and created_task_ids:
1080+
redis_key = f"newsletter_request:{request.request_id}"
1081+
# Store for 24 hours
1082+
await validators.redis_client.setex(redis_key, 86400, json.dumps(created_task_ids))
1083+
10411084
return CreateNewsletterResponse(
10421085
is_scheduled=bool(created_task_ids),
10431086
is_whitelisted=is_whitelisted,

mxgo/prompts/template_prompts.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1177,6 +1177,14 @@
11771177
- **distilled_future_task_instructions**: Clear, detailed instructions about how the task should be processed when executed in the future. This should include the processing approach, any specific requirements, and what the expected outcome should be. **CRITICAL: If the original email contains attachments, you MUST include detailed context about the attachments in these instructions since attachments will not be available during scheduled execution. Include attachment names, types, sizes, and any relevant content or context from the attachments.**
11781178
- **start_time**: (Optional) Start time for the task in ISO 8601 format - task will not execute before this time (e.g., "2024-09-01T00:00:00Z")
11791179
- **end_time**: (Optional) End time for the task in ISO 8601 format - task will not execute after this time (e.g., "2024-12-31T23:59:59Z")
1180+
- **future_handle_alias**: The specific email handle to use for the future task. Select one of the following based on the user's instructions:
1181+
- **`ask`**: (Default) For general questions, custom tasks, or when no other handle fits.
1182+
- **`news`**: For fetching the latest news on a specific topic (e.g., "send me the news on AI every Friday").
1183+
- **`research`**: For conducting in-depth research on a topic at a later time.
1184+
- **`summarize`**: For summarizing the content of the email at a future date.
1185+
- **`fact-check`**: For verifying claims in the email content later.
1186+
- **`simplify`**: For explaining the content of the email in simple terms at a scheduled time.
1187+
- **`translate`**: For translating the email content at a future time.
11801188
11811189
**Response (Successful Scheduling):**
11821190
1. Confirmation message with:

mxgo/schemas.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -503,18 +503,21 @@ class ScheduleType(str, Enum):
503503

504504

505505
class RecurringWeekly(BaseModel):
506-
days: list[str] = Field(..., description="List of weekdays, e.g., ['monday', 'wednesday', 'friday']")
506+
days: list[int] = Field(..., description="List of weekdays as integers (0=Sunday, 1=Monday, ..., 6=Saturday)")
507507
time: str = Field(..., description="Time in HH:MM format, e.g., '09:30'")
508508

509509
@field_validator("days")
510510
@classmethod
511511
def validate_days(cls, v):
512-
valid_days = {"monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"}
512+
if len(v) > 7: # noqa: PLR2004
513+
msg = "Cannot specify more than 7 days"
514+
raise ValueError(msg)
515+
513516
for day in v:
514-
if day.lower() not in valid_days:
515-
msg = f"Invalid day: {day}. Must be one of {valid_days}"
517+
if not 0 <= day <= 6: # noqa: PLR2004
518+
msg = f"Invalid day: {day}. Must be an integer between 0 and 6."
516519
raise ValueError(msg)
517-
return [day.lower() for day in v]
520+
return v
518521

519522
@field_validator("time")
520523
@classmethod
@@ -527,15 +530,16 @@ def validate_time(cls, v):
527530

528531
class ScheduleOptions(BaseModel):
529532
type: ScheduleType
530-
specific_dates: list[str] | None = Field(
531-
None, description="List of ISO 8601 datetime strings for specific, non-recurring schedules."
533+
specific_datetime: str | None = Field(
534+
None, description="List of ISO 8601 datetime string for a specific, non-recurring schedules."
532535
)
533-
recurring_weekly: RecurringWeekly | None = Field(None, description="Configuration for a recurring weekly schedule.")
536+
weekly_schedule: RecurringWeekly | None = Field(None, description="Configuration for a recurring weekly schedule.")
534537

535538

536539
class CreateNewsletterRequest(BaseModel):
537540
"""Request model for creating a newsletter based on the new UI mock."""
538541

542+
request_id: str = Field(..., description="Unique request identifier for idempotency")
539543
prompt: str = Field(..., description="The main instructions for the newsletter content.")
540544
estimated_read_time: int | None = Field(None, description="Estimated read time in minutes for the newsletter.")
541545
sources: list[str] | None = Field(None, description="A list of source names or URLs to use.")

mxgo/tools/scheduled_tasks_tool.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,9 @@ def forward(
260260
logger.info(f"Resolved future handle alias '{future_handle_alias}' to '{resolved_handle_alias}'")
261261
except Exception:
262262
logger.warning(f"Could not resolve handle alias '{future_handle_alias}'. Validation might fail.")
263+
resolved_handle_alias = "ask"
264+
else:
265+
resolved_handle_alias = "ask"
263266

264267
# Validate input using Pydantic
265268
input_data = ScheduledTaskInput(

mxgo/utils.py

Lines changed: 79 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from datetime import datetime, timedelta, timezone
22

3+
from croniter import croniter
4+
35
from mxgo.schemas import ScheduleOptions, ScheduleType
46

57

@@ -107,11 +109,28 @@ def calculate_cron_interval(cron_expression: str) -> timedelta: # noqa: PLR0912
107109
# Daily pattern (specific hour and minute, every day)
108110
elif day == "*" and month == "*" and (weekday in {"*", "?"}):
109111
interval = timedelta(days=1)
110-
# Weekly pattern (specific weekday)
112+
# Weekly pattern with multiple days
111113
elif day == "*" and month == "*" and weekday not in {"*", "?"}:
112-
# If multiple days are specified (e.g., "1-5" or "1,3,5"), the minimum interval is 1 day.
113-
# Otherwise, it's a single day of the week, so the interval is 7 days.
114-
interval = timedelta(days=1) if "," in weekday or "-" in weekday else timedelta(weeks=1)
114+
# This handles cases like "1,3,5" (Mon, Wed, Fri)
115+
if "," in weekday:
116+
days_of_week = sorted([int(d) for d in weekday.split(",")])
117+
if len(days_of_week) <= 1:
118+
return timedelta(weeks=1)
119+
120+
min_diff = 7
121+
# Calculate interval between consecutive days
122+
for i in range(1, len(days_of_week)):
123+
diff = days_of_week[i] - days_of_week[i - 1]
124+
min_diff = min(min_diff, diff)
125+
126+
# Calculate wrap-around interval (e.g., from Friday to Monday)
127+
wrap_around_diff = (days_of_week[0] + 7) - days_of_week[-1]
128+
min_diff = min(min_diff, wrap_around_diff)
129+
130+
return timedelta(days=min_diff)
131+
132+
# Single day of the week
133+
return timedelta(weeks=1)
115134

116135
# Monthly pattern (specific day of month)
117136
elif day != "*" and month == "*":
@@ -131,34 +150,67 @@ def calculate_cron_interval(cron_expression: str) -> timedelta: # noqa: PLR0912
131150

132151

133152
def convert_schedule_to_cron_list(schedule: ScheduleOptions) -> list[str]:
134-
"""Converts schedule options from the newsletter request into a list of cron expressions."""
153+
"""
154+
Converts schedule options from the newsletter request into a list of valid cron expressions.
155+
156+
Raises:
157+
ValueError: If the schedule configuration is invalid or results in an invalid cron expression.
158+
159+
"""
160+
cron_expressions = []
161+
135162
if schedule.type == ScheduleType.IMMEDIATE:
136-
# Schedule for 1 minute in the future to be executed ASAP
163+
# Schedule for 1 minute in the future to be executed as soon as possible.
137164
now = datetime.now(timezone.utc) + timedelta(minutes=1)
138-
return [f"{now.minute} {now.hour} {now.day} {now.month} *"]
165+
cron_str = f"{now.minute} {now.hour} {now.day} {now.month} *"
166+
cron_expressions.append(cron_str)
139167

140-
if schedule.type == ScheduleType.SPECIFIC_DATES:
141-
cron_list = []
142-
if not schedule.specific_dates:
143-
msg = "specific_dates must be provided for SPECIFIC_DATES schedule type."
168+
elif schedule.type == ScheduleType.SPECIFIC_DATES:
169+
if not schedule.specific_datetime:
170+
msg = "specific_datetime must be provided for SPECIFIC_DATES schedule type."
144171
raise ValueError(msg)
145-
for dt_str in schedule.specific_dates:
146-
dt = datetime.fromisoformat(dt_str)
147-
if dt.tzinfo is None:
148-
dt = dt.replace(tzinfo=timezone.utc) # Assume UTC if naive
149-
cron_list.append(f"{dt.minute} {dt.hour} {dt.day} {dt.month} *")
150-
return cron_list
151-
152-
if schedule.type == ScheduleType.RECURRING_WEEKLY:
153-
if not schedule.recurring_weekly or not schedule.recurring_weekly.days:
154-
msg = "recurring_weekly with at least one day must be provided for RECURRING_WEEKLY schedule type."
172+
173+
dt = datetime.fromisoformat(schedule.specific_datetime)
174+
if dt.tzinfo is None:
175+
dt = dt.replace(tzinfo=timezone.utc) # Assume UTC if timezone is not specified
176+
177+
# Ensure the scheduled time is not in the past
178+
if dt <= datetime.now(timezone.utc):
179+
msg = "Specific datetime for a one-time schedule must be in the future."
180+
raise ValueError(msg)
181+
182+
cron_str = f"{dt.minute} {dt.hour} {dt.day} {dt.month} *"
183+
cron_expressions.append(cron_str)
184+
185+
elif schedule.type == ScheduleType.RECURRING_WEEKLY:
186+
if not schedule.weekly_schedule or not schedule.weekly_schedule.days:
187+
msg = "weekly_schedule with at least one day must be provided for RECURRING_WEEKLY type."
155188
raise ValueError(msg)
156189

157-
day_map = {"monday": 1, "tuesday": 2, "wednesday": 3, "thursday": 4, "friday": 5, "saturday": 6, "sunday": 0}
158-
days_of_week = ",".join(str(day_map[day]) for day in schedule.recurring_weekly.days)
159-
hour, minute = schedule.recurring_weekly.time.split(":")
190+
# Days are now expected to be integers (0=Sunday, 1=Monday, ..., 6=Saturday)
191+
days_of_week = ",".join(map(str, sorted(schedule.weekly_schedule.days)))
160192

161-
return [f"{minute} {hour} * * {days_of_week}"]
193+
try:
194+
hour, minute = schedule.weekly_schedule.time.split(":")
195+
# Basic validation for time format
196+
if not (0 <= int(hour) <= 23 and 0 <= int(minute) <= 59): # noqa: PLR2004
197+
msg = "Invalid time format."
198+
raise ValueError(msg)
199+
except ValueError as e:
200+
msg = f"Invalid time format in weekly_schedule: {schedule.weekly_schedule.time}"
201+
raise ValueError(msg) from e
202+
203+
cron_str = f"{minute} {hour} * * {days_of_week}"
204+
cron_expressions.append(cron_str)
205+
206+
else:
207+
msg = f"Unsupported schedule type: {schedule.type}"
208+
raise ValueError(msg)
209+
210+
# Validate all generated cron expressions before returning
211+
for cron_expr in cron_expressions:
212+
if not croniter.is_valid(cron_expr):
213+
msg = f"Generated an invalid cron expression: '{cron_expr}'"
214+
raise ValueError(msg)
162215

163-
msg = f"Unsupported schedule type: {schedule.type}"
164-
raise ValueError(msg)
216+
return cron_expressions

0 commit comments

Comments
 (0)