Skip to content

Commit a9087fd

Browse files
Merge pull request #1258 from mykonos-ibiza/move-to-supabase-cron
refactor: replace QStash with Supabase Cron for background job processing
2 parents a1ef96b + 2cba539 commit a9087fd

File tree

14 files changed

+1648
-1681
lines changed

14 files changed

+1648
-1681
lines changed

.cursor/rules/suna-project.mdc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ project/
7070
- **LLM Integration**: LiteLLM for multi-provider support, structured prompts
7171
- **Tool System**: Dual schema decorators (OpenAPI + XML), consistent ToolResult
7272
- **Real-time**: Supabase subscriptions for live updates
73-
- **Background Jobs**: Dramatiq for async processing, QStash for scheduling
7473

7574
## Key Technologies
7675

CONTRIBUTING.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ Before contributing, ensure you have access to:
4141
- Daytona account (for agent execution)
4242
- Tavily API key (for search)
4343
- Firecrawl API key (for web scraping)
44-
- QStash account (for background jobs)
4544

4645
**Optional:**
4746

README.md

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@ The setup process includes:
9999
- Setting up Daytona for secure agent execution
100100
- Integrating with LLM providers (Anthropic, OpenAI, OpenRouter, etc.)
101101
- Configuring web search and scraping capabilities (Tavily, Firecrawl)
102-
- Setting up QStash for background job processing and workflows
103102
- Configuring webhook handling for automated tasks
104103
- Optional integrations (RapidAPI for data providers)
105104

@@ -147,14 +146,13 @@ We welcome contributions from the community! Please see our [Contributing Guide]
147146
### Technologies
148147

149148
- [Daytona](https://daytona.io/) - Secure agent execution environment
150-
- [Supabase](https://supabase.com/) - Database and authentication
149+
- [Supabase](https://supabase.com/) - Database, Cron, and Authentication
151150
- [Playwright](https://playwright.dev/) - Browser automation
152151
- [OpenAI](https://openai.com/) - LLM provider
153152
- [Anthropic](https://www.anthropic.com/) - LLM provider
154153
- [Morph](https://morphllm.com/) - For AI-powered code editing
155154
- [Tavily](https://tavily.com/) - Search capabilities
156155
- [Firecrawl](https://firecrawl.dev/) - Web scraping capabilities
157-
- [QStash](https://upstash.com/qstash) - Background job processing and workflows
158156
- [RapidAPI](https://rapidapi.com/) - API services
159157
- Custom MCP servers - Extend functionality with custom tools
160158

backend/.env.example

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,6 @@ SMITHERY_API_KEY=
5454

5555
MCP_CREDENTIAL_ENCRYPTION_KEY=
5656

57-
QSTASH_URL="https://qstash.upstash.io"
58-
QSTASH_TOKEN=""
59-
QSTASH_CURRENT_SIGNING_KEY=""
60-
QSTASH_NEXT_SIGNING_KEY=""
61-
6257
WEBHOOK_BASE_URL=""
6358

6459
# Optional

backend/README.md

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -96,11 +96,6 @@ DAYTONA_API_KEY=your-daytona-key
9696
DAYTONA_SERVER_URL=https://app.daytona.io/api
9797
DAYTONA_TARGET=us
9898

99-
# Background Job Processing (Required)
100-
QSTASH_URL=https://qstash.upstash.io
101-
QSTASH_TOKEN=your-qstash-token
102-
QSTASH_CURRENT_SIGNING_KEY=your-current-signing-key
103-
QSTASH_NEXT_SIGNING_KEY=your-next-signing-key
10499
WEBHOOK_BASE_URL=https://yourdomain.com
105100

106101
# MCP Configuration

backend/pyproject.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ dependencies = [
5858
"cryptography>=41.0.0",
5959
"apscheduler>=3.10.0",
6060
"croniter>=1.4.0",
61-
"qstash>=2.0.0",
6261
"structlog==25.4.0",
6362
"PyPDF2==3.0.1",
6463
"python-docx==1.1.0",
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
-- Enable Supabase Cron and HTTP extensions and provide helper RPCs
2+
-- This migration replaces QStash-based scheduling with Supabase Cron
3+
4+
BEGIN;
5+
6+
-- Enable required extensions if not already enabled
7+
CREATE EXTENSION IF NOT EXISTS pg_cron;
8+
CREATE EXTENSION IF NOT EXISTS pg_net;
9+
10+
-- Helper function to schedule an HTTP POST via Supabase Cron
11+
-- Overwrites existing job with the same name
12+
CREATE OR REPLACE FUNCTION public.schedule_trigger_http(
13+
job_name text,
14+
schedule text,
15+
url text,
16+
headers jsonb DEFAULT '{}'::jsonb,
17+
body jsonb DEFAULT '{}'::jsonb,
18+
timeout_ms integer DEFAULT 8000
19+
) RETURNS bigint
20+
LANGUAGE plpgsql
21+
SECURITY DEFINER
22+
AS $$
23+
DECLARE
24+
job_id bigint;
25+
sql_text text;
26+
headers_fixed jsonb;
27+
body_fixed jsonb;
28+
BEGIN
29+
-- Unschedule any existing jobs with the same name
30+
PERFORM cron.unschedule(j.jobid)
31+
FROM cron.job j
32+
WHERE j.jobname = job_name;
33+
34+
-- Normalize headers/body in case callers pass JSON strings instead of objects
35+
headers_fixed := COALESCE(
36+
CASE
37+
WHEN headers IS NULL THEN '{}'::jsonb
38+
WHEN jsonb_typeof(headers) = 'object' THEN headers
39+
WHEN jsonb_typeof(headers) = 'string' THEN (
40+
-- Remove surrounding quotes then unescape to get raw JSON text, finally cast to jsonb
41+
replace(replace(trim(both '"' from headers::text), '\\"', '"'), '\\\\', '\\')
42+
)::jsonb
43+
ELSE '{}'::jsonb
44+
END,
45+
'{}'::jsonb
46+
);
47+
48+
body_fixed := COALESCE(
49+
CASE
50+
WHEN body IS NULL THEN '{}'::jsonb
51+
WHEN jsonb_typeof(body) = 'object' THEN body
52+
WHEN jsonb_typeof(body) = 'string' THEN (
53+
replace(replace(trim(both '"' from body::text), '\\"', '"'), '\\\\', '\\')
54+
)::jsonb
55+
ELSE body
56+
END,
57+
'{}'::jsonb
58+
);
59+
60+
-- Build the SQL snippet to be executed by pg_cron
61+
sql_text := format(
62+
$sql$select net.http_post(
63+
url := %L,
64+
headers := %L::jsonb,
65+
body := %L::jsonb,
66+
timeout_milliseconds := %s
67+
);$sql$,
68+
url,
69+
headers_fixed::text,
70+
body_fixed::text,
71+
timeout_ms
72+
);
73+
74+
job_id := cron.schedule(job_name, schedule, sql_text);
75+
RETURN job_id;
76+
END;
77+
$$;
78+
79+
-- Helper to unschedule by job name
80+
CREATE OR REPLACE FUNCTION public.unschedule_job_by_name(job_name text)
81+
RETURNS void
82+
LANGUAGE plpgsql
83+
SECURITY DEFINER
84+
AS $$
85+
BEGIN
86+
PERFORM cron.unschedule(j.jobid)
87+
FROM cron.job j
88+
WHERE j.jobname = job_name;
89+
END;
90+
$$;
91+
92+
-- Grant execute to service role (backend uses service role key)
93+
GRANT EXECUTE ON FUNCTION public.schedule_trigger_http(text, text, text, jsonb, jsonb, integer) TO service_role;
94+
GRANT EXECUTE ON FUNCTION public.unschedule_job_by_name(text) TO service_role;
95+
96+
COMMIT;

backend/triggers/api.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import uuid
77
from datetime import datetime, timezone
88
import json
9+
import hmac
910

1011
from services.supabase import DBConnection
1112
from utils.auth_utils import get_current_user_id_from_jwt
@@ -452,6 +453,18 @@ async def trigger_webhook(
452453
raise HTTPException(status_code=403, detail="Agent triggers are not enabled")
453454

454455
try:
456+
# Simple header-based auth using a shared secret
457+
# Configure the secret via environment variable: TRIGGER_WEBHOOK_SECRET
458+
secret = os.getenv("TRIGGER_WEBHOOK_SECRET")
459+
if not secret:
460+
logger.error("TRIGGER_WEBHOOK_SECRET is not configured")
461+
raise HTTPException(status_code=500, detail="Webhook secret not configured")
462+
463+
incoming_secret = request.headers.get("x-trigger-secret", "")
464+
if not hmac.compare_digest(incoming_secret, secret):
465+
logger.warning(f"Invalid webhook secret for trigger {trigger_id}")
466+
raise HTTPException(status_code=401, detail="Unauthorized")
467+
455468
# Get raw data from request
456469
raw_data = {}
457470
try:

backend/triggers/provider_service.py

Lines changed: 53 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
import croniter
99
import pytz
10-
from qstash.client import QStash
10+
from services.supabase import DBConnection
1111

1212
from services.supabase import DBConnection
1313
from utils.logger import logger
@@ -41,19 +41,11 @@ async def process_event(self, trigger: Trigger, event: TriggerEvent) -> TriggerR
4141
class ScheduleProvider(TriggerProvider):
4242
def __init__(self):
4343
super().__init__("schedule", TriggerType.SCHEDULE)
44-
self._qstash_token = os.getenv("QSTASH_TOKEN")
45-
self._webhook_base_url = os.getenv("WEBHOOK_BASE_URL", "http://localhost:3000")
46-
47-
if not self._qstash_token:
48-
logger.warning("QSTASH_TOKEN not found. Schedule provider will not work without it.")
49-
self._qstash = None
50-
else:
51-
self._qstash = QStash(token=self._qstash_token)
44+
# This should point to your backend base URL since Supabase Cron will POST to backend
45+
self._webhook_base_url = os.getenv("WEBHOOK_BASE_URL", "http://localhost:8000")
46+
self._db = DBConnection()
5247

5348
async def validate_config(self, config: Dict[str, Any]) -> Dict[str, Any]:
54-
if not self._qstash:
55-
raise ValueError("QSTASH_TOKEN environment variable is required for scheduled triggers")
56-
5749
if 'cron_expression' not in config:
5850
raise ValueError("cron_expression is required for scheduled triggers")
5951

@@ -81,10 +73,6 @@ async def validate_config(self, config: Dict[str, Any]) -> Dict[str, Any]:
8173
return config
8274

8375
async def setup_trigger(self, trigger: Trigger) -> bool:
84-
if not self._qstash:
85-
logger.error("QStash client not available")
86-
return False
87-
8876
try:
8977
webhook_url = f"{self._webhook_base_url}/api/triggers/{trigger.trigger_id}/webhook"
9078
cron_expression = trigger.config['cron_expression']
@@ -104,63 +92,71 @@ async def setup_trigger(self, trigger: Trigger) -> bool:
10492
"timestamp": datetime.now(timezone.utc).isoformat()
10593
}
10694

107-
headers = {
95+
headers: Dict[str, Any] = {
10896
"Content-Type": "application/json",
10997
"X-Trigger-Source": "schedule"
11098
}
111-
99+
100+
# Include simple shared secret header for backend auth
101+
secret = os.getenv("TRIGGER_WEBHOOK_SECRET")
102+
if secret:
103+
headers["X-Trigger-Secret"] = secret
112104
if config.ENV_MODE == EnvMode.STAGING:
113105
vercel_bypass_key = os.getenv("VERCEL_PROTECTION_BYPASS_KEY", "")
114106
if vercel_bypass_key:
115107
headers["X-Vercel-Protection-Bypass"] = vercel_bypass_key
116-
117-
schedule_id = await asyncio.to_thread(
118-
self._qstash.schedule.create,
119-
destination=webhook_url,
120-
cron=cron_expression,
121-
body=json.dumps(payload),
122-
headers=headers,
123-
retries=3,
124-
delay="5s"
125-
)
126-
127-
trigger.config['qstash_schedule_id'] = schedule_id
128-
logger.info(f"Created QStash schedule {schedule_id} for trigger {trigger.trigger_id}")
108+
109+
# Supabase Cron job names are case-sensitive; we keep a stable name per trigger
110+
job_name = f"trigger_{trigger.trigger_id}"
111+
112+
# Schedule via Supabase Cron RPC helper
113+
client = await self._db.client
114+
try:
115+
result = await client.rpc(
116+
"schedule_trigger_http",
117+
{
118+
"job_name": job_name,
119+
"schedule": cron_expression,
120+
"url": webhook_url,
121+
"headers": headers,
122+
"body": payload,
123+
"timeout_ms": 8000,
124+
},
125+
).execute()
126+
except Exception as rpc_err:
127+
logger.error(f"Failed to schedule Supabase Cron job via RPC: {rpc_err}")
128+
return False
129+
130+
trigger.config['cron_job_name'] = job_name
131+
try:
132+
trigger.config['cron_job_id'] = result.data
133+
except Exception:
134+
trigger.config['cron_job_id'] = None
135+
logger.info(f"Created Supabase Cron job '{job_name}' for trigger {trigger.trigger_id}")
129136
return True
130137

131138
except Exception as e:
132-
logger.error(f"Failed to setup QStash schedule for trigger {trigger.trigger_id}: {e}")
139+
logger.error(f"Failed to setup Supabase Cron schedule for trigger {trigger.trigger_id}: {e}")
133140
return False
134141

135142
async def teardown_trigger(self, trigger: Trigger) -> bool:
136-
if not self._qstash:
137-
logger.warning("QStash client not available, skipping teardown")
138-
return True
139-
140143
try:
141-
schedule_id = trigger.config.get('qstash_schedule_id')
142-
if schedule_id:
143-
try:
144-
await asyncio.to_thread(self._qstash.schedule.delete, schedule_id)
145-
logger.info(f"Deleted QStash schedule {schedule_id} for trigger {trigger.trigger_id}")
146-
return True
147-
except Exception as e:
148-
logger.warning(f"Failed to delete QStash schedule {schedule_id}: {e}")
149-
150-
schedules = await asyncio.to_thread(self._qstash.schedule.list)
151-
webhook_url = f"{self._webhook_base_url}/api/triggers/{trigger.trigger_id}/webhook"
152-
153-
for schedule in schedules:
154-
if schedule.get('destination') == webhook_url:
155-
await asyncio.to_thread(self._qstash.schedule.delete, schedule['scheduleId'])
156-
logger.info(f"Deleted QStash schedule {schedule['scheduleId']} for trigger {trigger.trigger_id}")
157-
return True
158-
159-
logger.warning(f"No QStash schedule found for trigger {trigger.trigger_id}")
160-
return True
144+
job_name = trigger.config.get('cron_job_name') or f"trigger_{trigger.trigger_id}"
145+
client = await self._db.client
146+
147+
try:
148+
await client.rpc(
149+
"unschedule_job_by_name",
150+
{"job_name": job_name},
151+
).execute()
152+
logger.info(f"Unschedule requested for Supabase Cron job '{job_name}' (trigger {trigger.trigger_id})")
153+
return True
154+
except Exception as rpc_err:
155+
logger.warning(f"Failed to unschedule job '{job_name}' via RPC: {rpc_err}")
156+
return False
161157

162158
except Exception as e:
163-
logger.error(f"Failed to teardown QStash schedule for trigger {trigger.trigger_id}: {e}")
159+
logger.error(f"Failed to teardown Supabase Cron schedule for trigger {trigger.trigger_id}: {e}")
164160
return False
165161

166162
async def process_event(self, trigger: Trigger, event: TriggerEvent) -> TriggerResult:

backend/triggers/trigger_service.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,8 @@ async def update_trigger(
135135

136136
trigger.updated_at = datetime.now(timezone.utc)
137137

138-
if config is not None or (is_active is True and not trigger.is_active):
138+
# Reconcile provider scheduling when config changes or activation state toggles
139+
if (config is not None) or (is_active is not None):
139140
from .provider_service import get_provider_service
140141
provider_service = get_provider_service(self._db)
141142

0 commit comments

Comments
 (0)