Skip to content

Commit 30db7d3

Browse files
JAORMXclaude
andcommitted
Address PR feedback: fix race condition and encapsulation issues
- Fix race condition in LocalStorage.Close() by collecting keys before deletion - Update Close() comment to reflect actual behavior (clears sessions, not a no-op) - Add setter methods (setTimestamps, setMetadataMap) to ProxySession for proper encapsulation - Update serialization to use setter methods instead of direct field access - Fix StreamableSession constructor to use NewTypedProxySession for proper initialization - Add type assertion check in StreamableSession deserialization 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent 2f8f4aa commit 30db7d3

File tree

4 files changed

+41
-24
lines changed

4 files changed

+41
-24
lines changed

pkg/transport/session/proxy_session.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,3 +134,24 @@ func (s *ProxySession) DeleteMetadata(key string) {
134134
defer s.mu.Unlock()
135135
delete(s.metadata, key)
136136
}
137+
138+
// setTimestamps updates the created and updated timestamps.
139+
// This is used internally for deserialization to restore session state.
140+
func (s *ProxySession) setTimestamps(created, updated time.Time) {
141+
s.mu.Lock()
142+
defer s.mu.Unlock()
143+
s.created = created
144+
s.updated = updated
145+
}
146+
147+
// setMetadataMap replaces the entire metadata map.
148+
// This is used internally for deserialization to restore session state.
149+
func (s *ProxySession) setMetadataMap(metadata map[string]string) {
150+
s.mu.Lock()
151+
defer s.mu.Unlock()
152+
if metadata == nil {
153+
s.metadata = make(map[string]string)
154+
} else {
155+
s.metadata = metadata
156+
}
157+
}

pkg/transport/session/serialization.go

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -68,28 +68,23 @@ func deserializeSession(data []byte) (Session, error) {
6868
// Use existing NewSSESession constructor
6969
sseSession := NewSSESession(sd.ID)
7070
// Update timestamps to match stored values
71-
sseSession.created = sd.CreatedAt
72-
sseSession.updated = sd.UpdatedAt
71+
sseSession.setTimestamps(sd.CreatedAt, sd.UpdatedAt)
7372
// Restore metadata
74-
sseSession.metadata = sd.Metadata
75-
if sseSession.metadata == nil {
76-
sseSession.metadata = make(map[string]string)
77-
}
73+
sseSession.setMetadataMap(sd.Metadata)
7874
// Note: SSE channels and client info will be recreated when reconnected
7975
session = sseSession
8076

8177
case SessionTypeStreamable:
8278
// Use existing NewStreamableSession constructor
83-
streamSession := NewStreamableSession(sd.ID).(*StreamableSession)
79+
sess := NewStreamableSession(sd.ID)
80+
streamSession, ok := sess.(*StreamableSession)
81+
if !ok {
82+
return nil, fmt.Errorf("failed to create StreamableSession")
83+
}
8484
// Update timestamps to match stored values
85-
streamSession.created = sd.CreatedAt
86-
streamSession.updated = sd.UpdatedAt
87-
streamSession.sessType = SessionTypeStreamable
85+
streamSession.setTimestamps(sd.CreatedAt, sd.UpdatedAt)
8886
// Restore metadata
89-
streamSession.metadata = sd.Metadata
90-
if streamSession.metadata == nil {
91-
streamSession.metadata = make(map[string]string)
92-
}
87+
streamSession.setMetadataMap(sd.Metadata)
9388
session = streamSession
9489

9590
case SessionTypeMCP:
@@ -98,13 +93,9 @@ func deserializeSession(data []byte) (Session, error) {
9893
// Use existing NewTypedProxySession constructor
9994
proxySession := NewTypedProxySession(sd.ID, sd.Type)
10095
// Update timestamps to match stored values
101-
proxySession.created = sd.CreatedAt
102-
proxySession.updated = sd.UpdatedAt
96+
proxySession.setTimestamps(sd.CreatedAt, sd.UpdatedAt)
10397
// Restore metadata
104-
proxySession.metadata = sd.Metadata
105-
if proxySession.metadata == nil {
106-
proxySession.metadata = make(map[string]string)
107-
}
98+
proxySession.setMetadataMap(sd.Metadata)
10899
session = proxySession
109100
}
110101

pkg/transport/session/storage_local.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,13 +94,18 @@ func (s *LocalStorage) DeleteExpired(ctx context.Context, before time.Time) erro
9494
return nil
9595
}
9696

97-
// Close is a no-op for local storage as there are no resources to clean up.
97+
// Close clears all sessions from local storage.
9898
func (s *LocalStorage) Close() error {
99-
// Clear all sessions on close
99+
// Collect keys first to avoid modifying map during iteration
100+
var toDelete []any
100101
s.sessions.Range(func(key, _ any) bool {
101-
s.sessions.Delete(key)
102+
toDelete = append(toDelete, key)
102103
return true
103104
})
105+
// Clear all sessions
106+
for _, key := range toDelete {
107+
s.sessions.Delete(key)
108+
}
104109
return nil
105110
}
106111

pkg/transport/session/streamable_session.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ type StreamableSession struct {
1717
// NewStreamableSession constructs a new streamable session with buffered channels
1818
func NewStreamableSession(id string) Session {
1919
return &StreamableSession{
20-
ProxySession: &ProxySession{id: id},
20+
ProxySession: NewTypedProxySession(id, SessionTypeStreamable),
2121
MessageCh: make(chan jsonrpc2.Message, 100),
2222
ResponseCh: make(chan jsonrpc2.Message, 100),
2323
}

0 commit comments

Comments
 (0)