1111
1212import { create } from '@bufbuild/protobuf' ;
1313import { useQuery as useTanstackQuery } from '@tanstack/react-query' ;
14- import { useCallback , useEffect , useRef , useState } from 'react' ;
14+ import { useEffect , useReducer , useRef , useState } from 'react' ;
1515import { ONE_MINUTE , ONE_SECOND } from 'react-query/react-query.utils' ;
1616import { toast as sonnerToast } from 'sonner' ;
1717
@@ -101,6 +101,68 @@ function shouldIncludeMessage(msg: TopicMessage, pipelineId: string, serverless:
101101 return msg . keyJson === pipelineId ;
102102}
103103
104+ // --- History streaming (extracted for React Compiler compatibility) ---
105+
106+ type HistoryStreamOpts = {
107+ pipelineId : string ;
108+ serverless : boolean ;
109+ signal : AbortSignal ;
110+ messagesRef : React . RefObject < TopicMessage [ ] > ;
111+ setPhase : React . Dispatch < React . SetStateAction < string | null > > ;
112+ } ;
113+
114+ function handleHistoryMessage ( res : ListMessagesResponse , opts : HistoryStreamOpts ) {
115+ switch ( res . controlMessage . case ) {
116+ case 'data' : {
117+ const msg = convertListMessageData ( res . controlMessage . value ) ;
118+ if ( shouldIncludeMessage ( msg , opts . pipelineId , opts . serverless ) ) {
119+ opts . messagesRef . current ?. push ( msg ) ;
120+ }
121+ break ;
122+ }
123+ case 'phase' :
124+ opts . setPhase ( res . controlMessage . value . phase ) ;
125+ break ;
126+ case 'done' :
127+ opts . setPhase ( null ) ;
128+ break ;
129+ case 'error' : {
130+ const errMsg = res . controlMessage . value . message ;
131+ sonnerToast . error ( 'Failed to search pipeline logs' , { description : errMsg } ) ;
132+ throw new Error ( errMsg ) ;
133+ }
134+ default :
135+ break ;
136+ }
137+ }
138+
139+ async function runHistoryStream ( opts : HistoryStreamOpts ) {
140+ const client = appConfig . consoleClient ;
141+ if ( ! client ) {
142+ throw new Error ( 'Console client not configured' ) ;
143+ }
144+
145+ const req = buildRequest ( opts . pipelineId , false , opts . serverless ) ;
146+
147+ try {
148+ for await ( const res of client . listMessages ( req , {
149+ signal : opts . signal ,
150+ timeoutMs : HISTORY_TIMEOUT_MS ,
151+ } ) ) {
152+ if ( opts . signal . aborted ) {
153+ break ;
154+ }
155+ handleHistoryMessage ( res , opts ) ;
156+ }
157+ } catch ( e ) {
158+ opts . setPhase ( null ) ;
159+ throw e ;
160+ }
161+
162+ opts . setPhase ( null ) ;
163+ return opts . messagesRef . current ?? [ ] ;
164+ }
165+
104166// --- useLogHistory: React Query + ref/interval for incremental streaming ---
105167
106168function useLogHistory ( opts : { pipelineId : string ; serverless : boolean ; enabled : boolean } ) {
@@ -114,62 +176,25 @@ function useLogHistory(opts: { pipelineId: string; serverless: boolean; enabled:
114176 return ;
115177 }
116178 const interval = setInterval ( ( ) => {
117- setStreamingMessages ( [ ...messagesRef . current ] ) ;
179+ setStreamingMessages ( [ ...( messagesRef . current ?? [ ] ) ] ) ;
118180 } , FLUSH_INTERVAL_MS ) ;
119181 return ( ) => clearInterval ( interval ) ;
120182 } , [ phase ] ) ;
121183
122184 const query = useTanstackQuery < TopicMessage [ ] > ( {
123185 queryKey : LOG_HISTORY_KEY ( opts . pipelineId ) ,
124- queryFn : async ( { signal } ) => {
186+ queryFn : ( { signal } ) => {
125187 messagesRef . current = [ ] ;
126188 setPhase ( 'Searching...' ) ;
127189 setStreamingMessages ( [ ] ) ;
128190
129- const client = appConfig . consoleClient ;
130- if ( ! client ) {
131- throw new Error ( 'Console client not configured' ) ;
132- }
133-
134- const req = buildRequest ( opts . pipelineId , false , opts . serverless ) ;
135-
136- try {
137- for await ( const res of client . listMessages ( req , {
138- signal,
139- timeoutMs : HISTORY_TIMEOUT_MS ,
140- } ) ) {
141- if ( signal ?. aborted ) {
142- break ;
143- }
144-
145- switch ( res . controlMessage . case ) {
146- case 'data' : {
147- const msg = convertListMessageData ( res . controlMessage . value ) ;
148- if ( shouldIncludeMessage ( msg , opts . pipelineId , opts . serverless ) ) {
149- messagesRef . current . push ( msg ) ;
150- }
151- break ;
152- }
153- case 'phase' :
154- setPhase ( res . controlMessage . value . phase ) ;
155- break ;
156- case 'done' :
157- setPhase ( null ) ;
158- break ;
159- case 'error' : {
160- const errMsg = res . controlMessage . value . message ;
161- sonnerToast . error ( 'Failed to search pipeline logs' , { description : errMsg } ) ;
162- throw new Error ( errMsg ) ;
163- }
164- default :
165- break ;
166- }
167- }
168- } finally {
169- setPhase ( null ) ;
170- }
171-
172- return messagesRef . current ;
191+ return runHistoryStream ( {
192+ pipelineId : opts . pipelineId ,
193+ serverless : opts . serverless ,
194+ signal,
195+ messagesRef,
196+ setPhase,
197+ } ) ;
173198 } ,
174199 enabled : opts . enabled ,
175200 staleTime : 0 ,
@@ -185,44 +210,78 @@ function useLogHistory(opts: { pipelineId: string; serverless: boolean; enabled:
185210
186211// --- useLogLive: Standalone streaming hook for live tail ---
187212
213+ type LiveState = {
214+ messages : TopicMessage [ ] ;
215+ phase : string | null ;
216+ error : string | null ;
217+ } ;
218+
219+ type LiveAction =
220+ | { type : 'reset' }
221+ | { type : 'start' }
222+ | { type : 'addMessage' ; msg : TopicMessage }
223+ | { type : 'setPhase' ; phase : string }
224+ | { type : 'setError' ; error : string }
225+ | { type : 'done' }
226+ | { type : 'noClient' } ;
227+
228+ const LIVE_INITIAL_STATE : LiveState = { messages : [ ] , phase : null , error : null } ;
229+
230+ function liveReducer ( state : LiveState , action : LiveAction ) : LiveState {
231+ switch ( action . type ) {
232+ case 'reset' :
233+ return LIVE_INITIAL_STATE ;
234+ case 'start' :
235+ return { messages : [ ] , phase : 'Searching...' , error : null } ;
236+ case 'addMessage' :
237+ return { ...state , messages : [ action . msg , ...state . messages ] } ;
238+ case 'setPhase' :
239+ return { ...state , phase : action . phase } ;
240+ case 'setError' :
241+ return { ...state , error : action . error , phase : null } ;
242+ case 'done' :
243+ return { ...state , phase : null } ;
244+ case 'noClient' :
245+ return { messages : [ ] , error : 'Console client not configured' , phase : null } ;
246+ default :
247+ return state ;
248+ }
249+ }
250+
188251type LiveStreamOpts = {
189252 client : NonNullable < typeof appConfig . consoleClient > ;
190253 req : ReturnType < typeof buildRequest > ;
191254 abortController : AbortController ;
192255 pipelineId : string ;
193256 serverless : boolean ;
194257 isMountedRef : React . RefObject < boolean > ;
195- setMessages : React . Dispatch < React . SetStateAction < TopicMessage [ ] > > ;
196- setPhase : React . Dispatch < React . SetStateAction < string | null > > ;
197- setError : React . Dispatch < React . SetStateAction < string | null > > ;
258+ dispatch : React . Dispatch < LiveAction > ;
198259} ;
199260
200261function handleLiveMessage ( res : ListMessagesResponse , opts : LiveStreamOpts ) {
262+ if ( ! opts . isMountedRef . current ) {
263+ return ;
264+ }
201265 switch ( res . controlMessage . case ) {
202266 case 'phase' :
203- if ( opts . isMountedRef . current ) {
204- opts . setPhase ( res . controlMessage . value . phase ) ;
205- }
267+ opts . dispatch ( { type : 'setPhase' , phase : res . controlMessage . value . phase } ) ;
206268 break ;
207269 case 'data' : {
208270 const msg = convertListMessageData ( res . controlMessage . value ) ;
209- if ( shouldIncludeMessage ( msg , opts . pipelineId , opts . serverless ) && opts . isMountedRef . current ) {
210- opts . setMessages ( ( prev ) => [ msg , ... prev ] ) ;
271+ if ( shouldIncludeMessage ( msg , opts . pipelineId , opts . serverless ) ) {
272+ opts . dispatch ( { type : 'addMessage' , msg } ) ;
211273 }
212274 break ;
213275 }
214276 case 'done' :
215- if ( opts . isMountedRef . current ) {
216- opts . setPhase ( null ) ;
217- }
277+ opts . dispatch ( { type : 'done' } ) ;
218278 break ;
219- case 'error' :
220- if ( opts . isMountedRef . current ) {
221- const errMsg = res . controlMessage . value . message ;
222- sonnerToast . error ( 'Failed to search pipeline logs' , { description : errMsg } ) ;
223- opts . setError ( errMsg ) ;
224- }
279+ case 'error' : {
280+ const errMsg = res . controlMessage . value . message ;
281+ sonnerToast . error ( 'Failed to search pipeline logs' , { description : errMsg } ) ;
282+ opts . dispatch ( { type : 'setError' , error : errMsg } ) ;
225283 break ;
284+ }
226285 default :
227286 break ;
228287 }
@@ -246,24 +305,17 @@ async function runLiveStream(opts: LiveStreamOpts) {
246305 if ( opts . isMountedRef . current ) {
247306 const errMsg = e instanceof Error ? e . message : 'Unknown error' ;
248307 sonnerToast . error ( 'Failed to search pipeline logs' , { description : errMsg } ) ;
249- opts . setError ( errMsg ) ;
250- opts . setPhase ( null ) ;
308+ opts . dispatch ( { type : 'setError' , error : errMsg } ) ;
251309 }
252310 } finally {
253311 if ( opts . isMountedRef . current && ! opts . abortController . signal . aborted ) {
254- opts . setPhase ( null ) ;
312+ opts . dispatch ( { type : 'done' } ) ;
255313 }
256314 }
257315}
258316
259- function useLogLive ( opts : { pipelineId : string ; serverless : boolean ; enabled : boolean } ) : {
260- messages : TopicMessage [ ] ;
261- phase : string | null ;
262- error : string | null ;
263- } {
264- const [ messages , setMessages ] = useState < TopicMessage [ ] > ( [ ] ) ;
265- const [ phase , setPhase ] = useState < string | null > ( null ) ;
266- const [ error , setError ] = useState < string | null > ( null ) ;
317+ function useLogLive ( opts : { pipelineId : string ; serverless : boolean ; enabled : boolean } ) : LiveState {
318+ const [ state , dispatch ] = useReducer ( liveReducer , LIVE_INITIAL_STATE ) ;
267319
268320 const abortControllerRef = useRef < AbortController | null > ( null ) ;
269321 const isMountedRef = useRef ( true ) ;
@@ -279,24 +331,19 @@ function useLogLive(opts: { pipelineId: string; serverless: boolean; enabled: bo
279331 useEffect ( ( ) => {
280332 if ( ! opts . enabled ) {
281333 abortControllerRef . current ?. abort ( ) ;
282- setMessages ( [ ] ) ;
283- setPhase ( null ) ;
284- setError ( null ) ;
334+ dispatch ( { type : 'reset' } ) ;
285335 return ;
286336 }
287337
288338 abortControllerRef . current ?. abort ( ) ;
289339 const abortController = new AbortController ( ) ;
290340 abortControllerRef . current = abortController ;
291341
292- setPhase ( 'Searching...' ) ;
293- setError ( null ) ;
294- setMessages ( [ ] ) ;
342+ dispatch ( { type : 'start' } ) ;
295343
296344 const client = appConfig . consoleClient ;
297345 if ( ! client ) {
298- setError ( 'Console client not configured' ) ;
299- setPhase ( null ) ;
346+ dispatch ( { type : 'noClient' } ) ;
300347 return ;
301348 }
302349
@@ -308,17 +355,15 @@ function useLogLive(opts: { pipelineId: string; serverless: boolean; enabled: bo
308355 pipelineId : opts . pipelineId ,
309356 serverless : opts . serverless ,
310357 isMountedRef,
311- setMessages,
312- setPhase,
313- setError,
358+ dispatch,
314359 } ) ;
315360
316361 return ( ) => {
317362 abortController . abort ( ) ;
318363 } ;
319364 } , [ opts . enabled , opts . pipelineId , opts . serverless ] ) ;
320365
321- return { messages , phase , error } ;
366+ return state ;
322367}
323368
324369// --- useLogSearch: Public composition hook ---
@@ -341,10 +386,6 @@ export function useLogSearch({
341386 enabled : enabled && live ,
342387 } ) ;
343388
344- const refresh = useCallback ( ( ) => {
345- history . refetch ( ) ;
346- } , [ history . refetch ] ) ;
347-
348389 if ( live ) {
349390 return {
350391 messages : liveResult . messages ,
@@ -358,6 +399,6 @@ export function useLogSearch({
358399 messages : history . messages ,
359400 phase : history . phase ,
360401 error : history . error ,
361- refresh,
402+ refresh : history . refetch ,
362403 } ;
363404}
0 commit comments