Skip to content

Commit ed6c060

Browse files
committed
add `createQueueJobsTable´ helper to the kysely adapter
1 parent b9fb66c commit ed6c060

File tree

4 files changed

+78
-53
lines changed

4 files changed

+78
-53
lines changed

packages/adapter-kysely/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@
3737
"default": "./dist/index.js"
3838
},
3939
"./migrations": {
40-
"default": "./src/migrations.js"
40+
"default": "./dist/migrations.js",
41+
"types": "./dist/migrations/queue_table.d.ts"
4142
},
4243
"./postgres-adapter": {
4344
"types": "./dist/postgres-adapter.d.ts",
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import type { Kysely } from "kysely"
2+
import { sql } from "kysely"
3+
4+
export function createQueueJobsTable(tableName: string, schemaName?: string) {
5+
return {
6+
up: async (db: Kysely<unknown>) => {
7+
await generateUp({ schemaName, tableName, db })
8+
},
9+
down: async (db: Kysely<unknown>) => {
10+
const schema = schemaName ? db.schema.withSchema(schemaName) : db.schema
11+
await schema.dropTable(tableName).execute()
12+
if (schemaName) {
13+
await schema.dropSchema(schemaName).execute()
14+
}
15+
},
16+
}
17+
}
18+
19+
async function generateUp({
20+
schemaName,
21+
tableName,
22+
db,
23+
}: {
24+
schemaName?: string
25+
tableName: string
26+
db: Kysely<unknown>
27+
}) {
28+
if (schemaName) {
29+
await db.schema.createSchema("custom_schema").ifNotExists().execute()
30+
}
31+
32+
const schema = schemaName ? db.schema.withSchema(schemaName) : db.schema
33+
34+
await schema
35+
.createTable(tableName)
36+
.addColumn("id", "uuid", (col) => col.defaultTo(sql`gen_random_uuid()`).notNull())
37+
.addColumn("queue_name", "varchar(255)", (col) => col.notNull())
38+
.addColumn("name", "varchar(255)", (col) => col.notNull())
39+
.addColumn("payload", "jsonb", (col) => col.notNull())
40+
.addColumn("status", "varchar(50)", (col) => col.notNull())
41+
.addColumn("priority", "int4", (col) => col.notNull())
42+
.addColumn("attempts", "int4", (col) => col.defaultTo(0).notNull())
43+
.addColumn("max_attempts", "int4", (col) => col.notNull())
44+
.addColumn("timeout", "jsonb")
45+
.addColumn("cron", "varchar(255)")
46+
.addColumn("created_at", "timestamptz", (col) =>
47+
col.defaultTo(sql`timezone('utc'::text, now())`).notNull(),
48+
)
49+
.addColumn("process_at", "timestamptz", (col) => col.notNull())
50+
.addColumn("processed_at", "timestamptz")
51+
.addColumn("completed_at", "timestamptz")
52+
.addColumn("failed_at", "timestamptz")
53+
.addColumn("error", "jsonb")
54+
.addColumn("result", "jsonb")
55+
.addColumn("progress", "int4")
56+
.addColumn("repeat_every", "int4")
57+
.addColumn("repeat_limit", "int4")
58+
.addColumn("repeat_count", "int4")
59+
.execute()
60+
61+
await schema
62+
.createIndex(`idx_${tableName}_status_priority`)
63+
.on(tableName)
64+
.columns(["queue_name", "status", "priority", "created_at"])
65+
.execute()
66+
67+
await schema
68+
.createIndex(`idx_${tableName}_process_at`)
69+
.on(tableName)
70+
.column("process_at")
71+
.execute()
72+
}

packages/adapter-kysely/src/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,5 @@ export {
33
PostgresQueueAdapter,
44
PostgresQueueAdapter as PostgresKyselyQueueAdapter,
55
} from "./postgres-adapter"
6+
7+
export { createQueueJobsTable } from "./helpers"
Lines changed: 2 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,3 @@
1-
import type { Kysely } from "kysely"
2-
import { sql } from "kysely"
1+
import { createQueueJobsTable } from "../helpers"
32

4-
export async function up(db: Kysely<unknown>) {
5-
await db.schema.createSchema("custom_schema").ifNotExists().execute()
6-
7-
await db.schema
8-
.withSchema("custom_schema")
9-
.createTable("custom_queue_jobs")
10-
.addColumn("id", "uuid", (col) => col.defaultTo(sql`gen_random_uuid()`).notNull())
11-
.addColumn("queue_name", "varchar(255)", (col) => col.notNull())
12-
.addColumn("name", "varchar(255)", (col) => col.notNull())
13-
.addColumn("payload", "jsonb", (col) => col.notNull())
14-
.addColumn("status", "varchar(50)", (col) => col.notNull())
15-
.addColumn("priority", "int4", (col) => col.notNull())
16-
.addColumn("attempts", "int4", (col) => col.defaultTo(0).notNull())
17-
.addColumn("max_attempts", "int4", (col) => col.notNull())
18-
.addColumn("timeout", "jsonb")
19-
.addColumn("cron", "varchar(255)")
20-
.addColumn("created_at", "timestamptz", (col) =>
21-
col.defaultTo(sql`timezone('utc'::text, now())`).notNull(),
22-
)
23-
.addColumn("process_at", "timestamptz", (col) => col.notNull())
24-
.addColumn("processed_at", "timestamptz")
25-
.addColumn("completed_at", "timestamptz")
26-
.addColumn("failed_at", "timestamptz")
27-
.addColumn("error", "jsonb")
28-
.addColumn("result", "jsonb")
29-
.addColumn("progress", "int4")
30-
.addColumn("repeat_every", "int4")
31-
.addColumn("repeat_limit", "int4")
32-
.addColumn("repeat_count", "int4")
33-
.execute()
34-
35-
await db.schema
36-
.withSchema("custom_schema")
37-
.createIndex("idx_custom_queue_jobs_status_priority")
38-
.on("custom_queue_jobs")
39-
.columns(["queue_name", "status", "priority", "created_at"])
40-
.execute()
41-
42-
await db.schema
43-
.withSchema("custom_schema")
44-
.createIndex("idx_custom_queue_jobs_process_at")
45-
.on("custom_queue_jobs")
46-
.column("process_at")
47-
.execute()
48-
}
49-
50-
export async function down(db: Kysely<unknown>) {
51-
await db.schema.dropSchema("custom_schema").execute()
52-
await db.schema.withSchema("custom_schema").dropTable("custom_queue_jobs").execute()
53-
}
3+
export const { up, down } = createQueueJobsTable("custom_queue_jobs", "custom_schema")

0 commit comments

Comments
 (0)