Skip to content

Commit e5cac9e

Browse files
committed
Reverts consumer index on consumption failure
1 parent 97d9151 commit e5cac9e

File tree

2 files changed

+10
-4
lines changed

2 files changed

+10
-4
lines changed

internal/net/server.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@ import (
55
"ashishkujoy/queue/internal/config"
66
queueinternal "ashishkujoy/queue/internal/queue"
77
netinternal "ashishkujoy/queue/proto"
8-
context "context"
8+
"context"
99
"fmt"
1010
"net"
1111
"sync"
1212

13-
grpc "google.golang.org/grpc"
14-
codes "google.golang.org/grpc/codes"
15-
status "google.golang.org/grpc/status"
13+
"google.golang.org/grpc"
14+
"google.golang.org/grpc/codes"
15+
"google.golang.org/grpc/status"
1616
)
1717

1818
type MessageOutputStream = grpc.ServerStreamingServer[netinternal.QueueMessage]
@@ -67,6 +67,7 @@ func (qs *QueueServer) broadcastMessage() {
6767
if err := qs.serveMessages(consumer); err != nil {
6868
mu.Lock()
6969
defer mu.Unlock()
70+
qs.queueService.RevertDequeue(int(consumer.id))
7071
consumer.closeChannel <- "closed"
7172
closedChannels = append(closedChannels, consumer.id)
7273
}

internal/queue/queue_service.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,8 @@ func (qs *QueueService) Dequeue(consumerId int) ([]byte, error) {
4040
qs.consumerIndex.WriteIndex(consumerId, index+1)
4141
return data, nil
4242
}
43+
44+
func (qs *QueueService) RevertDequeue(consumerId int) {
45+
index := qs.consumerIndex.ReadIndex(consumerId)
46+
qs.consumerIndex.WriteIndex(consumerId, index-1)
47+
}

0 commit comments

Comments
 (0)