@@ -2,6 +2,7 @@ import { TaskDriver, TaskRunner } from '../driver';
22import { TaskData } from '../types' ;
33import { DatabaseSync , StatementSync } from 'node:sqlite' ;
44import cronParser from 'cron-parser' ;
5+ import { defer } from 'commandkit' ;
56
67/**
78 * SQLite-based persistent job queue manager for CommandKit tasks.
@@ -28,17 +29,39 @@ export class SQLiteDriver implements TaskDriver {
2829 delete : StatementSync ;
2930 updateNextRun : StatementSync ;
3031 updateCompleted : StatementSync ;
32+ findCronByName : StatementSync ;
33+ deleteByName : StatementSync ;
3134 } ;
3235
3336 /**
3437 * Create a new SQLiteDriver instance.
3538 * @param dbPath Path to the SQLite database file (default: './commandkit-tasks.db'). Use `:memory:` for an in-memory database.
39+ * @param pollingInterval The interval in milliseconds to poll for jobs (default: 5_000).
3640 */
37- constructor ( dbPath = './commandkit-tasks.db' ) {
41+ constructor (
42+ dbPath = './commandkit-tasks.db' ,
43+ private pollingInterval = 5_000 ,
44+ ) {
3845 this . db = new DatabaseSync ( dbPath , { open : true } ) ;
3946 this . init ( ) ;
4047 }
4148
49+ /**
50+ * Get the polling interval.
51+ * @returns The polling interval in milliseconds.
52+ */
53+ public getPollingInterval ( ) {
54+ return this . pollingInterval ;
55+ }
56+
57+ /**
58+ * Set the polling interval.
59+ * @param pollingInterval The interval in milliseconds to poll for jobs.
60+ */
61+ public setPollingInterval ( pollingInterval : number ) {
62+ this . pollingInterval = pollingInterval ;
63+ }
64+
4265 /**
4366 * Destroy the SQLite driver and stop the polling loop.
4467 */
@@ -81,6 +104,12 @@ export class SQLiteDriver implements TaskDriver {
81104 updateCompleted : this . db . prepare (
82105 /* sql */ `UPDATE jobs SET status = 'completed', last_run = ? WHERE id = ?` ,
83106 ) ,
107+ findCronByName : this . db . prepare (
108+ /* sql */ `SELECT id FROM jobs WHERE name = ? AND schedule_type = 'cron' AND status = 'pending'` ,
109+ ) ,
110+ deleteByName : this . db . prepare (
111+ /* sql */ `DELETE FROM jobs WHERE name = ? AND schedule_type = 'cron'` ,
112+ ) ,
84113 } ;
85114
86115 this . startPolling ( ) ;
@@ -110,6 +139,15 @@ export class SQLiteDriver implements TaskDriver {
110139 nextRun = typeof schedule === 'number' ? schedule : schedule . getTime ( ) ;
111140 }
112141
142+ if ( scheduleType === 'cron' ) {
143+ const existingTask = this . statements . findCronByName . get ( name ) as
144+ | { id : number }
145+ | undefined ;
146+ if ( existingTask ) {
147+ this . statements . deleteByName . run ( name ) ;
148+ }
149+ }
150+
113151 const result = this . statements . insert . run (
114152 name ,
115153 JSON . stringify ( data ?? { } ) ,
@@ -120,11 +158,13 @@ export class SQLiteDriver implements TaskDriver {
120158 Date . now ( ) ,
121159 ) ;
122160
123- if ( task . immediate ) {
124- await this . runner ?.( {
125- name,
126- data,
127- timestamp : Date . now ( ) ,
161+ if ( task . immediate && scheduleType === 'cron' ) {
162+ defer ( ( ) => {
163+ return this . runner ?.( {
164+ name,
165+ data,
166+ timestamp : Date . now ( ) ,
167+ } ) ;
128168 } ) ;
129169 }
130170
@@ -153,7 +193,10 @@ export class SQLiteDriver implements TaskDriver {
153193 */
154194 private startPolling ( ) {
155195 if ( this . interval ) clearInterval ( this . interval ) ;
156- this . interval = setInterval ( ( ) => this . pollJobs ( ) , 1000 ) . unref ( ) ;
196+ this . interval = setInterval (
197+ ( ) => this . pollJobs ( ) ,
198+ this . pollingInterval ,
199+ ) . unref ( ) ;
157200 // Run immediately on startup
158201 this . pollJobs ( ) ;
159202 }
0 commit comments