77 netinternal "ashishkujoy/queue/proto"
88 context "context"
99 "fmt"
10- "log"
1110 "net"
1211 "sync"
1312
@@ -50,25 +49,30 @@ func NewQueueServer(config *config.Config, port string) (*QueueServer, error) {
5049}
5150
5251func (qs * QueueServer ) Enqueue (_ context.Context , req * netinternal.EnqueueRequest ) (* netinternal.EnqueueRequestResponse , error ) {
53- //qs.mu.Lock()
54- //defer qs.mu.Unlock()
5552 if err := qs .queueService .Enqueue (req .Message ); err != nil {
5653 return nil , status .Errorf (codes .Internal , "failed to enqueue" )
5754 }
58- qs .broadcastMessage ()
55+ go qs .broadcastMessage ()
5956 return & netinternal.EnqueueRequestResponse {Success : true }, nil
6057}
6158
6259func (qs * QueueServer ) broadcastMessage () {
63- //qs.mu.RLock()
64- //defer qs.mu.RUnlock()
6560 var closedChannels []uint64
61+ mu := sync.Mutex {}
62+ wg := sync.WaitGroup {}
6663 for _ , consumer := range qs .onlineConsumer {
67- if err := qs .serveMessages (consumer ); err != nil {
68- consumer .closeChannel <- "closed"
69- closedChannels = append (closedChannels , consumer .id )
70- }
64+ wg .Add (1 )
65+ go func (consumer * OnlineConsumer ) {
66+ defer wg .Done ()
67+ if err := qs .serveMessages (consumer ); err != nil {
68+ mu .Lock ()
69+ defer mu .Unlock ()
70+ consumer .closeChannel <- "closed"
71+ closedChannels = append (closedChannels , consumer .id )
72+ }
73+ }(consumer )
7174 }
75+ wg .Wait ()
7276 qs .onlineConsumer = internal .Filter (qs .onlineConsumer , func (consumer * OnlineConsumer ) bool {
7377 return ! internal .Contains (closedChannels , consumer .id )
7478 })
@@ -78,8 +82,6 @@ func (qs *QueueServer) ObserveQueue(req *netinternal.ObserveQueueRequest, stream
7882 closeChannel := make (chan interface {})
7983 consumer := & OnlineConsumer {id : req .ConsumerId , stream : stream , closeChannel : closeChannel }
8084 _ = qs .serveMessages (consumer )
81- //qs.mu.Lock()
82- //defer qs.mu.Unlock()
8385 qs .onlineConsumer = append (qs .onlineConsumer , consumer )
8486 <- closeChannel
8587 return nil
@@ -100,22 +102,15 @@ func (qs *QueueServer) serveMessages(consumer *OnlineConsumer) error {
100102 return nil
101103}
102104
103- func (qs * QueueServer ) Run (cancel <- chan interface {} ) error {
105+ func (qs * QueueServer ) Run () error {
104106 listener , err := net .Listen ("tcp" , ":50051" )
105107 if err != nil {
106108 fmt .Printf ("Error Creating listener %v\n " , err )
107109 return err
108110 }
109111 fmt .Println ("Created Listener" )
110- go func () {
111- if err := qs .gpServer .Serve (listener ); err != nil {
112- fmt .Printf ("Error Starting grpc serve %v" , err )
113- }
114- log .Printf ("GRPC server listening on %s" , qs .port )
115-
116- }()
117- <- cancel
118- _ = listener .Close ()
119- qs .gpServer .Stop ()
112+ if err := qs .gpServer .Serve (listener ); err != nil {
113+ return nil
114+ }
120115 return nil
121116}
0 commit comments