Skip to content

Commit e77aeb6

Browse files
committed
Send dns reg chunks in separate goroutines
1 parent 8a41fea commit e77aeb6

File tree

4 files changed

+85
-47
lines changed

4 files changed

+85
-47
lines changed

pkg/registrars/dns-registrar/examples/client/main.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,14 @@ func main() {
2727
pubKey, err := hex.DecodeString(key)
2828
util.Check(err)
2929

30-
req, err := requester.NewRequester(&requester.Config{
31-
TransportMethod: requester.UDP,
32-
Target: addr.String(),
33-
BaseDomain: *baseDomain,
34-
Pubkey: pubKey,
35-
})
36-
37-
util.Check(err)
38-
39-
tworeq, err := tworeqresp.NewRequester(req, 80)
30+
tworeq, err := tworeqresp.NewRequester(func() (tworeqresp.Onerequester, error) {
31+
return requester.NewRequester(&requester.Config{
32+
TransportMethod: requester.UDP,
33+
Target: addr.String(),
34+
BaseDomain: *baseDomain,
35+
Pubkey: pubKey,
36+
})
37+
}, 80)
4038
util.Check(err)
4139

4240
reader := bufio.NewReader(os.Stdin)

pkg/registrars/dns-registrar/tworeqresp/requester.go

Lines changed: 68 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,20 @@ type dialFunc = func(ctx context.Context, network, addr string) (net.Conn, error
1414

1515
const idLen = 8
1616

17-
type onerequester interface {
17+
type Onerequester interface {
1818
RequestAndRecv(sendBytes []byte) ([]byte, error)
1919
Close() error
2020
SetDialer(dialer dialFunc) error
2121
}
2222

2323
type Requester struct {
24-
parent onerequester
25-
mtu uint
24+
createRequester func() (Onerequester, error)
25+
mtu uint
26+
dialer dialFunc
2627
}
2728

28-
func NewRequester(parent onerequester, mtu uint) (*Requester, error) {
29-
return &Requester{parent: parent, mtu: mtu}, nil
29+
func NewRequester(createRequester func() (Onerequester, error), mtu uint) (*Requester, error) {
30+
return &Requester{createRequester: createRequester, mtu: mtu}, nil
3031
}
3132

3233
func (r *Requester) RequestAndRecv(sendBytes []byte) ([]byte, error) {
@@ -39,32 +40,71 @@ func (r *Requester) RequestAndRecv(sendBytes []byte) ([]byte, error) {
3940

4041
parts := splitIntoChunks(sendBytes, int(r.mtu))
4142

42-
for i, partBytes := range parts {
43-
toSend := &pb.DnsPartReq{Id: id[:], PartNum: proto.Uint32(uint32(i)), TotalParts: proto.Uint32(uint32(len(parts))), Data: partBytes}
44-
toSendBytes, err := proto.Marshal(toSend)
45-
if err != nil {
46-
return nil, fmt.Errorf("error marshal part %v: %v", i, err)
47-
}
43+
resCh := make(chan []byte, len(parts))
44+
errCh := make(chan error, len(parts))
45+
waitCh := make(chan struct{}, len(parts))
4846

49-
respBytes, err := r.parent.RequestAndRecv(toSendBytes)
50-
if err != nil {
51-
return nil, fmt.Errorf("error request part %v: %v", i, err)
52-
}
47+
for i, partBytes := range parts {
48+
i := i
49+
partBytes := partBytes
50+
go func() {
51+
toSend := &pb.DnsPartReq{Id: id[:], PartNum: proto.Uint32(uint32(i)), TotalParts: proto.Uint32(uint32(len(parts))), Data: partBytes}
52+
toSendBytes, err := proto.Marshal(toSend)
53+
if err != nil {
54+
errCh <- fmt.Errorf("error marshal part %v: %v", i, err)
55+
return
56+
}
57+
58+
req, err := r.createRequester()
59+
if err != nil {
60+
errCh <- fmt.Errorf("error creating requester in part %v: %v", i, err)
61+
return
62+
}
63+
64+
if r.dialer != nil {
65+
err = req.SetDialer(r.dialer)
66+
if err != nil {
67+
errCh <- fmt.Errorf("error setting dialer in part %v: %v", i, err)
68+
return
69+
}
70+
}
71+
72+
respBytes, err := req.RequestAndRecv(toSendBytes)
73+
if err != nil {
74+
errCh <- fmt.Errorf("error request part %v: %v", i, err)
75+
return
76+
}
77+
78+
resp := &pb.DnsPartResp{}
79+
err = proto.Unmarshal(respBytes, resp)
80+
if err != nil {
81+
errCh <- fmt.Errorf("error unmarshal response: %v", err)
82+
return
83+
}
84+
85+
if resp.GetWaiting() {
86+
waitCh <- struct{}{}
87+
return
88+
}
89+
90+
resCh <- resp.GetData()
91+
92+
}()
93+
}
5394

54-
resp := &pb.DnsPartResp{}
55-
err = proto.Unmarshal(respBytes, resp)
56-
if err != nil {
57-
return nil, fmt.Errorf("error unmarshal response: %v", err)
58-
}
95+
errs := []error{}
5996

60-
if resp.GetWaiting() {
61-
continue
97+
for range parts {
98+
select {
99+
case res := <-resCh:
100+
return res, nil
101+
case err = <-errCh:
102+
errs = append(errs, err)
103+
case <-waitCh:
62104
}
63-
64-
return resp.GetData(), nil
65105
}
66106

67-
return nil, fmt.Errorf("no response")
107+
return nil, fmt.Errorf("errors occurred: %v", errs)
68108
}
69109

70110
func splitIntoChunks(data []byte, mtu int) [][]byte {
@@ -81,10 +121,11 @@ func splitIntoChunks(data []byte, mtu int) [][]byte {
81121

82122
// Close closes the parent transport
83123
func (r *Requester) Close() error {
84-
return r.parent.Close()
124+
return nil
85125
}
86126

87127
// SetDialer sets the parent dialer
88128
func (r *Requester) SetDialer(dialer dialFunc) error {
89-
return r.parent.SetDialer(dialer)
129+
r.dialer = dialer
130+
return nil
90131
}

pkg/registrars/dns-registrar/tworeqresp/requester_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
11
package tworeqresp
22

33
import (
4+
"sync/atomic"
45
"testing"
56

67
pb "github.com/refraction-networking/conjure/proto"
78
"google.golang.org/protobuf/proto"
89
)
910

1011
type mockRequester struct {
11-
calls int
12+
calls atomic.Uint32
1213
}
1314

1415
func (r *mockRequester) RequestAndRecv([]byte) ([]byte, error) {
15-
r.calls += 1
16+
r.calls.Add(1)
1617
return proto.Marshal(&pb.DnsPartResp{Waiting: proto.Bool(true)})
1718
}
1819

@@ -47,13 +48,13 @@ func TestSpliting(t *testing.T) {
4748
} {
4849
t.Run(testCase.name, func(t *testing.T) {
4950
parent := &mockRequester{}
50-
requester, err := NewRequester(parent, mtu)
51+
requester, err := NewRequester(func() (Onerequester, error) { return parent, nil }, mtu)
5152
if err != nil {
5253
t.Fatalf("error creating requester: %v", err)
5354
}
5455
_, _ = requester.RequestAndRecv(testCase.data)
55-
if parent.calls != testCase.chunksExpected {
56-
t.Fatalf("calls: %v, expected: %v", parent.calls, testCase.chunksExpected)
56+
if int(parent.calls.Load()) != testCase.chunksExpected {
57+
t.Fatalf("calls: %v, expected: %v", parent.calls.Load(), testCase.chunksExpected)
5758
}
5859
})
5960

pkg/registrars/registration/dns-registrar.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,16 +61,14 @@ func createRequester(config *Config) (*requester.Requester, error) {
6161

6262
// NewDNSRegistrar creates a DNSRegistrar from config
6363
func NewDNSRegistrar(config *Config) (*DNSRegistrar, error) {
64-
req, err := createRequester(config)
65-
if err != nil {
66-
return nil, fmt.Errorf("error creating requester: %v", err)
67-
}
6864

6965
mtu := config.DNSRegistrarMTU
7066
if mtu == 0 {
7167
mtu = defaultMTU
7268
}
73-
tworeq, err := tworeqresp.NewRequester(req, mtu)
69+
tworeq, err := tworeqresp.NewRequester(func() (tworeqresp.Onerequester, error) {
70+
return createRequester(config)
71+
}, mtu)
7472
if err != nil {
7573
return nil, fmt.Errorf("error adding fragmentation layer: %v", err)
7674
}

0 commit comments

Comments
 (0)