@@ -3,6 +3,7 @@ package itest
33import (
44 "bytes"
55 "context"
6+ "io"
67 "testing"
78 "time"
89
@@ -61,6 +62,51 @@ type ClientEventStream[T any] interface {
6162type EventSubscription [T any ] struct {
6263 ClientEventStream [T ]
6364 Cancel context.CancelFunc
65+
66+ // ShouldNotify is an optional filter predicate function that can be
67+ // used to filter events received from the client stream.
68+ //
69+ // If set, it will be called for each event received from the stream. If
70+ // it returns true, the event is returned. If it returns false, the
71+ // event is ignored and the next event is received from the stream.
72+ ShouldNotify func (T ) (bool , error )
73+ }
74+
75+ // Recv receives an event from the client stream. If a filter is set, it will
76+ // check if the event matches the filter. If it does, it returns the event.
77+ // If not, it continues receiving events until it finds one that matches the
78+ // filter.
79+ func (e * EventSubscription [T ]) Recv () (T , error ) {
80+ var zero T
81+
82+ // If no filter predicate is set, we can just return the event.
83+ if e .ShouldNotify == nil {
84+ return e .ClientEventStream .Recv ()
85+ }
86+
87+ // If a filter is set, we need to check if the event matches the
88+ // filter. If it does, we return the event. If not, we continue
89+ // receiving events until we find one that matches the filter.
90+ for {
91+ event , err := e .ClientEventStream .Recv ()
92+ if err != nil {
93+ if err == io .EOF {
94+ // Handle end of stream.
95+ return zero , err
96+ }
97+
98+ return zero , err
99+ }
100+
101+ match , err := e .ShouldNotify (event )
102+ if err != nil {
103+ return zero , err
104+ }
105+
106+ if match {
107+ return event , nil
108+ }
109+ }
64110}
65111
66112// CopyRequest is a helper function to copy a request so that we can modify it.
0 commit comments