Skip to content

Commit c9dc7e8

Browse files
DX 593 - Auto executed pipeline (#1039)
* feat: add auto executed pipelines * test: improve test times by shrinking test subjects * feat: add proxy over autopipeline function * add enableAutoPipelining parameter to redis * initalize auto pipeline with static method * add docstrings for autoPipeline methods * add pipelineCounter field to autoPipeline proxy * add test for consecutive awaits with auto pipeline * simplfy auto pipeline tests * fix test descriptions * rm pipelineCounter field from auto pipeline proxy --------- Co-authored-by: CahidArda <[email protected]>
1 parent 682d8bc commit c9dc7e8

File tree

9 files changed

+404
-12
lines changed

9 files changed

+404
-12
lines changed

pkg/auto-pipeline.test.ts

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
import { Redis } from "../platforms/nodejs"
2+
import { keygen, newHttpClient } from "./test-utils";
3+
4+
import { afterEach, describe, expect, test } from "bun:test";
5+
import { ScriptLoadCommand } from "./commands/script_load";
6+
7+
8+
const client = newHttpClient();
9+
10+
const { newKey, cleanup } = keygen();
11+
afterEach(cleanup);
12+
13+
describe("Auto pipeline", () => {
14+
test("should execute all commands inside a Promise.all in a single pipeline", async () => {
15+
const persistentKey = newKey();
16+
const persistentKey2 = newKey();
17+
const scriptHash = await new ScriptLoadCommand(["return 1"]).exec(client);
18+
19+
const redis = Redis.autoPipeline({
20+
latencyLogging: false
21+
})
22+
// @ts-expect-error pipelineCounter is not in type but accessible
23+
expect(redis.pipelineCounter).toBe(0)
24+
25+
// all the following commands are in a single pipeline call
26+
const result = await Promise.all([
27+
redis.append(newKey(), "hello"),
28+
redis.bitcount(newKey(), 0, 1),
29+
redis.bitop("and", newKey(), newKey()),
30+
redis.bitpos(newKey(), 1, 0),
31+
redis.dbsize(),
32+
redis.decr(newKey()),
33+
redis.decrby(newKey(), 1),
34+
redis.del(newKey()),
35+
redis.echo("hello"),
36+
redis.eval("return ARGV[1]", [], ["Hello"]),
37+
redis.evalsha(scriptHash, [], ["Hello"]),
38+
redis.exists(newKey()),
39+
redis.expire(newKey(), 5),
40+
redis.expireat(newKey(), Math.floor(new Date().getTime() / 1000) + 60),
41+
redis.flushall(),
42+
redis.flushdb(),
43+
redis.get(newKey()),
44+
redis.getbit(newKey(), 0),
45+
redis.getdel(newKey()),
46+
redis.getset(newKey(), "hello"),
47+
redis.hdel(newKey(), "field"),
48+
redis.hexists(newKey(), "field"),
49+
redis.hget(newKey(), "field"),
50+
redis.hgetall(newKey()),
51+
redis.hincrby(newKey(), "field", 1),
52+
redis.hincrbyfloat(newKey(), "field", 1.5),
53+
redis.hkeys(newKey()),
54+
redis.hlen(newKey()),
55+
redis.hmget(newKey(), newKey()),
56+
redis.hmset(newKey(), { field: "field", value: "value" }),
57+
redis.hscan(newKey(), 0),
58+
redis.hset(newKey(), { field: "value" }),
59+
redis.hsetnx(newKey(), "field", "value"),
60+
redis.hstrlen(newKey(), "field"),
61+
redis.hvals(newKey()),
62+
redis.incr(newKey()),
63+
redis.incrby(newKey(), 1),
64+
redis.incrbyfloat(newKey(), 1.5),
65+
redis.keys("*"),
66+
redis.lindex(newKey(), 0),
67+
redis.linsert(newKey(), "before", "pivot", "value"),
68+
redis.llen(newKey()),
69+
redis.lmove(newKey(), newKey(), "left", "right"),
70+
redis.lpop(newKey()),
71+
redis.lpos(newKey(), "value"),
72+
redis.lpush(persistentKey, "element"),
73+
redis.lpushx(newKey(), "element1", "element2"),
74+
redis.lrange(newKey(), 0, 1),
75+
redis.lrem(newKey(), 1, "value"),
76+
redis.lset(persistentKey, 0, "value"),
77+
redis.ltrim(newKey(), 0, 1),
78+
redis.hrandfield(newKey()),
79+
redis.hrandfield(newKey(), 2),
80+
redis.hrandfield(newKey(), 3, true),
81+
redis.mget<[string, string]>(newKey(), newKey()),
82+
redis.mset({ key1: "value", key2: "value" }),
83+
redis.msetnx({ key3: "value", key4: "value" }),
84+
redis.persist(newKey()),
85+
redis.pexpire(newKey(), 1000),
86+
redis.pexpireat(newKey(), new Date().getTime() + 1000),
87+
redis.ping(),
88+
redis.psetex(newKey(), 1, "value"),
89+
redis.pttl(newKey()),
90+
redis.publish("test", "hello"),
91+
redis.randomkey(),
92+
redis.rename(persistentKey, persistentKey2),
93+
redis.renamenx(persistentKey2, newKey()),
94+
redis.rpop(newKey()),
95+
redis.rpush(newKey(), "element1", "element2"),
96+
redis.rpushx(newKey(), "element1", "element2"),
97+
redis.sadd(newKey(), "memeber1", "member2"),
98+
redis.scan(0),
99+
redis.scard(newKey()),
100+
redis.sdiff(newKey()),
101+
redis.sdiffstore(newKey(), newKey()),
102+
redis.set(newKey(), "value"),
103+
redis.setbit(newKey(), 1, 1),
104+
redis.setex(newKey(), 1, "value"),
105+
redis.setnx(newKey(), "value"),
106+
redis.setrange(newKey(), 1, "value"),
107+
redis.sinter(newKey(), newKey()),
108+
redis.sinterstore(newKey(), newKey()),
109+
redis.sismember(newKey(), "member"),
110+
redis.smembers(newKey()),
111+
redis.smove(newKey(), newKey(), "member"),
112+
redis.spop(newKey()),
113+
redis.srandmember(newKey()),
114+
redis.srem(newKey(), "member"),
115+
redis.sscan(newKey(), 0),
116+
redis.strlen(newKey()),
117+
redis.sunion(newKey()),
118+
redis.sunionstore(newKey(), newKey()),
119+
redis.time(),
120+
redis.touch(newKey()),
121+
redis.ttl(newKey()),
122+
redis.type(newKey()),
123+
redis.unlink(newKey()),
124+
redis.zadd(newKey(), { score: 0, member: "member" }),
125+
redis.zcard(newKey()),
126+
redis.scriptExists(scriptHash),
127+
redis.scriptFlush({ async: true }),
128+
redis.scriptLoad("return 1"),
129+
redis.zcount(newKey(), 0, 1),
130+
redis.zincrby(newKey(), 1, "member"),
131+
redis.zinterstore(newKey(), 1, [newKey()]),
132+
redis.zlexcount(newKey(), "-", "+"),
133+
redis.zpopmax(newKey()),
134+
redis.zpopmin(newKey()),
135+
redis.zrange(newKey(), 0, 1),
136+
redis.zrank(newKey(), "member"),
137+
redis.zrem(newKey(), "member"),
138+
redis.zremrangebylex(newKey(), "-", "+"),
139+
redis.zremrangebyrank(newKey(), 0, 1),
140+
redis.zremrangebyscore(newKey(), 0, 1),
141+
redis.zrevrank(newKey(), "member"),
142+
redis.zscan(newKey(), 0),
143+
redis.zscore(newKey(), "member"),
144+
redis.zunionstore(newKey(), 1, [newKey()]),
145+
redis.zunion(1, [newKey()]),
146+
redis.json.set(newKey(), "$", { hello: "world" })
147+
])
148+
expect(result).toBeTruthy();
149+
expect(result.length).toBe(120); // returns
150+
// @ts-expect-error pipelineCounter is not in type but accessible120 results
151+
expect(redis.pipelineCounter).toBe(1);
152+
});
153+
154+
test("should group async requests with sync requests", async () => {
155+
156+
const redis = Redis.autoPipeline({
157+
latencyLogging: false
158+
})
159+
// @ts-expect-error pipelineCounter is not in type but accessible
160+
expect(redis.pipelineCounter).toBe(0);
161+
162+
// following five commands are added to the pipeline
163+
redis.flushdb();
164+
redis.incr("baz");
165+
redis.incr("baz");
166+
redis.set("foo", "bar");
167+
redis.incr("baz");
168+
169+
// two get calls are added to the pipeline and pipeline
170+
// is executed since we called await
171+
const [fooValue, bazValue] = await Promise.all([
172+
redis.get("foo"),
173+
redis.get("baz")
174+
]);
175+
176+
expect(fooValue).toBe("bar");
177+
expect(bazValue).toBe(3);
178+
// @ts-expect-error pipelineCounter is not in type but accessible
179+
expect(redis.pipelineCounter).toBe(1);
180+
})
181+
182+
test("should execute a pipeline for each consecutive awaited command", async () => {
183+
184+
const redis = Redis.autoPipeline({
185+
latencyLogging: false
186+
});
187+
// @ts-expect-error pipelineCounter is not in type but accessible
188+
expect(redis.pipelineCounter).toBe(0);
189+
190+
redis.flushdb();
191+
192+
const res1 = await redis.incr("baz");
193+
// @ts-expect-error pipelineCounter is not in type but accessible
194+
expect(redis.pipelineCounter).toBe(1);
195+
196+
const res2 = await redis.incr("baz");
197+
// @ts-expect-error pipelineCounter is not in type but accessible
198+
expect(redis.pipelineCounter).toBe(2);
199+
200+
const res3 = await redis.set("foo", "bar");
201+
// @ts-expect-error pipelineCounter is not in type but accessible
202+
expect(redis.pipelineCounter).toBe(3);
203+
204+
expect([res1, res2, res3]).toEqual([1, 2, "OK"]);
205+
206+
});
207+
208+
test("should execute a single pipeline for several commands inside Promise.all", async () => {
209+
210+
const redis = Redis.autoPipeline({
211+
latencyLogging: false
212+
});
213+
// @ts-expect-error pipelineCounter is not in type but accessible
214+
expect(redis.pipelineCounter).toBe(0);
215+
216+
const resArray = await Promise.all([
217+
redis.flushdb(),
218+
redis.incr("baz"),
219+
redis.incr("baz"),
220+
redis.set("foo", "bar"),
221+
redis.get("foo")
222+
]);
223+
// @ts-expect-error pipelineCounter is not in type but accessible
224+
expect(redis.pipelineCounter).toBe(1);
225+
expect(resArray).toEqual(["OK", 1, 2, "OK", "bar"]);
226+
227+
})
228+
});

pkg/auto-pipeline.ts

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
import { Command } from "./commands/command";
2+
import { CommandArgs } from "./types";
3+
import { Pipeline } from "./pipeline";
4+
import { Redis } from "./redis";
5+
6+
// will omit redis only commands since we call Pipeline in the background in auto pipeline
7+
type redisOnly = Exclude<keyof Redis, keyof Pipeline>
8+
9+
export function createAutoPipelineProxy(_redis: Redis) {
10+
11+
const redis = _redis as Redis & {
12+
autoPipelineExecutor: AutoPipelineExecutor;
13+
}
14+
15+
if (!redis.autoPipelineExecutor) {
16+
redis.autoPipelineExecutor = new AutoPipelineExecutor(redis);
17+
}
18+
19+
return new Proxy(redis, {
20+
get: (target, prop: "pipelineCounter" | keyof Pipeline ) => {
21+
22+
// return pipelineCounter of autoPipelineExecutor
23+
if (prop == "pipelineCounter") {
24+
return target.autoPipelineExecutor.pipelineCounter;
25+
}
26+
27+
// If the method is a function on the pipeline, wrap it with the executor logic
28+
if (typeof target.autoPipelineExecutor.pipeline[prop] === "function") {
29+
return (...args: CommandArgs<typeof Command>) => {
30+
return target.autoPipelineExecutor.withAutoPipeline((pipeline) => {
31+
(pipeline[prop] as Function)(...args);
32+
});
33+
};
34+
}
35+
return target.autoPipelineExecutor.pipeline[prop];
36+
},
37+
}) as Omit<Redis, redisOnly>;
38+
}
39+
40+
export class AutoPipelineExecutor {
41+
private pipelinePromises = new WeakMap<Pipeline, Promise<Array<unknown>>>();
42+
private activePipeline: Pipeline | null = null;
43+
private indexInCurrentPipeline = 0;
44+
private redis: Redis;
45+
pipeline: Pipeline; // only to make sure that proxy can work
46+
pipelineCounter: number = 0; // to keep track of how many times a pipeline was executed
47+
48+
constructor(redis: Redis) {
49+
this.redis = redis;
50+
this.pipeline = redis.pipeline();
51+
}
52+
53+
async withAutoPipeline<T>(executeWithPipeline: (pipeline: Pipeline) => unknown): Promise<T> {
54+
const pipeline = this.activePipeline || this.redis.pipeline();
55+
56+
if (!this.activePipeline) {
57+
this.activePipeline = pipeline;
58+
this.indexInCurrentPipeline = 0;
59+
}
60+
61+
const index = this.indexInCurrentPipeline++;
62+
executeWithPipeline(pipeline);
63+
64+
const pipelineDone = this.deferExecution().then(() => {
65+
if (!this.pipelinePromises.has(pipeline)) {
66+
const pipelinePromise = pipeline.exec();
67+
this.pipelineCounter += 1;
68+
69+
this.pipelinePromises.set(pipeline, pipelinePromise);
70+
this.activePipeline = null;
71+
}
72+
return this.pipelinePromises.get(pipeline)!;
73+
});
74+
75+
const results = await pipelineDone;
76+
return results[index] as T;
77+
}
78+
79+
private async deferExecution() {
80+
await Promise.resolve();
81+
return await Promise.resolve();
82+
}
83+
}

pkg/commands/xtrim.test.ts

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,20 @@ afterAll(cleanup);
1212

1313
describe("XLEN", () => {
1414
test(
15-
"should approximately trim stream to 300 items",
15+
"should approximately trim stream to 30 items",
1616
async () => {
1717
const key = newKey();
1818
const promises = [];
19-
for (let i = 1; i <= 10000; i++) {
19+
for (let i = 1; i <= 1000; i++) {
2020
promises.push(new XAddCommand([key, "*", { [randomID()]: randomID() }]).exec(client));
2121
}
2222
await Promise.all(promises);
23-
await new XTrimCommand([key, { strategy: "MAXLEN", threshold: 300, exactness: "~" }]).exec(
23+
await new XTrimCommand([key, { strategy: "MAXLEN", threshold: 30, exactness: "~" }]).exec(
2424
client,
2525
);
2626
const len = await new XLenCommand([key]).exec(client);
27-
expect(len).toBeGreaterThanOrEqual(290);
28-
expect(len).toBeLessThanOrEqual(310);
27+
expect(len).toBeGreaterThanOrEqual(29);
28+
expect(len).toBeLessThanOrEqual(31);
2929
},
3030
{ timeout: 1000 * 60 },
3131
);
@@ -45,20 +45,20 @@ describe("XLEN", () => {
4545
});
4646

4747
test(
48-
"should trim with MINID and a limit and only remove 10 items that satisfies MINID",
48+
"should trim with MINID and a limit and only remove 2 items that satisfies MINID",
4949
async () => {
5050
const key = newKey();
5151
const baseTimestamp = Date.now();
52-
for (let i = 0; i < 100; i++) {
52+
for (let i = 0; i < 20; i++) {
5353
const id = `${baseTimestamp}-${i}`;
5454
await new XAddCommand([key, id, { data: `value${i}` }]).exec(client);
5555
}
56-
const midRangeId = `${baseTimestamp}-50`;
57-
await new XTrimCommand([key, { strategy: "MINID", threshold: midRangeId, limit: 10 }]).exec(
56+
const midRangeId = `${baseTimestamp}-10`;
57+
await new XTrimCommand([key, { strategy: "MINID", threshold: midRangeId, limit: 2 }]).exec(
5858
client,
5959
);
6060
const len = await new XLenCommand([key]).exec(client);
61-
expect(len).toBeLessThanOrEqual(100);
61+
expect(len).toBeLessThanOrEqual(20);
6262
},
6363
{ timeout: 20000 },
6464
);

pkg/pipeline.test.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,6 @@ describe("use all the things", () => {
140140
.get(newKey())
141141
.getbit(newKey(), 0)
142142
.getdel(newKey())
143-
.getrange(newKey(), 0, 1)
144143
.getset(newKey(), "hello")
145144
.hdel(newKey(), "field")
146145
.hexists(newKey(), "field")
@@ -244,6 +243,6 @@ describe("use all the things", () => {
244243
.json.set(newKey(), "$", { hello: "world" });
245244

246245
const res = await p.exec();
247-
expect(res.length).toEqual(121);
246+
expect(res.length).toEqual(120);
248247
});
249248
});

0 commit comments

Comments
 (0)