@@ -159,7 +159,11 @@ export class LangGraphAgent extends AbstractAgent {
159
159
// Therefore, this value should either hold null, or the only edition of state that should be used.
160
160
this . activeRun ! . manuallyEmittedState = null ;
161
161
162
- this . activeRun ! . nodeName = input . forwardedProps ?. nodeName ;
162
+ const nodeNameInput = forwardedProps ?. nodeName ;
163
+ this . activeRun ! . nodeName = nodeNameInput ;
164
+ if ( this . activeRun ! . nodeName === '__end__' ) {
165
+ this . activeRun ! . nodeName = undefined ;
166
+ }
163
167
164
168
const threadId = inputThreadId ?? randomUUID ( ) ;
165
169
@@ -176,19 +180,20 @@ export class LangGraphAgent extends AbstractAgent {
176
180
177
181
state . messages = agentStateValues . messages ;
178
182
state = this . langGraphDefaultMergeState ( state , aguiToLangChainMessage , tools ) ;
183
+ const graphInfo = await this . client . assistants . getGraph ( this . assistant . assistant_id ) ;
179
184
180
185
const mode =
181
186
threadId && this . activeRun ! . nodeName != "__end__" && this . activeRun ! . nodeName
182
187
? "continue"
183
188
: "start" ;
184
189
185
190
if ( mode === "continue" && ! forwardedProps ?. command ?. resume ) {
191
+ const nodeBefore = graphInfo . edges . find ( e => e . target === this . activeRun ! . nodeName ) ;
186
192
await this . client . threads . updateState ( threadId , {
187
193
values : state ,
188
- asNode : this . activeRun ! . nodeName ,
194
+ asNode : nodeBefore ?. source ,
189
195
} ) ;
190
196
}
191
-
192
197
this . activeRun ! . schemaKeys = await this . getSchemaKeys ( ) ;
193
198
194
199
const payloadInput = getStreamPayloadInput ( {
@@ -197,8 +202,6 @@ export class LangGraphAgent extends AbstractAgent {
197
202
schemaKeys : this . activeRun ! . schemaKeys ,
198
203
} ) ;
199
204
200
- const graphInfo = await this . client . assistants . getGraph ( this . assistant . assistant_id ) ;
201
-
202
205
let payloadConfig : LangGraphConfig | undefined ;
203
206
const configsToMerge = [ this . assistantConfig , forwardedProps ?. config ] . filter (
204
207
Boolean ,
@@ -244,6 +247,7 @@ export class LangGraphAgent extends AbstractAgent {
244
247
} ) ;
245
248
return subscriber . complete ( ) ;
246
249
}
250
+
247
251
const streamResponse = this . client . runs . stream ( threadId , this . assistant . assistant_id , payload ) ;
248
252
249
253
this . activeRun ! . prevNodeName = null ;
@@ -262,6 +266,7 @@ export class LangGraphAgent extends AbstractAgent {
262
266
if ( ! payload . streamMode . includes ( streamResponseChunk . event as StreamMode ) ) {
263
267
continue ;
264
268
}
269
+
265
270
// Force event type, as data is not properly defined on the LG side.
266
271
type EventsChunkData = {
267
272
__interrupt__ ?: any ;
@@ -295,20 +300,11 @@ export class LangGraphAgent extends AbstractAgent {
295
300
this . activeRun ! . id = metadata . run_id ;
296
301
297
302
if ( currentNodeName && currentNodeName !== this . activeRun ! . nodeName ) {
298
- if ( this . activeRun ! . nodeName ) {
299
- this . dispatchEvent ( {
300
- type : EventType . STEP_FINISHED ,
301
- stepName : this . activeRun ! . nodeName ,
302
- } ) ;
303
+ if ( this . activeRun ! . nodeName && this . activeRun ! . nodeName !== nodeNameInput ) {
304
+ this . endStep ( )
303
305
}
304
306
305
- if ( currentNodeName ) {
306
- this . dispatchEvent ( {
307
- type : EventType . STEP_STARTED ,
308
- stepName : currentNodeName ,
309
- } ) ;
310
- this . activeRun ! . nodeName = currentNodeName ;
311
- }
307
+ this . startStep ( currentNodeName )
312
308
}
313
309
314
310
shouldExit =
@@ -326,7 +322,7 @@ export class LangGraphAgent extends AbstractAgent {
326
322
// we only want to update the node name under certain conditions
327
323
// since we don't need any internal node names to be sent to the frontend
328
324
if ( graphInfo [ "nodes" ] . some ( ( node ) => node . id === currentNodeName ) ) {
329
- this . activeRun ! . nodeName = currentNodeName ;
325
+ this . activeRun ! . nodeName = currentNodeName
330
326
}
331
327
332
328
updatedState = this . activeRun ! . manuallyEmittedState ?? latestStateValues ;
@@ -362,10 +358,17 @@ export class LangGraphAgent extends AbstractAgent {
362
358
}
363
359
364
360
state = await this . client . threads . getState ( threadId ) ;
365
- const interrupts = ( state . tasks ?. [ 0 ] ?. interrupts ?? [ ] ) as Interrupt [ ] ;
366
- this . activeRun ! . nodeName = interrupts
367
- ? this . activeRun ! . nodeName
368
- : Object . keys ( state . metadata . writes ) [ 0 ] ;
361
+ const tasks = state . tasks
362
+ const interrupts = ( tasks ?. [ 0 ] ?. interrupts ?? [ ] ) as Interrupt [ ] ;
363
+ const isEndNode = state . next . length === 0
364
+ const writes = state . metadata . writes ?? { }
365
+
366
+ let newNodeName = this . activeRun ! . nodeName !
367
+
368
+ if ( ! interrupts ?. length ) {
369
+ newNodeName = isEndNode ? '__end__' : ( state . next [ 0 ] ?? Object . keys ( writes ) [ 0 ] ) ;
370
+ }
371
+
369
372
370
373
interrupts . forEach ( ( interrupt ) => {
371
374
this . dispatchEvent ( {
@@ -377,6 +380,12 @@ export class LangGraphAgent extends AbstractAgent {
377
380
} ) ;
378
381
} ) ;
379
382
383
+ if ( this . activeRun ! . nodeName != newNodeName ) {
384
+ this . endStep ( )
385
+ this . startStep ( newNodeName )
386
+ }
387
+
388
+ this . endStep ( )
380
389
this . dispatchEvent ( {
381
390
type : EventType . STATE_SNAPSHOT ,
382
391
snapshot : this . getStateSnapshot ( state . values ) ,
@@ -385,17 +394,13 @@ export class LangGraphAgent extends AbstractAgent {
385
394
type : EventType . MESSAGES_SNAPSHOT ,
386
395
messages : langchainMessagesToAgui ( state . values . messages ?? [ ] ) ,
387
396
} ) ;
388
- if ( this . activeRun ! . nodeName ) {
389
- this . dispatchEvent ( {
390
- type : EventType . STEP_FINISHED ,
391
- stepName : this . activeRun ! . nodeName ! ,
392
- } ) ;
393
- }
397
+
394
398
this . dispatchEvent ( {
395
399
type : EventType . RUN_FINISHED ,
396
400
threadId,
397
401
runId : this . activeRun ! . id ,
398
402
} ) ;
403
+ this . activeRun = undefined ;
399
404
return subscriber . complete ( ) ;
400
405
} catch ( e ) {
401
406
return subscriber . error ( e ) ;
@@ -834,6 +839,25 @@ export class LangGraphAgent extends AbstractAgent {
834
839
tools : [ ...( state . tools ?? [ ] ) , ...tools ] ,
835
840
} ;
836
841
}
842
+
843
+ startStep ( nodeName : string ) {
844
+ this . dispatchEvent ( {
845
+ type : EventType . STEP_STARTED ,
846
+ stepName : nodeName ,
847
+ } ) ;
848
+ this . activeRun ! . nodeName = nodeName ;
849
+ }
850
+
851
+ endStep ( ) {
852
+ if ( ! this . activeRun ! . nodeName ) {
853
+ throw new Error ( "No active step to end" ) ;
854
+ }
855
+ this . dispatchEvent ( {
856
+ type : EventType . STEP_FINISHED ,
857
+ stepName : this . activeRun ! . nodeName ! ,
858
+ } ) ;
859
+ this . activeRun ! . nodeName = undefined ;
860
+ }
837
861
}
838
862
839
863
export * from "./types" ;
0 commit comments