Skip to content

Commit bf1ca98

Browse files
committed
Publishes new messages to online consumers
1 parent 514266d commit bf1ca98

File tree

3 files changed

+72
-19
lines changed

3 files changed

+72
-19
lines changed

cmd/server/main.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ func main() {
1919
serverStop := make(chan interface{})
2020
stop := make(chan os.Signal, 1)
2121
signal.Notify(stop, syscall.SIGTERM, syscall.SIGINT)
22-
go func() {
23-
println("About to call run")
24-
server.Run(serverStop)
25-
println("Calling run done")
26-
}()
22+
23+
err = server.Run(serverStop)
24+
if err != nil {
25+
return
26+
}
2727
<-stop
2828
serverStop <- "stop"
2929
}

internal/arrayUtils.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package internal
2+
3+
func Contains[T comparable](arr []T, element T) bool {
4+
for _, e := range arr {
5+
if e == element {
6+
return true
7+
}
8+
}
9+
10+
return false
11+
}
12+
13+
func Filter[T any](elements []T, f func(T) bool) []T {
14+
var filtered []T
15+
for _, element := range elements {
16+
if f(element) {
17+
filtered = append(filtered, element)
18+
}
19+
}
20+
21+
return filtered
22+
}

internal/net/server.go

Lines changed: 45 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,34 @@
11
package netinternal
22

33
import (
4+
"ashishkujoy/queue/internal"
45
"ashishkujoy/queue/internal/config"
56
queueinternal "ashishkujoy/queue/internal/queue"
67
netinternal "ashishkujoy/queue/proto"
78
context "context"
89
"fmt"
910
"log"
1011
"net"
12+
"sync"
1113

1214
grpc "google.golang.org/grpc"
1315
codes "google.golang.org/grpc/codes"
1416
status "google.golang.org/grpc/status"
1517
)
1618

19+
type MessageOutputStream = grpc.ServerStreamingServer[netinternal.QueueMessage]
20+
type OnlineConsumer struct {
21+
id uint64
22+
stream MessageOutputStream
23+
closeChannel chan<- interface{}
24+
}
1725
type QueueServer struct {
1826
netinternal.UnimplementedQueueServiceServer
19-
queueService *queueinternal.QueueService
20-
port string
21-
gpServer *grpc.Server
27+
queueService *queueinternal.QueueService
28+
port string
29+
gpServer *grpc.Server
30+
onlineConsumer []*OnlineConsumer
31+
mu *sync.RWMutex
2232
}
2333

2434
func NewQueueServer(config *config.Config, port string) (*QueueServer, error) {
@@ -29,15 +39,19 @@ func NewQueueServer(config *config.Config, port string) (*QueueServer, error) {
2939

3040
gpServer := grpc.NewServer()
3141
server := &QueueServer{
32-
port: port,
33-
queueService: service,
34-
gpServer: gpServer,
42+
port: port,
43+
queueService: service,
44+
gpServer: gpServer,
45+
onlineConsumer: make([]*OnlineConsumer, 0),
46+
mu: &sync.RWMutex{},
3547
}
3648
netinternal.RegisterQueueServiceServer(gpServer, server)
3749
return server, nil
3850
}
3951

40-
func (qs *QueueServer) Enqueue(ctx context.Context, req *netinternal.EnqueueRequest) (*netinternal.EnqueueRequestResponse, error) {
52+
func (qs *QueueServer) Enqueue(_ context.Context, req *netinternal.EnqueueRequest) (*netinternal.EnqueueRequestResponse, error) {
53+
//qs.mu.Lock()
54+
//defer qs.mu.Unlock()
4155
if err := qs.queueService.Enqueue(req.Message); err != nil {
4256
return nil, status.Errorf(codes.Internal, "failed to enqueue")
4357
}
@@ -46,21 +60,38 @@ func (qs *QueueServer) Enqueue(ctx context.Context, req *netinternal.EnqueueRequ
4660
}
4761

4862
func (qs *QueueServer) broadcastMessage() {
49-
// panic("unimplemented")
63+
//qs.mu.RLock()
64+
//defer qs.mu.RUnlock()
65+
var closedChannels []uint64
66+
for _, consumer := range qs.onlineConsumer {
67+
if err := qs.serveMessages(consumer); err != nil {
68+
consumer.closeChannel <- "closed"
69+
closedChannels = append(closedChannels, consumer.id)
70+
}
71+
}
72+
qs.onlineConsumer = internal.Filter(qs.onlineConsumer, func(consumer *OnlineConsumer) bool {
73+
return !internal.Contains(closedChannels, consumer.id)
74+
})
5075
}
5176

5277
func (qs *QueueServer) ObserveQueue(req *netinternal.ObserveQueueRequest, stream grpc.ServerStreamingServer[netinternal.QueueMessage]) error {
53-
return qs.serveMessages(req, stream)
78+
closeChannel := make(chan interface{})
79+
consumer := &OnlineConsumer{id: req.ConsumerId, stream: stream, closeChannel: closeChannel}
80+
_ = qs.serveMessages(consumer)
81+
//qs.mu.Lock()
82+
//defer qs.mu.Unlock()
83+
qs.onlineConsumer = append(qs.onlineConsumer, consumer)
84+
<-closeChannel
85+
return nil
5486
}
5587

56-
func (qs *QueueServer) serveMessages(req *netinternal.ObserveQueueRequest, stream grpc.ServerStreamingServer[netinternal.QueueMessage]) error {
88+
func (qs *QueueServer) serveMessages(consumer *OnlineConsumer) error {
5789
for {
58-
msg, err := qs.queueService.Dequeue(int(req.ConsumerId))
90+
msg, err := qs.queueService.Dequeue(int(consumer.id))
5991
if err != nil {
6092
break
6193
}
62-
fmt.Printf("Message dequeued: %v\n", msg)
63-
err = stream.Send(&netinternal.QueueMessage{Message: msg})
94+
err = consumer.stream.Send(&netinternal.QueueMessage{Message: msg})
6495
if err != nil {
6596
return err
6697
}
@@ -84,7 +115,7 @@ func (qs *QueueServer) Run(cancel <-chan interface{}) error {
84115

85116
}()
86117
<-cancel
87-
listener.Close()
118+
_ = listener.Close()
88119
qs.gpServer.Stop()
89120
return nil
90121
}

0 commit comments

Comments
 (0)