Skip to content

Commit 77f61cb

Browse files
high-moctaneclaude
andcommitted
fix: MergeHandler が limit 到達後にリアルタイムイベントをドロップするバグを修正
limitReachedSub フラグが EOSE 後の全イベント(リアルタイム含む)を永久にブロック していた。NIP-01 では limit は initial query にのみ適用されるため、EOSE 後の リアルタイムイベントはパススルーすべき。 limitReachedSub を廃止し、既存の completedSubs による EOSE 後パススルーに統一。 🐛 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Sakura 🌸 <noreply@anthropic.com>
1 parent cf0d849 commit 77f61cb

File tree

2 files changed

+140
-20
lines changed

2 files changed

+140
-20
lines changed

merge_handler.go

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,7 @@ type mergeSession struct {
151151
pendingOKs map[string]*pendingOK // eventID -> pending OK state
152152
pendingEOSEs map[string]*pendingEOSE // subscriptionID -> pending EOSE state
153153
pendingCounts map[string]*pendingCount // subscriptionID -> pending COUNT state
154-
completedSubs map[string]bool // subscriptionID -> true if EOSE already sent
155-
limitReachedSub map[string]bool // subscriptionID -> true if limit reached (drop events)
154+
completedSubs map[string]bool // subscriptionID -> true if EOSE already sent
156155
}
157156

158157
type pendingOK struct {
@@ -184,11 +183,10 @@ type pendingEOSE struct {
184183
func newMergeSession(numHandlers int) *mergeSession {
185184
return &mergeSession{
186185
numHandlers: numHandlers,
187-
pendingOKs: make(map[string]*pendingOK),
188-
pendingEOSEs: make(map[string]*pendingEOSE),
189-
pendingCounts: make(map[string]*pendingCount),
190-
completedSubs: make(map[string]bool),
191-
limitReachedSub: make(map[string]bool),
186+
pendingOKs: make(map[string]*pendingOK),
187+
pendingEOSEs: make(map[string]*pendingEOSE),
188+
pendingCounts: make(map[string]*pendingCount),
189+
completedSubs: make(map[string]bool),
192190
}
193191
}
194192

@@ -240,7 +238,6 @@ func (s *mergeSession) closeSubscription(subID string) {
240238
delete(s.pendingEOSEs, subID)
241239
delete(s.pendingCounts, subID)
242240
delete(s.completedSubs, subID)
243-
delete(s.limitReachedSub, subID)
244241
}
245242

246243
func (s *mergeSession) processResponse(msg *ServerMsg, handlerIndex int) []*ServerMsg {
@@ -271,11 +268,6 @@ func (s *mergeSession) processResponse(msg *ServerMsg, handlerIndex int) []*Serv
271268
eventCreatedAt := msg.Event.CreatedAt.Unix()
272269
eventID := msg.Event.ID
273270

274-
// If limit reached, drop all subsequent events
275-
if s.limitReachedSub[subID] {
276-
return nil
277-
}
278-
279271
// If EOSE already sent (all handlers finished), pass through without dedup/sort (real-time events)
280272
if s.completedSubs[subID] {
281273
return []*ServerMsg{msg}
@@ -336,7 +328,6 @@ func (s *mergeSession) processResponse(msg *ServerMsg, handlerIndex int) []*Serv
336328
pending.eoseSent = true
337329
delete(s.pendingEOSEs, subID)
338330
s.completedSubs[subID] = true
339-
s.limitReachedSub[subID] = true // Drop subsequent events
340331
return []*ServerMsg{msg, NewServerEOSEMsg(subID)}
341332
}
342333

merge_handler_test.go

Lines changed: 135 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -579,20 +579,149 @@ func TestMergeHandler_Req_Limit_MultipleHandlers(t *testing.T) {
579579
}
580580
done:
581581

582-
// Should have at most 3 EVENTs + 1 EOSE = 4 messages
583-
// (MergeHandler should enforce the limit across all handlers)
584-
eventCount := 0
582+
// Count events before EOSE (should be at most 3 = limit)
583+
// After EOSE, additional events may arrive (real-time passthrough)
584+
eventCountBeforeEOSE := 0
585+
foundEOSE := false
585586
for _, msg := range events {
587+
if msg.Type == MsgTypeEOSE {
588+
foundEOSE = true
589+
break
590+
}
586591
if msg.Type == MsgTypeEvent {
587-
eventCount++
592+
eventCountBeforeEOSE++
588593
}
589594
}
590595

591-
assert.LessOrEqual(t, eventCount, 3, "should have at most 3 events (limit)")
592-
assert.Equal(t, MsgTypeEOSE, events[len(events)-1].Type)
596+
assert.True(t, foundEOSE, "should have EOSE")
597+
assert.LessOrEqual(t, eventCountBeforeEOSE, 3, "should have at most 3 events before EOSE (limit)")
593598
})
594599
}
595600

601+
// TestMergeHandler_Req_Limit_RealTimeEventsAfterEOSE tests that real-time events
602+
// are NOT dropped after limit-triggered EOSE.
603+
// This is the key bug fix test: previously, limitReachedSub permanently blocked
604+
// all events including real-time ones.
605+
func TestMergeHandler_Req_Limit_RealTimeEventsAfterEOSE(t *testing.T) {
606+
synctest.Test(t, func(t *testing.T) {
607+
ctx := context.Background()
608+
609+
// Storage with enough events to trigger limit
610+
storage := NewInMemoryStorage()
611+
storage.Store(ctx, makeEvent("event-1", "pubkey01", 1, 500))
612+
storage.Store(ctx, makeEvent("event-2", "pubkey01", 1, 400))
613+
storage.Store(ctx, makeEvent("event-3", "pubkey01", 1, 300))
614+
615+
// A handler that simulates RouterHandler:
616+
// sends EOSE immediately, then delivers a real-time event via trigger channel
617+
realTimeCh := make(chan struct{})
618+
realTimeEvent := makeEvent("realtime-1", "pubkey01", 1, 600)
619+
rtHandler := &triggeredRealTimeHandler{
620+
triggerCh: realTimeCh,
621+
event: realTimeEvent,
622+
}
623+
624+
mergeHandler := NewMergeHandler(NewStorageHandler(storage), rtHandler)
625+
626+
ctx, cancel := context.WithCancel(ctx)
627+
defer cancel()
628+
629+
send := make(chan *ServerMsg, 20)
630+
recv := make(chan *ClientMsg, 10)
631+
632+
go mergeHandler.ServeNostr(ctx, send, recv)
633+
634+
// Send a REQ with limit=2 (storage has 3 events, so limit will be reached)
635+
limit := int64(2)
636+
recv <- &ClientMsg{
637+
Type: MsgTypeReq,
638+
SubscriptionID: "sub1",
639+
Filters: []*ReqFilter{{Limit: &limit}},
640+
}
641+
642+
synctest.Wait()
643+
644+
// Collect messages until EOSE
645+
var beforeEOSE []*ServerMsg
646+
foundEOSE := false
647+
for {
648+
select {
649+
case msg := <-send:
650+
if msg.Type == MsgTypeEOSE {
651+
foundEOSE = true
652+
goto eoseReceived
653+
}
654+
beforeEOSE = append(beforeEOSE, msg)
655+
default:
656+
goto eoseReceived
657+
}
658+
}
659+
eoseReceived:
660+
assert.True(t, foundEOSE, "should receive EOSE")
661+
assert.LessOrEqual(t, len(beforeEOSE), 2, "should have at most 2 events before EOSE")
662+
663+
// Now trigger a real-time event AFTER EOSE
664+
close(realTimeCh)
665+
666+
synctest.Wait()
667+
668+
// The real-time event should be delivered (NOT dropped)
669+
foundRealTime := false
670+
for {
671+
select {
672+
case msg := <-send:
673+
if msg.Type == MsgTypeEvent && msg.Event.ID == "realtime-1" {
674+
foundRealTime = true
675+
}
676+
default:
677+
goto done
678+
}
679+
}
680+
done:
681+
assert.True(t, foundRealTime, "real-time event should be delivered after limit-triggered EOSE")
682+
})
683+
}
684+
685+
// triggeredRealTimeHandler sends EOSE immediately on REQ,
686+
// then waits for triggerCh to close before sending a real-time event.
687+
// This simulates RouterHandler behavior where real-time events arrive later.
688+
type triggeredRealTimeHandler struct {
689+
triggerCh <-chan struct{}
690+
event *Event
691+
}
692+
693+
func (h *triggeredRealTimeHandler) ServeNostr(ctx context.Context, send chan<- *ServerMsg, recv <-chan *ClientMsg) error {
694+
var subID string
695+
for {
696+
select {
697+
case <-ctx.Done():
698+
return ctx.Err()
699+
case msg, ok := <-recv:
700+
if !ok {
701+
return nil
702+
}
703+
if msg.Type == MsgTypeReq {
704+
subID = msg.SubscriptionID
705+
select {
706+
case <-ctx.Done():
707+
return ctx.Err()
708+
case send <- NewServerEOSEMsg(subID):
709+
}
710+
}
711+
case <-h.triggerCh:
712+
if subID != "" {
713+
select {
714+
case <-ctx.Done():
715+
return ctx.Err()
716+
case send <- NewServerEventMsg(subID, h.event):
717+
}
718+
// Only send once: set triggerCh to nil to stop selecting on it
719+
h.triggerCh = nil
720+
}
721+
}
722+
}
723+
}
724+
596725
// errorHandler is a handler that returns an error.
597726
type errorHandler struct {
598727
err error

0 commit comments

Comments
 (0)