Skip to content

Commit 2ec6234

Browse files
authored
fix: more reliable pss and gsoc (#462)
* fix: pss and gsoc * chore: fix lint * chore: add timeout * fix: review comments * fix: graceful shutdown * fix: review comments * fix: review comments * fix: use 10m request timeout
1 parent 1642c39 commit 2ec6234

File tree

6 files changed

+143
-132
lines changed

6 files changed

+143
-132
lines changed

config/config.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,8 +247,8 @@ checks:
247247
postage-ttl: 24h
248248
postage-depth: 21
249249
postage-label: test-label
250-
request-timeout: 5m
251-
timeout: 5m
250+
request-timeout: 10m
251+
timeout: 30m
252252
type: pss
253253
pullsync:
254254
options:

config/local.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,8 +223,8 @@ checks:
223223
postage-ttl: 24h
224224
postage-depth: 21
225225
postage-label: test-label
226-
request-timeout: 5m
227-
timeout: 5m
226+
request-timeout: 10m
227+
timeout: 30m
228228
type: pss
229229
ci-pushsync-chunks:
230230
options:

config/public-testnet.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ checks:
9393
postage-ttl: 336h
9494
postage-depth: 21
9595
postage-label: test-label
96-
request-timeout: 5m
96+
request-timeout: 10m
9797
timeout: 30m
9898
type: pss
9999
pt-soc:

pkg/check/gsoc/gsoc.go

Lines changed: 24 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"encoding/binary"
77
"encoding/hex"
88
"fmt"
9-
"net/http"
109
"strconv"
1110
"sync"
1211
"time"
@@ -19,7 +18,7 @@ import (
1918
"github.com/ethersphere/beekeeper/pkg/beekeeper"
2019
"github.com/ethersphere/beekeeper/pkg/logging"
2120
"github.com/ethersphere/beekeeper/pkg/orchestration"
22-
"github.com/gorilla/websocket"
21+
"github.com/ethersphere/beekeeper/pkg/wslistener"
2322
"golang.org/x/sync/errgroup"
2423
)
2524

@@ -122,22 +121,36 @@ func run(ctx context.Context, uploadClient *bee.Client, listenClient *bee.Client
122121
if err != nil {
123122
return err
124123
}
125-
logger.Infof("gsoc: socAddress=%s, listner node address=%s", socAddress, addresses.Overlay)
124+
logger.Infof("gsoc: socAddress=%s, listener node address=%s, node=%s", socAddress, addresses.Overlay, listenClient.Name())
126125

127-
listener := &socListener{}
128-
ch, err := listener.Listen(ctx, listenClient, socAddress, logger)
126+
ctx, cancel := context.WithCancel(ctx)
127+
ch, closer, err := wslistener.ListenWebSocket(ctx, listenClient, fmt.Sprintf("/gsoc/subscribe/%s", socAddress), logger)
129128
if err != nil {
129+
cancel()
130130
return fmt.Errorf("listen websocket: %w", err)
131131
}
132-
defer listener.Close()
132+
133+
defer func() {
134+
cancel()
135+
closer()
136+
}()
133137

134138
received := make(map[string]bool, numChunks)
135139
receivedMtx := new(sync.Mutex)
140+
done := make(chan struct{})
136141

137142
go func() {
138143
for p := range ch {
139144
receivedMtx.Lock()
145+
if !received[p] {
146+
logger.Infof("gsoc: received message %s on node %s", p, listenClient.Name())
147+
}
140148
received[p] = true
149+
if len(received) == numChunks {
150+
close(done)
151+
receivedMtx.Unlock()
152+
return
153+
}
141154
receivedMtx.Unlock()
142155
}
143156
}()
@@ -151,16 +164,14 @@ func run(ctx context.Context, uploadClient *bee.Client, listenClient *bee.Client
151164
return err
152165
}
153166

154-
// wait for listener to receive all messages
155-
time.Sleep(5 * time.Second)
156-
listener.Close()
167+
select {
168+
case <-time.After(1 * time.Minute):
169+
return fmt.Errorf("timeout: not all messages received")
170+
case <-done:
171+
}
157172

158173
receivedMtx.Lock()
159174
defer receivedMtx.Unlock()
160-
if len(received) != numChunks {
161-
return fmt.Errorf("expected %d messages, got %d", numChunks, len(received))
162-
}
163-
164175
for i := 0; i < numChunks; i++ {
165176
want := fmt.Sprintf("data %d", i)
166177
if !received[want] {
@@ -289,58 +300,3 @@ func makeSoc(msg string, id []byte, privKey *ecdsa.PrivateKey) (*socData, error)
289300
Data: ch.Data(),
290301
}, nil
291302
}
292-
293-
type socListener struct {
294-
listening bool
295-
ws *websocket.Conn
296-
ch chan string
297-
}
298-
299-
func (s *socListener) Close() {
300-
if !s.listening {
301-
return
302-
}
303-
304-
s.listening = false
305-
s.ws.Close()
306-
close(s.ch)
307-
}
308-
309-
func (s *socListener) Listen(ctx context.Context, client *bee.Client, addr swarm.Address, logger logging.Logger) (<-chan string, error) {
310-
dialer := &websocket.Dialer{
311-
Proxy: http.ProxyFromEnvironment,
312-
HandshakeTimeout: 45 * time.Second,
313-
}
314-
315-
ws, _, err := dialer.DialContext(ctx, fmt.Sprintf("ws://%s/gsoc/subscribe/%s", client.Host(), addr), http.Header{})
316-
if err != nil {
317-
return nil, err
318-
}
319-
s.ws = ws
320-
s.ch = make(chan string)
321-
s.listening = true
322-
323-
go func(nodeName string) {
324-
for {
325-
select {
326-
case <-ctx.Done():
327-
return
328-
default:
329-
msgType, data, err := ws.ReadMessage()
330-
if err != nil {
331-
logger.WithField("node", nodeName).Infof("gsoc: websocket error: %v", err)
332-
return
333-
}
334-
if msgType != websocket.BinaryMessage {
335-
logger.WithField("node", nodeName).Info("gsoc: websocket received non-binary message")
336-
continue
337-
}
338-
339-
logger.WithField("node", nodeName).Infof("gsoc: websocket received message: %s", string(data))
340-
s.ch <- string(data)
341-
}
342-
}
343-
}(client.Name())
344-
345-
return s.ch, nil
346-
}

pkg/check/pss/pss.go

Lines changed: 21 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,16 @@ package pss
22

33
import (
44
"context"
5-
"errors"
65
"fmt"
76
"math/rand"
8-
"net/http"
97
"time"
108

119
"github.com/ethersphere/beekeeper/pkg/bee"
1210
"github.com/ethersphere/beekeeper/pkg/beekeeper"
1311
"github.com/ethersphere/beekeeper/pkg/logging"
1412
"github.com/ethersphere/beekeeper/pkg/orchestration"
1513
"github.com/ethersphere/beekeeper/pkg/random"
16-
"github.com/gorilla/websocket"
14+
"github.com/ethersphere/beekeeper/pkg/wslistener"
1715
)
1816

1917
// Options represents check options
@@ -100,96 +98,60 @@ func pickAtRandom(r *rand.Rand, names []string, skip string) string {
10098
}
10199
}
102100

103-
var (
104-
errDataMismatch = errors.New("pss: data sent and received are not equal")
105-
errWebsocketConnection = errors.New("pss: websocket connection terminated with an error")
106-
)
107-
108101
var (
109102
testData = []byte("Hello Swarm :)")
110103
testTopic = "test"
111104
)
112105

113106
func (c *Check) testPss(nodeAName, nodeBName string, clients map[string]*bee.Client, o Options) error {
114107
ctx, cancel := context.WithTimeout(context.Background(), o.RequestTimeout)
108+
defer cancel()
115109

116110
nodeA := clients[nodeAName]
117111
nodeB := clients[nodeBName]
118112

119113
addrB, err := nodeB.Addresses(ctx)
120114
if err != nil {
121-
cancel()
122115
return err
123116
}
124117

125118
batchID, err := nodeA.GetOrCreateMutableBatch(ctx, o.PostageTTL, o.PostageDepth, o.PostageLabel)
126119
if err != nil {
127-
cancel()
128120
return fmt.Errorf("node %s: batched id %w", nodeAName, err)
129121
}
130122
c.logger.Infof("node %s: batched id %s", nodeAName, batchID)
131123

132-
ch, close, err := listenWebsocket(ctx, nodeB.Host(), testTopic, c.logger)
124+
ch, closer, err := wslistener.ListenWebSocket(ctx, nodeB, fmt.Sprintf("/pss/subscribe/%s", testTopic), c.logger)
133125
if err != nil {
134-
cancel()
135126
return err
136127
}
137128

138129
c.logger.Infof("pss: sending test data to node %s and listening on node %s", nodeAName, nodeBName)
139130

131+
defer closer()
132+
140133
tStart := time.Now()
141134
err = nodeA.SendPSSMessage(ctx, addrB.Overlay, addrB.PSSPublicKey, testTopic, o.AddressPrefix, testData, batchID)
142135
if err != nil {
143-
close()
144-
cancel()
145136
return err
146137
}
138+
c.logger.Infof("pss: test data sent successfully to node %s. Waiting for response from node %s", nodeAName, nodeBName)
147139

148-
msg, ok := <-ch
149-
if ok {
150-
if msg == string(testData) {
151-
c.logger.Info("pss: websocket connection received correct message")
152-
c.metrics.SendAndReceiveGauge.WithLabelValues(nodeAName, nodeBName).Set(time.Since(tStart).Seconds())
153-
} else {
154-
err = errDataMismatch
140+
for {
141+
select {
142+
case <-time.After(1 * time.Minute):
143+
return fmt.Errorf("correct message not received after %s", 1*time.Minute)
144+
case msg, ok := <-ch:
145+
if !ok {
146+
return fmt.Errorf("ws closed before receiving correct message")
147+
}
148+
149+
if msg == string(testData) {
150+
c.logger.Info("pss: websocket connection received correct message")
151+
c.metrics.SendAndReceiveGauge.WithLabelValues(nodeAName, nodeBName).Set(time.Since(tStart).Seconds())
152+
return nil
153+
}
154+
c.logger.Infof("pss: received incorrect message. trying again. want %s, got %s", string(testData), msg)
155155
}
156-
} else {
157-
err = errWebsocketConnection
158-
}
159-
160-
cancel()
161-
close()
162-
163-
if err != nil {
164-
return err
165156
}
166-
167-
return nil
168-
}
169-
170-
func listenWebsocket(ctx context.Context, host string, topic string, logger logging.Logger) (<-chan string, func(), error) {
171-
dialer := &websocket.Dialer{
172-
Proxy: http.ProxyFromEnvironment,
173-
HandshakeTimeout: 45 * time.Second,
174-
}
175-
176-
ws, _, err := dialer.DialContext(ctx, fmt.Sprintf("ws://%s/pss/subscribe/%s", host, topic), http.Header{})
177-
if err != nil {
178-
return nil, nil, err
179-
}
180-
181-
ch := make(chan string)
182-
183-
go func() {
184-
_, data, err := ws.ReadMessage()
185-
if err != nil {
186-
logger.Infof("pss: websocket error %v", err)
187-
close(ch)
188-
return
189-
}
190-
191-
ch <- string(data)
192-
}()
193-
194-
return ch, func() { ws.Close() }, nil
195157
}

0 commit comments

Comments
 (0)