Skip to content

Commit b6d35cb

Browse files
committed
Some tweaks
1 parent bad04db commit b6d35cb

File tree

4 files changed

+93
-17
lines changed

4 files changed

+93
-17
lines changed

apps/webapp/app/env.server.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -255,9 +255,9 @@ const EnvironmentSchema = z.object({
255255
EVENTS_MIN_CONCURRENCY: z.coerce.number().int().default(1),
256256
EVENTS_MAX_CONCURRENCY: z.coerce.number().int().default(10),
257257
EVENTS_MAX_BATCH_SIZE: z.coerce.number().int().default(500),
258-
EVENTS_MEMORY_PRESSURE_THRESHOLD: z.coerce.number().int().default(2000),
259-
EVENTS_LOAD_SHEDDING_THRESHOLD: z.coerce.number().int().default(5000),
260-
EVENTS_LOAD_SHEDDING_ENABLED: z.coerce.boolean().default(true),
258+
EVENTS_MEMORY_PRESSURE_THRESHOLD: z.coerce.number().int().default(5000),
259+
EVENTS_LOAD_SHEDDING_THRESHOLD: z.coerce.number().int().default(100000),
260+
EVENTS_LOAD_SHEDDING_ENABLED: z.string().default("1"),
261261
SHARED_QUEUE_CONSUMER_POOL_SIZE: z.coerce.number().int().default(10),
262262
SHARED_QUEUE_CONSUMER_INTERVAL_MS: z.coerce.number().int().default(100),
263263
SHARED_QUEUE_CONSUMER_NEXT_TICK_INTERVAL_MS: z.coerce.number().int().default(100),

apps/webapp/app/v3/dynamicFlushScheduler.server.ts

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1+
import { Logger } from "@trigger.dev/core/logger";
12
import { nanoid } from "nanoid";
23
import pLimit from "p-limit";
3-
import { logger } from "~/services/logger.server";
4-
import { TaskEventKind } from "@trigger.dev/database";
54

65
export type DynamicFlushSchedulerConfig<T> = {
76
batchSize: number;
@@ -49,6 +48,8 @@ export class DynamicFlushScheduler<T> {
4948
private readonly isDroppableEvent?: (item: T) => boolean;
5049
private isLoadShedding: boolean = false;
5150

51+
private readonly logger: Logger = new Logger("EventRepo.DynamicFlushScheduler", "debug");
52+
5253
constructor(config: DynamicFlushSchedulerConfig<T>) {
5354
this.batchQueue = [];
5455
this.currentBatch = [];
@@ -98,16 +99,17 @@ export class DynamicFlushScheduler<T> {
9899

99100
if (!this.isLoadShedding) {
100101
this.isLoadShedding = true;
101-
logger.warn("Load shedding activated", {
102-
totalQueuedItems: this.totalQueuedItems,
103-
threshold: this.loadSheddingThreshold,
104-
droppedCount: dropped.length,
105-
});
106102
}
103+
104+
this.logger.warn("Load shedding", {
105+
totalQueuedItems: this.totalQueuedItems,
106+
threshold: this.loadSheddingThreshold,
107+
droppedCount: dropped.length,
108+
});
107109
}
108110
} else if (this.isLoadShedding && this.totalQueuedItems < this.loadSheddingThreshold * 0.8) {
109111
this.isLoadShedding = false;
110-
logger.info("Load shedding deactivated", {
112+
this.logger.info("Load shedding deactivated", {
111113
totalQueuedItems: this.totalQueuedItems,
112114
threshold: this.loadSheddingThreshold,
113115
totalDropped: this.metrics.droppedEvents,
@@ -183,7 +185,7 @@ export class DynamicFlushScheduler<T> {
183185
this.metrics.flushedBatches++;
184186
this.metrics.totalItemsFlushed += itemCount;
185187

186-
logger.debug("Batch flushed successfully", {
188+
this.logger.debug("Batch flushed successfully", {
187189
flushId,
188190
itemCount,
189191
duration,
@@ -195,7 +197,7 @@ export class DynamicFlushScheduler<T> {
195197
this.consecutiveFlushFailures++;
196198
this.metrics.failedBatches++;
197199

198-
logger.error("Error flushing batch", {
200+
this.logger.error("Error flushing batch", {
199201
flushId,
200202
itemCount,
201203
error,
@@ -223,13 +225,21 @@ export class DynamicFlushScheduler<T> {
223225
});
224226
}
225227

228+
private lastConcurrencyAdjustment: number = Date.now();
229+
226230
private adjustConcurrency(backOff: boolean = false): void {
227231
const currentConcurrency = this.limiter.concurrency;
228232
let newConcurrency = currentConcurrency;
229-
233+
230234
// Calculate pressure metrics - moved outside the if/else block
231235
const queuePressure = this.totalQueuedItems / this.memoryPressureThreshold;
232236
const timeSinceLastFlush = Date.now() - this.lastFlushTime;
237+
const timeSinceLastAdjustment = Date.now() - this.lastConcurrencyAdjustment;
238+
239+
// Don't adjust too frequently (except for backoff)
240+
if (!backOff && timeSinceLastAdjustment < 1000) {
241+
return;
242+
}
233243

234244
if (backOff) {
235245
// Reduce concurrency on failures
@@ -258,12 +268,13 @@ export class DynamicFlushScheduler<T> {
258268
if (newConcurrency !== currentConcurrency) {
259269
this.limiter = pLimit(newConcurrency);
260270

261-
logger.info("Adjusted flush concurrency", {
271+
this.logger.info("Adjusted flush concurrency", {
262272
previousConcurrency: currentConcurrency,
263273
newConcurrency,
264274
queuePressure,
265275
totalQueuedItems: this.totalQueuedItems,
266276
currentBatchSize: this.currentBatchSize,
277+
memoryPressureThreshold: this.memoryPressureThreshold,
267278
});
268279
}
269280
}
@@ -276,7 +287,7 @@ export class DynamicFlushScheduler<T> {
276287
droppedByKind[kind] = count;
277288
});
278289

279-
logger.info("DynamicFlushScheduler metrics", {
290+
this.logger.info("DynamicFlushScheduler metrics", {
280291
totalQueuedItems: this.totalQueuedItems,
281292
batchQueueLength: this.batchQueue.length,
282293
currentBatchLength: this.currentBatch.length,

apps/webapp/app/v3/eventRepository.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1349,7 +1349,7 @@ function initializeEventRepo() {
13491349
maxBatchSize: env.EVENTS_MAX_BATCH_SIZE,
13501350
memoryPressureThreshold: env.EVENTS_MEMORY_PRESSURE_THRESHOLD,
13511351
loadSheddingThreshold: env.EVENTS_LOAD_SHEDDING_THRESHOLD,
1352-
loadSheddingEnabled: env.EVENTS_LOAD_SHEDDING_ENABLED,
1352+
loadSheddingEnabled: env.EVENTS_LOAD_SHEDDING_ENABLED === "1",
13531353
redis: {
13541354
port: env.PUBSUB_REDIS_PORT,
13551355
host: env.PUBSUB_REDIS_HOST,

references/hello-world/src/trigger/example.ts

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,3 +373,68 @@ export const largeAttributesTask = task({
373373
});
374374
},
375375
});
376+
377+
export const lotsOfLogsParentTask = task({
378+
id: "lots-of-logs-parent",
379+
run: async (payload: { count: number }, { ctx }) => {
380+
logger.info("Hello, world from the lots of logs parent task", { count: payload.count });
381+
await lotsOfLogsTask.batchTriggerAndWait(
382+
Array.from({ length: 20 }, (_, i) => ({
383+
payload: { count: payload.count },
384+
}))
385+
);
386+
},
387+
});
388+
389+
export const lotsOfLogsTask = task({
390+
id: "lots-of-logs",
391+
run: async (payload: { count: number }, { ctx }) => {
392+
logger.info("Hello, world from the lots of logs task", { count: payload.count });
393+
394+
for (let i = 0; i < payload.count; i++) {
395+
logger.info("Hello, world from the lots of logs task", { count: i });
396+
}
397+
398+
await setTimeout(1000);
399+
400+
for (let i = 0; i < payload.count; i++) {
401+
logger.info("Hello, world from the lots of logs task", { count: i });
402+
}
403+
404+
await setTimeout(1000);
405+
406+
for (let i = 0; i < payload.count; i++) {
407+
logger.info("Hello, world from the lots of logs task", { count: i });
408+
}
409+
410+
await setTimeout(1000);
411+
412+
for (let i = 0; i < payload.count; i++) {
413+
logger.info("Hello, world from the lots of logs task", { count: i });
414+
}
415+
416+
await setTimeout(1000);
417+
418+
for (let i = 0; i < payload.count; i++) {
419+
logger.info("Hello, world from the lots of logs task", { count: i });
420+
}
421+
422+
await setTimeout(1000);
423+
424+
for (let i = 0; i < payload.count; i++) {
425+
logger.info("Hello, world from the lots of logs task", { count: i });
426+
}
427+
428+
await setTimeout(1000);
429+
430+
for (let i = 0; i < payload.count; i++) {
431+
logger.info("Hello, world from the lots of logs task", { count: i });
432+
}
433+
434+
await setTimeout(1000);
435+
436+
for (let i = 0; i < payload.count; i++) {
437+
logger.info("Hello, world from the lots of logs task", { count: i });
438+
}
439+
},
440+
});

0 commit comments

Comments
 (0)