Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 49 additions & 17 deletions pkg/check/gsoc/gsoc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@ type Options struct {
PostageTTL time.Duration
PostageDepth uint64
PostageLabel string
Chunks int
}

// NewDefaultOptions returns new default options
func NewDefaultOptions() Options {
return Options{
PostageTTL: 24 * time.Hour,
PostageDepth: 17,
PostageDepth: 22,
PostageLabel: "test-label",
Chunks: 3,
}
}

Expand Down Expand Up @@ -95,14 +97,14 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts any
}

c.logger.Infof("send messages with different postage batches sequentially...")
err = run(ctx, uploadClient, listenClient, batches, c.logger, false)
err = run(ctx, uploadClient, listenClient, batches, c.logger, false, o.Chunks)
if err != nil {
return fmt.Errorf("sequential: %w", err)
}
c.logger.Infof("done")

c.logger.Infof("send messages with different postage batches parallel...")
err = run(ctx, uploadClient, listenClient, batches, c.logger, true)
err = run(ctx, uploadClient, listenClient, batches, c.logger, true, o.Chunks)
if err != nil {
return fmt.Errorf("parallel: %w", err)
}
Expand All @@ -111,8 +113,10 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts any
return nil
}

func run(ctx context.Context, uploadClient *bee.Client, listenClient *bee.Client, batches []string, logger logging.Logger, parallel bool) error {
const numChunks = 10
func run(ctx context.Context, uploadClient *bee.Client, listenClient *bee.Client, batches []string, logger logging.Logger, parallel bool, numChunks int) error {
if numChunks <= 0 {
numChunks = 3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should show a warning to user that we are changing number of chunks to 3 ?
Why 3 exactly ?

}
privKey, err := crypto.GenerateSecp256k1Key()
if err != nil {
return err
Expand Down Expand Up @@ -171,21 +175,49 @@ func run(ctx context.Context, uploadClient *bee.Client, listenClient *bee.Client
return err
}

select {
case <-time.After(3 * time.Minute):
return fmt.Errorf("timeout: not all messages received")
case <-done:
}
// Wait for all messages to be received or timeout
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
timeout := time.After(3 * time.Minute)

receivedMtx.Lock()
defer receivedMtx.Unlock()
for i := range numChunks {
want := fmt.Sprintf("data %d", i)
if !received[want] {
return fmt.Errorf("message '%s' not received", want)
for {
select {
case <-done:
return nil
case <-timeout:
return fmt.Errorf("timeout: not all messages received")
case <-ticker.C:
receivedMtx.Lock()
if len(received) == numChunks {
receivedMtx.Unlock()
return nil
}

var missing []int
for i := range numChunks {
want := fmt.Sprintf("data %d", i)
if !received[want] {
missing = append(missing, i)
}
}
receivedMtx.Unlock()

if len(missing) == 0 {
continue
}

logger.Infof("gsoc: still missing %d chunks: %v. retrying...", len(missing), missing)

// Retry missing chunks sequentially to avoid flooding
for _, i := range missing {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the retry loop check ctx.Done() before each iteration?
Without it, cancellation during retry execution won't be detected until all retries complete.

payload := fmt.Sprintf("data %d", i)
logger.Infof("gsoc: retrying soc to node=%s, payload=%s", uploadClient.Name(), payload)
if err := uploadSoc(ctx, uploadClient, payload, resourceId, batches[i%2], privKey); err != nil {
logger.Errorf("gsoc: retry failed for %s: %v", payload, err)
}
}
}
}
return nil
}

func uploadSoc(ctx context.Context, client *bee.Client, payload string, resourceId []byte, batchID string, privKey *ecdsa.PrivateKey) error {
Expand Down
34 changes: 32 additions & 2 deletions pkg/check/pss/pss.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func NewDefaultOptions() Options {
AddressPrefix: 1,
GasPrice: "",
PostageTTL: 24 * time.Hour,
PostageDepth: 16,
PostageDepth: 22,
PostageLabel: "test-label",
RequestTimeout: 5 * time.Minute,
Seed: random.Int64(),
Expand Down Expand Up @@ -131,8 +131,38 @@ func (c *Check) testPss(nodeAName, nodeBName string, clients map[string]*bee.Cli
defer closer()

tStart := time.Now()
err = nodeA.SendPSSMessage(ctx, addrB.Overlay, addrB.PSSPublicKey, testTopic, o.AddressPrefix, testData, batchID)
c.metrics.SendAndReceiveGauge.WithLabelValues(nodeAName, nodeBName).Set(0)
for range 5 {
err = nodeA.SendPSSMessage(ctx, addrB.Overlay, addrB.PSSPublicKey, testTopic, o.AddressPrefix, testData, batchID)
if err == nil {
break
}

// check if message is received
select {
case msg := <-ch:
if msg == string(testData) {
c.logger.Info("pss: message received despite send failure")
return nil
}
default:
// continue
}

c.logger.Infof("pss: send failed, retrying in 1s: %v", err)
time.Sleep(1 * time.Second)
}
if err != nil {
// check if message is received
select {
case msg := <-ch:
if msg == string(testData) {
c.logger.Info("pss: message received despite send failure")
return nil
}
default:
// continue
}
return err
}
c.logger.Infof("pss: test data sent successfully to node %s. Waiting for response from node %s", nodeAName, nodeBName)
Expand Down