Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 49 additions & 17 deletions pkg/check/gsoc/gsoc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@ type Options struct {
PostageTTL time.Duration
PostageDepth uint64
PostageLabel string
Chunks int
}

// NewDefaultOptions returns new default options
func NewDefaultOptions() Options {
return Options{
PostageTTL: 24 * time.Hour,
PostageDepth: 17,
PostageDepth: 22,
PostageLabel: "test-label",
Chunks: 3,
}
}

Expand Down Expand Up @@ -95,14 +97,14 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts any
}

c.logger.Infof("send messages with different postage batches sequentially...")
err = run(ctx, uploadClient, listenClient, batches, c.logger, false)
err = run(ctx, uploadClient, listenClient, batches, c.logger, false, o.Chunks)
if err != nil {
return fmt.Errorf("sequential: %w", err)
}
c.logger.Infof("done")

c.logger.Infof("send messages with different postage batches parallel...")
err = run(ctx, uploadClient, listenClient, batches, c.logger, true)
err = run(ctx, uploadClient, listenClient, batches, c.logger, true, o.Chunks)
if err != nil {
return fmt.Errorf("parallel: %w", err)
}
Expand All @@ -111,8 +113,10 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts any
return nil
}

func run(ctx context.Context, uploadClient *bee.Client, listenClient *bee.Client, batches []string, logger logging.Logger, parallel bool) error {
const numChunks = 10
func run(ctx context.Context, uploadClient *bee.Client, listenClient *bee.Client, batches []string, logger logging.Logger, parallel bool, numChunks int) error {
if numChunks <= 0 {
numChunks = 3
}
privKey, err := crypto.GenerateSecp256k1Key()
if err != nil {
return err
Expand Down Expand Up @@ -171,21 +175,49 @@ func run(ctx context.Context, uploadClient *bee.Client, listenClient *bee.Client
return err
}

select {
case <-time.After(3 * time.Minute):
return fmt.Errorf("timeout: not all messages received")
case <-done:
}
// Wait for all messages to be received or timeout
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
timeout := time.After(3 * time.Minute)

receivedMtx.Lock()
defer receivedMtx.Unlock()
for i := range numChunks {
want := fmt.Sprintf("data %d", i)
if !received[want] {
return fmt.Errorf("message '%s' not received", want)
for {
select {
case <-done:
return nil
case <-timeout:
return fmt.Errorf("timeout: not all messages received")
case <-ticker.C:
receivedMtx.Lock()
if len(received) == numChunks {
receivedMtx.Unlock()
return nil
}

var missing []int
for i := range numChunks {
want := fmt.Sprintf("data %d", i)
if !received[want] {
missing = append(missing, i)
}
}
receivedMtx.Unlock()

if len(missing) == 0 {
continue
}

logger.Infof("gsoc: still missing %d chunks: %v. retrying...", len(missing), missing)

// Retry missing chunks sequentially to avoid flooding
for _, i := range missing {
payload := fmt.Sprintf("data %d", i)
logger.Infof("gsoc: retrying soc to node=%s, payload=%s", uploadClient.Name(), payload)
if err := uploadSoc(ctx, uploadClient, payload, resourceId, batches[i%2], privKey); err != nil {
logger.Errorf("gsoc: retry failed for %s: %v", payload, err)
}
}
}
}
return nil
}

func uploadSoc(ctx context.Context, client *bee.Client, payload string, resourceId []byte, batchID string, privKey *ecdsa.PrivateKey) error {
Expand Down
34 changes: 32 additions & 2 deletions pkg/check/pss/pss.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func NewDefaultOptions() Options {
AddressPrefix: 1,
GasPrice: "",
PostageTTL: 24 * time.Hour,
PostageDepth: 16,
PostageDepth: 22,
PostageLabel: "test-label",
RequestTimeout: 5 * time.Minute,
Seed: random.Int64(),
Expand Down Expand Up @@ -131,8 +131,38 @@ func (c *Check) testPss(nodeAName, nodeBName string, clients map[string]*bee.Cli
defer closer()

tStart := time.Now()
err = nodeA.SendPSSMessage(ctx, addrB.Overlay, addrB.PSSPublicKey, testTopic, o.AddressPrefix, testData, batchID)
c.metrics.SendAndReceiveGauge.WithLabelValues(nodeAName, nodeBName).Set(0)
for range 5 {
err = nodeA.SendPSSMessage(ctx, addrB.Overlay, addrB.PSSPublicKey, testTopic, o.AddressPrefix, testData, batchID)
if err == nil {
break
}

// check if message is received
select {
case msg := <-ch:
if msg == string(testData) {
c.logger.Info("pss: message received despite send failure")
return nil
}
default:
// continue
}

c.logger.Infof("pss: send failed, retrying in 1s: %v", err)
time.Sleep(1 * time.Second)
}
if err != nil {
// check if message is received
select {
case msg := <-ch:
if msg == string(testData) {
c.logger.Info("pss: message received despite send failure")
return nil
}
default:
// continue
}
return err
}
c.logger.Infof("pss: test data sent successfully to node %s. Waiting for response from node %s", nodeAName, nodeBName)
Expand Down