Skip to content

Commit 169f758

Browse files
committed
Temp commit, ammend it later
1 parent 0f2e9b4 commit 169f758

File tree

6 files changed

+159
-0
lines changed

6 files changed

+159
-0
lines changed

cmd/main.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package main
2+
3+
import (
4+
"ashishkujoy/queue/internal/config"
5+
netinternal "ashishkujoy/queue/internal/net"
6+
"log"
7+
"os"
8+
"os/signal"
9+
"syscall"
10+
)
11+
12+
func main() {
13+
conf := config.NewConfigWithMetadataPath("data/segments", "data/metadata", 1024*1024)
14+
server, err := netinternal.NewQueueServer(conf, ":8081")
15+
if err != nil {
16+
log.Fatalf("Failed to create server: %v", err)
17+
return
18+
}
19+
serverStop := make(chan interface{})
20+
stop := make(chan os.Signal, 1)
21+
signal.Notify(stop, syscall.SIGTERM, syscall.SIGINT)
22+
go func() {
23+
println("About to call run")
24+
server.Run(serverStop)
25+
println("Calling run done")
26+
}()
27+
<-stop
28+
serverStop <- "stop"
29+
}

generate-grcp-code.sh

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#! /bin/bash
2+
3+
protoc --go_out=. --go_opt=paths=source_relative \
4+
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
5+
proto/queue.proto

go.mod

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@ require (
1010
github.com/pmezard/go-difflib v1.0.0 // indirect
1111
github.com/rakyll/gotest v0.0.6 // indirect
1212
github.com/stretchr/testify v1.10.0 // indirect
13+
golang.org/x/net v0.35.0 // indirect
1314
golang.org/x/sys v0.32.0 // indirect
15+
golang.org/x/text v0.22.0 // indirect
16+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a // indirect
17+
google.golang.org/grpc v1.72.0 // indirect
18+
google.golang.org/protobuf v1.36.5 // indirect
1419
gopkg.in/yaml.v3 v3.0.1 // indirect
1520
)

go.sum

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,21 @@ github.com/rakyll/gotest v0.0.6 h1:hBTqkO3jiuwYW/M9gL4bu0oTYcm8J6knQAAPUsJsz1I=
1616
github.com/rakyll/gotest v0.0.6/go.mod h1:SkoesdNCWmiD4R2dljIUcfSnNdVZ12y8qK4ojDkc2Sc=
1717
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
1818
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
19+
golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8=
20+
golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk=
1921
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
2022
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
2123
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
2224
golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
2325
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
26+
golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM=
27+
golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY=
28+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a h1:51aaUVRocpvUOSQKM6Q7VuoaktNIaMCLuhZB6DKksq4=
29+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a/go.mod h1:uRxBH1mhmO8PGhU89cMcHaXKZqO+OfakD8QQO0oYwlQ=
30+
google.golang.org/grpc v1.72.0 h1:S7UkcVa60b5AAQTaO6ZKamFp1zMZSU0fGDK2WZLbBnM=
31+
google.golang.org/grpc v1.72.0/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM=
32+
google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM=
33+
google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
2434
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
2535
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
2636
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

internal/net/server.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package netinternal
2+
3+
import (
4+
"ashishkujoy/queue/internal/config"
5+
queueinternal "ashishkujoy/queue/internal/queue"
6+
context "context"
7+
"fmt"
8+
"log"
9+
"net"
10+
11+
grpc "google.golang.org/grpc"
12+
codes "google.golang.org/grpc/codes"
13+
status "google.golang.org/grpc/status"
14+
)
15+
16+
type QueueServer struct {
17+
UnimplementedQueueServiceServer
18+
queueService *queueinternal.QueueService
19+
port string
20+
gpServer *grpc.Server
21+
}
22+
23+
func NewQueueServer(config *config.Config, port string) (*QueueServer, error) {
24+
service, err := queueinternal.NewQueueService(config)
25+
if err != nil {
26+
return nil, err
27+
}
28+
29+
gpServer := grpc.NewServer()
30+
server := &QueueServer{
31+
port: port,
32+
queueService: service,
33+
gpServer: gpServer,
34+
}
35+
RegisterQueueServiceServer(gpServer, server)
36+
return server, nil
37+
}
38+
39+
func (qs *QueueServer) Enqueue(ctx context.Context, req *EnqueuRequest) (*EnqueuRequestResponse, error) {
40+
qs.queueService.Enqueue(req.Message)
41+
qs.broadcastMessage()
42+
return nil, status.Errorf(codes.Unimplemented, "method Enqueue not implemented")
43+
}
44+
45+
func (qs *QueueServer) broadcastMessage() {
46+
// panic("unimplemented")
47+
}
48+
49+
func (qs *QueueServer) ObserveQueue(req *ObserveQueueRequest, stream grpc.ServerStreamingServer[QueueMessage]) error {
50+
return qs.serveMessages(req, stream)
51+
}
52+
53+
func (qs *QueueServer) serveMessages(req *ObserveQueueRequest, stream grpc.ServerStreamingServer[QueueMessage]) error {
54+
for {
55+
msg, err := qs.queueService.Dequeue(int(req.ConsumerId))
56+
if err != nil {
57+
break
58+
}
59+
err = stream.Send(&QueueMessage{Message: msg})
60+
if err != nil {
61+
return err
62+
}
63+
}
64+
65+
return nil
66+
}
67+
68+
func (qs *QueueServer) Run(cancel <-chan interface{}) error {
69+
listener, err := net.Listen("tcp", ":50051")
70+
if err != nil {
71+
fmt.Printf("Error Creating listener %v\n", err)
72+
return err
73+
}
74+
fmt.Println("Created Listener")
75+
go func() {
76+
if err := qs.gpServer.Serve(listener); err != nil {
77+
fmt.Printf("Error Starting grpc serve %v", err)
78+
}
79+
log.Printf("GRPC server listening on %s", qs.port)
80+
81+
}()
82+
<-cancel
83+
listener.Close()
84+
qs.gpServer.Stop()
85+
return nil
86+
}

proto/queue.proto

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
syntax = "proto3";
2+
3+
option go_package = "ashishkujoy/queue/net;netinternal";
4+
5+
message EnqueuRequest {
6+
bytes message = 1;
7+
}
8+
9+
message EnqueuRequestResponse {
10+
bool success = 1;
11+
}
12+
13+
message ObserveQueueRequest {
14+
uint64 consumerId = 1;
15+
}
16+
17+
message QueueMessage {
18+
bytes message = 1;
19+
}
20+
21+
service QueueService {
22+
rpc Enqueue(EnqueuRequest) returns (EnqueuRequestResponse);
23+
rpc ObserveQueue(ObserveQueueRequest) returns (stream QueueMessage);
24+
}

0 commit comments

Comments
 (0)