Skip to content

Commit b386216

Browse files
author
Mohammad Hamdan
committed
Location Tracker (Version 0.1.0)
1 parent 0bdad32 commit b386216

File tree

8 files changed

+302
-29
lines changed

8 files changed

+302
-29
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go 1.24.5
44

55
require (
66
github.com/golang/protobuf v1.5.4
7+
github.com/google/uuid v1.6.0
78
github.com/gorilla/websocket v1.5.3
89
github.com/redis/go-redis/v9 v9.11.0
910
github.com/stretchr/testify v1.10.0

main.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"net/http"
8+
"os"
9+
"os/signal"
10+
"sync"
11+
"syscall"
12+
"time"
13+
14+
"github.com/MohammadAsDjangoDev/geo_tracker/src/config"
15+
"github.com/MohammadAsDjangoDev/geo_tracker/src/infrastructure/repositories/redisrepo"
16+
"github.com/MohammadAsDjangoDev/geo_tracker/src/infrastructure/transport/grpcserver"
17+
"github.com/MohammadAsDjangoDev/geo_tracker/src/infrastructure/transport/websocket"
18+
"github.com/MohammadAsDjangoDev/geo_tracker/src/usecases"
19+
"github.com/redis/go-redis/v9"
20+
)
21+
22+
func main() {
23+
// Load config.yaml
24+
cfg, err := config.LoadConfig("config.yaml")
25+
if err != nil {
26+
log.Fatalf("Failed to load config.yaml: %v", err)
27+
}
28+
29+
redisAddr := fmt.Sprintf("%s:%d", cfg.Redis.Host, cfg.Redis.Port)
30+
log.Printf("Connecting to Redis at %s ...", redisAddr)
31+
32+
redisClient := redis.NewClient(&redis.Options{
33+
Addr: redisAddr,
34+
Username: cfg.Redis.Username,
35+
Password: cfg.Redis.Password,
36+
DB: cfg.Redis.DB,
37+
})
38+
39+
// Ping Redis to verify connection before proceeding
40+
if err := redisClient.Ping(context.Background()).Err(); err != nil {
41+
log.Fatalf("Failed to connect to Redis: %v", err)
42+
}
43+
log.Println("Successfully connected to Redis")
44+
45+
defer func() {
46+
log.Println("Closing Redis connection...")
47+
if err := redisClient.Close(); err != nil {
48+
log.Printf("Error closing Redis connection: %v", err)
49+
} else {
50+
log.Println("Redis connection closed")
51+
}
52+
}()
53+
54+
// Repository and Service
55+
repo := redisrepo.NewRedisTrackedItemRepository(redisClient)
56+
trackingService := usecases.NewTrackingService(repo)
57+
58+
// WebSocket server
59+
wsServer := websocket.NewServer(trackingService)
60+
61+
// gRPC server
62+
grpcSrv, err := grpcserver.RunGRPCServer(trackingService, 50051)
63+
if err != nil {
64+
log.Fatalf("failed to start gRPC server: %v", err)
65+
}
66+
67+
var wg sync.WaitGroup
68+
wg.Add(2)
69+
70+
// Run WebSocket server
71+
go func() {
72+
defer wg.Done()
73+
if err := wsServer.Start(":8080"); err != nil && err != http.ErrServerClosed {
74+
log.Printf("WebSocket server error: %v", err)
75+
}
76+
}()
77+
78+
// Run gRPC server goroutine, replaced blocking select{} with channel wait
79+
done := make(chan struct{})
80+
go func() {
81+
defer wg.Done()
82+
<-done
83+
}()
84+
85+
// Listen for termination signals (Ctrl+C)
86+
sigChan := make(chan os.Signal, 1)
87+
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
88+
89+
<-sigChan // wait for signal
90+
log.Println("Shutdown signal received, stopping servers...")
91+
92+
// Graceful shutdown context with timeout
93+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
94+
defer cancel()
95+
96+
// Shutdown WebSocket server
97+
if err := wsServer.Stop(ctx); err != nil {
98+
log.Printf("Error shutting down WebSocket server: %v", err)
99+
}
100+
101+
// Shutdown gRPC server
102+
grpcSrv.Stop()
103+
104+
// Signal gRPC goroutine to exit
105+
close(done)
106+
107+
// Wait for goroutines to finish
108+
wg.Wait()
109+
log.Println("Servers stopped, exiting.")
110+
}

src/infrastructure/repositories/redis_tracked_item_repository.go renamed to src/infrastructure/repositories/redisrepo/redis_tracked_item_repository.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package repositories
1+
package redisrepo
22

33
import (
44
"context"
@@ -40,13 +40,31 @@ func (r *RedisTrackedItemRepository) Save(ctx context.Context, item entities.Tra
4040
return r.client.Set(ctx, key, data, 0).Err()
4141
}
4242

43+
func (r *RedisTrackedItemRepository) Update(ctx context.Context, item entities.TrackedItem) error {
44+
key := r.key(item.ID)
45+
exists, err := r.client.Exists(ctx, key).Result()
46+
if err != nil {
47+
return err
48+
}
49+
if exists == 0 {
50+
return irepo.ErrItemNotFound
51+
}
52+
53+
data, err := json.Marshal(item)
54+
if err != nil {
55+
return err
56+
}
57+
58+
return r.client.Set(ctx, key, data, 0).Err()
59+
}
60+
4361
func (r *RedisTrackedItemRepository) UpdatePosition(ctx context.Context, id entities.ItemID, newPos entities.GeoPos) error {
4462
item, err := r.FindByID(ctx, id)
4563
if err != nil {
4664
return err
4765
}
4866
item.Position = newPos
49-
return r.Save(ctx, item)
67+
return r.Update(ctx, item)
5068
}
5169

5270
func (r *RedisTrackedItemRepository) Delete(ctx context.Context, id entities.ItemID) error {

src/infrastructure/transport/grpcserver/server.go

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,6 @@ import (
44
"context"
55
"fmt"
66
"net"
7-
"os"
8-
"os/signal"
9-
"syscall"
107

118
"github.com/MohammadAsDjangoDev/geo_tracker/src/entities"
129
"github.com/MohammadAsDjangoDev/geo_tracker/src/interfaces/grpcapi"
@@ -18,47 +15,62 @@ import (
1815
"google.golang.org/protobuf/types/known/emptypb"
1916
)
2017

18+
// Server struct holds grpc.Server and your existing Server implementation
2119
type Server struct {
2220
grpcapi.UnimplementedTrackingServiceServer
2321
service usecases.TrackingService
22+
grpcSrv *grpc.Server
23+
lis net.Listener
2424
}
2525

2626
func NewServer(service usecases.TrackingService) *Server {
27-
return &Server{service: service}
27+
return &Server{
28+
service: service,
29+
}
2830
}
2931

30-
// RunGRPCServer starts the gRPC server on the specified port with the given TrackingService implementation.
31-
func RunGRPCServer(service usecases.TrackingService, port int) error {
32+
// RunGRPCServer starts the gRPC server and returns a Server instance for graceful shutdown.
33+
// Important: this function no longer listens for OS signals — caller (main) should handle signals
34+
// and call the returned Server.Stop() to perform a graceful shutdown.
35+
func RunGRPCServer(service usecases.TrackingService, port int) (*Server, error) {
3236
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
3337
if err != nil {
34-
return fmt.Errorf("failed to listen: %w", err)
38+
return nil, fmt.Errorf("failed to listen: %w", err)
3539
}
3640

37-
grpcServer := grpc.NewServer()
41+
grpcSrv := grpc.NewServer()
3842
srv := NewServer(service)
43+
srv.grpcSrv = grpcSrv
44+
srv.lis = lis
3945

40-
// Register your server implementation with the gRPC server
41-
grpcapi.RegisterTrackingServiceServer(grpcServer, srv)
42-
43-
// Optional: enable reflection for easier debugging and tooling support (e.g., grpcurl)
44-
reflection.Register(grpcServer)
46+
grpcapi.RegisterTrackingServiceServer(grpcSrv, srv)
47+
reflection.Register(grpcSrv)
4548

46-
// Graceful shutdown on SIGINT or SIGTERM
49+
// Serve in a goroutine so RunGRPCServer returns immediately and caller controls lifecycle.
4750
go func() {
48-
c := make(chan os.Signal, 1)
49-
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
50-
<-c
51-
fmt.Println("Shutting down gRPC server...")
52-
grpcServer.GracefulStop()
51+
if err := grpcSrv.Serve(lis); err != nil {
52+
// Serve returns a non-nil error only on failure; log it for visibility.
53+
// This goroutine ends when Serve returns (e.g., after GracefulStop()).
54+
fmt.Printf("gRPC server stopped serving: %v\n", err)
55+
}
5356
}()
5457

5558
fmt.Printf("gRPC server listening on port %d\n", port)
56-
if err := grpcServer.Serve(lis); err != nil {
57-
return fmt.Errorf("failed to serve gRPC server: %w", err)
59+
return srv, nil
60+
}
61+
62+
// Stop gracefully stops the gRPC server and closes the listener
63+
func (s *Server) Stop() {
64+
if s.grpcSrv != nil {
65+
s.grpcSrv.GracefulStop()
66+
}
67+
if s.lis != nil {
68+
_ = s.lis.Close()
5869
}
59-
return nil
6070
}
6171

72+
// RPC handlers (unchanged)
73+
6274
func (s *Server) Register(ctx context.Context, req *grpcapi.RegisterRequest) (*emptypb.Empty, error) {
6375
if req.Item == nil {
6476
return nil, status.Error(codes.InvalidArgument, "item is required")

src/infrastructure/transport/websocket/server.go

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ package websocket
22

33
import (
44
"context"
5+
"io"
56
"log"
67
"net/http"
8+
"strings"
79
"sync"
810

911
"github.com/MohammadAsDjangoDev/geo_tracker/src/entities"
@@ -16,6 +18,8 @@ type Server struct {
1618
Upgrader websocket.Upgrader
1719
Clients map[entities.ItemID]*websocket.Conn
1820
Mu sync.Mutex
21+
22+
httpServer *http.Server
1923
}
2024

2125
func NewServer(service usecases.TrackingService) *Server {
@@ -63,16 +67,26 @@ func (s *Server) HandleWebSocket(w http.ResponseWriter, r *http.Request) {
6367
Latitude float64 `json:"latitude"`
6468
Longitude float64 `json:"longitude"`
6569
}
66-
if err := conn.ReadJSON(&update); err != nil {
67-
log.Println("Connection closed or error:", err)
70+
err := conn.ReadJSON(&update)
71+
72+
if err != nil {
73+
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
74+
log.Printf("WebSocket closed normally: %s", itemID)
75+
} else if err == io.EOF {
76+
log.Printf("WebSocket closed (EOF): %s", itemID)
77+
} else if strings.Contains(err.Error(), "close 1006") || strings.Contains(err.Error(), "unexpected EOF") {
78+
log.Printf("WebSocket closed with 1006 or unexpected EOF (abnormal but expected): %s", itemID)
79+
} else {
80+
log.Printf("WebSocket read error: %v", err)
81+
}
6882
break
6983
}
7084

7185
pos := entities.GeoPos{
7286
Latitude: update.Latitude,
7387
Longitude: update.Longitude,
7488
}
75-
err := s.Tracking.UpdatePosition(ctx, itemID, pos)
89+
err = s.Tracking.UpdatePosition(ctx, itemID, pos)
7690
if err != nil {
7791
log.Println("Failed to update position:", err)
7892
conn.WriteMessage(websocket.TextMessage, []byte("error: failed to update position"))
@@ -86,7 +100,33 @@ func (s *Server) HandleWebSocket(w http.ResponseWriter, r *http.Request) {
86100
}
87101

88102
func (s *Server) Start(addr string) error {
89-
http.HandleFunc("/ws", s.HandleWebSocket)
103+
mux := http.NewServeMux()
104+
mux.HandleFunc("/ws", s.HandleWebSocket)
105+
106+
s.httpServer = &http.Server{
107+
Addr: addr,
108+
Handler: mux,
109+
}
110+
90111
log.Println("WebSocket server started on", addr)
91-
return http.ListenAndServe(addr, nil)
112+
return s.httpServer.ListenAndServe()
113+
}
114+
115+
// Stop gracefully shuts down the HTTP server and closes all open websocket connections
116+
func (s *Server) Stop(ctx context.Context) error {
117+
if s.httpServer != nil {
118+
if err := s.httpServer.Shutdown(ctx); err != nil {
119+
return err
120+
}
121+
}
122+
123+
s.Mu.Lock()
124+
for id, conn := range s.Clients {
125+
_ = conn.Close()
126+
delete(s.Clients, id)
127+
}
128+
s.Mu.Unlock()
129+
130+
log.Println("WebSocket server stopped")
131+
return nil
92132
}

0 commit comments

Comments
 (0)