-
Notifications
You must be signed in to change notification settings - Fork 58
Expand file tree
/
Copy pathredis.ts
More file actions
128 lines (107 loc) · 3.66 KB
/
redis.ts
File metadata and controls
128 lines (107 loc) · 3.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import { ulid } from 'ulid';
import { Package } from '../../utils/index.js';
import { validateOptions } from '../../utils/helpers.js';
import { RedisJob } from '../interfaces/job.js';
import { RedisQueueOptionsDto } from '../schema/index.js';
import { InternalMessage } from '../strategy/index.js';
import { PollQueueDriver } from '../strategy/pollQueueDriver.js';
const FIND_DELAYED_JOB = `
local source_key = KEYS[1]
local destination_key = KEYS[2]
local score_limit = tonumber(ARGV[1])
local max_members = 20
-- Get the member with the lowest score
local results = redis.call('ZRANGEBYSCORE', source_key, '-inf', score_limit, 'WITHSCORES', 'LIMIT', 0, max_members)
local processed = {}
for i = 1, #results, 2 do
local member = results[i]
local score = results[i+1]
-- Remove the member from the sorted set
redis.call('ZREM', source_key, member)
-- Push the member to the destination list
redis.call('RPUSH', destination_key, member)
table.insert(processed, {member, score})
end
return processed
`;
export class RedisQueueDriver implements PollQueueDriver {
private client: any;
private queuePrefix: string;
private IORedis: any;
constructor(private options: Record<string, any>) {
validateOptions(this.options, RedisQueueOptionsDto, {
cls: 'RedisQueueDriver',
});
this.queuePrefix = this.options.prefix || 'intent_queue';
this.initializeModules();
}
async init(): Promise<void> {}
async push(message: string, rawPayload: InternalMessage): Promise<void> {
await this.initializeModules();
if (rawPayload.delay > Date.now()) {
await this.pushToDelayedQueue(message, rawPayload);
return;
}
await this.client.rpush(
this.getQueue(`${rawPayload.queue}`),
this.getProcessedMessage(message),
);
}
async pull(options: Record<string, any>): Promise<RedisJob[]> {
await this.initializeModules();
const data = await this.client.lpop(this.getQueue(options.queue));
return data ? [new RedisJob({ message: data })] : [];
}
async remove(): Promise<void> {
return;
}
async purge(options: Record<string, any>): Promise<void> {
await this.initializeModules();
await this.client.del(this.getQueue(options.queue));
await this.client.del(this.getDelayedQueue(options.queue));
}
async count(options: Record<string, any>): Promise<number> {
await this.initializeModules();
return await this.client.llen(this.getQueue(options.queue));
}
async pushToDelayedQueue(
message: string,
rawPayload: InternalMessage,
): Promise<void> {
await this.initializeModules();
await this.client.zadd(
this.getDelayedQueue(`${rawPayload.queue}`),
rawPayload.delay,
this.getProcessedMessage(message),
);
}
getProcessedMessage(message: string): string {
const data = JSON.parse(message);
data.id = ulid();
return JSON.stringify(data);
}
async scheduledTask(options: Record<string, any>): Promise<void> {
await this.initializeModules();
await (this.client as any).findDelayedJob(
this.getDelayedQueue(options.queue),
this.getQueue(options.queue),
Date.now(),
);
}
getDelayedQueue(queue: string): string {
return `${this.queuePrefix}::${queue}::delayed`;
}
getQueue(queue: string): string {
return `${this.queuePrefix}::${queue}`;
}
async initializeModules(): Promise<void> {
if (this.IORedis && this.client) return Promise.resolve();
const { Redis } = await Package.load('ioredis');
this.IORedis = Redis;
this.client = new this.IORedis(this.options);
this.client.defineCommand('findDelayedJob', {
numberOfKeys: 2,
lua: FIND_DELAYED_JOB,
});
}
}