Skip to content

Commit ae917e4

Browse files
committed
feat(p2p): Add DHT discovery
1 parent aee97c2 commit ae917e4

4 files changed

Lines changed: 358 additions & 4 deletions

File tree

internal/config/vault.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,16 @@ import (
88
"github.com/substantialcattle5/sietch/internal/constants"
99
)
1010

11+
type DHTConfig struct {
12+
Enabled bool `yaml:"enabled"`
13+
BootstrapNodes []string `yaml:"bootstrap_peers,omitempty"`
14+
}
15+
16+
type DiscoveryConfig struct {
17+
MDNS bool `yaml:"mdns"`
18+
DHT DHTConfig `yaml:"dht"`
19+
}
20+
1121
// VaultConfig represents the structure for vault.yaml
1222
type VaultConfig struct {
1323
Name string `yaml:"name"`
@@ -21,6 +31,8 @@ type VaultConfig struct {
2131
Deduplication DeduplicationConfig `yaml:"deduplication"`
2232
Sync SyncConfig `yaml:"sync"`
2333
Metadata MetadataConfig `yaml:"metadata"`
34+
35+
Discovery DiscoveryConfig `yaml:"discovery"`
2436
}
2537

2638
// EncryptionConfig contains encryption settings
@@ -284,6 +296,17 @@ func BuildVaultConfigWithDeduplication(
284296
}
285297
}
286298

299+
// Set default discovery settings
300+
config.Discovery = DiscoveryConfig{
301+
MDNS: true,
302+
DHT: DHTConfig{
303+
Enabled: true,
304+
BootstrapNodes: []string{
305+
"/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ",
306+
},
307+
},
308+
}
309+
287310
return config
288311
}
289312

internal/p2p/dht.go

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,185 @@
11
package p2p
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
"time"
8+
9+
"github.com/libp2p/go-libp2p-kad-dht"
10+
"github.com/libp2p/go-libp2p/core/host"
11+
"github.com/libp2p/go-libp2p/core/peer"
12+
"github.com/multiformats/go-multiaddr"
13+
)
14+
15+
// DHTDiscovery implements the config.Discovery interface using Kademlia DHT
16+
type DHTDiscovery struct {
17+
host host.Host
18+
dht *dht.IpfsDHT // Kademlia DHT instance
19+
peerChan chan peer.AddrInfo
20+
ctx context.Context
21+
cancel context.CancelFunc
22+
mutex sync.Mutex
23+
started bool
24+
closed bool
25+
bootstrapPeers []peer.AddrInfo
26+
}
27+
28+
// NewDHTDiscovery creates a new DHT based discovery service
29+
func NewDHTDiscovery(ctx context.Context, h host.Host, bootstrapAddrs []multiaddr.Multiaddr) (*DHTDiscovery, error) {
30+
// Create a discovery context with cancellation
31+
ctx, cancel := context.WithCancel(ctx)
32+
33+
// Parse bootstrap multiaddrs
34+
peers := ParseBootstrapAddrs(bootstrapAddrs)
35+
36+
// Initialize the DHT discovery service
37+
d := &DHTDiscovery{
38+
host: h,
39+
peerChan: make(chan peer.AddrInfo, 32), // Buffer for discovered peers
40+
ctx: ctx,
41+
cancel: cancel,
42+
bootstrapPeers: peers,
43+
}
44+
45+
return d, nil
46+
}
47+
48+
func ParseBootstrapAddrs(bootstrapAddrs []multiaddr.Multiaddr) []peer.AddrInfo {
49+
peers := make([]peer.AddrInfo, 0, len(bootstrapAddrs))
50+
for _, maddr := range bootstrapAddrs {
51+
pi, err := peer.AddrInfoFromP2pAddr(maddr)
52+
if err != nil {
53+
fmt.Println("Skipping invalid peer info:", maddr, err)
54+
continue
55+
}
56+
57+
peers = append(peers, *pi)
58+
}
59+
60+
if len(peers) == 0 {
61+
fmt.Println("No valid bootstrap peers provided, using default bootstrap peers")
62+
peers = dht.GetDefaultBootstrapPeerAddrInfos()
63+
}
64+
return peers
65+
}
66+
67+
// Start initiates the DHT discovery process
68+
func (d *DHTDiscovery) Start(ctx context.Context) error {
69+
d.mutex.Lock()
70+
defer d.mutex.Unlock()
71+
72+
if d.started {
73+
return nil // Already started
74+
}
75+
76+
if d.closed {
77+
return nil // Already closed
78+
}
79+
80+
d.started = true
81+
82+
// Bootstrap and Discovery logic
83+
if err := d.BootstrapDHT(); err != nil {
84+
return err
85+
}
86+
87+
// Start peer discovery routine
88+
go d.DiscoverPeers()
89+
90+
return nil
91+
}
92+
93+
func (d *DHTDiscovery) BootstrapDHT() error {
94+
// Initialize Kademlia DHT
95+
kadDHT, err := dht.New(d.ctx, d.host)
96+
if err != nil {
97+
return err
98+
}
99+
d.dht = kadDHT
100+
101+
for _, p := range d.bootstrapPeers {
102+
fmt.Println("Connecting to bootstrap peer:", p.ID)
103+
if err := d.host.Connect(d.ctx, p); err != nil {
104+
fmt.Println("Failed to connect:", err)
105+
} else {
106+
fmt.Println("Connected to bootstrap peer:", p.ID)
107+
}
108+
}
109+
110+
// Start background DHT bootstrapping (refresh peers etc)
111+
if err := d.dht.Bootstrap(d.ctx); err != nil {
112+
fmt.Println("Failed to bootstrap DHT:", err)
113+
}
114+
return nil
115+
}
116+
117+
// Stop halts the DHT discovery process
118+
func (d *DHTDiscovery) Stop() error {
119+
d.mutex.Lock()
120+
defer d.mutex.Unlock()
121+
122+
if !d.started || d.closed {
123+
return nil
124+
}
125+
126+
// Close the DHT service
127+
if d.dht != nil {
128+
if err := d.dht.Close(); err != nil {
129+
return err
130+
}
131+
}
132+
133+
// Cancel our context and mark as closed
134+
d.cancel()
135+
d.closed = true
136+
close(d.peerChan)
137+
138+
return nil
139+
}
140+
141+
// DiscoveredPeers returns channel of found peers
142+
func (d *DHTDiscovery) DiscoveredPeers() <-chan peer.AddrInfo {
143+
return d.peerChan
144+
}
145+
146+
// FindPeers queries the DHT for peers and sends them to the peerChan
147+
func (d *DHTDiscovery) FindPeers() {
148+
peers := d.dht.RoutingTable().ListPeers()
149+
150+
fmt.Printf("DHT has %d peers in routing table\n", len(peers))
151+
152+
for _, pid := range peers {
153+
if pid == d.host.ID() {
154+
continue // Skip self
155+
}
156+
157+
addrs := d.host.Peerstore().Addrs(pid)
158+
if len(addrs) == 0 {
159+
continue
160+
}
161+
162+
// Send peer info (non-blocking)
163+
select {
164+
case d.peerChan <- peer.AddrInfo{ID: pid, Addrs: addrs}:
165+
fmt.Println("✨ Discovered peer:", pid)
166+
default:
167+
// Skip if channel full
168+
}
169+
}
170+
}
171+
172+
// DiscoverPeers periodically searches for peers in the DHT
173+
func (d *DHTDiscovery) DiscoverPeers() {
174+
ticker := time.NewTicker(30 * time.Second)
175+
defer ticker.Stop()
176+
177+
for {
178+
select {
179+
case <-d.ctx.Done():
180+
return
181+
case <-ticker.C:
182+
d.FindPeers()
183+
}
184+
}
185+
}

internal/p2p/dht_test.go

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
package p2p_test
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/libp2p/go-libp2p"
9+
"github.com/libp2p/go-libp2p/core/network"
10+
"github.com/libp2p/go-libp2p/core/peer"
11+
"github.com/multiformats/go-multiaddr"
12+
13+
// "github.com/substantialcattle5/sietch/internal/config"
14+
"github.com/substantialcattle5/sietch/internal/p2p"
15+
// "github.com/substantialcattle5/sietch/testutil"
16+
)
17+
18+
func TestDHTDiscovery(t *testing.T) {
19+
ctx := context.Background()
20+
21+
// 1. Define static relays
22+
relayAddrs := []string{
23+
"/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ",
24+
}
25+
26+
var staticRelays []peer.AddrInfo
27+
for _, addr := range relayAddrs {
28+
maddr, err := multiaddr.NewMultiaddr(addr)
29+
if err != nil {
30+
t.Logf("Invalid relay address: %s, error: %v", addr, err)
31+
continue
32+
}
33+
34+
pi, err := peer.AddrInfoFromP2pAddr(maddr)
35+
if err != nil {
36+
t.Logf("Invalid relay peer info: %s, error: %v", addr, err)
37+
continue
38+
}
39+
40+
staticRelays = append(staticRelays, *pi)
41+
}
42+
43+
// 2. Create libp2p host with AutoRelay and NAT support
44+
h, err := libp2p.New(
45+
libp2p.ListenAddrStrings(
46+
"/ip4/0.0.0.0/tcp/0",
47+
"/ip6/::/tcp/0",
48+
),
49+
libp2p.EnableRelay(),
50+
libp2p.EnableAutoRelayWithStaticRelays(staticRelays),
51+
libp2p.NATPortMap(),
52+
)
53+
if err != nil {
54+
t.Fatalf("Failed to create libp2p host: %v", err)
55+
}
56+
defer h.Close()
57+
58+
t.Logf("Libp2p host created. ID: %s", h.ID())
59+
60+
for _, addr := range h.Addrs() {
61+
t.Logf("Listening on: %s", addr)
62+
}
63+
64+
// 3. Relay / NAT diagnostic check
65+
time.AfterFunc(20*time.Second, func() {
66+
t.Log("Performing relay/NAT diagnostic check...")
67+
68+
if len(h.Addrs()) == 0 {
69+
t.Log("No listening addresses found after 20 seconds. Possible NAT issue.")
70+
} else {
71+
t.Log("Listening addresses after 20 seconds:")
72+
for _, addr := range h.Addrs() {
73+
if _, err := addr.ValueForProtocol(multiaddr.P_CIRCUIT); err == nil {
74+
t.Logf(" - Relay address: %s", addr)
75+
} else {
76+
t.Logf(" - Direct address: %s", addr)
77+
}
78+
}
79+
}
80+
})
81+
82+
// 4. Load vault config to get bootstrap nodes
83+
// vaultPath := testutil.TempDir(t, "dht-test")
84+
// mgr, err := config.NewManager(vaultPath)
85+
// if err != nil {
86+
// t.Fatalf("Failed to create vault manager: %v", err)
87+
// }
88+
// vaultConfig, err := mgr.GetConfig()
89+
// if err != nil {
90+
// t.Fatalf("Failed to load vault config: %v", err)
91+
// }
92+
93+
// bootstrapNodes := vaultConfig.Discovery.DHT.BootstrapNodes
94+
// if len(bootstrapNodes) == 0 {
95+
// t.Fatal("No bootstrap nodes configured in vault config")
96+
// }
97+
bootstrapNodeStrs := []string{
98+
"/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ",
99+
}
100+
t.Logf("Using %d bootstrap nodes from config", len(bootstrapNodeStrs))
101+
102+
var bootstrapNodes []multiaddr.Multiaddr
103+
for _, addr := range bootstrapNodeStrs {
104+
maddr, err := multiaddr.NewMultiaddr(addr)
105+
if err != nil {
106+
t.Fatalf("Invalid bootstrap node address: %s, error: %v", addr, err)
107+
}
108+
bootstrapNodes = append(bootstrapNodes, maddr)
109+
}
110+
111+
// 5. Create DHTDiscovery instance
112+
discovery, err := p2p.NewDHTDiscovery(ctx, h, bootstrapNodes)
113+
if err != nil {
114+
t.Fatalf("Failed to create DHT discovery: %v", err)
115+
}
116+
t.Log("DHT discovery instance created")
117+
118+
// 6. Start discovery
119+
if err := discovery.Start(ctx); err != nil {
120+
t.Fatalf("Failed to start DHT discovery: %v", err)
121+
}
122+
t.Log("DHT discovery started")
123+
124+
// 7. Print discovered peers asynchronously
125+
go func() {
126+
for pi := range discovery.DiscoveredPeers() {
127+
t.Logf("Discovered peer: %s", pi.ID)
128+
}
129+
}()
130+
131+
// 8. Print connection events
132+
h.Network().Notify(&network.NotifyBundle{
133+
ConnectedF: func(n network.Network, c network.Conn) {
134+
t.Logf("Connected to: %s", c.RemotePeer())
135+
},
136+
DisconnectedF: func(n network.Network, c network.Conn) {
137+
t.Logf("Disconnected from: %s", c.RemotePeer())
138+
},
139+
})
140+
141+
// 9. Wait for discovery (adjust duration if needed)
142+
t.Log("⏳ Waiting 30 seconds for peer discovery...")
143+
time.Sleep(30 * time.Second)
144+
145+
// 10. Stop discovery
146+
if err := discovery.Stop(); err != nil {
147+
t.Fatalf("Failed to stop DHTDiscovery: %v", err)
148+
}
149+
t.Log("✅ DHTDiscovery stopped")
150+
}

internal/p2p/discovery.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package p2p
22

33
import (
44
"context"
5-
"fmt"
65

76
"github.com/libp2p/go-libp2p/core/host"
87
"github.com/multiformats/go-multiaddr"
@@ -24,7 +23,5 @@ func (f *Factory) CreateMDNS(h host.Host) (config.Discovery, error) {
2423

2524
// CreateDHT creates a DHT-based discovery service
2625
func (f *Factory) CreateDHT(ctx context.Context, h host.Host, bootstrapAddrs []multiaddr.Multiaddr) (config.Discovery, error) {
27-
// This would be implemented later
28-
// For now just return an error
29-
return nil, fmt.Errorf("DHT discovery not yet implemented")
26+
return NewDHTDiscovery(ctx, h, bootstrapAddrs)
3027
}

0 commit comments

Comments
 (0)