Skip to content

Commit 34edf23

Browse files
committed
feat: Improves driver types and error reporting
Adds additional error classifications for drivers and allows clean typing for the schema and table objects. Moved some properties to private that had clear accessors in the BaseDriver.
1 parent 30c30ba commit 34edf23

File tree

7 files changed

+137
-108
lines changed

7 files changed

+137
-108
lines changed

src/driver/base.ts

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,24 @@ import {
88
type QueueDoc,
99
type Driver,
1010
} from "../types.js";
11+
import { DriverNotImplementedError } from "../error.js";
1112

13+
/** asynced is a helper method that accepts any number of unknown arguments, and returns a Promise<unknown> */
1214
const asynced = (...args: unknown[]) =>
1315
new Promise<unknown>((r) => {
1416
r(args);
1517
});
1618

17-
export class BaseDriver implements Driver {
19+
export class BaseDriver<Schema = unknown, Table = unknown>
20+
implements Driver<Schema, Table>
21+
{
1822
events: DriverEmitter;
19-
protected connection: unknown;
20-
protected options: DriverOptions | undefined;
21-
protected schema: string;
22-
protected table: string;
23-
protected init: Promise<boolean>;
23+
private conn: unknown;
24+
private schema: string;
25+
private table: string;
26+
private init: Promise<boolean>;
2427
constructor(connection: unknown, options?: DriverOptions) {
25-
this.connection = connection;
26-
this.options = options;
28+
this.conn = connection;
2729
this.events = new EventEmitter() as DriverEmitter;
2830
this.schema = options?.schema ?? "docmq";
2931
this.table = options?.table ?? "jobs";
@@ -43,9 +45,9 @@ export class BaseDriver implements Driver {
4345
}
4446

4547
/** Gets the schema object or name */
46-
async getSchema(): Promise<unknown> {
48+
async getSchema(): Promise<Schema> {
4749
await asynced();
48-
throw new Error("Not implemented");
50+
throw new DriverNotImplementedError();
4951
}
5052

5153
/** Gets the schema name */
@@ -54,9 +56,9 @@ export class BaseDriver implements Driver {
5456
}
5557

5658
/** Get the table object or name */
57-
async getTable(): Promise<unknown> {
59+
async getTable(): Promise<Table> {
5860
await asynced();
59-
throw new Error("Not implemented");
61+
throw new DriverNotImplementedError();
6062
}
6163

6264
/** Gets the table name */
@@ -67,73 +69,73 @@ export class BaseDriver implements Driver {
6769
/** Bookend a transaction with driver specific handling */
6870
async transaction(body: () => Promise<unknown>) {
6971
await asynced(body);
70-
throw new Error("Not implemented");
72+
throw new DriverNotImplementedError();
7173
}
7274

7375
/** Take N items from the queue for processing */
7476
async take(visibility: number, limit = 1): Promise<QueueDoc[]> {
7577
await asynced(visibility, limit);
76-
throw new Error("Not implemented");
78+
throw new DriverNotImplementedError();
7779
}
7880

7981
/** Ack a job, removing it from the queue */
8082
async ack(ack: string) {
8183
await asynced(ack);
82-
throw new Error("Not implemented");
84+
throw new DriverNotImplementedError();
8385
}
8486

8587
/** Promote a job, making it immediately available for running */
8688
async promote(ref: string) {
8789
await asynced(ref);
88-
throw new Error("Not implemented");
90+
throw new DriverNotImplementedError();
8991
}
9092

9193
/** Delay a job, pushing its visibility window out */
9294
async delay(ref: string, delayBy: number) {
9395
await asynced(ref, delayBy);
94-
throw new Error("Not implemented");
96+
throw new DriverNotImplementedError();
9597
}
9698

9799
/** Replay a job, copying and inserting a new job to run immediately */
98100
async replay(ref: string) {
99101
await asynced(ref);
100-
throw new Error("Not implemented");
102+
throw new DriverNotImplementedError();
101103
}
102104

103105
/** Fail a job, shifting the next run ahead to a retry time */
104106
async fail(ack: string, retryIn: number, attempt: number) {
105107
await asynced(ack, retryIn, attempt);
106-
throw new Error("Not implemented");
108+
throw new DriverNotImplementedError();
107109
}
108110

109111
/** Place an item into the dead letter queue and ack it */
110112
async dead(doc: QueueDoc) {
111113
await asynced(doc);
112-
throw new Error("Not implemented");
114+
throw new DriverNotImplementedError();
113115
}
114116

115117
/** Extend the runtime of a job */
116118
async ping(ack: string, extendBy = 15) {
117119
await asynced(ack, extendBy);
118-
throw new Error("Not implemented");
120+
throw new DriverNotImplementedError();
119121
}
120122

121123
/** Remove any jobs that are before a certain date */
122124
async clean(before: Date) {
123125
await asynced(before);
124-
throw new Error("Not implemented");
126+
throw new DriverNotImplementedError();
125127
}
126128

127129
/** Replace any upcoming instances of a doc with new data */
128130
async replaceUpcoming(doc: QueueDoc): Promise<QueueDoc> {
129131
await asynced(doc);
130-
throw new Error("Not implemented");
132+
throw new DriverNotImplementedError();
131133
}
132134

133135
/** Remove all upcoming instances of a job by its ref */
134136
async removeUpcoming(ref: string) {
135137
await asynced(ref);
136-
throw new Error("Not implemented");
138+
throw new DriverNotImplementedError();
137139
}
138140

139141
/** Finds the next occurence of a job, either through a cron or duration */
@@ -181,7 +183,7 @@ export class BaseDriver implements Driver {
181183
/** Create the next instance of a job */
182184
async createNext(doc: QueueDoc) {
183185
await asynced(doc);
184-
throw new Error("Not implemented");
186+
throw new DriverNotImplementedError();
185187
}
186188

187189
/** Begin listening for changes on the data source. Should operate idempotently */

src/driver/loki.ts

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
11
import { DateTime } from "luxon";
22
import Loki, { type Collection } from "lokijs";
33
import { v4 } from "uuid";
4-
import { DriverError, MaxAttemptsExceededError } from "../error.js";
4+
import {
5+
DriverError,
6+
DriverInitializationError,
7+
DriverNoMatchingAckError,
8+
DriverNoMatchingRefError,
9+
MaxAttemptsExceededError,
10+
} from "../error.js";
511
import { QueueDoc } from "../types.js";
612
import { BaseDriver } from "./base.js";
713
import { loadModule } from "@brillout/load-module";
@@ -97,15 +103,15 @@ export const getClient = (identifier: string) => {
97103
* LokiJS Driver Class. Creates a connection that allows DocMQ to talk to
98104
* an in-memory LokiJS instance
99105
*/
100-
export class LokiDriver extends BaseDriver {
106+
export class LokiDriver extends BaseDriver<Loki, Collection<LokiDoc>> {
101107
protected _db: Loki | undefined;
102108
protected _jobs: Collection<LokiDoc> | undefined;
103109

104110
/** Get the Parent Schema object associated with the job list */
105111
async getSchema() {
106112
await this.ready();
107113
if (!this._db) {
108-
throw new Error("init");
114+
throw new DriverInitializationError();
109115
}
110116
return Promise.resolve(this._db);
111117
}
@@ -114,7 +120,7 @@ export class LokiDriver extends BaseDriver {
114120
async getTable() {
115121
await this.ready();
116122
if (!this._jobs) {
117-
throw new Error("init");
123+
throw new DriverInitializationError();
118124
}
119125
return Promise.resolve(this._jobs);
120126
}
@@ -158,7 +164,7 @@ export class LokiDriver extends BaseDriver {
158164
await this.ready();
159165

160166
if (!this._jobs) {
161-
throw new Error("init");
167+
throw new DriverInitializationError();
162168
}
163169

164170
this._jobs.startTransaction();
@@ -170,7 +176,7 @@ export class LokiDriver extends BaseDriver {
170176
await this.ready();
171177

172178
if (!this._jobs) {
173-
throw new Error("init");
179+
throw new DriverInitializationError();
174180
}
175181

176182
const now = DateTime.now();
@@ -204,7 +210,7 @@ export class LokiDriver extends BaseDriver {
204210
await this.ready();
205211

206212
if (!this._jobs) {
207-
throw new Error("init");
213+
throw new DriverInitializationError();
208214
}
209215

210216
if (ack === null) {
@@ -230,15 +236,15 @@ export class LokiDriver extends BaseDriver {
230236
.data()?.[0];
231237

232238
if (!next) {
233-
throw new Error("NO_MATCHING_JOB");
239+
throw new DriverNoMatchingAckError(ack);
234240
}
235241
}
236242

237243
async fail(ack: string, retryIn: number, attempt: number): Promise<void> {
238244
await this.ready();
239245

240246
if (!this._jobs) {
241-
throw new Error("init");
247+
throw new DriverInitializationError();
242248
}
243249
if (ack === null) {
244250
throw new Error("ERR_NULL_ACK");
@@ -263,7 +269,7 @@ export class LokiDriver extends BaseDriver {
263269
.data()?.[0];
264270

265271
if (!next) {
266-
throw new Error("NO_MATCHING_JOB");
272+
throw new DriverNoMatchingAckError(ack);
267273
}
268274
}
269275

@@ -277,7 +283,7 @@ export class LokiDriver extends BaseDriver {
277283
}
278284

279285
if (!this._jobs) {
280-
throw new Error("init");
286+
throw new DriverInitializationError();
281287
}
282288

283289
const err = new MaxAttemptsExceededError(
@@ -307,15 +313,15 @@ export class LokiDriver extends BaseDriver {
307313
.data()?.[0];
308314

309315
if (!next) {
310-
throw new Error("NO_MATCHING_JOB");
316+
throw new DriverNoMatchingAckError(ackVal);
311317
}
312318
}
313319

314320
async ping(ack: string, extendBy = 15): Promise<void> {
315321
await this.ready();
316322

317323
if (!this._jobs) {
318-
throw new Error("init");
324+
throw new DriverInitializationError();
319325
}
320326
if (ack === null) {
321327
throw new Error("ERR_NULL_ACK");
@@ -338,15 +344,15 @@ export class LokiDriver extends BaseDriver {
338344
.data()?.[0];
339345

340346
if (!next) {
341-
throw new Error("NO_MATCHING_JOB");
347+
throw new DriverNoMatchingAckError(ack);
342348
}
343349
}
344350

345351
async promote(ref: string): Promise<void> {
346352
await this.ready();
347353

348354
if (!this._jobs) {
349-
throw new Error("init");
355+
throw new DriverInitializationError();
350356
}
351357

352358
const now = DateTime.now();
@@ -367,15 +373,15 @@ export class LokiDriver extends BaseDriver {
367373
.data()?.[0];
368374

369375
if (!next) {
370-
throw new Error("NO_MATCHING_JOB");
376+
throw new DriverNoMatchingRefError(ref);
371377
}
372378
}
373379

374380
async delay(ref: string, delayBy: number): Promise<void> {
375381
await this.ready();
376382

377383
if (!this._jobs) {
378-
throw new Error("init");
384+
throw new DriverInitializationError();
379385
}
380386

381387
const next = this._jobs
@@ -396,15 +402,15 @@ export class LokiDriver extends BaseDriver {
396402
.data()?.[0];
397403

398404
if (!next) {
399-
throw new Error("NO_MATCHING_JOB");
405+
throw new DriverNoMatchingRefError(ref);
400406
}
401407
}
402408

403409
async replay(ref: string): Promise<void> {
404410
await this.ready();
405411

406412
if (!this._jobs) {
407-
throw new Error("init");
413+
throw new DriverInitializationError();
408414
}
409415

410416
const last = this._jobs
@@ -427,7 +433,7 @@ export class LokiDriver extends BaseDriver {
427433
.data()?.[0];
428434

429435
if (!last) {
430-
throw new Error("NO_MATCHING_JOB");
436+
throw new DriverNoMatchingRefError(ref);
431437
}
432438

433439
const next: LokiDoc = JSON.parse(JSON.stringify(last));
@@ -446,7 +452,7 @@ export class LokiDriver extends BaseDriver {
446452
await this.ready();
447453

448454
if (!this._jobs) {
449-
throw new Error("init");
455+
throw new DriverInitializationError();
450456
}
451457

452458
this._jobs
@@ -465,7 +471,7 @@ export class LokiDriver extends BaseDriver {
465471
await this.ready();
466472

467473
if (!this._jobs) {
468-
throw new Error("init");
474+
throw new DriverInitializationError();
469475
}
470476

471477
// remove all future existing
@@ -481,7 +487,7 @@ export class LokiDriver extends BaseDriver {
481487
await this.ready();
482488

483489
if (!this._jobs) {
484-
throw new Error("init");
490+
throw new DriverInitializationError();
485491
}
486492
if (!ref) {
487493
throw new Error("No ref provided");
@@ -509,7 +515,7 @@ export class LokiDriver extends BaseDriver {
509515
return;
510516
}
511517
if (!this._jobs) {
512-
throw new Error("init");
518+
throw new DriverInitializationError();
513519
}
514520

515521
const next: QueueDoc = {
@@ -554,7 +560,7 @@ export class LokiDriver extends BaseDriver {
554560
await this.ready();
555561

556562
if (!this._jobs) {
557-
throw new Error("init");
563+
throw new DriverInitializationError();
558564
}
559565

560566
this._jobs.addListener("insert", () => {

0 commit comments

Comments
 (0)