99import modelengine .fel .core .chat .ChatMessage ;
1010import modelengine .fel .core .chat .ChatOption ;
1111import modelengine .fel .core .memory .Memory ;
12+ import modelengine .fel .core .memory .support .RecentMemory ;
1213import modelengine .fel .engine .activities .AiStart ;
1314import modelengine .fel .engine .activities .FlowCallBack ;
1415import modelengine .fel .engine .operators .models .StreamingConsumer ;
15- import modelengine .fel .engine .operators .sources .Source ;
1616import modelengine .fel .engine .util .StateKey ;
1717import modelengine .fit .waterflow .domain .context .FlowSession ;
1818import modelengine .fit .waterflow .domain .stream .operators .Operators ;
3333 * @since 2024-04-28
3434 */
3535public class Conversation <D , R > {
36+ private static final int DEFAULT_HISTORY_COUNT = 20 ;
37+
3638 private final AiProcessFlow <D , R > flow ;
3739 private final FlowSession session ;
3840 private final AtomicReference <ConverseListener <R >> converseListener = new AtomicReference <>(null );
@@ -66,6 +68,7 @@ public Conversation(AiProcessFlow<D, R> flow, FlowSession session) {
6668 @ SafeVarargs
6769 public final ConverseLatch <R > offer (D ... data ) {
6870 ConverseLatch <R > latch = setListener (this .flow );
71+ this .initMemory ();
6972 FlowSession newSession = FlowSession .newRootSession (this .session , this .session .preserved ());
7073 newSession .getWindow ().setFrom (null );
7174 this .flow .start ().offer (data , newSession );
@@ -85,6 +88,7 @@ public final ConverseLatch<R> offer(D... data) {
8588 public ConverseLatch <R > offer (String nodeId , List <?> data ) {
8689 Validation .notBlank (nodeId , "invalid nodeId." );
8790 ConverseLatch <R > latch = setListener (this .flow );
91+ this .initMemory ();
8892 FlowSession newSession = new FlowSession (this .session );
8993 newSession .getWindow ().setFrom (null );
9094 this .flow .origin ().offer (nodeId , data .toArray (new Object [0 ]), newSession );
@@ -231,4 +235,10 @@ private FlowSession setConverseListener(FlowSession session) {
231235 session .setInnerState (StateKey .CONVERSE_LISTENER , new AtomicReference <>(new ConcurrentHashMap <>()));
232236 return session ;
233237 }
238+
239+ private void initMemory () {
240+ if (this .session .getInnerState (StateKey .HISTORY ) == null ) {
241+ this .session .setInnerState (StateKey .HISTORY , new RecentMemory (DEFAULT_HISTORY_COUNT ));
242+ }
243+ }
234244}
0 commit comments