@@ -418,17 +418,14 @@ async function addMessagesHandler(
418418 )
419419 ) ;
420420 }
421- let order : number | undefined ;
422421 let threadId = parentMessageId ;
423- if ( chatId ) {
424- const maxMessage = await getMaxMessage ( ctx , chatId ) ;
425- // If the previous message isn't our parent, we make a new thread.
426- threadId =
427- parentMessageId && maxMessage ?. _id === parentMessageId
428- ? maxMessage . threadId ?? parentMessageId
429- : parentMessageId ;
430- order = maxMessage ?. order ?? - 1 ;
431- }
422+ const maxMessage = await getMaxMessage ( ctx , chatId , userId ) ;
423+ // If the previous message isn't our parent, we make a new thread.
424+ threadId =
425+ parentMessageId && maxMessage ?. _id === parentMessageId
426+ ? maxMessage . threadId ?? parentMessageId
427+ : parentMessageId ;
428+ let order = maxMessage ?. order ?? - 1 ;
432429 const toReturn : Doc < "messages" > [ ] = [ ] ;
433430 if ( messages . length > 0 ) {
434431 for ( const { message, fileId, id } of messages ) {
@@ -455,18 +452,37 @@ async function addMessagesHandler(
455452 return { messages : toReturn } ;
456453}
457454
458- async function getMaxMessage ( ctx : QueryCtx , chatId : Id < "chats" > ) {
459- return mergedStream (
460- [ "success" as const , "pending" as const ] . map ( ( status ) =>
461- stream ( ctx . db , schema )
462- . query ( "messages" )
463- . withIndex ( "chatId_status_tool_order_stepOrder" , ( q ) =>
464- q . eq ( "chatId" , chatId ) . eq ( "status" , status ) . eq ( "tool" , false )
465- )
466- . order ( "desc" )
467- ) ,
468- [ "order" , "stepOrder" ]
469- ) . first ( ) ;
455+ async function getMaxMessage (
456+ ctx : QueryCtx ,
457+ chatId : Id < "chats" > | undefined ,
458+ userId : string | undefined
459+ ) {
460+ assert ( chatId || userId , "One of chatId or userId is required" ) ;
461+ if ( chatId ) {
462+ return mergedStream (
463+ [ "success" as const , "pending" as const ] . map ( ( status ) =>
464+ stream ( ctx . db , schema )
465+ . query ( "messages" )
466+ . withIndex ( "chatId_status_tool_order_stepOrder" , ( q ) =>
467+ q . eq ( "chatId" , chatId ) . eq ( "status" , status ) . eq ( "tool" , false )
468+ )
469+ . order ( "desc" )
470+ ) ,
471+ [ "order" , "stepOrder" ]
472+ ) . first ( ) ;
473+ } else {
474+ return mergedStream (
475+ [ "success" as const , "pending" as const ] . map ( ( status ) =>
476+ stream ( ctx . db , schema )
477+ . query ( "messages" )
478+ . withIndex ( "userId_status_tool_order_stepOrder" , ( q ) =>
479+ q . eq ( "userId" , userId ) . eq ( "status" , status ) . eq ( "tool" , false )
480+ )
481+ . order ( "desc" )
482+ ) ,
483+ [ "order" , "stepOrder" ]
484+ ) . first ( ) ;
485+ }
470486}
471487
472488const addStepsArgs = {
@@ -766,46 +782,66 @@ export const _fetchVectorMessages = internalQuery({
766782 }
767783 const included : Record < string , Set < number > > = { } ;
768784 for ( const m of messages ) {
769- if ( ! included [ chatId ] ) {
770- included [ chatId ] = new Set ( ) ;
785+ const searchId = m . chatId ?? m . userId ! ;
786+ if ( ! included [ searchId ] ) {
787+ included [ searchId ] = new Set ( ) ;
771788 }
772- included [ m . chatId ] . add ( m . order ! ) ;
789+ included [ searchId ] . add ( m . order ! ) ;
773790 }
774- const ranges : Record < Id < "chats" > , Doc < "messages" > [ ] > = { } ;
791+ const ranges : Record < string , Doc < "messages" > [ ] > = { } ;
775792 const { before, after } = args . messageRange ;
776793 for ( const m of messages ) {
794+ const searchId = m . chatId ?? m . userId ! ;
777795 const order = m . order ! ;
778796 let earliest = order - before ;
779797 let latest = order + after ;
780798 for ( ; earliest <= latest ; earliest ++ ) {
781- if ( ! included [ m . chatId ! ] . has ( earliest ) ) {
799+ if ( ! included [ searchId ] . has ( earliest ) ) {
782800 break ;
783801 }
784802 }
785803 for ( ; latest >= earliest ; latest -- ) {
786- if ( ! included [ m . chatId ! ] . has ( latest ) ) {
804+ if ( ! included [ searchId ] . has ( latest ) ) {
787805 break ;
788806 }
789807 }
790808 for ( let i = earliest ; i <= latest ; i ++ ) {
791- included [ m . chatId ! ] . add ( i ) ;
809+ included [ searchId ] . add ( i ) ;
792810 }
793811 if ( earliest !== latest ) {
794- const surrounding = await ctx . db
795- . query ( "messages" )
796- . withIndex ( "chatId_status_tool_order_stepOrder" , ( q ) =>
797- q
798- . eq ( "chatId" , m . chatId )
799- . eq ( "status" , "success" )
800- . eq ( "tool" , false )
801- . gt ( "order" , earliest )
802- . lt ( "order" , latest )
803- )
804- . collect ( ) ;
805- if ( ! ranges [ m . chatId ! ] ) {
806- ranges [ m . chatId ! ] = [ ] ;
812+ if ( m . chatId ) {
813+ const surrounding = await ctx . db
814+ . query ( "messages" )
815+ . withIndex ( "chatId_status_tool_order_stepOrder" , ( q ) =>
816+ q
817+ . eq ( "chatId" , m . chatId )
818+ . eq ( "status" , "success" )
819+ . eq ( "tool" , false )
820+ . gt ( "order" , earliest )
821+ . lt ( "order" , latest )
822+ )
823+ . collect ( ) ;
824+ if ( ! ranges [ searchId ] ) {
825+ ranges [ searchId ] = [ ] ;
826+ }
827+ ranges [ searchId ] . push ( ...surrounding ) ;
828+ } else {
829+ const surrounding = await ctx . db
830+ . query ( "messages" )
831+ . withIndex ( "userId_status_tool_order_stepOrder" , ( q ) =>
832+ q
833+ . eq ( "userId" , m . userId ! )
834+ . eq ( "status" , "success" )
835+ . eq ( "tool" , false )
836+ . gt ( "order" , earliest )
837+ . lt ( "order" , latest )
838+ )
839+ . collect ( ) ;
840+ if ( ! ranges [ searchId ] ) {
841+ ranges [ searchId ] = [ ] ;
842+ }
843+ ranges [ searchId ] . push ( ...surrounding ) ;
807844 }
808- ranges [ m . chatId ! ] . push ( ...surrounding ) ;
809845 }
810846 }
811847 return Object . values ( ranges )
0 commit comments