Skip to content

Commit eda0d1d

Browse files
committed
Adds grpc server
1 parent 0f2e9b4 commit eda0d1d

File tree

11 files changed

+629
-20
lines changed

11 files changed

+629
-20
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
.vscode
2+
.idea

cmd/main.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package main
2+
3+
import (
4+
"ashishkujoy/queue/internal/config"
5+
netinternal "ashishkujoy/queue/internal/net"
6+
"log"
7+
"os"
8+
"os/signal"
9+
"syscall"
10+
)
11+
12+
func main() {
13+
conf := config.NewConfigWithMetadataPath("data/segments", "data/metadata", 1024*1024)
14+
server, err := netinternal.NewQueueServer(conf, ":8081")
15+
if err != nil {
16+
log.Fatalf("Failed to create server: %v", err)
17+
return
18+
}
19+
serverStop := make(chan interface{})
20+
stop := make(chan os.Signal, 1)
21+
signal.Notify(stop, syscall.SIGTERM, syscall.SIGINT)
22+
go func() {
23+
println("About to call run")
24+
server.Run(serverStop)
25+
println("Calling run done")
26+
}()
27+
<-stop
28+
serverStop <- "stop"
29+
}

dev-setup.sh

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
2+
## Install golang grpc
3+
4+
brew install protobuf
5+
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
6+
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

generate-grcp-code.sh

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#! /bin/bash
2+
3+
protoc --go_out=. --go_opt=paths=source_relative \
4+
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
5+
proto/queue.proto

go.mod

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,18 @@ module ashishkujoy/queue
22

33
go 1.24
44

5+
require (
6+
github.com/stretchr/testify v1.10.0
7+
google.golang.org/grpc v1.72.0
8+
)
9+
510
require (
611
github.com/davecgh/go-spew v1.1.1 // indirect
7-
github.com/fatih/color v1.18.0 // indirect
8-
github.com/mattn/go-colorable v0.1.14 // indirect
9-
github.com/mattn/go-isatty v0.0.20 // indirect
1012
github.com/pmezard/go-difflib v1.0.0 // indirect
11-
github.com/rakyll/gotest v0.0.6 // indirect
12-
github.com/stretchr/testify v1.10.0 // indirect
13+
golang.org/x/net v0.35.0 // indirect
1314
golang.org/x/sys v0.32.0 // indirect
15+
golang.org/x/text v0.22.0 // indirect
16+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a // indirect
17+
google.golang.org/protobuf v1.36.5 // indirect
1418
gopkg.in/yaml.v3 v3.0.1 // indirect
1519
)

go.sum

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,44 @@
11
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
22
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3-
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
4-
github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM=
5-
github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU=
6-
github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
7-
github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE=
8-
github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8=
9-
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
10-
github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE=
11-
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
12-
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
3+
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
4+
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
5+
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
6+
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
7+
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
8+
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
9+
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
10+
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
11+
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
12+
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
1313
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
1414
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
15-
github.com/rakyll/gotest v0.0.6 h1:hBTqkO3jiuwYW/M9gL4bu0oTYcm8J6knQAAPUsJsz1I=
16-
github.com/rakyll/gotest v0.0.6/go.mod h1:SkoesdNCWmiD4R2dljIUcfSnNdVZ12y8qK4ojDkc2Sc=
1715
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
1816
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
19-
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
20-
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
21-
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
17+
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
18+
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
19+
go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY=
20+
go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI=
21+
go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ=
22+
go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE=
23+
go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A=
24+
go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU=
25+
go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk=
26+
go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w=
27+
go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k=
28+
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
29+
golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8=
30+
golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk=
2231
golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
2332
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
33+
golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM=
34+
golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY=
35+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a h1:51aaUVRocpvUOSQKM6Q7VuoaktNIaMCLuhZB6DKksq4=
36+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a/go.mod h1:uRxBH1mhmO8PGhU89cMcHaXKZqO+OfakD8QQO0oYwlQ=
37+
google.golang.org/grpc v1.72.0 h1:S7UkcVa60b5AAQTaO6ZKamFp1zMZSU0fGDK2WZLbBnM=
38+
google.golang.org/grpc v1.72.0/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM=
39+
google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM=
40+
google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
41+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
2442
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
2543
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
2644
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

internal/net/server.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package netinternal
2+
3+
import (
4+
"ashishkujoy/queue/internal/config"
5+
queueinternal "ashishkujoy/queue/internal/queue"
6+
netinternal "ashishkujoy/queue/proto"
7+
context "context"
8+
"fmt"
9+
"log"
10+
"net"
11+
12+
grpc "google.golang.org/grpc"
13+
codes "google.golang.org/grpc/codes"
14+
status "google.golang.org/grpc/status"
15+
)
16+
17+
type QueueServer struct {
18+
netinternal.UnimplementedQueueServiceServer
19+
queueService *queueinternal.QueueService
20+
port string
21+
gpServer *grpc.Server
22+
}
23+
24+
func NewQueueServer(config *config.Config, port string) (*QueueServer, error) {
25+
service, err := queueinternal.NewQueueService(config)
26+
if err != nil {
27+
return nil, err
28+
}
29+
30+
gpServer := grpc.NewServer()
31+
server := &QueueServer{
32+
port: port,
33+
queueService: service,
34+
gpServer: gpServer,
35+
}
36+
netinternal.RegisterQueueServiceServer(gpServer, server)
37+
return server, nil
38+
}
39+
40+
func (qs *QueueServer) Enqueue(ctx context.Context, req *netinternal.EnqueuRequest) (*netinternal.EnqueuRequestResponse, error) {
41+
qs.queueService.Enqueue(req.Message)
42+
qs.broadcastMessage()
43+
return nil, status.Errorf(codes.Unimplemented, "method Enqueue not implemented")
44+
}
45+
46+
func (qs *QueueServer) broadcastMessage() {
47+
// panic("unimplemented")
48+
}
49+
50+
func (qs *QueueServer) ObserveQueue(req *netinternal.ObserveQueueRequest, stream grpc.ServerStreamingServer[netinternal.QueueMessage]) error {
51+
return qs.serveMessages(req, stream)
52+
}
53+
54+
func (qs *QueueServer) serveMessages(req *netinternal.ObserveQueueRequest, stream grpc.ServerStreamingServer[netinternal.QueueMessage]) error {
55+
for {
56+
msg, err := qs.queueService.Dequeue(int(req.ConsumerId))
57+
if err != nil {
58+
break
59+
}
60+
err = stream.Send(&netinternal.QueueMessage{Message: msg})
61+
if err != nil {
62+
return err
63+
}
64+
}
65+
66+
return nil
67+
}
68+
69+
func (qs *QueueServer) Run(cancel <-chan interface{}) error {
70+
listener, err := net.Listen("tcp", ":50051")
71+
if err != nil {
72+
fmt.Printf("Error Creating listener %v\n", err)
73+
return err
74+
}
75+
fmt.Println("Created Listener")
76+
go func() {
77+
if err := qs.gpServer.Serve(listener); err != nil {
78+
fmt.Printf("Error Starting grpc serve %v", err)
79+
}
80+
log.Printf("GRPC server listening on %s", qs.port)
81+
82+
}()
83+
<-cancel
84+
listener.Close()
85+
qs.gpServer.Stop()
86+
return nil
87+
}

main

14.6 MB
Binary file not shown.

0 commit comments

Comments
 (0)