diff --git a/pkg/check/gsoc/gsoc.go b/pkg/check/gsoc/gsoc.go index 87f39099..0334c22b 100644 --- a/pkg/check/gsoc/gsoc.go +++ b/pkg/check/gsoc/gsoc.go @@ -29,14 +29,16 @@ type Options struct { PostageTTL time.Duration PostageDepth uint64 PostageLabel string + Chunks int } // NewDefaultOptions returns new default options func NewDefaultOptions() Options { return Options{ PostageTTL: 24 * time.Hour, - PostageDepth: 17, + PostageDepth: 22, PostageLabel: "test-label", + Chunks: 3, } } @@ -95,14 +97,14 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts any } c.logger.Infof("send messages with different postage batches sequentially...") - err = run(ctx, uploadClient, listenClient, batches, c.logger, false) + err = run(ctx, uploadClient, listenClient, batches, c.logger, false, o.Chunks) if err != nil { return fmt.Errorf("sequential: %w", err) } c.logger.Infof("done") c.logger.Infof("send messages with different postage batches parallel...") - err = run(ctx, uploadClient, listenClient, batches, c.logger, true) + err = run(ctx, uploadClient, listenClient, batches, c.logger, true, o.Chunks) if err != nil { return fmt.Errorf("parallel: %w", err) } @@ -111,8 +113,10 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts any return nil } -func run(ctx context.Context, uploadClient *bee.Client, listenClient *bee.Client, batches []string, logger logging.Logger, parallel bool) error { - const numChunks = 10 +func run(ctx context.Context, uploadClient *bee.Client, listenClient *bee.Client, batches []string, logger logging.Logger, parallel bool, numChunks int) error { + if numChunks <= 0 { + numChunks = 3 + } privKey, err := crypto.GenerateSecp256k1Key() if err != nil { return err @@ -171,21 +175,49 @@ func run(ctx context.Context, uploadClient *bee.Client, listenClient *bee.Client return err } - select { - case <-time.After(3 * time.Minute): - return fmt.Errorf("timeout: not all messages received") - case <-done: - } + // Wait for all messages to be received or timeout + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + timeout := time.After(3 * time.Minute) - receivedMtx.Lock() - defer receivedMtx.Unlock() - for i := range numChunks { - want := fmt.Sprintf("data %d", i) - if !received[want] { - return fmt.Errorf("message '%s' not received", want) + for { + select { + case <-done: + return nil + case <-timeout: + return fmt.Errorf("timeout: not all messages received") + case <-ticker.C: + receivedMtx.Lock() + if len(received) == numChunks { + receivedMtx.Unlock() + return nil + } + + var missing []int + for i := range numChunks { + want := fmt.Sprintf("data %d", i) + if !received[want] { + missing = append(missing, i) + } + } + receivedMtx.Unlock() + + if len(missing) == 0 { + continue + } + + logger.Infof("gsoc: still missing %d chunks: %v. retrying...", len(missing), missing) + + // Retry missing chunks sequentially to avoid flooding + for _, i := range missing { + payload := fmt.Sprintf("data %d", i) + logger.Infof("gsoc: retrying soc to node=%s, payload=%s", uploadClient.Name(), payload) + if err := uploadSoc(ctx, uploadClient, payload, resourceId, batches[i%2], privKey); err != nil { + logger.Errorf("gsoc: retry failed for %s: %v", payload, err) + } + } } } - return nil } func uploadSoc(ctx context.Context, client *bee.Client, payload string, resourceId []byte, batchID string, privKey *ecdsa.PrivateKey) error { diff --git a/pkg/check/pss/pss.go b/pkg/check/pss/pss.go index 6856ad67..dd502ee2 100644 --- a/pkg/check/pss/pss.go +++ b/pkg/check/pss/pss.go @@ -33,7 +33,7 @@ func NewDefaultOptions() Options { AddressPrefix: 1, GasPrice: "", PostageTTL: 24 * time.Hour, - PostageDepth: 16, + PostageDepth: 22, PostageLabel: "test-label", RequestTimeout: 5 * time.Minute, Seed: random.Int64(), @@ -131,8 +131,38 @@ func (c *Check) testPss(nodeAName, nodeBName string, clients map[string]*bee.Cli defer closer() tStart := time.Now() - err = nodeA.SendPSSMessage(ctx, addrB.Overlay, addrB.PSSPublicKey, testTopic, o.AddressPrefix, testData, batchID) + c.metrics.SendAndReceiveGauge.WithLabelValues(nodeAName, nodeBName).Set(0) + for range 5 { + err = nodeA.SendPSSMessage(ctx, addrB.Overlay, addrB.PSSPublicKey, testTopic, o.AddressPrefix, testData, batchID) + if err == nil { + break + } + + // check if message is received + select { + case msg := <-ch: + if msg == string(testData) { + c.logger.Info("pss: message received despite send failure") + return nil + } + default: + // continue + } + + c.logger.Infof("pss: send failed, retrying in 1s: %v", err) + time.Sleep(1 * time.Second) + } if err != nil { + // check if message is received + select { + case msg := <-ch: + if msg == string(testData) { + c.logger.Info("pss: message received despite send failure") + return nil + } + default: + // continue + } return err } c.logger.Infof("pss: test data sent successfully to node %s. Waiting for response from node %s", nodeAName, nodeBName)