Skip to content

Commit 9a50a1f

Browse files
Fix parallel transfers between same two peers (#254)
* add test reproducing race * update test to run parallel transfers between 2 peers * test(impl): fix parallel transfer test and fix issue Update the parallel transfer test to fix the issues that made the test ineffective and then update go-graphsync to fix the actual issue * feat(deps): update to tagged GS Co-authored-by: tchardin <[email protected]>
1 parent 7870236 commit 9a50a1f

File tree

3 files changed

+182
-14
lines changed

3 files changed

+182
-14
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ require (
1313
github.com/ipfs/go-cid v0.0.7
1414
github.com/ipfs/go-datastore v0.4.5
1515
github.com/ipfs/go-ds-badger v0.2.6
16-
github.com/ipfs/go-graphsync v0.9.0
16+
github.com/ipfs/go-graphsync v0.9.1
1717
github.com/ipfs/go-ipfs-blockstore v1.0.1
1818
github.com/ipfs/go-ipfs-blocksutil v0.0.1
1919
github.com/ipfs/go-ipfs-chunker v0.0.5

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,8 +223,8 @@ github.com/ipfs/go-ds-badger v0.2.6/go.mod h1:02rnztVKA4aZwDuaRPTf8mpqcKmXP7mLl6
223223
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
224224
github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
225225
github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
226-
github.com/ipfs/go-graphsync v0.9.0 h1:T22kORlNbJUIm/+avUIfLnAf1BkwKG6aS19NsRVjVVY=
227-
github.com/ipfs/go-graphsync v0.9.0/go.mod h1:J62ahWT9JbPsFL2UWsUM5rOu0lZJ0LOIH1chHdxGGcw=
226+
github.com/ipfs/go-graphsync v0.9.1 h1:jo7ZaAZ3lal89RhKxKoRkPzIO8lmOY6KUWA1mDRZ2+U=
227+
github.com/ipfs/go-graphsync v0.9.1/go.mod h1:J62ahWT9JbPsFL2UWsUM5rOu0lZJ0LOIH1chHdxGGcw=
228228
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
229229
github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw=
230230
github.com/ipfs/go-ipfs-blockstore v0.1.4/go.mod h1:Jxm3XMVjh6R17WvxFEiyKBLUGr86HgIYJW/D/MwqeYQ=

impl/integration_test.go

Lines changed: 179 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package impl_test
33
import (
44
"bytes"
55
"context"
6+
"fmt"
67
"math/rand"
78
"os"
89
"testing"
@@ -21,6 +22,7 @@ import (
2122
chunker "github.com/ipfs/go-ipfs-chunker"
2223
offline "github.com/ipfs/go-ipfs-exchange-offline"
2324
ipldformat "github.com/ipfs/go-ipld-format"
25+
logging "github.com/ipfs/go-log/v2"
2426
"github.com/ipfs/go-merkledag"
2527
"github.com/ipfs/go-unixfs/importer/balanced"
2628
ihelper "github.com/ipfs/go-unixfs/importer/helpers"
@@ -1724,7 +1726,7 @@ func TestMultipleMessagesInExtension(t *testing.T) {
17241726
gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil)
17251727
host1 := gsData.Host1 // initiator, data sender
17261728

1727-
root, origBytes := LoadRandomData(ctx, t, gsData.DagService1)
1729+
root, origBytes := LoadRandomData(ctx, t, gsData.DagService1, 256000)
17281730
gsData.OrigBytes = origBytes
17291731
rootCid := root.(cidlink.Link).Cid
17301732
tp1 := gsData.SetupGSTransportHost1()
@@ -1860,8 +1862,174 @@ func TestMultipleMessagesInExtension(t *testing.T) {
18601862
gsData.VerifyFileTransferred(t, root, true)
18611863
}
18621864

1863-
func LoadRandomData(ctx context.Context, t *testing.T, dagService ipldformat.DAGService) (ipld.Link, []byte) {
1864-
data := make([]byte, 256000)
1865+
// completeRevalidator does not pause when sending the last voucher to confirm the deal is completed
1866+
type completeRevalidator struct {
1867+
*retrievalRevalidator
1868+
}
1869+
1870+
func (r *completeRevalidator) OnComplete(chid datatransfer.ChannelID) (bool, datatransfer.VoucherResult, error) {
1871+
return true, r.finalVoucher, nil
1872+
}
1873+
1874+
func TestMultipleParallelTransfers(t *testing.T) {
1875+
SetDTLogLevelDebug()
1876+
1877+
// Add more sizes here to trigger more transfers.
1878+
sizes := []int{300000, 256000, 200000, 256000}
1879+
1880+
ctx := context.Background()
1881+
1882+
gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil)
1883+
host1 := gsData.Host1 // initiator, data sender
1884+
1885+
tp1 := gsData.SetupGSTransportHost1()
1886+
tp2 := gsData.SetupGSTransportHost2()
1887+
1888+
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1)
1889+
require.NoError(t, err)
1890+
testutil.StartAndWaitForReady(ctx, t, dt1)
1891+
1892+
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2)
1893+
require.NoError(t, err)
1894+
testutil.StartAndWaitForReady(ctx, t, dt2)
1895+
1896+
// In this retrieval flow we expect 2 voucher results:
1897+
// The first one is sent as a response from the initial request telling the client
1898+
// the provider has accepted the request and is starting to send blocks
1899+
respVoucher := testutil.NewFakeDTType()
1900+
encodedRVR, err := encoding.Encode(respVoucher)
1901+
require.NoError(t, err)
1902+
1903+
// The final voucher result is sent by the provider to let the client know the deal is completed
1904+
finalVoucherResult := testutil.NewFakeDTType()
1905+
encodedFVR, err := encoding.Encode(finalVoucherResult)
1906+
require.NoError(t, err)
1907+
1908+
sv := testutil.NewStubbedValidator()
1909+
require.NoError(t, dt1.RegisterVoucherType(&testutil.FakeDTType{}, sv))
1910+
// Stub in the validator so it returns that exact voucher when calling ValidatePull
1911+
// this validator will not pause transfer when accepting a transfer and will start
1912+
// sending blocks immediately
1913+
sv.StubResult(respVoucher)
1914+
1915+
// no need for intermediary voucher results
1916+
voucherResults := []datatransfer.VoucherResult{}
1917+
1918+
pausePoints := []uint64{}
1919+
srv := &retrievalRevalidator{
1920+
testutil.NewStubbedRevalidator(), 0, 0, pausePoints, finalVoucherResult, voucherResults,
1921+
}
1922+
srv.ExpectSuccessErrResume()
1923+
require.NoError(t, dt1.RegisterRevalidator(testutil.NewFakeDTType(), srv))
1924+
1925+
// Register our response voucher with the client
1926+
require.NoError(t, dt2.RegisterVoucherResultType(respVoucher))
1927+
1928+
// for each size we create a new random DAG of the given size and try to retrieve it
1929+
for _, size := range sizes {
1930+
size := size
1931+
t.Run(fmt.Sprintf("size %d", size), func(t *testing.T) {
1932+
t.Parallel()
1933+
1934+
ctx, cancel := context.WithTimeout(ctx, 4*time.Second)
1935+
defer cancel()
1936+
1937+
errChan := make(chan struct{}, 2)
1938+
1939+
clientGotResponse := make(chan struct{}, 1)
1940+
clientFinished := make(chan struct{}, 1)
1941+
1942+
var chid datatransfer.ChannelID
1943+
chidReceived := make(chan struct{})
1944+
dt2.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) {
1945+
<-chidReceived
1946+
if chid != channelState.ChannelID() {
1947+
return
1948+
}
1949+
if event.Code == datatransfer.Error {
1950+
errChan <- struct{}{}
1951+
}
1952+
// Here we verify reception of voucherResults by the client
1953+
if event.Code == datatransfer.NewVoucherResult {
1954+
voucherResult := channelState.LastVoucherResult()
1955+
encodedVR, err := encoding.Encode(voucherResult)
1956+
require.NoError(t, err)
1957+
1958+
// If this voucher result is the response voucher no action is needed
1959+
// we just know that the provider has accepted the transfer and is sending blocks
1960+
if bytes.Equal(encodedVR, encodedRVR) {
1961+
// The test will fail if no response voucher is received
1962+
clientGotResponse <- struct{}{}
1963+
}
1964+
1965+
// If this voucher result is the final voucher result we need
1966+
// to send a new voucher to unpause the provider and complete the transfer
1967+
if bytes.Equal(encodedVR, encodedFVR) {
1968+
_ = dt2.SendVoucher(ctx, chid, testutil.NewFakeDTType())
1969+
}
1970+
}
1971+
1972+
if channelState.Status() == datatransfer.Completed {
1973+
clientFinished <- struct{}{}
1974+
}
1975+
})
1976+
1977+
providerFinished := make(chan struct{}, 1)
1978+
dt1.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) {
1979+
<-chidReceived
1980+
if chid != channelState.ChannelID() {
1981+
return
1982+
}
1983+
if event.Code == datatransfer.Error {
1984+
fmt.Println(event.Message)
1985+
errChan <- struct{}{}
1986+
}
1987+
if channelState.Status() == datatransfer.Completed {
1988+
providerFinished <- struct{}{}
1989+
}
1990+
})
1991+
1992+
root, origBytes := LoadRandomData(ctx, t, gsData.DagService1, size)
1993+
rootCid := root.(cidlink.Link).Cid
1994+
1995+
voucher := testutil.NewFakeDTType()
1996+
chid, err = dt2.OpenPullDataChannel(ctx, host1.ID(), voucher, rootCid, gsData.AllSelector)
1997+
require.NoError(t, err)
1998+
close(chidReceived)
1999+
// Expect the client to receive a response voucher, the provider to complete the transfer and
2000+
// the client to finish the transfer
2001+
for clientGotResponse != nil || providerFinished != nil || clientFinished != nil {
2002+
select {
2003+
case <-ctx.Done():
2004+
reason := "Did not complete successful data transfer"
2005+
switch true {
2006+
case clientGotResponse != nil:
2007+
reason = "client did not get initial response"
2008+
case clientFinished != nil:
2009+
reason = "client did not finish"
2010+
case providerFinished != nil:
2011+
reason = "provider did not finish"
2012+
}
2013+
t.Fatal(reason)
2014+
case <-clientGotResponse:
2015+
clientGotResponse = nil
2016+
case <-providerFinished:
2017+
providerFinished = nil
2018+
case <-clientFinished:
2019+
clientFinished = nil
2020+
case <-errChan:
2021+
t.Fatal("received unexpected error")
2022+
}
2023+
}
2024+
sv.VerifyExpectations(t)
2025+
srv.VerifyExpectations(t)
2026+
testutil.VerifyHasFile(gsData.Ctx, t, gsData.DagService2, root, origBytes)
2027+
})
2028+
}
2029+
}
2030+
2031+
func LoadRandomData(ctx context.Context, t *testing.T, dagService ipldformat.DAGService, size int) (ipld.Link, []byte) {
2032+
data := make([]byte, size)
18652033
rand.New(rand.NewSource(time.Now().UnixNano())).Read(data)
18662034

18672035
// import to UnixFS
@@ -1874,7 +2042,7 @@ func LoadRandomData(ctx context.Context, t *testing.T, dagService ipldformat.DAG
18742042
Dagserv: bufferedDS,
18752043
}
18762044

1877-
db, err := params.New(chunker.NewSizeSplitter(bytes.NewReader(data), int64(1<<10)))
2045+
db, err := params.New(chunker.NewSizeSplitter(bytes.NewReader(data), 128000))
18782046
require.NoError(t, err)
18792047

18802048
nd, err := balanced.Layout(db)
@@ -1928,10 +2096,10 @@ func (r *receiver) ReceiveRestartExistingChannelRequest(ctx context.Context,
19282096

19292097
}
19302098

1931-
//func SetDTLogLevelDebug() {
1932-
// _ = logging.SetLogLevel("dt-impl", "debug")
1933-
// _ = logging.SetLogLevel("dt-chanmon", "debug")
1934-
// _ = logging.SetLogLevel("dt_graphsync", "debug")
1935-
// _ = logging.SetLogLevel("data_transfer", "debug")
1936-
// _ = logging.SetLogLevel("data_transfer_network", "debug")
1937-
//}
2099+
func SetDTLogLevelDebug() {
2100+
_ = logging.SetLogLevel("dt-impl", "debug")
2101+
_ = logging.SetLogLevel("dt-chanmon", "debug")
2102+
_ = logging.SetLogLevel("dt_graphsync", "debug")
2103+
_ = logging.SetLogLevel("data-transfer", "debug")
2104+
_ = logging.SetLogLevel("data_transfer_network", "debug")
2105+
}

0 commit comments

Comments
 (0)