Skip to content

Commit 514266d

Browse files
committed
Adds cli client
1 parent 5b35316 commit 514266d

File tree

7 files changed

+135
-46
lines changed

7 files changed

+135
-46
lines changed

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
11
.vscode
22
.idea
3-
main
3+
main
4+
data
5+
cli-client
6+
.vscode

cmd/cli/cli-client.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package main
2+
3+
import (
4+
netinternal "ashishkujoy/queue/proto"
5+
"context"
6+
"flag"
7+
"fmt"
8+
"google.golang.org/grpc"
9+
"google.golang.org/grpc/credentials/insecure"
10+
"io"
11+
"log"
12+
"time"
13+
)
14+
15+
type CLIOptions struct {
16+
msg string
17+
publish bool
18+
consumerId uint64
19+
}
20+
21+
func NewCLIOptions() *CLIOptions {
22+
msg := flag.String("msg", "", "message to send")
23+
publish := flag.Bool("publish", false, "publish message to the queue")
24+
consumerId := flag.Uint64("consumer-id", 0, "consumer id")
25+
26+
flag.Parse()
27+
28+
return &CLIOptions{
29+
msg: *msg,
30+
publish: *publish,
31+
consumerId: *consumerId,
32+
}
33+
}
34+
35+
func createQueueClient() netinternal.QueueServiceClient {
36+
conn, err := grpc.NewClient("localhost:50051", grpc.WithTransportCredentials(insecure.NewCredentials()))
37+
if err != nil {
38+
log.Fatalf("failed to connect: %v", err)
39+
}
40+
client := netinternal.NewQueueServiceClient(conn)
41+
return client
42+
}
43+
44+
func enqueueMsg(cliOptions *CLIOptions, client netinternal.QueueServiceClient) {
45+
fmt.Printf("publishing message to queue: %s\n", cliOptions.msg)
46+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
47+
defer cancel()
48+
_, err := client.Enqueue(ctx, &netinternal.EnqueueRequest{Message: []byte(cliOptions.msg)})
49+
if err != nil {
50+
log.Fatalf("failed to enqueue: %v", err)
51+
}
52+
return
53+
}
54+
55+
func observeQueueMsg(cliOptions *CLIOptions, client netinternal.QueueServiceClient) {
56+
fmt.Printf("Observing message from queue, consumer id = %d\n", cliOptions.consumerId)
57+
queue, err := client.ObserveQueue(context.Background(), &netinternal.ObserveQueueRequest{ConsumerId: cliOptions.consumerId})
58+
if err != nil {
59+
log.Fatalf("failed to observe: %v", err)
60+
}
61+
for {
62+
queueMessage, err := queue.Recv()
63+
if err == io.EOF {
64+
break
65+
}
66+
if err != nil {
67+
log.Fatalf("failed to receive: %v", err)
68+
}
69+
fmt.Printf("received message: %s\n", string(queueMessage.Message))
70+
}
71+
}
72+
73+
func main() {
74+
cliOptions := NewCLIOptions()
75+
client := createQueueClient()
76+
77+
if cliOptions.publish {
78+
enqueueMsg(cliOptions, client)
79+
return
80+
}
81+
82+
observeQueueMsg(cliOptions, client)
83+
}

cmd/main.go renamed to cmd/server/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111

1212
func main() {
1313
conf := config.NewConfigWithMetadataPath("data/segments", "data/metadata", 1024*1024)
14-
server, err := netinternal.NewQueueServer(conf, ":8081")
14+
server, err := netinternal.NewQueueServer(conf, ":50051")
1515
if err != nil {
1616
log.Fatalf("Failed to create server: %v", err)
1717
return

internal/net/server.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,12 @@ func NewQueueServer(config *config.Config, port string) (*QueueServer, error) {
3737
return server, nil
3838
}
3939

40-
func (qs *QueueServer) Enqueue(ctx context.Context, req *netinternal.EnqueuRequest) (*netinternal.EnqueuRequestResponse, error) {
41-
qs.queueService.Enqueue(req.Message)
40+
func (qs *QueueServer) Enqueue(ctx context.Context, req *netinternal.EnqueueRequest) (*netinternal.EnqueueRequestResponse, error) {
41+
if err := qs.queueService.Enqueue(req.Message); err != nil {
42+
return nil, status.Errorf(codes.Internal, "failed to enqueue")
43+
}
4244
qs.broadcastMessage()
43-
return nil, status.Errorf(codes.Unimplemented, "method Enqueue not implemented")
45+
return &netinternal.EnqueueRequestResponse{Success: true}, nil
4446
}
4547

4648
func (qs *QueueServer) broadcastMessage() {
@@ -57,6 +59,7 @@ func (qs *QueueServer) serveMessages(req *netinternal.ObserveQueueRequest, strea
5759
if err != nil {
5860
break
5961
}
62+
fmt.Printf("Message dequeued: %v\n", msg)
6063
err = stream.Send(&netinternal.QueueMessage{Message: msg})
6164
if err != nil {
6265
return err

proto/queue.pb.go

Lines changed: 31 additions & 31 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

proto/queue.proto

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@ syntax = "proto3";
22

33
option go_package = "ashishkujoy/queue/net;netinternal";
44

5-
message EnqueuRequest {
5+
message EnqueueRequest {
66
bytes message = 1;
77
}
88

9-
message EnqueuRequestResponse {
9+
message EnqueueRequestResponse {
1010
bool success = 1;
1111
}
1212

@@ -19,6 +19,6 @@ message QueueMessage {
1919
}
2020

2121
service QueueService {
22-
rpc Enqueue(EnqueuRequest) returns (EnqueuRequestResponse);
22+
rpc Enqueue(EnqueueRequest) returns (EnqueueRequestResponse);
2323
rpc ObserveQueue(ObserveQueueRequest) returns (stream QueueMessage);
2424
}

proto/queue_grpc.pb.go

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)