Skip to content

Commit d04c24c

Browse files
committed
Removes offline consumer from broadcast
1 parent e5cac9e commit d04c24c

File tree

3 files changed

+20
-13
lines changed

3 files changed

+20
-13
lines changed

cmd/cli/cli-client.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,13 @@ func NewCLIOptions() *CLIOptions {
3232
}
3333
}
3434

35-
func createQueueClient() netinternal.QueueServiceClient {
35+
func createQueueClient() (netinternal.QueueServiceClient, *grpc.ClientConn) {
3636
conn, err := grpc.NewClient("localhost:50051", grpc.WithTransportCredentials(insecure.NewCredentials()))
3737
if err != nil {
3838
log.Fatalf("failed to connect: %v", err)
3939
}
4040
client := netinternal.NewQueueServiceClient(conn)
41-
return client
41+
return client, conn
4242
}
4343

4444
func enqueueMsg(cliOptions *CLIOptions, client netinternal.QueueServiceClient) {
@@ -72,7 +72,8 @@ func observeQueueMsg(cliOptions *CLIOptions, client netinternal.QueueServiceClie
7272

7373
func main() {
7474
cliOptions := NewCLIOptions()
75-
client := createQueueClient()
75+
client, conn := createQueueClient()
76+
defer conn.Close()
7677

7778
if cliOptions.publish {
7879
enqueueMsg(cliOptions, client)

cmd/server/main.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,7 @@ func main() {
1616
}
1717

1818
err = server.Run()
19+
if err != nil {
20+
log.Fatalf("Server run failed: %v", err)
21+
}
1922
}

internal/net/server.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,8 @@ import (
1717

1818
type MessageOutputStream = grpc.ServerStreamingServer[netinternal.QueueMessage]
1919
type OnlineConsumer struct {
20-
id uint64
21-
stream MessageOutputStream
22-
closeChannel chan<- interface{}
20+
id uint64
21+
stream MessageOutputStream
2322
}
2423
type QueueServer struct {
2524
netinternal.UnimplementedQueueServiceServer
@@ -56,6 +55,12 @@ func (qs *QueueServer) Enqueue(_ context.Context, req *netinternal.EnqueueReques
5655
return &netinternal.EnqueueRequestResponse{Success: true}, nil
5756
}
5857

58+
func removeClosedConsumers(closedChannels []uint64, consumers []*OnlineConsumer) []*OnlineConsumer {
59+
return internal.Filter(consumers, func(consumer *OnlineConsumer) bool {
60+
return !internal.Contains(closedChannels, consumer.id)
61+
})
62+
}
63+
5964
func (qs *QueueServer) broadcastMessage() {
6065
var closedChannels []uint64
6166
mu := sync.Mutex{}
@@ -68,23 +73,21 @@ func (qs *QueueServer) broadcastMessage() {
6873
mu.Lock()
6974
defer mu.Unlock()
7075
qs.queueService.RevertDequeue(int(consumer.id))
71-
consumer.closeChannel <- "closed"
7276
closedChannels = append(closedChannels, consumer.id)
7377
}
7478
}(consumer)
7579
}
7680
wg.Wait()
77-
qs.onlineConsumer = internal.Filter(qs.onlineConsumer, func(consumer *OnlineConsumer) bool {
78-
return !internal.Contains(closedChannels, consumer.id)
79-
})
81+
qs.onlineConsumer = removeClosedConsumers(closedChannels, qs.onlineConsumer)
8082
}
8183

8284
func (qs *QueueServer) ObserveQueue(req *netinternal.ObserveQueueRequest, stream grpc.ServerStreamingServer[netinternal.QueueMessage]) error {
83-
closeChannel := make(chan interface{})
84-
consumer := &OnlineConsumer{id: req.ConsumerId, stream: stream, closeChannel: closeChannel}
85+
consumer := &OnlineConsumer{id: req.ConsumerId, stream: stream}
8586
_ = qs.serveMessages(consumer)
8687
qs.onlineConsumer = append(qs.onlineConsumer, consumer)
87-
<-closeChannel
88+
<-stream.Context().Done()
89+
closedConsumers := []uint64{req.ConsumerId}
90+
qs.onlineConsumer = removeClosedConsumers(closedConsumers, qs.onlineConsumer)
8891
return nil
8992
}
9093

0 commit comments

Comments
 (0)