Skip to content

Commit 6f6bc1a

Browse files
authored
Merge pull request #6 from KongZ/optmize-mem
Optimize memory uage
2 parents 3bc84a8 + 9a42b96 commit 6f6bc1a

File tree

6 files changed

+160
-38
lines changed

6 files changed

+160
-38
lines changed

pkg/agent/conversation.go

Lines changed: 44 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,9 @@ type Agent struct {
121121
// SessionBackend is the configured backend for session persistence (e.g., memory, filesystem).
122122
SessionBackend string
123123

124+
// sessionManager is a cached SessionManager instance, initialized once in Init.
125+
sessionManager *sessions.SessionManager
126+
124127
// lastErr is the most recent error run into, for use across the stack
125128
lastErr error
126129

@@ -134,6 +137,18 @@ type Agent struct {
134137
// Assert InMemoryChatStore implements ChatMessageStore
135138
var _ api.ChatMessageStore = &sessions.InMemoryChatStore{}
136139

140+
// getSessionManager returns the cached session manager, lazily initializing it if needed.
141+
func (c *Agent) getSessionManager() (*sessions.SessionManager, error) {
142+
if c.sessionManager == nil {
143+
sm, err := sessions.NewSessionManager(c.SessionBackend)
144+
if err != nil {
145+
return nil, fmt.Errorf("failed to create session manager: %w", err)
146+
}
147+
c.sessionManager = sm
148+
}
149+
return c.sessionManager, nil
150+
}
151+
137152
func (s *Agent) GetSession() *api.Session {
138153
s.sessionMu.Lock()
139154
defer s.sessionMu.Unlock()
@@ -156,7 +171,6 @@ func (c *Agent) addMessage(source api.MessageSource, messageType api.MessageType
156171
Type: messageType,
157172
Payload: payload,
158173
Timestamp: time.Now(),
159-
Metadata: make(map[string]string),
160174
}
161175

162176
// session should always have a ChatMessageStore at this point
@@ -225,6 +239,13 @@ func (s *Agent) Init(ctx context.Context) error {
225239
return fmt.Errorf("agent requires a session to be provided")
226240
}
227241

242+
// Initialize session manager for reuse across agent methods
243+
sessionMgr, err := sessions.NewSessionManager(s.SessionBackend)
244+
if err != nil {
245+
return fmt.Errorf("failed to create session manager: %w", err)
246+
}
247+
s.sessionManager = sessionMgr
248+
228249
// Create a session working directory in the user's home directory
229250
// to avoid read-only filesystem issues in containers
230251
homeDir, err := os.UserHomeDir()
@@ -595,7 +616,7 @@ func (c *Agent) Run(ctx context.Context, initialQuery string) error {
595616
msg := c.addMessage(api.MessageSourceModel, api.MessageTypeText, streamedText)
596617
// If no function calls to be made, this is the final message of the turn
597618
if len(functionCalls) == 0 {
598-
msg.Metadata["is_final"] = "true"
619+
msg.SetMetadata("is_final", "true")
599620
}
600621
}
601622
// If no function calls to be made, we're done
@@ -612,7 +633,7 @@ func (c *Agent) Run(ctx context.Context, initialQuery string) error {
612633
// IMPORTANT: This also prevents UIs from getting blocked on reading from the output channel.
613634
log.V(2).Info("Empty response with no tool calls from LLM.")
614635
msg := c.addMessage(api.MessageSourceAgent, api.MessageTypeText, "Empty response from LLM")
615-
msg.Metadata["is_final"] = "true"
636+
msg.SetMetadata("is_final", "true")
616637
}
617638
continue
618639
}
@@ -788,12 +809,11 @@ func (c *Agent) handleMetaQuery(ctx context.Context, query string) (answer strin
788809
return "Saved session as " + savedSessionID, true, nil
789810

790811
case "sessions":
791-
manager, err := sessions.NewSessionManager(c.SessionBackend)
812+
mgr, err := c.getSessionManager()
792813
if err != nil {
793-
return "", false, fmt.Errorf("failed to create session manager: %w", err)
814+
return "", false, err
794815
}
795-
796-
sessionList, err := manager.ListSessions()
816+
sessionList, err := mgr.ListSessions()
797817
if err != nil {
798818
return "", false, fmt.Errorf("failed to list sessions: %w", err)
799819
}
@@ -840,11 +860,6 @@ func (c *Agent) NewSession() (string, error) {
840860
return "", fmt.Errorf("failed to save current session: %w", err)
841861
}
842862

843-
manager, err := sessions.NewSessionManager(c.SessionBackend)
844-
if err != nil {
845-
return "", fmt.Errorf("failed to create session manager: %w", err)
846-
}
847-
848863
metadata := sessions.Metadata{
849864
ModelID: c.Model,
850865
ProviderID: c.Provider,
@@ -853,7 +868,11 @@ func (c *Agent) NewSession() (string, error) {
853868
metadata.SlackUserID = c.Session.SlackUserID
854869
}
855870

856-
newSession, err := manager.NewSession(metadata)
871+
mgr, err := c.getSessionManager()
872+
if err != nil {
873+
return "", err
874+
}
875+
newSession, err := mgr.NewSession(metadata)
857876
if err != nil {
858877
return "", fmt.Errorf("failed to create new session: %w", err)
859878
}
@@ -910,13 +929,13 @@ func (c *Agent) SaveSession() (string, error) {
910929
c.sessionMu.Lock()
911930
defer c.sessionMu.Unlock()
912931

913-
manager, err := sessions.NewSessionManager(c.SessionBackend)
932+
mgr, err := c.getSessionManager()
914933
if err != nil {
915-
return "", fmt.Errorf("failed to create session manager: %w", err)
934+
return "", err
916935
}
917936

918937
if c.Session != nil {
919-
foundSession, _ := manager.FindSessionByID(c.Session.ID)
938+
foundSession, _ := mgr.FindSessionByID(c.Session.ID)
920939
if foundSession != nil {
921940
return foundSession.ID, nil
922941
}
@@ -930,7 +949,7 @@ func (c *Agent) SaveSession() (string, error) {
930949
SlackUserID: c.Session.SlackUserID,
931950
}
932951

933-
newSession, err := manager.NewSession(metadata)
952+
newSession, err := mgr.NewSession(metadata)
934953
if err != nil {
935954
return "", fmt.Errorf("failed to create new session: %w", err)
936955
}
@@ -953,14 +972,14 @@ func (c *Agent) SaveSession() (string, error) {
953972

954973
// LoadSession loads a session by ID (or latest), updates the agent's state, and re-initializes the chat.
955974
func (c *Agent) LoadSession(sessionID string) error {
956-
manager, err := sessions.NewSessionManager(c.SessionBackend)
975+
mgr, err := c.getSessionManager()
957976
if err != nil {
958-
return fmt.Errorf("failed to create session manager: %w", err)
977+
return err
959978
}
960979

961980
var session *api.Session
962981
if sessionID == "" || sessionID == "latest" {
963-
s, err := manager.GetLatestSession()
982+
s, err := mgr.GetLatestSession()
964983
if err != nil {
965984
return fmt.Errorf("failed to get latest session: %w", err)
966985
}
@@ -969,7 +988,7 @@ func (c *Agent) LoadSession(sessionID string) error {
969988
}
970989
session = s
971990
} else {
972-
s, err := manager.FindSessionByID(sessionID)
991+
s, err := mgr.FindSessionByID(sessionID)
973992
if err != nil {
974993
return fmt.Errorf("failed to get session %q: %w", sessionID, err)
975994
}
@@ -993,7 +1012,7 @@ func (c *Agent) LoadSession(sessionID string) error {
9931012
c.Session.AgentState = api.AgentStateIdle
9941013
}
9951014

996-
if err := manager.UpdateLastAccessed(session); err != nil {
1015+
if err := mgr.UpdateLastAccessed(session); err != nil {
9971016
return fmt.Errorf("failed to update session metadata: %w", err)
9981017
}
9991018

@@ -1262,7 +1281,7 @@ func toMap(v any) (map[string]any, error) {
12621281

12631282
func candidateToShimCandidate(iterator gollm.ChatResponseIterator) (gollm.ChatResponseIterator, error) {
12641283
return func(yield func(gollm.ChatResponse, error) bool) {
1265-
buffer := ""
1284+
var buf strings.Builder
12661285
for response, err := range iterator {
12671286
if err != nil {
12681287
yield(nil, err)
@@ -1278,7 +1297,7 @@ func candidateToShimCandidate(iterator gollm.ChatResponseIterator) (gollm.ChatRe
12781297

12791298
for _, part := range candidate.Parts() {
12801299
if text, ok := part.AsText(); ok {
1281-
buffer += text
1300+
buf.WriteString(text)
12821301
klog.Infof("text is %q", text)
12831302
} else {
12841303
yield(nil, fmt.Errorf("no text part found in candidate"))
@@ -1287,6 +1306,7 @@ func candidateToShimCandidate(iterator gollm.ChatResponseIterator) (gollm.ChatRe
12871306
}
12881307
}
12891308

1309+
buffer := buf.String()
12901310
if buffer == "" {
12911311
yield(nil, nil)
12921312
return

pkg/agent/manager.go

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"context"
2020
"fmt"
2121
"sync"
22+
"time"
2223

2324
"github.com/KongZ/kubeai-chatbot/pkg/api"
2425
"github.com/KongZ/kubeai-chatbot/pkg/sessions"
@@ -28,22 +29,30 @@ import (
2829
// Factory is a function that creates a new Agent instance.
2930
type Factory func(context.Context) (*Agent, error)
3031

32+
// defaultIdleTimeout is how long an agent can sit idle before being evicted.
33+
const defaultIdleTimeout = 30 * time.Minute
34+
3135
// AgentManager manages the lifecycle of agents and their sessions.
3236
type AgentManager struct {
3337
factory Factory
3438
sessionManager *sessions.SessionManager
3539
agents map[string]*Agent // sessionID -> agent
3640
mu sync.RWMutex
3741
onAgentCreated func(*Agent)
42+
stopCh chan struct{}
43+
closeOnce sync.Once
3844
}
3945

4046
// NewAgentManager creates a new Manager.
4147
func NewAgentManager(factory Factory, sessionManager *sessions.SessionManager) *AgentManager {
42-
return &AgentManager{
48+
m := &AgentManager{
4349
factory: factory,
4450
sessionManager: sessionManager,
4551
agents: make(map[string]*Agent),
52+
stopCh: make(chan struct{}),
4653
}
54+
go m.evictionLoop(defaultIdleTimeout)
55+
return m
4756
}
4857

4958
// SetAgentCreatedCallback sets the callback to be called when a new agent is created.
@@ -80,8 +89,12 @@ func (sm *AgentManager) GetAgent(ctx context.Context, sessionID string) (*Agent,
8089
return sm.startAgent(ctx, session, newAgent)
8190
}
8291

83-
// Close closes all active agents.
92+
// Close closes all active agents and stops the eviction loop.
8493
func (sm *AgentManager) Close() error {
94+
sm.closeOnce.Do(func() {
95+
close(sm.stopCh)
96+
})
97+
8598
sm.mu.Lock()
8699
defer sm.mu.Unlock()
87100

@@ -96,6 +109,44 @@ func (sm *AgentManager) Close() error {
96109
return nil
97110
}
98111

112+
// evictionLoop periodically checks for and evicts idle agents.
113+
func (sm *AgentManager) evictionLoop(idleTimeout time.Duration) {
114+
ticker := time.NewTicker(idleTimeout / 2)
115+
defer ticker.Stop()
116+
for {
117+
select {
118+
case <-ticker.C:
119+
sm.evictIdleAgents(idleTimeout)
120+
case <-sm.stopCh:
121+
return
122+
}
123+
}
124+
}
125+
126+
// evictIdleAgents removes agents that have been idle longer than the timeout.
127+
func (sm *AgentManager) evictIdleAgents(idleTimeout time.Duration) {
128+
now := time.Now()
129+
130+
sm.mu.Lock()
131+
var toClose []*Agent
132+
for id, a := range sm.agents {
133+
session := a.GetSession()
134+
if now.Sub(session.LastModified) > idleTimeout {
135+
klog.Infof("Evicting idle agent for session %s (idle for %v)", id, now.Sub(session.LastModified))
136+
toClose = append(toClose, a)
137+
delete(sm.agents, id)
138+
}
139+
}
140+
sm.mu.Unlock()
141+
142+
// Close evicted agents outside the lock to avoid blocking
143+
for _, a := range toClose {
144+
if err := a.Close(); err != nil {
145+
klog.Errorf("Error closing evicted agent: %v", err)
146+
}
147+
}
148+
}
149+
99150
// ListSessions delegates to the underlying store.
100151
func (sm *AgentManager) ListSessions() ([]*api.Session, error) {
101152
return sm.sessionManager.ListSessions()

pkg/api/models.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,14 @@ func (m *Message) Validate() error {
8989
return validate.Struct(m)
9090
}
9191

92+
// SetMetadata sets a metadata key-value pair, lazily initializing the map if needed.
93+
func (m *Message) SetMetadata(key, value string) {
94+
if m.Metadata == nil {
95+
m.Metadata = make(map[string]string)
96+
}
97+
m.Metadata[key] = value
98+
}
99+
92100
type MessageSource string
93101

94102
const (

pkg/sessions/memory.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,10 @@ func NewInMemoryChatStore() *InMemoryChatStore {
117117
}
118118
}
119119

120+
// maxChatMessages is the upper bound on in-memory messages per session.
121+
// When exceeded, the oldest messages are trimmed to prevent unbounded growth.
122+
const maxChatMessages = 500
123+
120124
// AddChatMessage adds a message to the store.
121125
func (s *InMemoryChatStore) AddChatMessage(record *api.Message) error {
122126
if err := record.Validate(); err != nil {
@@ -125,6 +129,10 @@ func (s *InMemoryChatStore) AddChatMessage(record *api.Message) error {
125129
s.mu.Lock()
126130
defer s.mu.Unlock()
127131
s.messages = append(s.messages, record)
132+
// Trim oldest messages to prevent unbounded memory growth
133+
if len(s.messages) > maxChatMessages {
134+
s.messages = s.messages[len(s.messages)-maxChatMessages:]
135+
}
128136
return nil
129137
}
130138

pkg/tools/kubectl_tool.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,30 @@ import (
2121
"os"
2222
"os/exec"
2323
"strings"
24+
"sync"
2425
"time"
2526

2627
"github.com/KongZ/kubeai-chatbot/gollm"
2728
"github.com/KongZ/kubeai-chatbot/pkg/api"
2829
"mvdan.cc/sh/v3/shell"
2930
)
3031

32+
var (
33+
cachedBaseEnv []string
34+
cachedBaseEnvOnce sync.Once
35+
)
36+
37+
// baseEnviron returns a copy of the cached process environment.
38+
// The base environment is captured once to avoid repeated os.Environ() syscalls.
39+
func baseEnviron() []string {
40+
cachedBaseEnvOnce.Do(func() {
41+
cachedBaseEnv = os.Environ()
42+
})
43+
env := make([]string, len(cachedBaseEnv))
44+
copy(env, cachedBaseEnv)
45+
return env
46+
}
47+
3148
type ExecResult struct {
3249
Command string `json:"command"`
3350
Stdout string `json:"stdout,omitempty"`
@@ -114,8 +131,8 @@ func (t *Kubectl) Run(ctx context.Context, args map[string]any) (any, error) {
114131
// Reconstruct command for reporting
115132
fullCommand := strings.Join(cmdArgs, " ")
116133

117-
// Prepare environment
118-
env := os.Environ()
134+
// Prepare environment (use cached base to avoid repeated syscalls)
135+
env := baseEnviron()
119136
if kubeconfig != "" {
120137
kubeconfig, err := ExpandShellVar(kubeconfig)
121138
if err != nil {

0 commit comments

Comments
 (0)