|
2 | 2 |
|
3 | 3 | import { AnyTask, ApiClient, InferRunTypes, RealtimeRun } from "@trigger.dev/core/v3"; |
4 | 4 | import { useCallback, useEffect, useId, useRef, useState } from "react"; |
5 | | -import { throttle } from "../utils/throttle.js"; |
6 | 5 | import { KeyedMutator, useSWR } from "../utils/trigger-swr.js"; |
7 | 6 | import { useApiClient, UseApiClientOptions } from "./useApiClient.js"; |
| 7 | +import { createThrottledQueue } from "../utils/throttle.js"; |
8 | 8 |
|
9 | 9 | export type UseRealtimeRunOptions = UseApiClientOptions & { |
10 | 10 | id?: string; |
@@ -78,12 +78,7 @@ export function useRealtimeRun<TTask extends AnyTask>( |
78 | 78 | const abortController = new AbortController(); |
79 | 79 | abortControllerRef.current = abortController; |
80 | 80 |
|
81 | | - await processRealtimeRun( |
82 | | - runId, |
83 | | - apiClient, |
84 | | - throttle(mutateRun, options?.experimental_throttleInMs), |
85 | | - abortControllerRef |
86 | | - ); |
| 81 | + await processRealtimeRun(runId, apiClient, mutateRun, abortControllerRef); |
87 | 82 | } catch (err) { |
88 | 83 | // Ignore abort errors as they are expected. |
89 | 84 | if ((err as any).name === "AbortError") { |
@@ -199,10 +194,11 @@ export function useRealtimeRunWithStreams< |
199 | 194 | await processRealtimeRunWithStreams( |
200 | 195 | runId, |
201 | 196 | apiClient, |
202 | | - throttle(mutateRun, options?.experimental_throttleInMs), |
203 | | - throttle(mutateStreams, options?.experimental_throttleInMs), |
| 197 | + mutateRun, |
| 198 | + mutateStreams, |
204 | 199 | streamsRef, |
205 | | - abortControllerRef |
| 200 | + abortControllerRef, |
| 201 | + options?.experimental_throttleInMs |
206 | 202 | ); |
207 | 203 | } catch (err) { |
208 | 204 | // Ignore abort errors as they are expected. |
@@ -285,13 +281,7 @@ export function useRealtimeRunsWithTag<TTask extends AnyTask>( |
285 | 281 | const abortController = new AbortController(); |
286 | 282 | abortControllerRef.current = abortController; |
287 | 283 |
|
288 | | - await processRealtimeRunsWithTag( |
289 | | - tag, |
290 | | - apiClient, |
291 | | - throttle(mutateRuns, options?.experimental_throttleInMs), |
292 | | - runsRef, |
293 | | - abortControllerRef |
294 | | - ); |
| 284 | + await processRealtimeRunsWithTag(tag, apiClient, mutateRuns, runsRef, abortControllerRef); |
295 | 285 | } catch (err) { |
296 | 286 | // Ignore abort errors as they are expected. |
297 | 287 | if ((err as any).name === "AbortError") { |
@@ -372,13 +362,7 @@ export function useRealtimeBatch<TTask extends AnyTask>( |
372 | 362 | const abortController = new AbortController(); |
373 | 363 | abortControllerRef.current = abortController; |
374 | 364 |
|
375 | | - await processRealtimeBatch( |
376 | | - batchId, |
377 | | - apiClient, |
378 | | - throttle(mutateRuns, options?.experimental_throttleInMs), |
379 | | - runsRef, |
380 | | - abortControllerRef |
381 | | - ); |
| 365 | + await processRealtimeBatch(batchId, apiClient, mutateRuns, runsRef, abortControllerRef); |
382 | 366 | } catch (err) { |
383 | 367 | // Ignore abort errors as they are expected. |
384 | 368 | if ((err as any).name === "AbortError") { |
@@ -486,23 +470,51 @@ async function processRealtimeRunWithStreams< |
486 | 470 | mutateRunData: KeyedMutator<RealtimeRun<TTask>>, |
487 | 471 | mutateStreamData: KeyedMutator<StreamResults<TStreams>>, |
488 | 472 | existingDataRef: React.MutableRefObject<StreamResults<TStreams>>, |
489 | | - abortControllerRef: React.MutableRefObject<AbortController | null> |
| 473 | + abortControllerRef: React.MutableRefObject<AbortController | null>, |
| 474 | + throttleInMs?: number |
490 | 475 | ) { |
491 | 476 | const subscription = apiClient.subscribeToRun<InferRunTypes<TTask>>(runId, { |
492 | 477 | signal: abortControllerRef.current?.signal, |
493 | 478 | }); |
494 | 479 |
|
| 480 | + type StreamUpdate = { |
| 481 | + type: keyof TStreams; |
| 482 | + chunk: any; |
| 483 | + }; |
| 484 | + |
| 485 | + const streamQueue = createThrottledQueue<StreamUpdate>(async (updates) => { |
| 486 | + const nextStreamData = { ...existingDataRef.current }; |
| 487 | + |
| 488 | + // Group updates by type |
| 489 | + const updatesByType = updates.reduce( |
| 490 | + (acc, update) => { |
| 491 | + if (!acc[update.type]) { |
| 492 | + acc[update.type] = []; |
| 493 | + } |
| 494 | + acc[update.type].push(update.chunk); |
| 495 | + return acc; |
| 496 | + }, |
| 497 | + {} as Record<keyof TStreams, any[]> |
| 498 | + ); |
| 499 | + |
| 500 | + // Apply all updates |
| 501 | + for (const [type, chunks] of Object.entries(updatesByType)) { |
| 502 | + // @ts-ignore |
| 503 | + nextStreamData[type] = [...(existingDataRef.current[type] || []), ...chunks]; |
| 504 | + } |
| 505 | + |
| 506 | + await mutateStreamData(nextStreamData); |
| 507 | + }, throttleInMs); |
| 508 | + |
495 | 509 | for await (const part of subscription.withStreams<TStreams>()) { |
496 | 510 | if (part.type === "run") { |
497 | 511 | mutateRunData(part.run); |
498 | 512 | } else { |
499 | | - const nextStreamData = { |
500 | | - ...existingDataRef.current, |
| 513 | + streamQueue.add({ |
| 514 | + type: part.type, |
501 | 515 | // @ts-ignore |
502 | | - [part.type]: [...(existingDataRef.current[part.type] || []), part.chunk], |
503 | | - }; |
504 | | - |
505 | | - mutateStreamData(nextStreamData); |
| 516 | + chunk: part.chunk, |
| 517 | + }); |
506 | 518 | } |
507 | 519 | } |
508 | 520 | } |
|
0 commit comments