diff --git a/openapi/Swarm.yaml b/openapi/Swarm.yaml index 7f48e32d329..70115c3c8fc 100644 --- a/openapi/Swarm.yaml +++ b/openapi/Swarm.yaml @@ -879,6 +879,7 @@ paths: - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageStamp" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmAct" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmActHistoryAddress" + - $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" requestBody: required: true description: The SOC binary data is composed of the span (8 bytes) and the at most 4KB payload. @@ -926,6 +927,7 @@ paths: description: Arbitrary identifier of the related data - $ref: "SwarmCommon.yaml#/components/parameters/SwarmOnlyRootChunkParameter" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmCache" + - $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyStrategyParameter" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyFallbackModeParameter" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmChunkRetrievalTimeoutParameter" @@ -977,6 +979,7 @@ paths: - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageBatchId" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmAct" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmActHistoryAddress" + - $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" responses: "201": description: Created @@ -1035,6 +1038,7 @@ paths: - $ref: "SwarmCommon.yaml#/components/parameters/SwarmFeedLegacyResolve" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmOnlyRootChunkParameter" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmCache" + - $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyStrategyParameter" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyFallbackModeParameter" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmChunkRetrievalTimeoutParameter" diff --git a/pkg/api/bzz.go b/pkg/api/bzz.go index a27e3f813c5..fd24b425da0 100644 --- a/pkg/api/bzz.go +++ b/pkg/api/bzz.go @@ -32,6 +32,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/log" "github.com/ethersphere/bee/v2/pkg/manifest" "github.com/ethersphere/bee/v2/pkg/postage" + "github.com/ethersphere/bee/v2/pkg/replicas" "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/storer" "github.com/ethersphere/bee/v2/pkg/swarm" @@ -406,7 +407,8 @@ func (s *Service) serveReference(logger log.Logger, address swarm.Address, pathV } ctx := r.Context() - ls := loadsave.NewReadonly(s.storer.Download(cache), s.storer.Cache(), redundancy.DefaultLevel) + g := s.storer.Download(cache) + ls := loadsave.NewReadonly(g, s.storer.Cache(), rLevel) feedDereferenced := false ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, logger) @@ -434,7 +436,7 @@ FETCH: // unmarshal as mantaray first and possibly resolve the feed, otherwise // go on normally. if !feedDereferenced { - if l, err := s.manifestFeed(ctx, m); err == nil { + if l, err := s.manifestFeed(ctx, m, replicas.NewSocGetter(g, rLevel)); err == nil { // we have a feed manifest here ch, cur, _, err := l.At(ctx, time.Now().Unix(), 0) if err != nil { @@ -701,6 +703,7 @@ func manifestMetadataLoad( func (s *Service) manifestFeed( ctx context.Context, m manifest.Interface, + st storage.Getter, ) (feeds.Lookup, error) { e, err := m.Lookup(ctx, "/") if err != nil { @@ -733,5 +736,5 @@ func (s *Service) manifestFeed( return nil, fmt.Errorf("node lookup: %s", "feed metadata absent") } f := feeds.New(topic, common.BytesToAddress(owner)) - return s.feedFactory.NewLookup(*t, f) + return s.feedFactory.NewLookup(*t, f, st) } diff --git a/pkg/api/bzz_test.go b/pkg/api/bzz_test.go index 7f636476d1f..74ba32cd0bf 100644 --- a/pkg/api/bzz_test.go +++ b/pkg/api/bzz_test.go @@ -7,6 +7,7 @@ package api_test import ( "bytes" "context" + "encoding/hex" "errors" "fmt" "io" @@ -19,6 +20,7 @@ import ( "testing" "github.com/ethersphere/bee/v2/pkg/api" + "github.com/ethersphere/bee/v2/pkg/feeds" "github.com/ethersphere/bee/v2/pkg/file/loadsave" "github.com/ethersphere/bee/v2/pkg/file/redundancy" "github.com/ethersphere/bee/v2/pkg/jsonhttp" @@ -27,7 +29,10 @@ import ( "github.com/ethersphere/bee/v2/pkg/manifest" mockbatchstore "github.com/ethersphere/bee/v2/pkg/postage/batchstore/mock" mockpost "github.com/ethersphere/bee/v2/pkg/postage/mock" + "github.com/ethersphere/bee/v2/pkg/replicas" + "github.com/ethersphere/bee/v2/pkg/soc" testingsoc "github.com/ethersphere/bee/v2/pkg/soc/testing" + "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/storage/inmemchunkstore" mockstorer "github.com/ethersphere/bee/v2/pkg/storer/mock" "github.com/ethersphere/bee/v2/pkg/swarm" @@ -806,6 +811,161 @@ func TestFeedIndirection(t *testing.T) { t.Fatalf("expected file reference, did not got any") } + // get root chunk of data + // and wrap it in a feed + rootCh, err := storer.ChunkStore().Get(context.Background(), resp.Reference) + if err != nil { + t.Fatal(err) + } + socRootCh := testingsoc.GenerateMockSOC(t, rootCh.Data()[swarm.SpanSize:]).Chunk() + + // now use the "content" to mock the feed lookup + // also, use the mocked mantaray chunks that unmarshal + // into a real manifest with the mocked feed values when + // called from the bzz endpoint. then call the bzz endpoint with + // the pregenerated feed root manifest hash + + t.Run("feed wrapping", func(t *testing.T) { + var ( + look = newMockLookup(-1, 0, socRootCh, nil, &id{}, nil) + factory = newMockFactory(look) + bzzDownloadResource = func(addr, path string) string { return "/bzz/" + addr + "/" + path } + ctx = context.Background() + ) + client, _, _, _ = newTestServer(t, testServerOptions{ + Storer: storer, + Logger: logger, + Feeds: factory, + }) + if err != nil { + t.Fatal(err) + } + m, err := manifest.NewDefaultManifest( + loadsave.New(storer.ChunkStore(), storer.Cache(), pipelineFactory(storer.Cache(), false, 0), redundancy.DefaultLevel), + false, + ) + if err != nil { + t.Fatal(err) + } + emptyAddr := make([]byte, 32) + err = m.Add(ctx, manifest.RootPath, manifest.NewEntry(swarm.NewAddress(emptyAddr), map[string]string{ + api.FeedMetadataEntryOwner: "8d3766440f0d7b949a5e32995d09619a7f86e632", + api.FeedMetadataEntryTopic: "abcc", + api.FeedMetadataEntryType: "epoch", + })) + if err != nil { + t.Fatal(err) + } + manifRef, err := m.Store(ctx) + if err != nil { + t.Fatal(err) + } + + jsonhttptest.Request(t, client, http.MethodGet, bzzDownloadResource(manifRef.String(), ""), http.StatusOK, + jsonhttptest.WithExpectedResponse(updateData), + jsonhttptest.WithExpectedContentLength(len(updateData)), + jsonhttptest.WithExpectedResponseHeader(api.AccessControlExposeHeaders, api.SwarmFeedIndexHeader), + jsonhttptest.WithExpectedResponseHeader(api.AccessControlExposeHeaders, api.ContentDispositionHeader), + jsonhttptest.WithExpectedResponseHeader(api.ContentDispositionHeader, `inline; filename="index.html"`), + jsonhttptest.WithExpectedResponseHeader(api.ContentTypeHeader, "text/html; charset=utf-8"), + ) + }) + + t.Run("redundancy", func(t *testing.T) { + // enough to test two redundancy levels since + tests := []struct { + name string + rLevel redundancy.Level + }{ + { + name: "none", + rLevel: redundancy.NONE, + }, + { + name: "medium", + rLevel: redundancy.MEDIUM, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + rLevel := tt.rLevel + socRoot, _ := soc.FromChunk(socRootCh) + socPutter := replicas.NewSocPutter(storer, rLevel) + err = socPutter.Put(context.Background(), socRootCh) + if err != nil { + t.Fatalf("failed to put SOC chunk with redundancy: %v", err) + } + + m, err := manifest.NewDefaultManifest( + loadsave.New(storer.ChunkStore(), storer.Cache(), pipelineFactory(storer.Cache(), false, rLevel), rLevel), + false, + ) + if err != nil { + t.Fatal(err) + } + + // Add the feed entry to the manifest + hexId := hex.EncodeToString(socRoot.ID()) + hexOwner := hex.EncodeToString(socRoot.OwnerAddress()) + err = m.Add(context.Background(), manifest.RootPath, manifest.NewEntry(socRootCh.Address(), map[string]string{ + api.FeedMetadataEntryOwner: hexOwner, + api.FeedMetadataEntryTopic: hexId, + api.FeedMetadataEntryType: "sequence", + })) + if err != nil { + t.Fatal(err) + } + manifestRef, err := m.Store(context.Background()) + if err != nil { + t.Fatal(err) + } + + // Create mockLookup and mockFactory for feed + look := newRedundancyMockLookup( + rLevel, + storer.ChunkStore(), + func() (swarm.Chunk, feeds.Index, feeds.Index) { + return socRootCh, &id{}, &id{} + }, + ) + feedFactory := newMockFactory(look) + + // Update the test server with the feed factory + client, _, _, _ := newTestServer(t, testServerOptions{ + Storer: storer, + Logger: log.Noop, + Post: mockpost.New(mockpost.WithAcceptAll()), + Feeds: feedFactory, + }) + + // remove original chunk from store + cs, ok := storer.ChunkStore().(storage.ChunkStore) + if !ok { + t.Fatalf("chunk store not available for deletion") + } + err = cs.Delete(context.Background(), socRootCh.Address()) + if err != nil { + t.Fatalf("Failed to delete soc chunk: %v", err) + } + + manifestHex := manifestRef.String() + + if rLevel == redundancy.NONE { + jsonhttptest.Request(t, client, http.MethodGet, "/bzz/"+manifestHex+"/", http.StatusNotFound) + return + } + jsonhttptest.Request(t, client, http.MethodGet, "/bzz/"+manifestHex+"/", http.StatusOK, + jsonhttptest.WithExpectedResponse(updateData), + jsonhttptest.WithExpectedContentLength(len(updateData)), + jsonhttptest.WithExpectedResponseHeader(api.AccessControlExposeHeaders, api.SwarmFeedIndexHeader), + jsonhttptest.WithExpectedResponseHeader(api.AccessControlExposeHeaders, api.ContentDispositionHeader), + jsonhttptest.WithExpectedResponseHeader(api.ContentTypeHeader, "text/html; charset=utf-8"), + jsonhttptest.WithExpectedResponseHeader(api.ContentDispositionHeader, `inline; filename="index.html"`), + ) + }) + } + }) + m, err := manifest.NewDefaultManifest( loadsave.New(storer.ChunkStore(), storer.Cache(), pipelineFactory(storer.Cache(), false, 0), redundancy.DefaultLevel), false, diff --git a/pkg/api/feed.go b/pkg/api/feed.go index 313b04c831a..6b5282a2b92 100644 --- a/pkg/api/feed.go +++ b/pkg/api/feed.go @@ -23,6 +23,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/manifest/mantaray" "github.com/ethersphere/bee/v2/pkg/manifest/simple" "github.com/ethersphere/bee/v2/pkg/postage" + "github.com/ethersphere/bee/v2/pkg/replicas" "github.com/ethersphere/bee/v2/pkg/soc" "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/storer" @@ -66,15 +67,21 @@ func (s *Service) feedGetHandler(w http.ResponseWriter, r *http.Request) { } headers := struct { - OnlyRootChunk bool `map:"Swarm-Only-Root-Chunk"` + OnlyRootChunk bool `map:"Swarm-Only-Root-Chunk"` + RedundancyLevel redundancy.Level `map:"Swarm-Redundancy-Level"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) return } + getter := s.storer.Download(false) + if headers.RedundancyLevel > redundancy.NONE { + getter = replicas.NewSocGetter(getter, headers.RedundancyLevel) + } + f := feeds.New(paths.Topic, paths.Owner) - lookup, err := s.feedFactory.NewLookup(feeds.Sequence, f) + lookup, err := s.feedFactory.NewLookup(feeds.Sequence, f, getter) if err != nil { logger.Debug("new lookup failed", "owner", paths.Owner, "error", err) logger.Error(nil, "new lookup failed") @@ -170,11 +177,12 @@ func (s *Service) feedPostHandler(w http.ResponseWriter, r *http.Request) { } headers := struct { - BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` - Pin bool `map:"Swarm-Pin"` - Deferred *bool `map:"Swarm-Deferred-Upload"` - Act bool `map:"Swarm-Act"` - HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` + BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` + Pin bool `map:"Swarm-Pin"` + Deferred *bool `map:"Swarm-Deferred-Upload"` + Act bool `map:"Swarm-Act"` + HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` + RedundancyLevel redundancy.Level `map:"Swarm-Redundancy-Level"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) @@ -231,7 +239,9 @@ func (s *Service) feedPostHandler(w http.ResponseWriter, r *http.Request) { logger: logger, } - l := loadsave.New(s.storer.ChunkStore(), s.storer.Cache(), requestPipelineFactory(r.Context(), putter, false, 0), redundancy.DefaultLevel) + rLevel := headers.RedundancyLevel + + l := loadsave.New(s.storer.ChunkStore(), s.storer.Cache(), requestPipelineFactory(r.Context(), putter, false, 0), rLevel) feedManifest, err := manifest.NewDefaultManifest(l, false) if err != nil { logger.Debug("create manifest failed", "error", err) diff --git a/pkg/api/feed_test.go b/pkg/api/feed_test.go index fcfe4b93e68..4ee338ca0af 100644 --- a/pkg/api/feed_test.go +++ b/pkg/api/feed_test.go @@ -18,6 +18,9 @@ import ( "strconv" "testing" + "github.com/ethersphere/bee/v2/pkg/replicas" + "github.com/ethersphere/bee/v2/pkg/storage" + "github.com/ethersphere/bee/v2/pkg/api" "github.com/ethersphere/bee/v2/pkg/feeds" "github.com/ethersphere/bee/v2/pkg/file/loadsave" @@ -345,6 +348,108 @@ func TestFeedDirectUpload(t *testing.T) { ) } +// redundancyMockLookup is a specialized mockLookup that uses redundancy SOC getter +type redundancyMockLookup struct { + redundancyLevel redundancy.Level + getter storage.Getter + lastChunkFn func() (swarm.Chunk, feeds.Index, feeds.Index) // Hook to get the latest chunk, index, nextIndex +} + +func newRedundancyMockLookup(rLevel redundancy.Level, getter storage.Getter, lastChunkFn func() (swarm.Chunk, feeds.Index, feeds.Index)) *redundancyMockLookup { + return &redundancyMockLookup{ + redundancyLevel: rLevel, + getter: getter, + lastChunkFn: lastChunkFn, + } +} + +// At overrides mockLookup.At to use redundancy SOC getter +func (l *redundancyMockLookup) At(ctx context.Context, at int64, after uint64) (swarm.Chunk, feeds.Index, feeds.Index, error) { + chunk, cur, next := l.lastChunkFn() + + // Create redundancy SOC getter if redundancy level is set + redGetter := replicas.NewSocGetter(l.getter, l.redundancyLevel) + + // Try to get the chunk with redundancy + redChunk, err := redGetter.Get(ctx, chunk.Address()) + if err != nil { + return nil, nil, nil, err + } + // Use the chunk retrieved with redundancy + return redChunk, cur, next, nil +} + +// TestFeedAPIWithRedundancy tests the feed API with SOC redundancy +func TestFeedAPIWithRedundancy(t *testing.T) { + t.Parallel() + + var ( + redundancyLevel = redundancy.PARANOID // Use highest redundancy level + topic = swarm.RandAddress(t) + mockStorer = mockstorer.New() + feedData = []byte("feed redundancy test data") + ) + socChunk := testingsoc.GenerateMockSOC(t, feedData) + + // Variables to track the last chunk, index, and next index + var ( + lastChunk swarm.Chunk + lastIndex feeds.Index + lastNext feeds.Index + ) + + // Provide a hook function to return the latest chunk, index, and next index + lastChunkFn := func() (swarm.Chunk, feeds.Index, feeds.Index) { + return lastChunk, lastIndex, lastNext + } + lastChunk = socChunk.Chunk() + lastIndex = &id{} + lastNext = &id{} + + // Create redundancy-aware lookup that wraps our lookup + redLookup := newRedundancyMockLookup(redundancyLevel, mockStorer.ChunkStore(), lastChunkFn) + factory := newMockFactory(redLookup) + + // Create test server with our custom setup + mp := mockpost.New(mockpost.WithIssuer(postage.NewStampIssuer("", "", batchOk, big.NewInt(3), 11, 10, 1000, true))) + client, _, _, _ := newTestServer(t, testServerOptions{ + Storer: mockStorer, + Post: mp, + Feeds: factory, + }) + + socPutter := replicas.NewSocPutter(mockStorer, redundancyLevel) + + ctx := context.Background() + err := socPutter.Put(ctx, socChunk.Chunk()) + if err != nil { + t.Fatalf("failed to put SOC chunk with redundancy: %v", err) + } + + // Get access to the underlying chunk store + cs, ok := mockStorer.ChunkStore().(storage.ChunkStore) + if !ok { + t.Fatal("Could not access underlying ChunkStore with Delete method") + } + + // Delete the original SOC chunk by using the address tracked in lastSOCAddress + // or use socChunk.Address() directly + err = cs.Delete(context.Background(), socChunk.Address()) + if err != nil { + t.Fatalf("Failed to delete original SOC chunk: %v", err) + } + + feedResource := fmt.Sprintf("/feeds/%s/%s", ownerString, topic) + + // Try to retrieve the feed content with redundancy + jsonhttptest.Request(t, client, http.MethodGet, + feedResource, + http.StatusOK, + jsonhttptest.WithRequestHeader(api.SwarmRedundancyLevelHeader, fmt.Sprintf("%d", redundancyLevel)), + jsonhttptest.WithExpectedResponse(feedData), + ) +} + type factoryMock struct { sequenceCalled bool epochCalled bool @@ -356,7 +461,7 @@ func newMockFactory(mockLookup feeds.Lookup) *factoryMock { return &factoryMock{lookup: mockLookup} } -func (f *factoryMock) NewLookup(t feeds.Type, feed *feeds.Feed) (feeds.Lookup, error) { +func (f *factoryMock) NewLookup(t feeds.Type, feed *feeds.Feed, getter storage.Getter) (feeds.Lookup, error) { switch t { case feeds.Sequence: f.sequenceCalled = true diff --git a/pkg/api/soc.go b/pkg/api/soc.go index 85d9bf5aaa3..c7c6147a02f 100644 --- a/pkg/api/soc.go +++ b/pkg/api/soc.go @@ -8,14 +8,17 @@ import ( "bytes" "encoding/hex" "errors" + "fmt" "io" "net/http" "strconv" "github.com/ethersphere/bee/v2/pkg/accesscontrol" "github.com/ethersphere/bee/v2/pkg/cac" + "github.com/ethersphere/bee/v2/pkg/file/redundancy" "github.com/ethersphere/bee/v2/pkg/jsonhttp" "github.com/ethersphere/bee/v2/pkg/postage" + "github.com/ethersphere/bee/v2/pkg/replicas" "github.com/ethersphere/bee/v2/pkg/soc" "github.com/ethersphere/bee/v2/pkg/storer" "github.com/ethersphere/bee/v2/pkg/swarm" @@ -47,10 +50,11 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) { } headers := struct { - BatchID []byte `map:"Swarm-Postage-Batch-Id"` - StampSig []byte `map:"Swarm-Postage-Stamp"` - Act bool `map:"Swarm-Act"` - HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` + BatchID []byte `map:"Swarm-Postage-Batch-Id"` + StampSig []byte `map:"Swarm-Postage-Stamp"` + Act bool `map:"Swarm-Act"` + HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) @@ -64,11 +68,22 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) { } var ( - putter storer.PutterSession - err error + basePutter storer.PutterSession // the putter used to store regular chunks + putter storer.PutterSession // the putter used to store SOC replica chunks + err error ) + var rLevel redundancy.Level + if headers.RLevel != nil { + rLevel = *headers.RLevel + } + if len(headers.StampSig) != 0 { + if headers.RLevel != nil { + logger.Error(nil, "redundancy level is not supported with stamp signature") + jsonhttp.BadRequest(w, "redundancy level is not supported with stamp signature") + return + } stamp := postage.Stamp{} if err := stamp.UnmarshalBinary(headers.StampSig); err != nil { errorMsg := "Stamp deserialization failure" @@ -84,6 +99,7 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) { Pin: false, Deferred: false, }, &stamp) + basePutter = putter } else { putter, err = s.newStamperPutter(r.Context(), putterOptions{ BatchID: headers.BatchID, @@ -91,6 +107,10 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) { Pin: false, Deferred: false, }) + basePutter = putter + if rLevel != redundancy.NONE { + putter = replicas.NewSocPutterSession(putter, rLevel) + } } if err != nil { logger.Debug("get putter failed", "error", err) @@ -183,7 +203,7 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) { reference := sch.Address() historyReference := swarm.ZeroAddress if headers.Act { - reference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, headers.HistoryAddress) + reference, historyReference, err = s.actEncryptionHandler(r.Context(), basePutter, reference, headers.HistoryAddress) if err != nil { logger.Debug("access control upload failed", "error", err) logger.Error(nil, "access control upload failed") @@ -201,11 +221,16 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) { } } - err = putter.Done(sch.Address()) + // do not pass sch.Address() since it causes error on parallel GSOC uploads + // in case of deferred upload + // pkg/storer/internal/pinning/pinning.go:collectionPutter.Close -> throws error if pin true but that is not a valid use-case at SOC upload + // pkg/storer/internal/upload/uploadstore.go:uploadPutter.Close -> updates tagID, and the address would be set along with it -> not necessary + // in case of directupload it only waits for the waitgroup for chunk upload and do not use swarm address + err = putter.Done(swarm.Address{}) if err != nil { logger.Debug("done split failed", "error", err) logger.Error(nil, "done split failed") - jsonhttp.InternalServerError(ow, "done split failed") + jsonhttp.InternalServerError(ow, fmt.Sprintf("done split failed: %v", err)) // TODO: put it back after fixing parallel upload issue return } if headers.Act { @@ -229,12 +254,14 @@ func (s *Service) socGetHandler(w http.ResponseWriter, r *http.Request) { } headers := struct { - OnlyRootChunk bool `map:"Swarm-Only-Root-Chunk"` + OnlyRootChunk bool `map:"Swarm-Only-Root-Chunk"` + RLevel redundancy.Level `map:"Swarm-Redundancy-Level"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) return } + rLevel := headers.RLevel address, err := soc.CreateAddress(paths.ID, paths.Owner) if err != nil { @@ -244,6 +271,9 @@ func (s *Service) socGetHandler(w http.ResponseWriter, r *http.Request) { } getter := s.storer.Download(true) + if rLevel != 0 { + getter = replicas.NewSocGetter(getter, rLevel) + } sch, err := getter.Get(r.Context(), address) if err != nil { logger.Error(err, "soc retrieval has been failed") diff --git a/pkg/api/soc_test.go b/pkg/api/soc_test.go index fb34eb82297..2b87c572a51 100644 --- a/pkg/api/soc_test.go +++ b/pkg/api/soc_test.go @@ -22,6 +22,7 @@ import ( testingpostage "github.com/ethersphere/bee/v2/pkg/postage/testing" testingsoc "github.com/ethersphere/bee/v2/pkg/soc/testing" "github.com/ethersphere/bee/v2/pkg/spinlock" + "github.com/ethersphere/bee/v2/pkg/storage" mockstorer "github.com/ethersphere/bee/v2/pkg/storer/mock" "github.com/ethersphere/bee/v2/pkg/swarm" ) @@ -209,3 +210,79 @@ func TestSOC(t *testing.T) { }) }) } + +// Verify that replicas provide fault tolerance +func TestSOCWithRedundancy(t *testing.T) { + + testWithRedundancy := func(t *testing.T, redundancyLevel int) { + t.Helper() + + t.Run(fmt.Sprintf("redundancy=%d", redundancyLevel), func(t *testing.T) { + testData := []byte(fmt.Sprintf("redundant-soc-data-%d", redundancyLevel)) + + mockStorer := mockstorer.New() + client, _, _, chanStore := newTestServer(t, testServerOptions{ + Storer: mockStorer, + Post: newTestPostService(), + DirectUpload: true, + }) + + soc := testingsoc.GenerateMockSOC(t, testData) + + chanStore.Subscribe(func(ch swarm.Chunk) { + err := mockStorer.Put(context.Background(), ch) + if err != nil { + t.Fatal(err) + } + }) + + jsonhttptest.Request(t, client, http.MethodPost, + fmt.Sprintf("/soc/%s/%s?sig=%s", + hex.EncodeToString(soc.Owner), + hex.EncodeToString(soc.ID), + hex.EncodeToString(soc.Signature)), + http.StatusCreated, + jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, batchOkStr), + jsonhttptest.WithRequestHeader(api.SwarmRedundancyLevelHeader, fmt.Sprintf("%d", redundancyLevel)), + jsonhttptest.WithRequestBody(bytes.NewReader(soc.WrappedChunk.Data())), + jsonhttptest.WithExpectedJSONResponse(api.SocPostResponse{ + Reference: soc.Address(), + }), + ) + + // Wait for replicas to be created in background + time.Sleep(100 * time.Millisecond) + + originalAddress := soc.Address() + + // Delete the original chunk to trigger dispersed retrieval + cs, ok := mockStorer.ChunkStore().(storage.ChunkStore) + if !ok { + t.Fatal("Could not access underlying ChunkStore with Delete method") + } + + err := cs.Delete(context.Background(), originalAddress) + if err != nil { + t.Fatalf("Failed to delete the original chunk: %v", err) + } + + // Try to retrieve the SOC after deletion + if redundancyLevel > 0 { + jsonhttptest.Request(t, client, http.MethodGet, + fmt.Sprintf("/soc/%s/%s", hex.EncodeToString(soc.Owner), hex.EncodeToString(soc.ID)), + http.StatusOK, + jsonhttptest.WithExpectedResponse(soc.WrappedChunk.Data()[swarm.SpanSize:]), + jsonhttptest.WithExpectedContentLength(len(soc.WrappedChunk.Data()[swarm.SpanSize:])), + ) + } else { + jsonhttptest.Request(t, client, http.MethodGet, + fmt.Sprintf("/soc/%s/%s", hex.EncodeToString(soc.Owner), hex.EncodeToString(soc.ID)), + http.StatusNotFound, + ) + } + }) + } + + testWithRedundancy(t, 0) + testWithRedundancy(t, 2) +} diff --git a/pkg/feeds/factory/factory.go b/pkg/feeds/factory/factory.go index 1d555416407..18a60b17918 100644 --- a/pkg/feeds/factory/factory.go +++ b/pkg/feeds/factory/factory.go @@ -19,12 +19,17 @@ func New(getter storage.Getter) feeds.Factory { return &factory{getter} } -func (f *factory) NewLookup(t feeds.Type, feed *feeds.Feed) (feeds.Lookup, error) { +func (f *factory) NewLookup(t feeds.Type, feed *feeds.Feed, getter storage.Getter) (feeds.Lookup, error) { + g := f.Getter + if getter != nil { + g = getter + } + switch t { case feeds.Sequence: - return sequence.NewAsyncFinder(f.Getter, feed), nil + return sequence.NewAsyncFinder(g, feed), nil case feeds.Epoch: - return epochs.NewAsyncFinder(f.Getter, feed), nil + return epochs.NewAsyncFinder(g, feed), nil } return nil, feeds.ErrFeedTypeNotFound diff --git a/pkg/feeds/feed.go b/pkg/feeds/feed.go index ac8d232f5ce..31eca88caa3 100644 --- a/pkg/feeds/feed.go +++ b/pkg/feeds/feed.go @@ -25,7 +25,7 @@ var ErrFeedTypeNotFound = errors.New("no such feed type") // Factory creates feed lookups for different types of feeds. type Factory interface { - NewLookup(Type, *Feed) (Lookup, error) + NewLookup(Type, *Feed, storage.Getter) (Lookup, error) } // Type enumerates the time-based feed types diff --git a/pkg/node/bootstrap.go b/pkg/node/bootstrap.go index 047c306aaf2..d9aa885f429 100644 --- a/pkg/node/bootstrap.go +++ b/pkg/node/bootstrap.go @@ -326,7 +326,7 @@ func getLatestSnapshot( } f := feeds.New(topic, common.BytesToAddress(owner)) - l, err := feedFactory.NewLookup(*t, f) + l, err := feedFactory.NewLookup(*t, f, nil) if err != nil { return nil, fmt.Errorf("feed lookup failed: %w", err) } diff --git a/pkg/replicas/export_test.go b/pkg/replicas/export_test.go index 271ad71ed0b..e8aee696a3a 100644 --- a/pkg/replicas/export_test.go +++ b/pkg/replicas/export_test.go @@ -4,12 +4,6 @@ package replicas -import "github.com/ethersphere/bee/v2/pkg/storage" - var ( Signer = signer ) - -func Wait(g storage.Getter) { - g.(*getter).wg.Wait() -} diff --git a/pkg/replicas/getter.go b/pkg/replicas/getter.go index b08b5c780e8..6eeb57c82bf 100644 --- a/pkg/replicas/getter.go +++ b/pkg/replicas/getter.go @@ -36,7 +36,6 @@ var ErrSwarmageddon = errors.New("swarmageddon has begun") // (by default, it is assumed to be 4, ie. total of 16) // - (not implemented) pivot: replicas with address in the proximity of pivot will be tried first type getter struct { - wg sync.WaitGroup storage.Getter level redundancy.Level } @@ -51,6 +50,9 @@ func (g *getter) Get(ctx context.Context, addr swarm.Address) (ch swarm.Chunk, e ctx, cancel := context.WithCancel(ctx) defer cancel() + var wg sync.WaitGroup + defer wg.Wait() + // channel that the results (retrieved chunks) are gathered to from concurrent // workers each fetching a replica resultC := make(chan swarm.Chunk) @@ -60,7 +62,7 @@ func (g *getter) Get(ctx context.Context, addr swarm.Address) (ch swarm.Chunk, e errcnt := 0 // concurrently call to retrieve chunk using original CAC address - g.wg.Go(func() { + wg.Go(func() { ch, err := g.Getter.Get(ctx, addr) if err != nil { errc <- err @@ -83,8 +85,6 @@ func (g *getter) Get(ctx context.Context, addr swarm.Address) (ch swarm.Chunk, e var wait <-chan time.Time // nil channel to disable case // addresses used are doubling each period of search expansion // (at intervals of RetryInterval) - ticker := time.NewTicker(RetryInterval) - defer ticker.Stop() for level := uint8(0); level <= uint8(g.level); { select { // at least one chunk is retrieved, cancel the rest and return early @@ -101,7 +101,6 @@ func (g *getter) Get(ctx context.Context, addr swarm.Address) (ch swarm.Chunk, e // ticker switches on the address channel case <-wait: - wait = nil next = rr.c level++ target = 1 << level @@ -115,7 +114,7 @@ func (g *getter) Get(ctx context.Context, addr swarm.Address) (ch swarm.Chunk, e continue } - g.wg.Go(func() { + wg.Go(func() { ch, err := g.Getter.Get(ctx, swarm.NewAddress(so.addr)) if err != nil { errc <- err @@ -138,7 +137,7 @@ func (g *getter) Get(ctx context.Context, addr swarm.Address) (ch swarm.Chunk, e continue } next = nil - wait = ticker.C + wait = time.After(RetryInterval) } } diff --git a/pkg/replicas/getter_soc.go b/pkg/replicas/getter_soc.go new file mode 100644 index 00000000000..93d30854db0 --- /dev/null +++ b/pkg/replicas/getter_soc.go @@ -0,0 +1,136 @@ +// Copyright 2023 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// the code below implements the integration of dispersed replicas in chunk fetching. +// using storage.Getter interface. +package replicas + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/ethersphere/bee/v2/pkg/file/redundancy" + "github.com/ethersphere/bee/v2/pkg/storage" + "github.com/ethersphere/bee/v2/pkg/swarm" +) + +// getter is the private implementation of storage.Getter, an interface for +// retrieving chunks. This getter embeds the original simple chunk getter and extends it +// to a multiplexed variant that fetches chunks with replicas for SOC. +// +// the strategy to retrieve a chunk that has replicas can be configured with a few parameters: +// - SOCRetryInterval: the delay before a new batch of replicas is fetched. +// - depth: 2^{depth} is the total number of additional replicas that have been uploaded +// (by default, it is assumed to be 4, ie. total of 16) +// - (not implemented) pivot: replicas with address in the proximity of pivot will be tried first +type socGetter struct { + storage.Getter + level redundancy.Level +} + +var SOCRetryInterval = 300 * time.Millisecond + +// NewSocGetter is the getter constructor +func NewSocGetter(g storage.Getter, level redundancy.Level) storage.Getter { + return &socGetter{Getter: g, level: level} +} + +// Get makes the socGetter satisfy the storage.Getter interface +func (g *socGetter) Get(ctx context.Context, addr swarm.Address) (ch swarm.Chunk, err error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + var wg sync.WaitGroup + defer wg.Wait() + + // channel that the results (retrieved chunks) are gathered to from concurrent + // workers each fetching a replica + resultC := make(chan swarm.Chunk) + // errc collects the errors + errc := make(chan error, g.level.GetReplicaCount()+1) + var errs error + errcnt := 0 + + // concurrently call to retrieve chunk using original SOC address + wg.Add(1) + go func() { + defer wg.Done() + ch, err := g.Getter.Get(ctx, addr) + if err != nil { + errc <- err + return + } + + select { + case resultC <- ch: + case <-ctx.Done(): + } + }() + // counters + n := 0 // counts the replica addresses tried + target := 2 // the number of replicas attempted to download in this batch + total := g.level.GetReplicaCount() + + // + rr := newSocReplicator(addr, g.level) + next := rr.c + var wait <-chan time.Time // nil channel to disable case + // addresses used are doubling each period of search expansion + // (at intervals of RetryInterval) + for level := uint8(0); level <= uint8(g.level); { + select { + // at least one chunk is retrieved, cancel the rest and return early + case chunk := <-resultC: + cancel() + return chunk, nil + + case err = <-errc: + errs = errors.Join(errs, err) + errcnt++ + if errcnt > total { + return nil, errors.Join(ErrSwarmageddon, errs) + } + + // ticker switches on the address channel + case <-wait: + next = rr.c + level++ + target = 1 << level + n = 0 + continue + + // getting the addresses in order + case so := <-next: + if so == nil { + next = nil + continue + } + + wg.Add(1) + go func() { + defer wg.Done() + ch, err := g.Getter.Get(ctx, swarm.NewAddress(so.addr)) + if err != nil { + errc <- err + return + } + + select { + case resultC <- ch: + case <-ctx.Done(): + } + }() + n++ + if n < target { + continue + } + next = nil + wait = time.After(SOCRetryInterval) + } + } + + return nil, nil +} diff --git a/pkg/replicas/getter_soc_test.go b/pkg/replicas/getter_soc_test.go new file mode 100644 index 00000000000..ed3e0c6125e --- /dev/null +++ b/pkg/replicas/getter_soc_test.go @@ -0,0 +1,216 @@ +// Copyright 2025 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// This file is a copy of the original getter_test.go file +// and tailored to socGetter implementation. + +package replicas_test + +import ( + "context" + "crypto/rand" + "errors" + "fmt" + "io" + "testing" + "time" + + "github.com/ethersphere/bee/v2/pkg/cac" + "github.com/ethersphere/bee/v2/pkg/crypto" + "github.com/ethersphere/bee/v2/pkg/file/redundancy" + "github.com/ethersphere/bee/v2/pkg/replicas" + "github.com/ethersphere/bee/v2/pkg/soc" + "github.com/ethersphere/bee/v2/pkg/storage" +) + +func TestSOCGetter(t *testing.T) { + t.Parallel() + // failure is a struct that defines a failure scenario to test + type failure struct { + name string + err error + errf func(int, int) func(int) chan struct{} + } + // failures is a list of failure scenarios to test + failures := []failure{ + { + "timeout", + context.Canceled, + func(_, _ int) func(i int) chan struct{} { + return func(i int) chan struct{} { + return nil + } + }, + }, + { + "not found", + storage.ErrNotFound, + func(_, _ int) func(i int) chan struct{} { + c := make(chan struct{}) + close(c) + return func(i int) chan struct{} { + return c + } + }, + }, + } + type test struct { + name string + failure failure + level int + count int + found int + } + + var tests []test + for _, f := range failures { + for level, c := range redundancy.GetReplicaCounts() { + for j := 0; j <= c*2+1; j++ { + tests = append(tests, test{ + name: fmt.Sprintf("%s level %d count %d found %d", f.name, level, c, j), + failure: f, + level: level, + count: c, + found: j, + }) + } + } + } + + // initialise the base chunk + chunkLen := 420 + buf := make([]byte, chunkLen) + if _, err := io.ReadFull(rand.Reader, buf); err != nil { + t.Fatal(err) + } + ch, err := cac.New(buf) + if err != nil { + t.Fatal(err) + } + // create soc from cac + // test key to sign soc chunks + privKey, err := crypto.GenerateSecp256k1Key() + if err != nil { + t.Fatal(err) + } + signer := crypto.NewDefaultSigner(privKey) + id := make([]byte, 32) + if _, err := rand.Read(id); err != nil { + t.Fatal(err) + } + s := soc.New(id, ch) + ch, err = s.Sign(signer) + if err != nil { + t.Fatal(err) + } + + // reset retry interval to speed up tests + retryInterval := replicas.SOCRetryInterval + defer func() { replicas.SOCRetryInterval = retryInterval }() + replicas.SOCRetryInterval = 100 * time.Millisecond + + // run the tests + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + // initiate a chunk retrieval session using replicas.Getter + // embedding a testGetter that simulates the behaviour of a chunk store + store := newTestGetter(ch, tc.found, tc.failure.errf(tc.found, tc.count)) + g := replicas.NewSocGetter(store, redundancy.Level(tc.level)) + store.now = time.Now() + ctx, cancel := context.WithCancel(context.Background()) + if tc.found > tc.count { + wait := replicas.SOCRetryInterval / 2 * time.Duration(1+2*tc.level) + go func() { + time.Sleep(wait) + cancel() + }() + } + _, err := g.Get(ctx, ch.Address()) + cancel() + + // test the returned error + if tc.found <= tc.count { + if err != nil { + t.Fatalf("expected no error. got %v", err) + } + // if j <= c, the original chunk should be retrieved and the context should be cancelled + t.Run("retrievals cancelled", func(t *testing.T) { + select { + case <-time.After(100 * time.Millisecond): + t.Fatal("timed out waiting for context to be cancelled") + case <-store.cancelled: + } + }) + + } else { + if err == nil { + t.Fatalf("expected error. got ") + } + + t.Run("returns correct error", func(t *testing.T) { + if !errors.Is(err, replicas.ErrSwarmageddon) { + t.Fatalf("incorrect error. want Swarmageddon. got %v", err) + } + if !errors.Is(err, tc.failure.err) { + t.Fatalf("incorrect error. want it to wrap %v. got %v", tc.failure.err, err) + } + }) + } + + attempts := int(store.attempts.Load()) + // the original chunk should be among those attempted for retrieval + addresses := store.addresses[:attempts] + latencies := store.latencies[:attempts] + t.Run("original address called", func(t *testing.T) { + select { + case <-time.After(100 * time.Millisecond): + t.Fatal("timed out waiting form original address to be attempted for retrieval") + case <-store.origCalled: + i := store.origIndex + if i > 2 { + t.Fatalf("original address called too late. want at most 2 (preceding attempts). got %v (latency: %v)", i, latencies[i]) + } + addresses = append(addresses[:i], addresses[i+1:]...) + latencies = append(latencies[:i], latencies[i+1:]...) + attempts-- + } + }) + + t.Run("retrieved count", func(t *testing.T) { + if attempts > tc.count { + t.Fatalf("too many attempts to retrieve a replica: want at most %v. got %v.", tc.count, attempts) + } + if tc.found > tc.count { + if attempts < tc.count { + t.Fatalf("too few attempts to retrieve a replica: want at least %v. got %v.", tc.count, attempts) + } + return + } + maxValue := 2 + for i := 1; i < tc.level && maxValue < tc.found; i++ { + maxValue = maxValue * 2 + } + if attempts > maxValue { + t.Fatalf("too many attempts to retrieve a replica: want at most %v. got %v. latencies %v", maxValue, attempts, latencies) + } + }) + + t.Run("dispersion", func(t *testing.T) { + if err := dispersed(redundancy.Level(tc.level), ch, addresses); err != nil { + t.Fatalf("addresses are not dispersed: %v", err) + } + }) + + t.Run("latency", func(t *testing.T) { + counts := redundancy.GetReplicaCounts() + for i, latency := range latencies { + multiplier := latency / replicas.SOCRetryInterval + if multiplier > 0 && i < counts[multiplier-1] { + t.Fatalf("incorrect latency for retrieving replica %d: %v", i, err) + } + } + }) + }) + } +} diff --git a/pkg/replicas/getter_test.go b/pkg/replicas/getter_test.go index d1d727dd5fd..b11a55d12c8 100644 --- a/pkg/replicas/getter_test.go +++ b/pkg/replicas/getter_test.go @@ -171,7 +171,6 @@ func TestGetter(t *testing.T) { }() } _, err := g.Get(ctx, ch.Address()) - replicas.Wait(g) cancel() // test the returned error diff --git a/pkg/replicas/putter_soc.go b/pkg/replicas/putter_soc.go new file mode 100644 index 00000000000..d9a70f02250 --- /dev/null +++ b/pkg/replicas/putter_soc.go @@ -0,0 +1,92 @@ +// Copyright 2020 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// the code below implements the integration of dispersed replicas in SOC upload. +// using storer.PutterSession interface. +package replicas + +import ( + "context" + "errors" + "sync" + + "github.com/ethersphere/bee/v2/pkg/file/redundancy" + "github.com/ethersphere/bee/v2/pkg/storage" + "github.com/ethersphere/bee/v2/pkg/storer" + "github.com/ethersphere/bee/v2/pkg/swarm" +) + +// socPutter is the private implementation of the public storage.Putter interface +// socPutter extends the original putter to a concurrent multiputter +type socPutter struct { + putter storage.Putter + rLevel redundancy.Level +} + +// NewSocPutter is the putter constructor +func NewSocPutter(p storage.Putter, rLevel redundancy.Level) storage.Putter { + return &socPutter{ + putter: p, + rLevel: rLevel, + } +} + +// Put makes the putter satisfy the storage.Putter interface +func (p *socPutter) Put(ctx context.Context, ch swarm.Chunk) error { + errs := []error{} + // Put base chunk first + if err := p.putter.Put(ctx, ch); err != nil { + return err + } + if p.rLevel == 0 { + return nil + } + + rr := newSocReplicator(ch.Address(), p.rLevel) + errc := make(chan error, p.rLevel.GetReplicaCount()) + wg := sync.WaitGroup{} + for r := range rr.c { + wg.Add(1) + go func(r *socReplica) { + defer wg.Done() + // create a new chunk with the replica address + sch := swarm.NewChunk(swarm.NewAddress(r.addr), ch.Data()) + err := p.putter.Put(ctx, sch) + if err != nil { + errc <- err + } + }(r) + } + + wg.Wait() + close(errc) + for err := range errc { + errs = append(errs, err) + } + return errors.Join(errs...) +} + +// socPutterSession extends the original socPutter +type socPutterSession struct { + socPutter + ps storer.PutterSession +} + +// NewSocPutterSession is the putterSession constructor +func NewSocPutterSession(p storer.PutterSession, rLevel redundancy.Level) storer.PutterSession { + return &socPutterSession{ + socPutter{ + putter: p, + rLevel: rLevel, + }, p, + } +} + +func (p *socPutterSession) Cleanup() error { + return p.ps.Cleanup() +} + +func (p *socPutterSession) Done(addr swarm.Address) error { + return p.ps.Done(addr) +} diff --git a/pkg/replicas/putter_soc_test.go b/pkg/replicas/putter_soc_test.go new file mode 100644 index 00000000000..e0315eadc42 --- /dev/null +++ b/pkg/replicas/putter_soc_test.go @@ -0,0 +1,231 @@ +// Copyright 2025 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// This file was created as a copy of the original putter_test.go file +// and tailored to the socPutter implementation. + +package replicas_test + +import ( + "context" + "crypto/rand" + "errors" + "fmt" + "io" + "sync/atomic" + "testing" + "time" + + "github.com/ethersphere/bee/v2/pkg/cac" + "github.com/ethersphere/bee/v2/pkg/crypto" + "github.com/ethersphere/bee/v2/pkg/file/redundancy" + "github.com/ethersphere/bee/v2/pkg/replicas" + "github.com/ethersphere/bee/v2/pkg/soc" + "github.com/ethersphere/bee/v2/pkg/storage" + "github.com/ethersphere/bee/v2/pkg/storage/inmemchunkstore" + "github.com/ethersphere/bee/v2/pkg/swarm" +) + +type putterSession struct { + chunkStore storage.ChunkStore + getErrors func(context.Context, swarm.Address) error + putErrors func(context.Context, swarm.Address) error +} + +func (tbp *putterSession) Get(ctx context.Context, addr swarm.Address) (swarm.Chunk, error) { + + g := tbp.getErrors + if g != nil { + return nil, g(ctx, addr) + } + return tbp.chunkStore.Get(ctx, addr) +} + +func (p *putterSession) Put(ctx context.Context, ch swarm.Chunk) error { + g := p.putErrors + if g != nil { + return g(ctx, ch.Address()) + } + + return p.chunkStore.Put(ctx, ch) +} + +func (p *putterSession) Done(address swarm.Address) error { return nil } + +func (p *putterSession) Cleanup() error { return nil } + +func TestSocPutter(t *testing.T) { + t.Parallel() + + // test key to sign soc chunks + privKey, err := crypto.GenerateSecp256k1Key() + if err != nil { + t.Fatal(err) + } + signer := crypto.NewDefaultSigner(privKey) + + tcs := []struct { + level redundancy.Level + length int + }{ + {0, 1}, + {1, 1}, + {2, 1}, + {3, 1}, + {4, 1}, + {0, 4096}, + {1, 4096}, + {2, 4096}, + {3, 4096}, + {4, 4096}, + } + for _, tc := range tcs { + t.Run(fmt.Sprintf("redundancy:%d, size:%d", tc.level, tc.length), func(t *testing.T) { + buf := make([]byte, tc.length) + if _, err := io.ReadFull(rand.Reader, buf); err != nil { + t.Fatal(err) + } + ctx := context.Background() + ch, err := cac.New(buf) + if err != nil { + t.Fatal(err) + } + // create soc from cac + id := make([]byte, swarm.HashSize) + if _, err := rand.Read(id); err != nil { + t.Fatal(err) + } + s := soc.New(id, ch) + sch, err := s.Sign(signer) + if err != nil { + t.Fatal(err) + } + + store := inmemchunkstore.New() + defer store.Close() + session := &putterSession{chunkStore: store} + p := replicas.NewSocPutter(session, tc.level) + + if err := p.Put(ctx, sch); err != nil { + t.Fatalf("expected no error. got %v", err) + } + var addrs []swarm.Address + orig := false + _ = store.Iterate(ctx, func(chunk swarm.Chunk) (stop bool, err error) { + if sch.Address().Equal(chunk.Address()) { + orig = true + return false, nil + } + if !soc.Valid(chunk) { + t.Fatalf("chunk %v is not a valid SOC chunk", chunk.Address()) + } + addrs = append(addrs, chunk.Address()) + return false, nil + }) + if !orig { + t.Fatal("original chunk missing") + } + t.Run("dispersion", func(t *testing.T) { + if err := dispersed(tc.level, ch, addrs); err != nil { + t.Fatalf("addresses are not dispersed: %v", err) + } + }) + t.Run("attempts", func(t *testing.T) { + count := tc.level.GetReplicaCount() + if len(addrs) != count { + t.Fatalf("incorrect number of attempts. want %v, got %v", count, len(addrs)) + } + }) + + t.Run("replication", func(t *testing.T) { + if err := replicated(store, ch, addrs); err != nil { + t.Fatalf("chunks are not replicas: %v", err) + } + }) + }) + } + t.Run("error handling", func(t *testing.T) { + tcs := []struct { + name string + level redundancy.Level + length int + f func(*putterSession) *putterSession + err []error + }{ + {"put errors", 4, 4096, func(tbp *putterSession) *putterSession { + var j int32 + i := &j + atomic.StoreInt32(i, 0) + tbp.putErrors = func(ctx context.Context, _ swarm.Address) error { + j := atomic.AddInt32(i, 1) + <-time.After(10 * time.Millisecond) + if j == 6 { + return errTestA + } + if j == 12 { + return errTestB + } + return nil + } + return tbp + }, []error{errTestA, errTestB}}, + {"put latencies", 4, 4096, func(tbp *putterSession) *putterSession { + var j int32 + i := &j + atomic.StoreInt32(i, 0) + tbp.putErrors = func(ctx context.Context, _ swarm.Address) error { + j := atomic.AddInt32(i, 1) + if j == 6 { + select { + case <-time.After(100 * time.Millisecond): + case <-ctx.Done(): + return ctx.Err() + } + } + if j == 12 { + return errTestA + } + return nil + } + return tbp + }, []error{errTestA, context.DeadlineExceeded}}, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + buf := make([]byte, tc.length) + if _, err := io.ReadFull(rand.Reader, buf); err != nil { + t.Fatal(err) + } + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, 50*time.Millisecond) + defer cancel() + ch, err := cac.New(buf) + if err != nil { + t.Fatal(err) + } + + id := make([]byte, swarm.HashSize) + if _, err := rand.Read(id); err != nil { + t.Fatal(err) + } + s := soc.New(id, ch) + sch, err := s.Sign(signer) + if err != nil { + t.Fatal(err) + } + + store := inmemchunkstore.New() + defer store.Close() + p := replicas.NewSocPutter(tc.f(&putterSession{chunkStore: store}), tc.level) + errs := p.Put(ctx, sch) + for _, err := range tc.err { + if !errors.Is(errs, err) { + t.Fatalf("incorrect error. want it to contain %v. got %v.", tc.err, errs) + } + } + }) + } + }) + +} diff --git a/pkg/replicas/replicas.go b/pkg/replicas/replicas.go index cdeb93cc30e..f6f7328ce87 100644 --- a/pkg/replicas/replicas.go +++ b/pkg/replicas/replicas.go @@ -30,7 +30,7 @@ type replicator struct { addr []byte // chunk address queue [16]*replica // to sort addresses according to di exist [30]bool // maps the 16 distinct nibbles on all levels - sizes [5]int // number of distinct neighnourhoods redcorded for each depth + sizes [5]int // number of distinct neighbourhoods recorded for each depth c chan *replica rLevel redundancy.Level } diff --git a/pkg/replicas/replicas_soc.go b/pkg/replicas/replicas_soc.go new file mode 100644 index 00000000000..d539a2c5cb0 --- /dev/null +++ b/pkg/replicas/replicas_soc.go @@ -0,0 +1,98 @@ +// Copyright 2025 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package replicas implements a scheme to replicate chunks +// in such a way that +// - the replicas are optimally dispersed to aid cross-neighbourhood redundancy +// - the replicas addresses can be deduced by retrievers only knowing the address +// of the original content addressed chunk +// - no new chunk validation rules are introduced +package replicas + +import ( + "github.com/ethersphere/bee/v2/pkg/file/redundancy" + "github.com/ethersphere/bee/v2/pkg/swarm" +) + +// socReplicator running the find for replicas +type socReplicator struct { + addr []byte // chunk address + c chan *socReplica + rLevel redundancy.Level +} + +// newSocReplicator socReplicator constructor +func newSocReplicator(addr swarm.Address, rLevel redundancy.Level) *socReplicator { + rr := &socReplicator{ + addr: addr.Bytes(), + c: make(chan *socReplica, rLevel.GetReplicaCount()), + rLevel: rLevel, + } + go rr.replicas() + return rr +} + +// socReplica of the mined SOC chunk (address) that serve as replicas +type socReplica struct { + addr []byte // byte slice of SOC address + nonce uint8 // byte of the mined nonce +} + +// replicate returns a replica params structure seeded with a byte of entropy as argument +func (rr *socReplicator) replicate(i uint8, bitsRequired uint8) (sp *socReplica) { + addr := make([]byte, 32) + copy(addr, rr.addr) + mirroredBits := mirrorBitsToMSB(i, bitsRequired) + // zero out the first leading bitsRequired bits of addr[0] and set mirroredBits of `i` + addr[0] &= 0xFF >> bitsRequired + addr[0] |= mirroredBits + if addr[0] == rr.addr[0] { + // xor MSB after the mirrored bits because the iteration found the original address + addr[0] ^= 1 << (bitsRequired - 1) + } + return &socReplica{addr: addr, nonce: addr[0]} +} + +// replicas enumerates replica parameters (nonce) pushing it in a channel given as argument +// the order of replicas is so that addresses are always maximally dispersed +// in successive sets of addresses. +// I.e., the binary tree representing the new addresses prefix bits up to depth is balanced +func (rr *socReplicator) replicas() { + defer close(rr.c) + // number of bits required to represent all replicas + bitsRequired := countBitsRequired(uint8(rr.rLevel.GetReplicaCount() - 1)) + // replicate iteration saturates all leading bits in generated addresses until bitsRequired + for i := uint8(0); i < uint8(rr.rLevel.GetReplicaCount()); i++ { + // create soc replica (with address and nonce) + r := rr.replicate(i, bitsRequired) + rr.c <- r + } +} + +// mirrorBitsToMSB mirrors the lowest n bits of v to the most significant bits of a byte. +// For example, mirrorBitsToMSB(0b00001101, 4) == 0b10110000 +func mirrorBitsToMSB(v byte, n uint8) byte { + var res byte + for i := uint8(0); i < n; i++ { + if (v & (1 << i)) != 0 { + res |= (1 << (7 - i)) + } + } + return res +} + +// countBitsRequired returns the minimum number of bits required to represent value v. +// For 0, it returns 1 (we need 1 bit to represent 0). +func countBitsRequired(v uint8) uint8 { + if v == 0 { + return 1 + } + + var bits uint8 + for v > 0 { + bits++ + v >>= 1 + } + return bits +} diff --git a/pkg/soc/validator.go b/pkg/soc/validator.go index 06f2fb72e0a..b767572c579 100644 --- a/pkg/soc/validator.go +++ b/pkg/soc/validator.go @@ -26,5 +26,10 @@ func Valid(ch swarm.Chunk) bool { if err != nil { return false } - return ch.Address().Equal(address) + defaultSoc := ch.Address().Equal(address) + if !defaultSoc { + // check whether the SOC chunk is a replica + return bytes.Equal(ch.Address().Bytes()[1:32], address.Bytes()[1:32]) + } + return true } diff --git a/pkg/soc/validator_test.go b/pkg/soc/validator_test.go index 203e0b1bc84..72995d260da 100644 --- a/pkg/soc/validator_test.go +++ b/pkg/soc/validator_test.go @@ -112,7 +112,7 @@ func TestInvalid(t *testing.T) { name: "wrong soc address", chunk: func() swarm.Chunk { wrongAddressBytes := socAddress.Clone().Bytes() - wrongAddressBytes[0] = 255 - wrongAddressBytes[0] + wrongAddressBytes[1] = 255 - wrongAddressBytes[1] wrongAddress := swarm.NewAddress(wrongAddressBytes) data := makeSocData() return swarm.NewChunk(wrongAddress, data) diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 312364e2676..df575a83ce5 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -307,10 +307,7 @@ func IdentityAddress(chunk swarm.Chunk) (swarm.Address, error) { // check the chunk is single owner chunk or cac if sch, err := soc.FromChunk(chunk); err == nil { - socAddress, err := sch.Address() - if err != nil { - return swarm.ZeroAddress, err - } + socAddress := chunk.Address() // cannot use sch.Address() because of SOC replicas h := swarm.NewHasher() _, err = h.Write(socAddress.Bytes()) if err != nil {