Skip to content

Commit fcd0a5f

Browse files
committed
Week 3 Day 15: Implement P2P GossipSub layer and configure regional OSRM routing backend
1 parent 1c2006e commit fcd0a5f

File tree

11 files changed

+947
-3
lines changed

11 files changed

+947
-3
lines changed

.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,8 @@ docker/osrm/data/
7272
# Artifacts
7373
artifacts/*
7474
*.tar.gz
75+
76+
# P2P and OSRM Local Data
77+
backend/p2p-node/p2p-daemon
78+
backend/p2p-node/test_ws.js
79+
osrm-data/

backend/p2p-node/api.go

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
package main
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"log"
7+
"net/http"
8+
"sync"
9+
10+
"github.com/gorilla/websocket"
11+
)
12+
13+
var upgrader = websocket.Upgrader{
14+
ReadBufferSize: 1024,
15+
WriteBufferSize: 1024,
16+
CheckOrigin: func(r *http.Request) bool {
17+
return true // Allow all cross-origin connections for local testing
18+
},
19+
}
20+
21+
// APIServer handles HTTP requests and WebSocket connections
22+
type APIServer struct {
23+
port int
24+
pubSubManager *PubSubManager
25+
msgChan chan IncidentMessage
26+
clients map[*websocket.Conn]bool
27+
clientsMux sync.RWMutex
28+
}
29+
30+
// NewAPIServer creates a new API Server instance
31+
func NewAPIServer(port int, psm *PubSubManager, msgChan chan IncidentMessage) *APIServer {
32+
return &APIServer{
33+
port: port,
34+
pubSubManager: psm,
35+
msgChan: msgChan,
36+
clients: make(map[*websocket.Conn]bool),
37+
}
38+
}
39+
40+
// Start boots the HTTP server in a goroutine
41+
func (s *APIServer) Start() error {
42+
mux := http.NewServeMux()
43+
44+
// Setup routes
45+
mux.HandleFunc("/broadcast", s.handleBroadcast)
46+
mux.HandleFunc("/events", s.handleEvents)
47+
48+
go s.broadcastLoop() // Loop to read from msgChan and send to all WS clients
49+
50+
serverAddr := fmt.Sprintf(":%d", s.port)
51+
log.Printf("Starting P2P API Server at HTTP/WS %s", serverAddr)
52+
53+
go func() {
54+
if err := http.ListenAndServe(serverAddr, mux); err != nil {
55+
log.Fatalf("API server failed: %v", err)
56+
}
57+
}()
58+
59+
return nil
60+
}
61+
62+
// handleBroadcast is the HTTP handler for POST /broadcast
63+
func (s *APIServer) handleBroadcast(w http.ResponseWriter, r *http.Request) {
64+
if r.Method != http.MethodPost {
65+
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
66+
return
67+
}
68+
69+
var inc IncidentMessage
70+
if err := json.NewDecoder(r.Body).Decode(&inc); err != nil {
71+
http.Error(w, "Invalid request body", http.StatusBadRequest)
72+
return
73+
}
74+
75+
// Make sure Type is set correctly (for schema consistency)
76+
if inc.Type == "" {
77+
inc.Type = "incident_create"
78+
}
79+
80+
// Publish via GossipSub
81+
if err := s.pubSubManager.Broadcast(inc); err != nil {
82+
log.Printf("Failed to broadcast message: %v\n", err)
83+
http.Error(w, "Failed to broadcast", http.StatusInternalServerError)
84+
return
85+
}
86+
87+
log.Printf("HTTP /broadcast processed: sent incident %s to peers", inc.IncidentID)
88+
89+
w.WriteHeader(http.StatusAccepted)
90+
json.NewEncoder(w).Encode(map[string]string{"status": "broadcasted"})
91+
}
92+
93+
// handleEvents upgrades the GET request to a WebSocket connection
94+
func (s *APIServer) handleEvents(w http.ResponseWriter, r *http.Request) {
95+
conn, err := upgrader.Upgrade(w, r, nil)
96+
if err != nil {
97+
log.Printf("Failed to upgrade WebSocket: %v", err)
98+
return
99+
}
100+
101+
// Register the new client
102+
s.clientsMux.Lock()
103+
s.clients[conn] = true
104+
s.clientsMux.Unlock()
105+
106+
log.Printf("New WebSocket client connected: %s", conn.RemoteAddr().String())
107+
108+
// Read loop just to handle disconnects (we don't expect messages *from* the client on this WS yet)
109+
go func() {
110+
defer func() {
111+
s.clientsMux.Lock()
112+
delete(s.clients, conn)
113+
s.clientsMux.Unlock()
114+
conn.Close()
115+
log.Printf("WebSocket client disconnected")
116+
}()
117+
for {
118+
if _, _, err := conn.ReadMessage(); err != nil {
119+
break
120+
}
121+
}
122+
}()
123+
}
124+
125+
// broadcastLoop reads from the p2p input channel and forwards to all connected WebSockets
126+
func (s *APIServer) broadcastLoop() {
127+
for msg := range s.msgChan {
128+
// Serialize
129+
data, err := json.Marshal(msg)
130+
if err != nil {
131+
log.Printf("Failed to marshal WS broadcast message: %v", err)
132+
continue
133+
}
134+
135+
s.clientsMux.RLock()
136+
for client := range s.clients {
137+
// Write the serialized json message to the websocket client
138+
if err := client.WriteMessage(websocket.TextMessage, data); err != nil {
139+
log.Printf("Failed to send message to WS client: %v", err)
140+
client.Close() // Typically best to close and let the read loop delete it
141+
}
142+
}
143+
s.clientsMux.RUnlock()
144+
}
145+
}

backend/p2p-node/discovery.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"time"
8+
9+
"github.com/libp2p/go-libp2p/core/host"
10+
"github.com/libp2p/go-libp2p/core/peer"
11+
"github.com/libp2p/go-libp2p/p2p/discovery/mdns"
12+
)
13+
14+
const discoveryServiceTag = "openrescue.p2p"
15+
16+
// discoveryNotifee gets notified when we find a new peer via mDNS discovery
17+
type discoveryNotifee struct {
18+
h host.Host
19+
}
20+
21+
// HandlePeerFound connects to peers discovered via mDNS
22+
func (n *discoveryNotifee) HandlePeerFound(pi peer.AddrInfo) {
23+
fmt.Printf("mDNS Discovery: Found peer %s\n", pi.ID)
24+
// Don't try to connect to ourselves
25+
if pi.ID == n.h.ID() {
26+
return
27+
}
28+
29+
// Connect to the peer
30+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
31+
defer cancel()
32+
33+
err := n.h.Connect(ctx, pi)
34+
if err != nil {
35+
fmt.Printf("mDNS Discovery: Failed connecting to peer %s: %s\n", pi.ID.String(), err)
36+
} else {
37+
fmt.Printf("mDNS Discovery: Automatically connected to peer %s\n", pi.ID.String())
38+
}
39+
}
40+
41+
// setupDiscovery creates an mDNS discovery service
42+
func setupDiscovery(h host.Host) error {
43+
s := mdns.NewMdnsService(h, discoveryServiceTag, &discoveryNotifee{h: h})
44+
if s == nil {
45+
return fmt.Errorf("failed creating mDNS service")
46+
}
47+
48+
if err := s.Start(); err != nil {
49+
return fmt.Errorf("failed starting mDNS service: %v", err)
50+
}
51+
52+
log.Printf("mDNS Peer Discovery started (service tag: %s)\n", discoveryServiceTag)
53+
return nil
54+
}

backend/p2p-node/go.mod

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
module openrescue/p2p-node
2+
3+
go 1.24.6
4+
5+
require (
6+
github.com/gorilla/websocket v1.5.3
7+
github.com/libp2p/go-libp2p v0.47.0
8+
github.com/libp2p/go-libp2p-pubsub v0.15.0
9+
)
10+
11+
require (
12+
github.com/benbjohnson/clock v1.3.5 // indirect
13+
github.com/beorn7/perks v1.0.1 // indirect
14+
github.com/cespare/xxhash/v2 v2.3.0 // indirect
15+
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect
16+
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect
17+
github.com/dunglas/httpsfv v1.1.0 // indirect
18+
github.com/flynn/noise v1.1.0 // indirect
19+
github.com/gogo/protobuf v1.3.2 // indirect
20+
github.com/google/uuid v1.6.0 // indirect
21+
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
22+
github.com/huin/goupnp v1.3.0 // indirect
23+
github.com/ipfs/go-cid v0.5.0 // indirect
24+
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
25+
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
26+
github.com/klauspost/cpuid/v2 v2.2.10 // indirect
27+
github.com/koron/go-ssdp v0.0.6 // indirect
28+
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
29+
github.com/libp2p/go-flow-metrics v0.2.0 // indirect
30+
github.com/libp2p/go-libp2p-asn-util v0.4.1 // indirect
31+
github.com/libp2p/go-msgio v0.3.0 // indirect
32+
github.com/libp2p/go-netroute v0.3.0 // indirect
33+
github.com/libp2p/go-reuseport v0.4.0 // indirect
34+
github.com/libp2p/go-yamux/v5 v5.0.1 // indirect
35+
github.com/libp2p/zeroconf/v2 v2.2.0 // indirect
36+
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect
37+
github.com/miekg/dns v1.1.66 // indirect
38+
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect
39+
github.com/mikioh/tcpopt v0.0.0-20190314235656-172688c1accc // indirect
40+
github.com/minio/sha256-simd v1.0.1 // indirect
41+
github.com/mr-tron/base58 v1.2.0 // indirect
42+
github.com/multiformats/go-base32 v0.1.0 // indirect
43+
github.com/multiformats/go-base36 v0.2.0 // indirect
44+
github.com/multiformats/go-multiaddr v0.16.1 // indirect
45+
github.com/multiformats/go-multiaddr-dns v0.4.1 // indirect
46+
github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect
47+
github.com/multiformats/go-multibase v0.2.0 // indirect
48+
github.com/multiformats/go-multicodec v0.9.1 // indirect
49+
github.com/multiformats/go-multihash v0.2.3 // indirect
50+
github.com/multiformats/go-multistream v0.6.1 // indirect
51+
github.com/multiformats/go-varint v0.0.7 // indirect
52+
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
53+
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
54+
github.com/pion/datachannel v1.5.10 // indirect
55+
github.com/pion/dtls/v2 v2.2.12 // indirect
56+
github.com/pion/dtls/v3 v3.0.6 // indirect
57+
github.com/pion/ice/v4 v4.0.10 // indirect
58+
github.com/pion/interceptor v0.1.40 // indirect
59+
github.com/pion/logging v0.2.3 // indirect
60+
github.com/pion/mdns/v2 v2.0.7 // indirect
61+
github.com/pion/randutil v0.1.0 // indirect
62+
github.com/pion/rtcp v1.2.15 // indirect
63+
github.com/pion/rtp v1.8.19 // indirect
64+
github.com/pion/sctp v1.8.39 // indirect
65+
github.com/pion/sdp/v3 v3.0.13 // indirect
66+
github.com/pion/srtp/v3 v3.0.6 // indirect
67+
github.com/pion/stun v0.6.1 // indirect
68+
github.com/pion/stun/v3 v3.0.0 // indirect
69+
github.com/pion/transport/v2 v2.2.10 // indirect
70+
github.com/pion/transport/v3 v3.0.7 // indirect
71+
github.com/pion/turn/v4 v4.0.2 // indirect
72+
github.com/pion/webrtc/v4 v4.1.2 // indirect
73+
github.com/prometheus/client_golang v1.22.0 // indirect
74+
github.com/prometheus/client_model v0.6.2 // indirect
75+
github.com/prometheus/common v0.64.0 // indirect
76+
github.com/prometheus/procfs v0.16.1 // indirect
77+
github.com/quic-go/qpack v0.6.0 // indirect
78+
github.com/quic-go/quic-go v0.59.0 // indirect
79+
github.com/quic-go/webtransport-go v0.10.0 // indirect
80+
github.com/spaolacci/murmur3 v1.1.0 // indirect
81+
github.com/wlynxg/anet v0.0.5 // indirect
82+
go.uber.org/dig v1.19.0 // indirect
83+
go.uber.org/fx v1.24.0 // indirect
84+
go.uber.org/mock v0.5.2 // indirect
85+
go.uber.org/multierr v1.11.0 // indirect
86+
go.uber.org/zap v1.27.0 // indirect
87+
golang.org/x/crypto v0.41.0 // indirect
88+
golang.org/x/exp v0.0.0-20250606033433-dcc06ee1d476 // indirect
89+
golang.org/x/mod v0.27.0 // indirect
90+
golang.org/x/net v0.43.0 // indirect
91+
golang.org/x/sync v0.16.0 // indirect
92+
golang.org/x/sys v0.35.0 // indirect
93+
golang.org/x/text v0.28.0 // indirect
94+
golang.org/x/time v0.12.0 // indirect
95+
golang.org/x/tools v0.36.0 // indirect
96+
google.golang.org/protobuf v1.36.6 // indirect
97+
lukechampine.com/blake3 v1.4.1 // indirect
98+
)

0 commit comments

Comments
 (0)