Skip to content

Commit f720685

Browse files
Refactoring: end of sync handler for gRPC
1 parent 3d30978 commit f720685

File tree

3 files changed

+10
-3
lines changed

3 files changed

+10
-3
lines changed

examples/grpc/server.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ func (server *Server) SubscribeOnTime(req *pb.Request, stream pb.TimeService_Sub
6767
server.subscriptions,
6868
NewTimeSubscription(),
6969
nil,
70+
nil,
7071
)
7172
}
7273

pkg/modules/grpc/pb/general.pb.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/modules/grpc/server.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ type ServerStream[T any] interface {
9797
var subscriptionsCounter = new(atomic.Uint64)
9898

9999
// DefaultSubscribeOn - default subscribe server handler
100-
func DefaultSubscribeOn[T any, P any](stream ServerStream[P], subscriptions *Subscriptions[T, P], subscription Subscription[T, P], handler func(id uint64) error) error {
100+
func DefaultSubscribeOn[T any, P any](stream ServerStream[P], subscriptions *Subscriptions[T, P], subscription Subscription[T, P], handler func(id uint64) error, onEndOfSync func(id uint64) error) error {
101101
subscriptionID := subscriptionsCounter.Add(1)
102102

103103
if err := stream.SendMsg(&pb.SubscribeResponse{
@@ -108,12 +108,18 @@ func DefaultSubscribeOn[T any, P any](stream ServerStream[P], subscriptions *Sub
108108

109109
if handler != nil {
110110
if err := handler(subscriptionID); err != nil {
111-
return err
111+
return errors.Wrap(err, "synchronization")
112112
}
113113
}
114114

115115
subscriptions.Add(subscriptionID, subscription)
116116

117+
if onEndOfSync != nil {
118+
if err := onEndOfSync(subscriptionID); err != nil {
119+
return errors.Wrap(err, "end of sync handler")
120+
}
121+
}
122+
117123
loop:
118124
for {
119125
select {

0 commit comments

Comments
 (0)