@@ -10,13 +10,7 @@ import {
1010 authorizeWebsiteAccess ,
1111} from "../utils/auth" ;
1212
13- if ( ! process . env . UPSTASH_QSTASH_TOKEN ) {
14- logger . error ( "UPSTASH_QSTASH_TOKEN environment variable is required" ) ;
15- }
16-
17- const client = new Client ( {
18- token : process . env . UPSTASH_QSTASH_TOKEN ,
19- } ) ;
13+ const client = new Client ( { token : process . env . UPSTASH_QSTASH_TOKEN } ) ;
2014
2115const CRON_GRANULARITIES = {
2216 minute : "* * * * *" ,
@@ -40,85 +34,60 @@ const granularityEnum = z.enum([
4034
4135const UPTIME_URL_GROUP = process . env . UPTIME_URL_GROUP || "uptime" ;
4236
43- async function findScheduleById ( scheduleId : string ) {
37+ async function getScheduleAndAuthorize (
38+ scheduleId : string ,
39+ context : Parameters < typeof authorizeUptimeScheduleAccess > [ 0 ]
40+ ) {
4441 const schedule = await db . query . uptimeSchedules . findFirst ( {
4542 where : eq ( uptimeSchedules . id , scheduleId ) ,
4643 } ) ;
47- return schedule ;
48- }
4944
50- async function findScheduleByWebsiteId ( websiteId : string ) {
51- const schedule = await db . query . uptimeSchedules . findFirst ( {
52- where : eq ( uptimeSchedules . websiteId , websiteId ) ,
53- orderBy : ( table , { desc } ) => [ desc ( table . createdAt ) ] ,
45+ if ( ! schedule ) {
46+ throw new ORPCError ( "NOT_FOUND" , { message : "Schedule not found" } ) ;
47+ }
48+
49+ await authorizeUptimeScheduleAccess ( context , {
50+ websiteId : schedule . websiteId ,
51+ userId : schedule . userId ,
5452 } ) ;
53+
5554 return schedule ;
5655}
5756
5857async function createQStashSchedule (
5958 scheduleId : string ,
6059 granularity : z . infer < typeof granularityEnum >
6160) {
62- const result = await client . schedules . create ( {
61+ await client . schedules . create ( {
6362 scheduleId,
6463 destination : UPTIME_URL_GROUP ,
6564 cron : CRON_GRANULARITIES [ granularity ] ,
66- headers : {
67- "Content-Type" : "application/json" ,
68- "X-Schedule-Id" : scheduleId ,
69- } ,
65+ headers : { "Content-Type" : "application/json" , "X-Schedule-Id" : scheduleId } ,
7066 } ) ;
71-
72- if ( ! result . scheduleId ) {
73- throw new ORPCError ( "INTERNAL_SERVER_ERROR" , {
74- message : "Failed to create uptime schedule" ,
75- } ) ;
76- }
77-
78- return result . scheduleId ;
7967}
8068
81- async function ensureNoDuplicateSchedule (
82- url : string ,
83- userId : string ,
84- websiteId ?: string | null
85- ) {
86- const conditions = [
87- eq ( uptimeSchedules . url , url ) ,
88- eq ( uptimeSchedules . userId , userId ) ,
89- ] ;
90-
91- if ( websiteId ) {
92- conditions . push ( eq ( uptimeSchedules . websiteId , websiteId ) ) ;
93- }
94-
95- const existing = await db . query . uptimeSchedules . findFirst ( {
96- where : and ( ...conditions ) ,
97- } ) ;
98-
99- if ( existing ) {
100- throw new ORPCError ( "CONFLICT" , {
101- message : websiteId
102- ? "Monitor already exists for this website"
103- : "Monitor already exists for this URL" ,
104- } ) ;
105- }
69+ function triggerInitialCheck ( scheduleId : string ) {
70+ client
71+ . publish ( {
72+ urlGroup : UPTIME_URL_GROUP ,
73+ headers : { "Content-Type" : "application/json" , "X-Schedule-Id" : scheduleId } ,
74+ } )
75+ . catch ( ( error ) => logger . error ( { scheduleId, error } , "Initial check failed" ) ) ;
10676}
10777
10878export const uptimeRouter = {
10979 getScheduleByWebsiteId : protectedProcedure
11080 . input ( z . object ( { websiteId : z . string ( ) } ) )
11181 . handler ( async ( { context, input } ) => {
11282 await authorizeWebsiteAccess ( context , input . websiteId , "read" ) ;
113- return await findScheduleByWebsiteId ( input . websiteId ) ;
83+ return await db . query . uptimeSchedules . findFirst ( {
84+ where : eq ( uptimeSchedules . websiteId , input . websiteId ) ,
85+ orderBy : ( table , { desc } ) => [ desc ( table . createdAt ) ] ,
86+ } ) ;
11487 } ) ,
11588
11689 listSchedules : protectedProcedure
117- . input (
118- z . object ( {
119- websiteId : z . string ( ) . optional ( ) ,
120- } )
121- )
90+ . input ( z . object ( { websiteId : z . string ( ) . optional ( ) } ) )
12291 . handler ( async ( { context, input } ) => {
12392 const conditions = [ eq ( uptimeSchedules . userId , context . user . id ) ] ;
12493
@@ -127,19 +96,19 @@ export const uptimeRouter = {
12796 conditions . push ( eq ( uptimeSchedules . websiteId , input . websiteId ) ) ;
12897 }
12998
130- const schedules = await db . query . uptimeSchedules . findMany ( {
99+ return await db . query . uptimeSchedules . findMany ( {
131100 where : and ( ...conditions ) ,
132101 orderBy : ( table , { desc } ) => [ desc ( table . createdAt ) ] ,
133102 } ) ;
134-
135- return schedules ;
136103 } ) ,
137104
138105 getSchedule : protectedProcedure
139106 . input ( z . object ( { scheduleId : z . string ( ) } ) )
140107 . handler ( async ( { context, input } ) => {
141108 const [ dbSchedule , qstashSchedule ] = await Promise . all ( [
142- findScheduleById ( input . scheduleId ) ,
109+ db . query . uptimeSchedules . findFirst ( {
110+ where : eq ( uptimeSchedules . id , input . scheduleId ) ,
111+ } ) ,
143112 client . schedules . get ( input . scheduleId ) . catch ( ( ) => null ) ,
144113 ] ) ;
145114
@@ -161,7 +130,7 @@ export const uptimeRouter = {
161130 createSchedule : protectedProcedure
162131 . input (
163132 z . object ( {
164- url : z . string ( ) . url ( "Valid URL is required" ) ,
133+ url : z . string ( ) . url ( ) ,
165134 name : z . string ( ) . optional ( ) ,
166135 websiteId : z . string ( ) . optional ( ) ,
167136 granularity : granularityEnum ,
@@ -172,15 +141,26 @@ export const uptimeRouter = {
172141 await authorizeWebsiteAccess ( context , input . websiteId , "update" ) ;
173142 }
174143
175- await ensureNoDuplicateSchedule (
176- input . url ,
177- context . user . id ,
178- input . websiteId ?? null
179- ) ;
144+ const existing = await db . query . uptimeSchedules . findFirst ( {
145+ where : and (
146+ eq ( uptimeSchedules . url , input . url ) ,
147+ eq ( uptimeSchedules . userId , context . user . id ) ,
148+ ...( input . websiteId
149+ ? [ eq ( uptimeSchedules . websiteId , input . websiteId ) ]
150+ : [ ] )
151+ ) ,
152+ } ) ;
153+
154+ if ( existing ) {
155+ throw new ORPCError ( "CONFLICT" , {
156+ message : input . websiteId
157+ ? "Monitor already exists for this website"
158+ : "Monitor already exists for this URL" ,
159+ } ) ;
160+ }
180161
181162 const scheduleId = input . websiteId || nanoid ( 10 ) ;
182163
183- // Insert to DB first
184164 await db . insert ( uptimeSchedules ) . values ( {
185165 id : scheduleId ,
186166 websiteId : input . websiteId ?? null ,
@@ -192,30 +172,17 @@ export const uptimeRouter = {
192172 isPaused : false ,
193173 } ) ;
194174
195- // Then create QStash schedule, rollback DB if it fails
196175 try {
197176 await createQStashSchedule ( scheduleId , input . granularity ) ;
198177 } catch ( error ) {
199178 await db . delete ( uptimeSchedules ) . where ( eq ( uptimeSchedules . id , scheduleId ) ) ;
200- logger . error ( { scheduleId, error } , "QStash creation failed, rolled back" ) ;
179+ logger . error ( { scheduleId, error } , "QStash failed, rolled back" ) ;
201180 throw new ORPCError ( "INTERNAL_SERVER_ERROR" , {
202181 message : "Failed to create monitor" ,
203182 } ) ;
204183 }
205184
206- // Trigger initial check (fire-and-forget)
207- client
208- . publish ( {
209- urlGroup : UPTIME_URL_GROUP ,
210- headers : {
211- "Content-Type" : "application/json" ,
212- "X-Schedule-Id" : scheduleId ,
213- } ,
214- } )
215- . catch ( ( error ) =>
216- logger . error ( { scheduleId, error } , "Failed to trigger initial check" )
217- ) ;
218-
185+ triggerInitialCheck ( scheduleId ) ;
219186 logger . info ( { scheduleId, url : input . url } , "Schedule created" ) ;
220187
221188 return {
@@ -230,15 +197,7 @@ export const uptimeRouter = {
230197 updateSchedule : protectedProcedure
231198 . input ( z . object ( { scheduleId : z . string ( ) , granularity : granularityEnum } ) )
232199 . handler ( async ( { context, input } ) => {
233- const schedule = await findScheduleById ( input . scheduleId ) ;
234- if ( ! schedule ) {
235- throw new ORPCError ( "NOT_FOUND" , { message : "Schedule not found" } ) ;
236- }
237-
238- await authorizeUptimeScheduleAccess ( context , {
239- websiteId : schedule . websiteId ,
240- userId : schedule . userId ,
241- } ) ;
200+ await getScheduleAndAuthorize ( input . scheduleId , context ) ;
242201
243202 await client . schedules . delete ( input . scheduleId ) ;
244203 await createQStashSchedule ( input . scheduleId , input . granularity ) ;
@@ -264,38 +223,53 @@ export const uptimeRouter = {
264223 deleteSchedule : protectedProcedure
265224 . input ( z . object ( { scheduleId : z . string ( ) } ) )
266225 . handler ( async ( { context, input } ) => {
267- const schedule = await findScheduleById ( input . scheduleId ) ;
268- if ( ! schedule ) {
269- throw new ORPCError ( "NOT_FOUND" , { message : "Schedule not found" } ) ;
270- }
271-
272- await authorizeUptimeScheduleAccess ( context , {
273- websiteId : schedule . websiteId ,
274- userId : schedule . userId ,
275- } ) ;
226+ await getScheduleAndAuthorize ( input . scheduleId , context ) ;
276227
277228 await Promise . all ( [
278229 client . schedules . delete ( input . scheduleId ) ,
279230 db . delete ( uptimeSchedules ) . where ( eq ( uptimeSchedules . id , input . scheduleId ) ) ,
280231 ] ) ;
281232
282233 logger . info ( { scheduleId : input . scheduleId } , "Schedule deleted" ) ;
234+ return { success : true } ;
235+ } ) ,
236+
237+ togglePause : protectedProcedure
238+ . input ( z . object ( { scheduleId : z . string ( ) , pause : z . boolean ( ) } ) )
239+ . handler ( async ( { context, input } ) => {
240+ const schedule = await getScheduleAndAuthorize ( input . scheduleId , context ) ;
241+
242+ if ( schedule . isPaused === input . pause ) {
243+ throw new ORPCError ( "BAD_REQUEST" , {
244+ message : input . pause
245+ ? "Schedule is already paused"
246+ : "Schedule is not paused" ,
247+ } ) ;
248+ }
249+
250+ await Promise . all ( [
251+ input . pause
252+ ? client . schedules . pause ( { schedule : input . scheduleId } )
253+ : client . schedules . resume ( { schedule : input . scheduleId } ) ,
254+ db
255+ . update ( uptimeSchedules )
256+ . set ( { isPaused : input . pause , updatedAt : new Date ( ) } )
257+ . where ( eq ( uptimeSchedules . id , input . scheduleId ) ) ,
258+ ] ) ;
283259
284- return { success : true , scheduleId : input . scheduleId } ;
260+ logger . info (
261+ { scheduleId : input . scheduleId , paused : input . pause } ,
262+ "Schedule toggled"
263+ ) ;
264+
265+ return { success : true , isPaused : input . pause } ;
285266 } ) ,
286267
268+ // Legacy endpoints for backwards compatibility
287269 pauseSchedule : protectedProcedure
288270 . input ( z . object ( { scheduleId : z . string ( ) } ) )
289271 . handler ( async ( { context, input } ) => {
290- const schedule = await findScheduleById ( input . scheduleId ) ;
291- if ( ! schedule ) {
292- throw new ORPCError ( "NOT_FOUND" , { message : "Schedule not found" } ) ;
293- }
294-
295- await authorizeUptimeScheduleAccess ( context , {
296- websiteId : schedule . websiteId ,
297- userId : schedule . userId ,
298- } ) ;
272+ const schedule = await getScheduleAndAuthorize ( input . scheduleId , context ) ;
299273
300274 if ( schedule . isPaused ) {
301275 throw new ORPCError ( "BAD_REQUEST" , {
@@ -312,22 +286,13 @@ export const uptimeRouter = {
312286 ] ) ;
313287
314288 logger . info ( { scheduleId : input . scheduleId } , "Schedule paused" ) ;
315-
316- return { success : true , scheduleId : input . scheduleId , isPaused : true } ;
289+ return { success : true , isPaused : true } ;
317290 } ) ,
318291
319292 resumeSchedule : protectedProcedure
320293 . input ( z . object ( { scheduleId : z . string ( ) } ) )
321294 . handler ( async ( { context, input } ) => {
322- const schedule = await findScheduleById ( input . scheduleId ) ;
323- if ( ! schedule ) {
324- throw new ORPCError ( "NOT_FOUND" , { message : "Schedule not found" } ) ;
325- }
326-
327- await authorizeUptimeScheduleAccess ( context , {
328- websiteId : schedule . websiteId ,
329- userId : schedule . userId ,
330- } ) ;
295+ const schedule = await getScheduleAndAuthorize ( input . scheduleId , context ) ;
331296
332297 if ( ! schedule . isPaused ) {
333298 throw new ORPCError ( "BAD_REQUEST" , {
@@ -344,7 +309,6 @@ export const uptimeRouter = {
344309 ] ) ;
345310
346311 logger . info ( { scheduleId : input . scheduleId } , "Schedule resumed" ) ;
347-
348- return { success : true , scheduleId : input . scheduleId , isPaused : false } ;
312+ return { success : true , isPaused : false } ;
349313 } ) ,
350314} ;
0 commit comments