Skip to content

Commit 6c7318f

Browse files
committed
fix(tasks): replace cron tasks on reload
1 parent 5d9e3de commit 6c7318f

File tree

6 files changed

+102
-19
lines changed

6 files changed

+102
-19
lines changed

apps/test-bot/commandkit.config.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ export default defineConfig({
1313
devtools(),
1414
cache(),
1515
ai(),
16-
tasks(),
16+
tasks({
17+
initializeDefaultDriver: true,
18+
sqliteDriverDatabasePath: './tasks.db',
19+
}),
1720
],
1821
});

apps/test-bot/src/app.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
import { Client, Partials } from 'discord.js';
22
import { Logger, commandkit } from 'commandkit';
3-
import { setDriver } from '@commandkit/tasks';
4-
import { SQLiteDriver } from '@commandkit/tasks/sqlite';
53
import config from './config.json' with { type: 'json' };
64

75
const client = new Client({
@@ -16,8 +14,6 @@ const client = new Client({
1614
partials: [Partials.Channel, Partials.Message, Partials.User],
1715
});
1816

19-
setDriver(new SQLiteDriver('./tasks.db'));
20-
2117
Logger.log('Application bootstrapped successfully!');
2218

2319
commandkit.setPrefixResolver((message) => {
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
import { task } from '@commandkit/tasks';
2+
import { Logger } from 'commandkit';
3+
4+
export default task({
5+
name: 'current-time',
6+
immediate: true,
7+
schedule: '*/10 * * * * *', // every 10 seconds
8+
async execute() {
9+
Logger.info(`The current time is ${new Date().toLocaleString()}`);
10+
},
11+
});

packages/tasks/src/drivers/sqlite.ts

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { TaskDriver, TaskRunner } from '../driver';
22
import { TaskData } from '../types';
33
import { DatabaseSync, StatementSync } from 'node:sqlite';
44
import cronParser from 'cron-parser';
5+
import { defer } from 'commandkit';
56

67
/**
78
* SQLite-based persistent job queue manager for CommandKit tasks.
@@ -28,17 +29,39 @@ export class SQLiteDriver implements TaskDriver {
2829
delete: StatementSync;
2930
updateNextRun: StatementSync;
3031
updateCompleted: StatementSync;
32+
findCronByName: StatementSync;
33+
deleteByName: StatementSync;
3134
};
3235

3336
/**
3437
* Create a new SQLiteDriver instance.
3538
* @param dbPath Path to the SQLite database file (default: './commandkit-tasks.db'). Use `:memory:` for an in-memory database.
39+
* @param pollingInterval The interval in milliseconds to poll for jobs (default: 5_000).
3640
*/
37-
constructor(dbPath = './commandkit-tasks.db') {
41+
constructor(
42+
dbPath = './commandkit-tasks.db',
43+
private pollingInterval = 5_000,
44+
) {
3845
this.db = new DatabaseSync(dbPath, { open: true });
3946
this.init();
4047
}
4148

49+
/**
50+
* Get the polling interval.
51+
* @returns The polling interval in milliseconds.
52+
*/
53+
public getPollingInterval() {
54+
return this.pollingInterval;
55+
}
56+
57+
/**
58+
* Set the polling interval.
59+
* @param pollingInterval The interval in milliseconds to poll for jobs.
60+
*/
61+
public setPollingInterval(pollingInterval: number) {
62+
this.pollingInterval = pollingInterval;
63+
}
64+
4265
/**
4366
* Destroy the SQLite driver and stop the polling loop.
4467
*/
@@ -81,6 +104,12 @@ export class SQLiteDriver implements TaskDriver {
81104
updateCompleted: this.db.prepare(
82105
/* sql */ `UPDATE jobs SET status = 'completed', last_run = ? WHERE id = ?`,
83106
),
107+
findCronByName: this.db.prepare(
108+
/* sql */ `SELECT id FROM jobs WHERE name = ? AND schedule_type = 'cron' AND status = 'pending'`,
109+
),
110+
deleteByName: this.db.prepare(
111+
/* sql */ `DELETE FROM jobs WHERE name = ? AND schedule_type = 'cron'`,
112+
),
84113
};
85114

86115
this.startPolling();
@@ -110,6 +139,15 @@ export class SQLiteDriver implements TaskDriver {
110139
nextRun = typeof schedule === 'number' ? schedule : schedule.getTime();
111140
}
112141

142+
if (scheduleType === 'cron') {
143+
const existingTask = this.statements.findCronByName.get(name) as
144+
| { id: number }
145+
| undefined;
146+
if (existingTask) {
147+
this.statements.deleteByName.run(name);
148+
}
149+
}
150+
113151
const result = this.statements.insert.run(
114152
name,
115153
JSON.stringify(data ?? {}),
@@ -120,11 +158,13 @@ export class SQLiteDriver implements TaskDriver {
120158
Date.now(),
121159
);
122160

123-
if (task.immediate) {
124-
await this.runner?.({
125-
name,
126-
data,
127-
timestamp: Date.now(),
161+
if (task.immediate && scheduleType === 'cron') {
162+
defer(() => {
163+
return this.runner?.({
164+
name,
165+
data,
166+
timestamp: Date.now(),
167+
});
128168
});
129169
}
130170

@@ -153,7 +193,10 @@ export class SQLiteDriver implements TaskDriver {
153193
*/
154194
private startPolling() {
155195
if (this.interval) clearInterval(this.interval);
156-
this.interval = setInterval(() => this.pollJobs(), 1000).unref();
196+
this.interval = setInterval(
197+
() => this.pollJobs(),
198+
this.pollingInterval,
199+
).unref();
157200
// Run immediately on startup
158201
this.pollJobs();
159202
}

packages/tasks/src/plugin.ts

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,18 @@ export interface TasksPluginOptions {
3131
* @default true
3232
*/
3333
initializeDefaultDriver?: boolean;
34+
/**
35+
* The polling interval for the default sqlite driver.
36+
* Default is 5_000.
37+
* @default 5_000
38+
*/
39+
sqliteDriverPollingInterval?: number;
40+
/**
41+
* The path to the sqlite database file for the default sqlite driver.
42+
* Default is './commandkit-tasks.db' but `:memory:` can be used for an in-memory database.
43+
* @default './commandkit-tasks.db'
44+
*/
45+
sqliteDriverDatabasePath?: string;
3446
}
3547

3648
/**
@@ -74,7 +86,12 @@ export class TasksPlugin extends RuntimePlugin<TasksPluginOptions> {
7486
const { SQLiteDriver } =
7587
require('./drivers/sqlite') as typeof import('./drivers/sqlite');
7688

77-
taskDriverManager.setDriver(new SQLiteDriver());
89+
taskDriverManager.setDriver(
90+
new SQLiteDriver(
91+
this.options.sqliteDriverDatabasePath ?? './commandkit-tasks.db',
92+
this.options.sqliteDriverPollingInterval ?? 5_000,
93+
),
94+
);
7895
} catch (e: any) {
7996
Logger.error(
8097
`Failed to initialize the default driver for tasks plugin: ${e?.stack || e}`,
@@ -182,6 +199,8 @@ export class TasksPlugin extends RuntimePlugin<TasksPluginOptions> {
182199
name: task.name,
183200
data: {},
184201
schedule: task.schedule,
202+
immediate: task.immediate,
203+
timezone: task.timezone,
185204
});
186205
}
187206

@@ -225,14 +244,22 @@ export class TasksPlugin extends RuntimePlugin<TasksPluginOptions> {
225244
if (!taskData || !(taskData instanceof Task)) return;
226245

227246
if (this.tasks.has(taskData.name)) {
228-
Logger.info(`Reloading task: ${taskData.name}`);
229-
await taskDriverManager.deleteTask(taskData.name);
247+
if (taskData.isCron()) {
248+
Logger.info(`Replacing cron task: ${taskData.name}`);
249+
// For cron tasks, the SQLiteDriver.create() method will handle the replacement
250+
// No need to manually delete the existing task
251+
} else {
252+
Logger.info(`Reloading task: ${taskData.name}`);
253+
await taskDriverManager.deleteTask(taskData.name);
254+
}
230255
this.tasks.set(taskData.name, taskData);
231256
if (taskData.schedule) {
232257
await taskDriverManager.createTask({
233258
name: taskData.name,
234259
data: {},
235260
schedule: taskData.schedule,
261+
immediate: taskData.immediate,
262+
timezone: taskData.timezone,
236263
});
237264
}
238265
} else {
@@ -243,6 +270,8 @@ export class TasksPlugin extends RuntimePlugin<TasksPluginOptions> {
243270
name: taskData.name,
244271
data: {},
245272
schedule: taskData.schedule,
273+
immediate: taskData.immediate,
274+
timezone: taskData.timezone,
246275
});
247276
}
248277
}

packages/tasks/src/task.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { TaskContext } from './context';
2-
import { TaskDefinition, TaskSchedule } from './types';
2+
import { TaskData, TaskDefinition, TaskSchedule } from './types';
33

44
/**
55
* Represents a task instance with execution logic and metadata.
@@ -14,7 +14,7 @@ import { TaskDefinition, TaskSchedule } from './types';
1414
*
1515
* export default task({
1616
* name: 'cleanup-old-data',
17-
* schedule: { type: 'cron', value: '0 2 * * *' }, // Daily at 2 AM
17+
* schedule: '0 2 * * *', // Daily at 2 AM
1818
* async prepare(ctx) {
1919
* // Only run if there's old data to clean
2020
* return await hasOldData();
@@ -40,7 +40,8 @@ export class Task<T extends Record<string, any> = Record<string, any>> {
4040
* Only applicable to cron tasks, defaults to false.
4141
*/
4242
public get immediate(): boolean {
43-
return this.data.immediate ?? false;
43+
if (this.isCron()) return !!this.data.immediate;
44+
return false;
4445
}
4546

4647
/**
@@ -126,7 +127,7 @@ export class Task<T extends Record<string, any> = Record<string, any>> {
126127
* // Simple scheduled task
127128
* export default task({
128129
* name: 'daily-backup',
129-
* schedule: { type: 'cron', value: '0 0 * * *' },
130+
* schedule: '0 0 * * *',
130131
* async execute(ctx) {
131132
* await performBackup();
132133
* },

0 commit comments

Comments
 (0)