Skip to content

Commit 2146c2d

Browse files
authored
Merge pull request #193 from quirrel-dev/reduce-redis-client-number
concentrate all subscriptions onto one client
2 parents 6f849a3 + 1bff6bc commit 2146c2d

File tree

6 files changed

+67
-46
lines changed

6 files changed

+67
-46
lines changed

package-lock.json

Lines changed: 18 additions & 13 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,12 @@
3434
"@types/chai": "4.2.22",
3535
"@types/debug": "4.1.7",
3636
"@types/ioredis": "4.27.5",
37+
"@types/minimatch": "^3.0.5",
3738
"@types/mocha": "9.0.0",
3839
"@types/node": "16.6.1",
3940
"@types/pino": "6.3.11",
4041
"chai": "4.3.4",
4142
"delay": "5.0.0",
42-
"glob": "7.2.0",
4343
"mocha": "9.1.2",
4444
"nyc": "15.1.0",
4545
"ts-node": "10.2.1",
@@ -48,6 +48,7 @@
4848
"dependencies": {
4949
"ioredis": "^4.27.1",
5050
"ioredis-mock": "^5.5.6",
51+
"minimatch": "^3.0.4",
5152
"opentracing": "^0.14.5",
5253
"pino": "^6.11.3"
5354
}

src/activity/activity.ts

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { Redis } from "ioredis";
22
import { Closable } from "../Closable";
33
import { decodeRedisKey, encodeRedisKey } from "../encodeRedisKey";
44
import { Job } from "../Job";
5-
import { Producer } from "../producer/producer";
5+
import minimatch from "minimatch";
66

77
/**
88
* Like String.split, but has a maximum number of delimiters it picks up.
@@ -34,19 +34,21 @@ function splitEvent(message: string, maxParts: number, delimiter = ":") {
3434
}
3535

3636
export type SubscriptionOptions = { queue?: string; id?: string };
37-
export type OnActivity = (event: OnActivityEvent) => Promise<void> | void;
37+
export type OnActivity<ScheduleType extends string> = (
38+
event: OnActivityEvent<ScheduleType>
39+
) => Promise<void> | void;
3840

39-
export type OnActivityEvent =
40-
| ScheduledEvent
41+
export type OnActivityEvent<ScheduleType extends string> =
42+
| ScheduledEvent<ScheduleType>
4143
| DeletedEvent
4244
| RequestedEvent
4345
| InvokedEvent
4446
| RescheduledEvent
4547
| AcknowledgedEvent;
4648

47-
interface ScheduledEvent {
49+
interface ScheduledEvent<ScheduleType extends string> {
4850
type: "scheduled";
49-
job: Job;
51+
job: Job<ScheduleType>;
5052
}
5153

5254
interface InvokedEvent {
@@ -81,35 +83,34 @@ interface AcknowledgedEvent {
8183
}
8284

8385
export class Activity<ScheduleType extends string> implements Closable {
84-
private redis;
85-
private producer;
86-
86+
private readonly pattern: string;
8787
constructor(
88-
redisFactory: () => Redis,
89-
private readonly onEvent: OnActivity,
88+
private readonly redis: Redis,
89+
private readonly onEvent: OnActivity<ScheduleType>,
9090
options: SubscriptionOptions = {}
9191
) {
92-
this.redis = redisFactory();
93-
this.producer = new Producer<ScheduleType>(redisFactory, null as any);
94-
9592
this.redis.on("pmessage", (_pattern, channel, message) =>
9693
this.handleMessage(channel, message)
9794
);
9895

99-
if (options.queue) {
100-
options.queue = encodeRedisKey(options.queue);
101-
}
96+
const queue = options.queue ? encodeRedisKey(options.queue) : "*";
97+
const id = options.id ? encodeRedisKey(options.id) : "*";
98+
this.pattern = `${queue}:${id}`;
10299

103-
if (options.id) {
104-
options.id = encodeRedisKey(options.id);
105-
}
100+
this.redis.psubscribe(this.pattern);
101+
}
106102

107-
this.redis.psubscribe(`${options.queue ?? "*"}:${options.id ?? "*"}`);
103+
private matchesPattern(channel: string) {
104+
return minimatch(channel, this.pattern);
108105
}
109106

110107
private async handleMessage(channel: string, message: string) {
108+
if (!this.matchesPattern(channel)) {
109+
return;
110+
}
111+
111112
const [_type, ...args] = splitEvent(message, 9);
112-
const type = _type as OnActivityEvent["type"];
113+
const type = _type as OnActivityEvent<ScheduleType>["type"];
113114

114115
const channelParts = channel.split(":").map(decodeRedisKey);
115116
if (channelParts.length !== 2) {
@@ -141,7 +142,7 @@ export class Activity<ScheduleType extends string> implements Closable {
141142
retry: JSON.parse(retryJson),
142143
schedule: schedule_type
143144
? {
144-
type: schedule_type,
145+
type: schedule_type as ScheduleType,
145146
meta: schedule_meta,
146147
times: max_times ? Number(max_times) : undefined,
147148
}
@@ -166,7 +167,6 @@ export class Activity<ScheduleType extends string> implements Closable {
166167
}
167168

168169
async close() {
169-
await this.redis.quit();
170-
await this.producer.close();
170+
await this.redis.punsubscribe(this.pattern);
171171
}
172172
}

src/index.ts

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,15 +61,16 @@ export default class Owl<ScheduleType extends string> {
6161
);
6262
}
6363

64+
private activityRedis?: Redis;
65+
6466
public createActivity(
65-
onEvent: OnActivity,
67+
onEvent: OnActivity<ScheduleType>,
6668
options: SubscriptionOptions = {}
6769
) {
68-
return new Activity<ScheduleType>(
69-
this.redisFactory,
70-
onEvent,
71-
options
72-
);
70+
if (!this.activityRedis) {
71+
this.activityRedis = this.redisFactory();
72+
}
73+
return new Activity(this.activityRedis, onEvent, options);
7374
}
7475

7576
public async runMigrations() {

test/functional/activity.test.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,16 @@ describeAcrossBackends("Activity", (backend) => {
1919
}
2020

2121
it("publishes all relevant information", async () => {
22+
const secondActivityEvents = [];
23+
const secondActivity = env.owl.createActivity(
24+
(event) => {
25+
secondActivityEvents.push(event);
26+
},
27+
{
28+
id: "nonexistent",
29+
queue: "nada",
30+
}
31+
);
2232
const currentDate = new Date();
2333

2434
await env.producer.enqueue({
@@ -139,5 +149,9 @@ describeAcrossBackends("Activity", (backend) => {
139149
],
140150
200
141151
);
152+
153+
expect(secondActivityEvents).to.be.empty;
154+
155+
await secondActivity.close();
142156
});
143157
});

test/functional/support.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ export function makeActivityEnv(backend: Backend, fail?: WorkerFailPredicate) {
157157

158158
const activityEnv: typeof workerEnv & {
159159
activity: Activity<"every">;
160-
events: OnActivityEvent[];
160+
events: OnActivityEvent<"every">[];
161161
} = workerEnv as any;
162162

163163
activityEnv.activity = null as any;

0 commit comments

Comments
 (0)