Skip to content

Commit 193c3b4

Browse files
committed
feat: optimize Redis stream handling with batching
Add STREAM_ORIGIN to environment schema. Improve performance in RealtimeStreams by using TextDecoderStream for simpler text decoding and implementing batching of XADD commands for Redis streams. Limit stream size using MAXLEN option. Update environment variable repository with new variable type. Adjust import statements for Redis key and value types.
1 parent 0ceadc6 commit 193c3b4

File tree

4 files changed

+45
-25
lines changed

4 files changed

+45
-25
lines changed

apps/webapp/app/env.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ const EnvironmentSchema = z.object({
3232
LOGIN_ORIGIN: z.string().default("http://localhost:3030"),
3333
APP_ORIGIN: z.string().default("http://localhost:3030"),
3434
API_ORIGIN: z.string().optional(),
35+
STREAM_ORIGIN: z.string().optional(),
3536
ELECTRIC_ORIGIN: z.string().default("http://localhost:3060"),
3637
APP_ENV: z.string().default(process.env.NODE_ENV),
3738
SERVICE_NAME: z.string().default("trigger.dev webapp"),

apps/webapp/app/services/realtimeStreams.server.ts

Lines changed: 38 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import Redis, { RedisOptions } from "ioredis";
1+
import Redis, { RedisKey, RedisOptions, RedisValue } from "ioredis";
22
import { logger } from "./logger.server";
33

44
export type RealtimeStreamsOptions = {
@@ -107,55 +107,68 @@ export class RealtimeStreams {
107107
streamId: string
108108
): Promise<Response> {
109109
const redis = new Redis(this.options.redis ?? {});
110-
111110
const streamKey = `stream:${runId}:${streamId}`;
112111

113-
async function cleanup(stream?: TransformStream) {
112+
async function cleanup() {
114113
try {
115114
await redis.quit();
116-
if (stream) {
117-
const writer = stream.writable.getWriter();
118-
await writer.close(); // Catch in case the stream is already closed
119-
}
120115
} catch (error) {
121116
logger.error("[RealtimeStreams][ingestData] Error in cleanup:", { error });
122117
}
123118
}
124119

125120
try {
126-
const reader = stream.getReader();
127-
const decoder = new TextDecoder();
128-
let buffer = "";
121+
// Use TextDecoderStream to simplify text decoding
122+
const textStream = stream.pipeThrough(new TextDecoderStream());
123+
const reader = textStream.getReader();
124+
125+
const batchSize = 10; // Adjust this value based on performance testing
126+
let batchCommands: Array<[key: RedisKey, ...args: RedisValue[]]> = [];
129127

130128
while (true) {
131129
const { done, value } = await reader.read();
132130

133-
logger.debug("[RealtimeStreams][ingestData] Reading data", { streamKey, done });
134-
135131
if (done) {
136-
if (buffer) {
137-
const data = JSON.parse(buffer);
138-
await redis.xadd(streamKey, "*", "data", JSON.stringify(data));
139-
}
140132
break;
141133
}
142134

143-
buffer += decoder.decode(value, { stream: true });
144-
const lines = buffer.split("\n");
145-
buffer = lines.pop() || "";
135+
logger.debug("[RealtimeStreams][ingestData] Reading data", { streamKey, value });
136+
137+
// 'value' is a string containing the decoded text
138+
const lines = value.split("\n");
146139

147140
for (const line of lines) {
148141
if (line.trim()) {
149-
const data = JSON.parse(line);
150-
151-
logger.debug("[RealtimeStreams][ingestData] Ingesting data", { streamKey });
152-
153-
await redis.xadd(streamKey, "*", "data", JSON.stringify(data));
142+
// Avoid unnecessary parsing; assume 'line' is already a JSON string
143+
// Add XADD command with MAXLEN option to limit stream size
144+
batchCommands.push([streamKey, "MAXLEN", "~", "1000", "*", "data", line]);
145+
146+
if (batchCommands.length >= batchSize) {
147+
// Send batch using a pipeline
148+
const pipeline = redis.pipeline();
149+
for (const args of batchCommands) {
150+
pipeline.xadd(...args);
151+
}
152+
await pipeline.exec();
153+
batchCommands = [];
154+
}
154155
}
155156
}
156157
}
157158

158-
await redis.xadd(streamKey, "*", "data", JSON.stringify({ __end: true }));
159+
// Send any remaining commands
160+
if (batchCommands.length > 0) {
161+
const pipeline = redis.pipeline();
162+
for (const args of batchCommands) {
163+
pipeline.xadd(...args);
164+
}
165+
await pipeline.exec();
166+
}
167+
168+
// Send the __end message to indicate the end of the stream
169+
const endData = JSON.stringify({ __end: true });
170+
await redis.xadd(streamKey, "MAXLEN", "~", "1000", "*", "data", endData);
171+
159172
return new Response(null, { status: 200 });
160173
} catch (error) {
161174
console.error("Error in ingestData:", error);

apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -734,6 +734,10 @@ async function resolveBuiltInProdVariables(runtimeEnvironment: RuntimeEnvironmen
734734
key: "TRIGGER_API_URL",
735735
value: env.API_ORIGIN ?? env.APP_ORIGIN,
736736
},
737+
{
738+
key: "TRIGGER_STREAM_URL",
739+
value: env.STREAM_ORIGIN ?? env.API_ORIGIN ?? env.APP_ORIGIN,
740+
},
737741
{
738742
key: "TRIGGER_RUNTIME_WAIT_THRESHOLD_IN_MS",
739743
value: String(env.CHECKPOINT_THRESHOLD_IN_MS),

packages/core/src/v3/runMetadata/metadataStream.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { run } from "node:test";
2+
13
export type MetadataOptions<T> = {
24
baseUrl: string;
35
runId: string;

0 commit comments

Comments
 (0)