Skip to content

Commit 9bab969

Browse files
committed
feat: add a v1 compatibility mode
When this option is passed, the DHT node will listen on and query _only_ using the old DHT protocol. Importantly, the node won't even pretend to be a new DHT node because it's routing table includes V1 peers.
1 parent 6f1df26 commit 9bab969

File tree

5 files changed

+153
-110
lines changed

5 files changed

+153
-110
lines changed

dht.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,11 @@ type IpfsDHT struct {
7777

7878
stripedPutLocks [256]sync.Mutex
7979

80-
// Primary DHT protocols - we query and respond to these protocols
80+
// DHT protocols we query with. We'll only add peers to our routing
81+
// table if they speak these protocols.
8182
protocols []protocol.ID
8283

83-
// DHT protocols we can respond to (may contain protocols in addition to the primary protocols)
84+
// DHT protocols we can respond to.
8485
serverProtocols []protocol.ID
8586

8687
auto bool
@@ -219,17 +220,30 @@ func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT
219220
}
220221

221222
func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
222-
protocols := []protocol.ID{cfg.protocolPrefix + kad2}
223-
serverProtocols := []protocol.ID{cfg.protocolPrefix + kad2, cfg.protocolPrefix + kad1}
223+
var protocols, serverProtocols []protocol.ID
224224

225225
// check if custom test protocols were set
226226
if len(cfg.testProtocols) > 0 {
227227
protocols = make([]protocol.ID, len(cfg.testProtocols))
228-
serverProtocols = make([]protocol.ID, len(cfg.testProtocols))
229228
for i, p := range cfg.testProtocols {
230229
protocols[i] = cfg.protocolPrefix + p
231-
serverProtocols[i] = cfg.protocolPrefix + p
232230
}
231+
serverProtocols = protocols
232+
} else if cfg.v1CompatibleMode {
233+
// In compat mode, query/serve using the old protocol.
234+
//
235+
// DO NOT accept requests on the new protocol. Otherwise:
236+
// 1. We'll end up in V2 routing tables.
237+
// 2. We'll have V1 peers in our routing table.
238+
//
239+
// In other words, we'll pollute the V2 network.
240+
protocols = []protocol.ID{cfg.protocolPrefix + kad1}
241+
serverProtocols = []protocol.ID{cfg.protocolPrefix + kad1}
242+
} else {
243+
// In v2 mode, serve on both protocols, but only
244+
// query/accept peers in v2 mode.
245+
protocols = []protocol.ID{cfg.protocolPrefix + kad2}
246+
serverProtocols = []protocol.ID{cfg.protocolPrefix + kad2, cfg.protocolPrefix + kad1}
233247
}
234248

235249
dht := &IpfsDHT{

dht_options.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ type config struct {
5252

5353
// test parameters
5454
testProtocols []protocol.ID
55+
56+
// set to true if we're operating in v1 dht compatible mode
57+
v1CompatibleMode bool
5558
}
5659

5760
func emptyQueryFilter(_ *IpfsDHT, ai peer.AddrInfo) bool { return true }
@@ -95,6 +98,8 @@ var defaults = func(o *config) error {
9598
o.concurrency = 3
9699
o.resiliency = 3
97100

101+
o.v1CompatibleMode = true
102+
98103
return nil
99104
}
100105

@@ -315,3 +320,18 @@ func customProtocols(protos ...protocol.ID) Option {
315320
return nil
316321
}
317322
}
323+
324+
// V1CompatibleMode sets the DHT to operate in V1 compatible mode. In this mode,
325+
// the DHT node will act like a V1 DHT node (use the V1 protocol names) but will
326+
// use the V2 query and routing table logic.
327+
//
328+
// For now, this option defaults to true for backwards compatibility. In the
329+
// near future, it will switch to false.
330+
//
331+
// This option is perma-unstable and may be removed in the future.
332+
func V1CompatibleMode(enable bool) Option {
333+
return func(c *config) error {
334+
c.v1CompatibleMode = enable
335+
return nil
336+
}
337+
}

dht_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1810,6 +1810,7 @@ func TestProtocolUpgrade(t *testing.T) {
18101810
defer cancel()
18111811

18121812
os := []Option{
1813+
testPrefix,
18131814
Mode(ModeServer),
18141815
NamespacedValidator("v", blankValidator{}),
18151816
DisableAutoRefresh(),
@@ -1820,19 +1821,19 @@ func TestProtocolUpgrade(t *testing.T) {
18201821
// about other DHT servers in the new DHT.
18211822

18221823
dhtA, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
1823-
append([]Option{testPrefix}, os...)...)
1824+
append([]Option{V1CompatibleMode(false)}, os...)...)
18241825
if err != nil {
18251826
t.Fatal(err)
18261827
}
18271828

18281829
dhtB, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
1829-
append([]Option{testPrefix}, os...)...)
1830+
append([]Option{V1CompatibleMode(false)}, os...)...)
18301831
if err != nil {
18311832
t.Fatal(err)
18321833
}
18331834

18341835
dhtC, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
1835-
append([]Option{testPrefix, customProtocols(kad1)}, os...)...)
1836+
append([]Option{V1CompatibleMode(true)}, os...)...)
18361837
if err != nil {
18371838
t.Fatal(err)
18381839
}

ext_test.go

Lines changed: 107 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,15 @@ func TestHungRequest(t *testing.T) {
3838
if err != nil {
3939
t.Fatal(err)
4040
}
41-
// Hang on every request.
42-
hosts[1].SetStreamHandler(d.protocols[0], func(s network.Stream) {
43-
defer s.Reset() //nolint
44-
<-ctx.Done()
45-
})
41+
for _, proto := range d.serverProtocols {
42+
// Hang on every request.
43+
hosts[1].SetStreamHandler(proto, func(s network.Stream) {
44+
defer s.Reset() //nolint
45+
<-ctx.Done()
46+
})
47+
}
4648

47-
require.NoError(t, hosts[0].Peerstore().AddProtocols(hosts[1].ID(), protocol.ConvertToStrings(d.protocols)...))
49+
require.NoError(t, hosts[0].Peerstore().AddProtocols(hosts[1].ID(), protocol.ConvertToStrings(d.serverProtocols)...))
4850
d.peerFound(ctx, hosts[1].ID(), true)
4951

5052
ctx1, cancel1 := context.WithTimeout(ctx, 1*time.Second)
@@ -91,10 +93,12 @@ func TestGetFailures(t *testing.T) {
9193
}
9294

9395
// Reply with failures to every message
94-
host2.SetStreamHandler(d.protocols[0], func(s network.Stream) {
95-
time.Sleep(400 * time.Millisecond)
96-
s.Close()
97-
})
96+
for _, proto := range d.serverProtocols {
97+
host2.SetStreamHandler(proto, func(s network.Stream) {
98+
time.Sleep(400 * time.Millisecond)
99+
s.Close()
100+
})
101+
}
98102

99103
host1.Peerstore().AddAddrs(host2.ID(), host2.Addrs(), peerstore.ConnectedAddrTTL)
100104
_, err = host1.Network().DialPeer(ctx, host2.ID())
@@ -120,24 +124,26 @@ func TestGetFailures(t *testing.T) {
120124

121125
t.Log("Timeout test passed.")
122126

123-
// Reply with failures to every message
124-
host2.SetStreamHandler(d.protocols[0], func(s network.Stream) {
125-
defer s.Close()
127+
for _, proto := range d.serverProtocols {
128+
// Reply with failures to every message
129+
host2.SetStreamHandler(proto, func(s network.Stream) {
130+
defer s.Close()
126131

127-
pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
128-
pbw := ggio.NewDelimitedWriter(s)
132+
pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
133+
pbw := ggio.NewDelimitedWriter(s)
129134

130-
pmes := new(pb.Message)
131-
if err := pbr.ReadMsg(pmes); err != nil {
132-
// user gave up
133-
return
134-
}
135+
pmes := new(pb.Message)
136+
if err := pbr.ReadMsg(pmes); err != nil {
137+
// user gave up
138+
return
139+
}
135140

136-
resp := &pb.Message{
137-
Type: pmes.Type,
138-
}
139-
_ = pbw.WriteMsg(resp)
140-
})
141+
resp := &pb.Message{
142+
Type: pmes.Type,
143+
}
144+
_ = pbw.WriteMsg(resp)
145+
})
146+
}
141147

142148
// This one should fail with NotFound.
143149
// long context timeout to ensure we dont end too early.
@@ -172,7 +178,7 @@ func TestGetFailures(t *testing.T) {
172178
Record: rec,
173179
}
174180

175-
s, err := host2.NewStream(context.Background(), host1.ID(), d.protocols[0])
181+
s, err := host2.NewStream(context.Background(), host1.ID(), d.protocols...)
176182
if err != nil {
177183
t.Fatal(err)
178184
}
@@ -224,36 +230,38 @@ func TestNotFound(t *testing.T) {
224230
// Reply with random peers to every message
225231
for _, host := range hosts {
226232
host := host // shadow loop var
227-
host.SetStreamHandler(d.protocols[0], func(s network.Stream) {
228-
defer s.Close()
233+
for _, proto := range d.serverProtocols {
234+
host.SetStreamHandler(proto, func(s network.Stream) {
235+
defer s.Close()
229236

230-
pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
231-
pbw := ggio.NewDelimitedWriter(s)
232-
233-
pmes := new(pb.Message)
234-
if err := pbr.ReadMsg(pmes); err != nil {
235-
panic(err)
236-
}
237-
238-
switch pmes.GetType() {
239-
case pb.Message_GET_VALUE:
240-
resp := &pb.Message{Type: pmes.Type}
237+
pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
238+
pbw := ggio.NewDelimitedWriter(s)
241239

242-
ps := []peer.AddrInfo{}
243-
for i := 0; i < 7; i++ {
244-
p := hosts[rand.Intn(len(hosts))].ID()
245-
pi := host.Peerstore().PeerInfo(p)
246-
ps = append(ps, pi)
240+
pmes := new(pb.Message)
241+
if err := pbr.ReadMsg(pmes); err != nil {
242+
panic(err)
247243
}
248244

249-
resp.CloserPeers = pb.PeerInfosToPBPeers(d.host.Network(), ps)
250-
if err := pbw.WriteMsg(resp); err != nil {
251-
panic(err)
245+
switch pmes.GetType() {
246+
case pb.Message_GET_VALUE:
247+
resp := &pb.Message{Type: pmes.Type}
248+
249+
ps := []peer.AddrInfo{}
250+
for i := 0; i < 7; i++ {
251+
p := hosts[rand.Intn(len(hosts))].ID()
252+
pi := host.Peerstore().PeerInfo(p)
253+
ps = append(ps, pi)
254+
}
255+
256+
resp.CloserPeers = pb.PeerInfosToPBPeers(d.host.Network(), ps)
257+
if err := pbw.WriteMsg(resp); err != nil {
258+
panic(err)
259+
}
260+
default:
261+
panic("Shouldnt recieve this.")
252262
}
253-
default:
254-
panic("Shouldnt recieve this.")
255-
}
256-
})
263+
})
264+
}
257265
}
258266

259267
// long timeout to ensure timing is not at play.
@@ -304,33 +312,35 @@ func TestLessThanKResponses(t *testing.T) {
304312
// Reply with random peers to every message
305313
for _, host := range hosts {
306314
host := host // shadow loop var
307-
host.SetStreamHandler(d.protocols[0], func(s network.Stream) {
308-
defer s.Close()
315+
for _, proto := range d.serverProtocols {
316+
host.SetStreamHandler(proto, func(s network.Stream) {
317+
defer s.Close()
309318

310-
pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
311-
pbw := ggio.NewDelimitedWriter(s)
312-
313-
pmes := new(pb.Message)
314-
if err := pbr.ReadMsg(pmes); err != nil {
315-
panic(err)
316-
}
319+
pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
320+
pbw := ggio.NewDelimitedWriter(s)
317321

318-
switch pmes.GetType() {
319-
case pb.Message_GET_VALUE:
320-
pi := host.Peerstore().PeerInfo(hosts[1].ID())
321-
resp := &pb.Message{
322-
Type: pmes.Type,
323-
CloserPeers: pb.PeerInfosToPBPeers(d.host.Network(), []peer.AddrInfo{pi}),
322+
pmes := new(pb.Message)
323+
if err := pbr.ReadMsg(pmes); err != nil {
324+
panic(err)
324325
}
325326

326-
if err := pbw.WriteMsg(resp); err != nil {
327-
panic(err)
327+
switch pmes.GetType() {
328+
case pb.Message_GET_VALUE:
329+
pi := host.Peerstore().PeerInfo(hosts[1].ID())
330+
resp := &pb.Message{
331+
Type: pmes.Type,
332+
CloserPeers: pb.PeerInfosToPBPeers(d.host.Network(), []peer.AddrInfo{pi}),
333+
}
334+
335+
if err := pbw.WriteMsg(resp); err != nil {
336+
panic(err)
337+
}
338+
default:
339+
panic("Shouldnt recieve this.")
328340
}
329-
default:
330-
panic("Shouldnt recieve this.")
331-
}
332341

333-
})
342+
})
343+
}
334344
}
335345

336346
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
@@ -369,34 +379,36 @@ func TestMultipleQueries(t *testing.T) {
369379

370380
d.peerFound(ctx, hosts[1].ID(), true)
371381

372-
// It would be nice to be able to just get a value and succeed but then
373-
// we'd need to deal with selectors and validators...
374-
hosts[1].SetStreamHandler(d.protocols[0], func(s network.Stream) {
375-
defer s.Close()
376-
377-
pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
378-
pbw := ggio.NewDelimitedWriter(s)
382+
for _, proto := range d.serverProtocols {
383+
// It would be nice to be able to just get a value and succeed but then
384+
// we'd need to deal with selectors and validators...
385+
hosts[1].SetStreamHandler(proto, func(s network.Stream) {
386+
defer s.Close()
379387

380-
pmes := new(pb.Message)
381-
if err := pbr.ReadMsg(pmes); err != nil {
382-
panic(err)
383-
}
388+
pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
389+
pbw := ggio.NewDelimitedWriter(s)
384390

385-
switch pmes.GetType() {
386-
case pb.Message_GET_VALUE:
387-
pi := hosts[1].Peerstore().PeerInfo(hosts[0].ID())
388-
resp := &pb.Message{
389-
Type: pmes.Type,
390-
CloserPeers: pb.PeerInfosToPBPeers(d.host.Network(), []peer.AddrInfo{pi}),
391+
pmes := new(pb.Message)
392+
if err := pbr.ReadMsg(pmes); err != nil {
393+
panic(err)
391394
}
392395

393-
if err := pbw.WriteMsg(resp); err != nil {
394-
panic(err)
396+
switch pmes.GetType() {
397+
case pb.Message_GET_VALUE:
398+
pi := hosts[1].Peerstore().PeerInfo(hosts[0].ID())
399+
resp := &pb.Message{
400+
Type: pmes.Type,
401+
CloserPeers: pb.PeerInfosToPBPeers(d.host.Network(), []peer.AddrInfo{pi}),
402+
}
403+
404+
if err := pbw.WriteMsg(resp); err != nil {
405+
panic(err)
406+
}
407+
default:
408+
panic("Shouldnt recieve this.")
395409
}
396-
default:
397-
panic("Shouldnt recieve this.")
398-
}
399-
})
410+
})
411+
}
400412

401413
// long timeout to ensure timing is not at play.
402414
ctx, cancel := context.WithTimeout(ctx, time.Second*20)

0 commit comments

Comments
 (0)