@@ -15,8 +15,7 @@ import (
1515 "github.com/libp2p/go-libp2p-core/peer"
1616 "github.com/libp2p/go-libp2p-core/peerstore"
1717 "github.com/libp2p/go-libp2p-core/routing"
18-
19- multistream "github.com/multiformats/go-multistream"
18+ "github.com/multiformats/go-multistream"
2019
2120 "golang.org/x/xerrors"
2221
@@ -26,12 +25,12 @@ import (
2625 opts "github.com/libp2p/go-libp2p-kad-dht/opts"
2726 pb "github.com/libp2p/go-libp2p-kad-dht/pb"
2827
29- cid "github.com/ipfs/go-cid"
28+ "github.com/ipfs/go-cid"
3029 u "github.com/ipfs/go-ipfs-util"
3130 kb "github.com/libp2p/go-libp2p-kbucket"
32- record "github.com/libp2p/go-libp2p-record"
31+ "github.com/libp2p/go-libp2p-record"
3332 swarmt "github.com/libp2p/go-libp2p-swarm/testing"
34- ci "github.com/libp2p/go-libp2p-testing/ci"
33+ "github.com/libp2p/go-libp2p-testing/ci"
3534 travisci "github.com/libp2p/go-libp2p-testing/ci/travis"
3635 bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
3736 ma "github.com/multiformats/go-multiaddr"
@@ -78,6 +77,36 @@ func (testValidator) Validate(_ string, b []byte) error {
7877 return nil
7978}
8079
80+ type testAtomicPutValidator struct {
81+ testValidator
82+ }
83+
84+ // selects the entry with the 'highest' last byte
85+ func (testAtomicPutValidator ) Select (_ string , bs [][]byte ) (int , error ) {
86+ index := - 1
87+ max := uint8 (0 )
88+ for i , b := range bs {
89+ if bytes .Equal (b , []byte ("valid" )) {
90+ if index == - 1 {
91+ index = i
92+ }
93+ continue
94+ }
95+
96+ str := string (b )
97+ n := str [len (str )- 1 ]
98+ if n > max {
99+ max = n
100+ index = i
101+ }
102+
103+ }
104+ if index == - 1 {
105+ return - 1 , errors .New ("no rec found" )
106+ }
107+ return index , nil
108+ }
109+
81110func setupDHT (ctx context.Context , t * testing.T , client bool ) * IpfsDHT {
82111 d , err := New (
83112 ctx ,
@@ -1107,6 +1136,51 @@ func TestBadProtoMessages(t *testing.T) {
11071136 }
11081137}
11091138
1139+ func TestAtomicPut (t * testing.T ) {
1140+ ctx , cancel := context .WithCancel (context .Background ())
1141+ defer cancel ()
1142+
1143+ d := setupDHT (ctx , t , false )
1144+ d .Validator = testAtomicPutValidator {}
1145+
1146+ // fnc to put a record
1147+ key := "testkey"
1148+ putRecord := func (value []byte ) error {
1149+ rec := record .MakePutRecord (key , value )
1150+ pmes := pb .NewMessage (pb .Message_PUT_VALUE , rec .Key , 0 )
1151+ pmes .Record = rec
1152+ _ , err := d .handlePutValue (ctx , "testpeer" , pmes )
1153+ return err
1154+ }
1155+
1156+ // put a valid record
1157+ if err := putRecord ([]byte ("valid" )); err != nil {
1158+ t .Fatal ("should not have errored on a valid record" )
1159+ }
1160+
1161+ // simultaneous puts for old & new values
1162+ values := [][]byte {[]byte ("newer1" ), []byte ("newer7" ), []byte ("newer3" ), []byte ("newer5" )}
1163+ var wg sync.WaitGroup
1164+ for _ , v := range values {
1165+ wg .Add (1 )
1166+ go func (v []byte ) {
1167+ defer wg .Done ()
1168+ putRecord (v )
1169+ }(v )
1170+ }
1171+ wg .Wait ()
1172+
1173+ // get should return the newest value
1174+ pmes := pb .NewMessage (pb .Message_GET_VALUE , []byte (key ), 0 )
1175+ msg , err := d .handleGetValue (ctx , "testkey" , pmes )
1176+ if err != nil {
1177+ t .Fatalf ("should not have errored on final get, but got %+v" , err )
1178+ }
1179+ if string (msg .GetRecord ().Value ) != "newer7" {
1180+ t .Fatalf ("Expected 'newer7' got '%s'" , string (msg .GetRecord ().Value ))
1181+ }
1182+ }
1183+
11101184func TestClientModeConnect (t * testing.T ) {
11111185 ctx , cancel := context .WithCancel (context .Background ())
11121186 defer cancel ()
0 commit comments