Skip to content

Commit 7e3f077

Browse files
committed
fix build issues, add missing exports
1 parent 9fa5e1e commit 7e3f077

File tree

4 files changed

+49
-21
lines changed

4 files changed

+49
-21
lines changed

packages/adapter-drizzle/src/postgres-adapter.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import type { PgliteDatabase } from "drizzle-orm/pglite"
33
import type { PostgresJsDatabase } from "drizzle-orm/postgres-js"
44
import { and, asc, count, eq, lte, sql } from "drizzle-orm"
55

6-
import type { BaseJob, JobStatus, QueueStats, SerializedError } from "@vorsteh-queue/core"
6+
import type { BaseJob, BatchJob, JobStatus, QueueStats, SerializedError } from "@vorsteh-queue/core"
77
import { asUtc, BaseQueueAdapter, serializeError } from "@vorsteh-queue/core"
88

99
import * as schema from "./postgres-schema"
@@ -77,8 +77,8 @@ export class PostgresQueueAdapter extends BaseQueueAdapter {
7777
}
7878

7979
async addJobs<TJobPayload, TJobResult = unknown>(
80-
jobs: readonly Omit<BaseJob<TJobPayload, TJobResult>, "id" | "createdAt">[],
81-
): Promise<readonly BaseJob<TJobPayload, TJobResult>[]> {
80+
jobs: Omit<BatchJob<TJobPayload, TJobResult>, "id" | "createdAt">[],
81+
): Promise<BatchJob<TJobPayload, TJobResult>[]> {
8282
if (!jobs.length) return []
8383
const values = jobs.map((job) => ({
8484
queueName: this.queueName,
@@ -88,18 +88,18 @@ export class PostgresQueueAdapter extends BaseQueueAdapter {
8888
priority: job.priority,
8989
attempts: job.attempts,
9090
maxAttempts: job.maxAttempts,
91-
processAt: sql`${job.processAt.toISOString()}::timestamptz`,
92-
cron: job.cron,
93-
repeatEvery: job.repeatEvery,
94-
repeatLimit: job.repeatLimit,
95-
repeatCount: job.repeatCount,
9691
timeout: job.timeout,
92+
processAt: sql`${new Date().toISOString()}::timestamptz`,
93+
cron: null,
94+
repeatEvery: null,
95+
repeatLimit: null,
96+
repeatCount: 0,
9797
}))
9898
const results = await this.db.insert(schema.queueJobs).values(values).returning()
9999
if (!results.length) {
100100
throw new Error("Failed to create jobs")
101101
}
102-
return results.map((row) => this.transformJob(row) as BaseJob<TJobPayload, TJobResult>)
102+
return results.map((row) => this.transformJob(row) as BatchJob<TJobPayload, TJobResult>)
103103
}
104104

105105
async updateJobStatus(

packages/adapter-kysely/src/postgres-adapter.ts

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import type { Kysely } from "kysely"
22
import { sql } from "kysely"
33

4-
import type { BaseJob, JobStatus, QueueStats, SerializedError } from "@vorsteh-queue/core"
4+
import type { BaseJob, BatchJob, JobStatus, QueueStats, SerializedError } from "@vorsteh-queue/core"
55
import { asUtc, BaseQueueAdapter, serializeError } from "@vorsteh-queue/core"
66

77
import type { DB, InsertQueueJobValue, QueueJob } from "./types"
@@ -104,8 +104,8 @@ export class PostgresQueueAdapter extends BaseQueueAdapter {
104104
}
105105

106106
async addJobs<TJobPayload, TJobResult = unknown>(
107-
jobs: readonly Omit<BaseJob<TJobPayload, TJobResult>, "id" | "createdAt">[],
108-
): Promise<readonly BaseJob<TJobPayload, TJobResult>[]> {
107+
jobs: Omit<BatchJob<TJobPayload, TJobResult>, "id" | "createdAt">[],
108+
): Promise<BatchJob<TJobPayload, TJobResult>[]> {
109109
if (!jobs.length) return []
110110

111111
const values: InsertQueueJobValue[] = jobs.map((job) => ({
@@ -116,11 +116,11 @@ export class PostgresQueueAdapter extends BaseQueueAdapter {
116116
priority: job.priority,
117117
attempts: job.attempts,
118118
max_attempts: job.maxAttempts,
119-
process_at: sql`${job.processAt.toISOString()}::timestamptz`,
120-
cron: job.cron,
121-
repeat_every: job.repeatEvery,
122-
repeat_limit: job.repeatLimit,
123-
repeat_count: job.repeatCount,
119+
process_at: sql`${asUtc(new Date()).toISOString()}::timestamptz`,
120+
cron: null,
121+
repeat_every: null,
122+
repeat_limit: null,
123+
repeat_count: 0,
124124
timeout: job.timeout,
125125
}))
126126

@@ -134,7 +134,7 @@ export class PostgresQueueAdapter extends BaseQueueAdapter {
134134
throw new Error("Failed to create jobs")
135135
}
136136

137-
return results.map((row) => this.transformJob(row) as BaseJob<TJobPayload, TJobResult>)
137+
return results.map((row) => this.transformJob(row) as BatchJob<TJobPayload, TJobResult>)
138138
}
139139

140140
async updateJobStatus(
@@ -272,9 +272,10 @@ export class PostgresQueueAdapter extends BaseQueueAdapter {
272272
// BatchJob omits scheduling fields, so we strip them
273273
return jobs.map((job) => {
274274
// eslint-disable-next-line @typescript-eslint/no-unused-vars
275-
const { cron, repeat_every, repeat_limit, repeat_count, process_at, ...rest } = job
275+
const { cron, repeat_every, repeat_limit, repeat_count, process_at, status, ...rest } = job
276276
return {
277277
...rest,
278+
status: status as JobStatus,
278279
maxAttempts: job.max_attempts,
279280
createdAt: job.created_at,
280281
processAt: job.process_at,
@@ -284,7 +285,7 @@ export class PostgresQueueAdapter extends BaseQueueAdapter {
284285
error: job.error as SerializedError | undefined,
285286
result: job.result,
286287
progress: job.progress ?? 0,
287-
// Remove scheduling fields for BatchJob
288+
timeout: job.timeout ?? undefined,
288289
}
289290
})
290291
}

packages/adapter-prisma/src/postgres-adapter.ts

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { BaseJob, JobStatus, QueueStats, SerializedError } from "@vorsteh-queue/core"
1+
import type { BaseJob, BatchJob, JobStatus, QueueStats, SerializedError } from "@vorsteh-queue/core"
22
import { BaseQueueAdapter, serializeError } from "@vorsteh-queue/core"
33

44
import type { PrismaClient, PrismaClientInternal } from "../types"
@@ -69,6 +69,32 @@ export class PostgresPrismaQueueAdapter extends BaseQueueAdapter {
6969
return this.transformJob(result) as BaseJob<TJobPayload, TJobResult>
7070
}
7171

72+
async addJobs<TJobPayload, TJobResult = unknown>(
73+
jobs: Omit<BatchJob<TJobPayload, TJobResult>, "id" | "createdAt">[],
74+
): Promise<BatchJob<TJobPayload, TJobResult>[]> {
75+
if (!jobs.length) return []
76+
const created: BatchJob<TJobPayload, TJobResult>[] = []
77+
for (const job of jobs) {
78+
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
79+
const result = (await this.db[this.modelName]!.create({
80+
data: {
81+
queueName: this.queueName,
82+
name: job.name,
83+
payload: JSON.stringify(job.payload),
84+
status: job.status,
85+
priority: job.priority,
86+
attempts: job.attempts,
87+
maxAttempts: job.maxAttempts,
88+
progress: job.progress ?? 0,
89+
timeout: job.timeout,
90+
},
91+
})) as QueueJob
92+
93+
created.push(this.transformJob(result) as BatchJob<TJobPayload, TJobResult>)
94+
}
95+
return created
96+
}
97+
7298
async updateJobStatus(
7399
id: string,
74100
status: JobStatus,

packages/core/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ export { serializeError } from "./utils/error"
99

1010
export type {
1111
BaseJob,
12+
BatchJob,
1213
JobHandler,
1314
JobOptions,
1415
JobPriority,

0 commit comments

Comments
 (0)