Skip to content
Closed
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
a684873
feat: soc dispersed replica
nugaon Mar 20, 2025
c252d50
feat: new soc putter session
nugaon Mar 20, 2025
2f42145
refactor: lint issue
nugaon Jun 5, 2025
19971d1
test: outline race condition
nugaon Jun 5, 2025
2ff200c
fix: randomize ids at soc generation
nugaon Jun 5, 2025
40c5c17
fix: data race in putter soc
nugaon Jun 6, 2025
b982b20
test: soc api
nugaon Jun 6, 2025
61d8a8d
fix: same replica address logic as in validity
nugaon Jun 6, 2025
7ec15cb
fix: lint issue
nugaon Jun 11, 2025
ec191e7
chore: comments
nugaon Jun 11, 2025
f1039df
Merge remote-tracking branch 'origin/master' into feat/soc-dispersed
nugaon Jun 12, 2025
eafef39
fix: gsoc parallel upload
nugaon Jun 12, 2025
2fd1e12
chore: print out error message for integration test
nugaon Jun 12, 2025
8acb990
fix: parallel gsoc upload
nugaon Jun 12, 2025
8ff9e53
fix: nil reference on redundancy header
nugaon Jun 12, 2025
1f5011c
fix: goroutines access loop variable
nugaon Jun 13, 2025
20671a4
feat: feedfactory integration on bzz and feed
nugaon Jun 13, 2025
e41cb55
test: feed api for red
nugaon Jun 13, 2025
7199f6a
test: bzz api
nugaon Jun 17, 2025
db800a8
feat: soc validation check bytes except first
nugaon Jun 18, 2025
1706ccc
test: add soc valid test
nugaon Jun 18, 2025
031d6f5
feat: flaky attempt to replicate address
nugaon Jun 18, 2025
415c66f
feat: saturating by mirrorbits
nugaon Jun 19, 2025
d3f46a3
fix: flip bit after mirroredbits if necessary
nugaon Jun 19, 2025
342df84
test: change test according to the new validation rules
nugaon Jun 19, 2025
1aca908
test: getter
nugaon Jun 19, 2025
e6c6c95
Merge branch 'master' into feat/soc-dispersed
nugaon Jun 19, 2025
a97908f
fix: race issue
nugaon Jun 25, 2025
7d7b587
docs: openapi changes
nugaon Jun 25, 2025
4bc2c00
Merge branch 'master' into feat/soc-dispersed
nugaon Jul 17, 2025
575e683
fix: socretryinterval instead of retryinterval
nugaon Jul 18, 2025
00fb890
Merge branch 'master' into feat/soc-dispersed
nugaon Sep 16, 2025
e18e95a
fix: identity address using assigned address
nugaon Sep 17, 2025
f3bc134
fix: wg usage
nugaon Sep 17, 2025
58fde4d
fix: wg usage at cac
nugaon Sep 17, 2025
3cabd35
test: remove wg wait for get
nugaon Sep 17, 2025
6eee490
refactor: errc size
nugaon Sep 17, 2025
64aa115
refactor: rename specialgetter
nugaon Sep 17, 2025
1d3b9ff
refactor: time.After instead of time.NewTicker
nugaon Sep 19, 2025
495797d
refactor: comments
nugaon Sep 25, 2025
69b6725
Merge branch 'master' into feat/soc-dispersed
gacevicljubisa Nov 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions openapi/Swarm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,7 @@ paths:
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageStamp"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmAct"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmActHistoryAddress"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter"
requestBody:
required: true
description: The SOC binary data is composed of the span (8 bytes) and the at most 4KB payload.
Expand Down Expand Up @@ -932,6 +933,7 @@ paths:
description: Arbitrary identifier of the related data
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmOnlyRootChunkParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmCache"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyStrategyParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyFallbackModeParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmChunkRetrievalTimeoutParameter"
Expand Down Expand Up @@ -983,6 +985,7 @@ paths:
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageBatchId"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmAct"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmActHistoryAddress"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter"
responses:
"201":
description: Created
Expand Down Expand Up @@ -1041,6 +1044,7 @@ paths:
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmFeedLegacyResolve"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmOnlyRootChunkParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmCache"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyStrategyParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyFallbackModeParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmChunkRetrievalTimeoutParameter"
Expand Down
9 changes: 6 additions & 3 deletions pkg/api/bzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/manifest"
"github.com/ethersphere/bee/v2/pkg/postage"
"github.com/ethersphere/bee/v2/pkg/replicas"
"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storer"
"github.com/ethersphere/bee/v2/pkg/swarm"
Expand Down Expand Up @@ -406,7 +407,8 @@ func (s *Service) serveReference(logger log.Logger, address swarm.Address, pathV
}

ctx := r.Context()
ls := loadsave.NewReadonly(s.storer.Download(cache), s.storer.Cache(), redundancy.DefaultLevel)
g := s.storer.Download(cache)
ls := loadsave.NewReadonly(g, s.storer.Cache(), rLevel)
feedDereferenced := false

ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, logger)
Expand Down Expand Up @@ -434,7 +436,7 @@ FETCH:
// unmarshal as mantaray first and possibly resolve the feed, otherwise
// go on normally.
if !feedDereferenced {
if l, err := s.manifestFeed(ctx, m); err == nil {
if l, err := s.manifestFeed(ctx, m, replicas.NewSocGetter(g, rLevel)); err == nil {
// we have a feed manifest here
ch, cur, _, err := l.At(ctx, time.Now().Unix(), 0)
if err != nil {
Expand Down Expand Up @@ -701,6 +703,7 @@ func manifestMetadataLoad(
func (s *Service) manifestFeed(
ctx context.Context,
m manifest.Interface,
st storage.Getter,
) (feeds.Lookup, error) {
e, err := m.Lookup(ctx, "/")
if err != nil {
Expand Down Expand Up @@ -733,5 +736,5 @@ func (s *Service) manifestFeed(
return nil, fmt.Errorf("node lookup: %s", "feed metadata absent")
}
f := feeds.New(topic, common.BytesToAddress(owner))
return s.feedFactory.NewLookup(*t, f)
return s.feedFactory.NewLookup(*t, f, st)
}
160 changes: 160 additions & 0 deletions pkg/api/bzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package api_test
import (
"bytes"
"context"
"encoding/hex"
"errors"
"fmt"
"io"
Expand All @@ -19,6 +20,7 @@ import (
"testing"

"github.com/ethersphere/bee/v2/pkg/api"
"github.com/ethersphere/bee/v2/pkg/feeds"
"github.com/ethersphere/bee/v2/pkg/file/loadsave"
"github.com/ethersphere/bee/v2/pkg/file/redundancy"
"github.com/ethersphere/bee/v2/pkg/jsonhttp"
Expand All @@ -27,7 +29,10 @@ import (
"github.com/ethersphere/bee/v2/pkg/manifest"
mockbatchstore "github.com/ethersphere/bee/v2/pkg/postage/batchstore/mock"
mockpost "github.com/ethersphere/bee/v2/pkg/postage/mock"
"github.com/ethersphere/bee/v2/pkg/replicas"
"github.com/ethersphere/bee/v2/pkg/soc"
testingsoc "github.com/ethersphere/bee/v2/pkg/soc/testing"
"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage/inmemchunkstore"
mockstorer "github.com/ethersphere/bee/v2/pkg/storer/mock"
"github.com/ethersphere/bee/v2/pkg/swarm"
Expand Down Expand Up @@ -809,6 +814,161 @@ func TestFeedIndirection(t *testing.T) {
t.Fatalf("expected file reference, did not got any")
}

// get root chunk of data
// and wrap it in a feed
rootCh, err := storer.ChunkStore().Get(context.Background(), resp.Reference)
if err != nil {
t.Fatal(err)
}
socRootCh := testingsoc.GenerateMockSOC(t, rootCh.Data()[swarm.SpanSize:]).Chunk()

// now use the "content" to mock the feed lookup
// also, use the mocked mantaray chunks that unmarshal
// into a real manifest with the mocked feed values when
// called from the bzz endpoint. then call the bzz endpoint with
// the pregenerated feed root manifest hash

t.Run("feed wrapping", func(t *testing.T) {
var (
look = newMockLookup(-1, 0, socRootCh, nil, &id{}, nil)
factory = newMockFactory(look)
bzzDownloadResource = func(addr, path string) string { return "/bzz/" + addr + "/" + path }
ctx = context.Background()
)
client, _, _, _ = newTestServer(t, testServerOptions{
Storer: storer,
Logger: logger,
Feeds: factory,
})
if err != nil {
t.Fatal(err)
}
m, err := manifest.NewDefaultManifest(
loadsave.New(storer.ChunkStore(), storer.Cache(), pipelineFactory(storer.Cache(), false, 0), redundancy.DefaultLevel),
false,
)
if err != nil {
t.Fatal(err)
}
emptyAddr := make([]byte, 32)
err = m.Add(ctx, manifest.RootPath, manifest.NewEntry(swarm.NewAddress(emptyAddr), map[string]string{
api.FeedMetadataEntryOwner: "8d3766440f0d7b949a5e32995d09619a7f86e632",
api.FeedMetadataEntryTopic: "abcc",
api.FeedMetadataEntryType: "epoch",
}))
if err != nil {
t.Fatal(err)
}
manifRef, err := m.Store(ctx)
if err != nil {
t.Fatal(err)
}

jsonhttptest.Request(t, client, http.MethodGet, bzzDownloadResource(manifRef.String(), ""), http.StatusOK,
jsonhttptest.WithExpectedResponse(updateData),
jsonhttptest.WithExpectedContentLength(len(updateData)),
jsonhttptest.WithExpectedResponseHeader(api.AccessControlExposeHeaders, api.SwarmFeedIndexHeader),
jsonhttptest.WithExpectedResponseHeader(api.AccessControlExposeHeaders, api.ContentDispositionHeader),
jsonhttptest.WithExpectedResponseHeader(api.ContentDispositionHeader, `inline; filename="index.html"`),
jsonhttptest.WithExpectedResponseHeader(api.ContentTypeHeader, "text/html; charset=utf-8"),
)
})

t.Run("redundancy", func(t *testing.T) {
// enough to test two redundancy levels since
tests := []struct {
name string
rLevel redundancy.Level
}{
{
name: "none",
rLevel: redundancy.NONE,
},
{
name: "medium",
rLevel: redundancy.MEDIUM,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rLevel := tt.rLevel
socRoot, _ := soc.FromChunk(socRootCh)
socPutter := replicas.NewSocPutter(storer, rLevel)
err = socPutter.Put(context.Background(), socRootCh)
if err != nil {
t.Fatalf("failed to put SOC chunk with redundancy: %v", err)
}

m, err := manifest.NewDefaultManifest(
loadsave.New(storer.ChunkStore(), storer.Cache(), pipelineFactory(storer.Cache(), false, rLevel), rLevel),
false,
)
if err != nil {
t.Fatal(err)
}

// Add the feed entry to the manifest
hexId := hex.EncodeToString(socRoot.ID())
hexOwner := hex.EncodeToString(socRoot.OwnerAddress())
err = m.Add(context.Background(), manifest.RootPath, manifest.NewEntry(socRootCh.Address(), map[string]string{
api.FeedMetadataEntryOwner: hexOwner,
api.FeedMetadataEntryTopic: hexId,
api.FeedMetadataEntryType: "sequence",
}))
if err != nil {
t.Fatal(err)
}
manifestRef, err := m.Store(context.Background())
if err != nil {
t.Fatal(err)
}

// Create mockLookup and mockFactory for feed
look := newRedundancyMockLookup(
rLevel,
storer.ChunkStore(),
func() (swarm.Chunk, feeds.Index, feeds.Index) {
return socRootCh, &id{}, &id{}
},
)
feedFactory := newMockFactory(look)

// Update the test server with the feed factory
client, _, _, _ := newTestServer(t, testServerOptions{
Storer: storer,
Logger: log.Noop,
Post: mockpost.New(mockpost.WithAcceptAll()),
Feeds: feedFactory,
})

// remove original chunk from store
cs, ok := storer.ChunkStore().(storage.ChunkStore)
if !ok {
t.Fatalf("chunk store not available for deletion")
}
err = cs.Delete(context.Background(), socRootCh.Address())
if err != nil {
t.Fatalf("Failed to delete soc chunk: %v", err)
}

manifestHex := manifestRef.String()

if rLevel == redundancy.NONE {
jsonhttptest.Request(t, client, http.MethodGet, "/bzz/"+manifestHex+"/", http.StatusNotFound)
return
}
jsonhttptest.Request(t, client, http.MethodGet, "/bzz/"+manifestHex+"/", http.StatusOK,
jsonhttptest.WithExpectedResponse(updateData),
jsonhttptest.WithExpectedContentLength(len(updateData)),
jsonhttptest.WithExpectedResponseHeader(api.AccessControlExposeHeaders, api.SwarmFeedIndexHeader),
jsonhttptest.WithExpectedResponseHeader(api.AccessControlExposeHeaders, api.ContentDispositionHeader),
jsonhttptest.WithExpectedResponseHeader(api.ContentTypeHeader, "text/html; charset=utf-8"),
jsonhttptest.WithExpectedResponseHeader(api.ContentDispositionHeader, `inline; filename="index.html"`),
)
})
}
})

m, err := manifest.NewDefaultManifest(
loadsave.New(storer.ChunkStore(), storer.Cache(), pipelineFactory(storer.Cache(), false, 0), redundancy.DefaultLevel),
false,
Expand Down
26 changes: 18 additions & 8 deletions pkg/api/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/manifest/mantaray"
"github.com/ethersphere/bee/v2/pkg/manifest/simple"
"github.com/ethersphere/bee/v2/pkg/postage"
"github.com/ethersphere/bee/v2/pkg/replicas"
"github.com/ethersphere/bee/v2/pkg/soc"
"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storer"
Expand Down Expand Up @@ -66,15 +67,21 @@ func (s *Service) feedGetHandler(w http.ResponseWriter, r *http.Request) {
}

headers := struct {
OnlyRootChunk bool `map:"Swarm-Only-Root-Chunk"`
OnlyRootChunk bool `map:"Swarm-Only-Root-Chunk"`
RedundancyLevel redundancy.Level `map:"Swarm-Redundancy-Level"`
}{}
if response := s.mapStructure(r.Header, &headers); response != nil {
response("invalid header params", logger, w)
return
}

getter := s.storer.Download(false)
if headers.RedundancyLevel > redundancy.NONE {
getter = replicas.NewSocGetter(getter, headers.RedundancyLevel)
}

f := feeds.New(paths.Topic, paths.Owner)
lookup, err := s.feedFactory.NewLookup(feeds.Sequence, f)
lookup, err := s.feedFactory.NewLookup(feeds.Sequence, f, getter)
if err != nil {
logger.Debug("new lookup failed", "owner", paths.Owner, "error", err)
logger.Error(nil, "new lookup failed")
Expand Down Expand Up @@ -170,11 +177,12 @@ func (s *Service) feedPostHandler(w http.ResponseWriter, r *http.Request) {
}

headers := struct {
BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"`
Pin bool `map:"Swarm-Pin"`
Deferred *bool `map:"Swarm-Deferred-Upload"`
Act bool `map:"Swarm-Act"`
HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"`
BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"`
Pin bool `map:"Swarm-Pin"`
Deferred *bool `map:"Swarm-Deferred-Upload"`
Act bool `map:"Swarm-Act"`
HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"`
RedundancyLevel redundancy.Level `map:"Swarm-Redundancy-Level"`
}{}
if response := s.mapStructure(r.Header, &headers); response != nil {
response("invalid header params", logger, w)
Expand Down Expand Up @@ -231,7 +239,9 @@ func (s *Service) feedPostHandler(w http.ResponseWriter, r *http.Request) {
logger: logger,
}

l := loadsave.New(s.storer.ChunkStore(), s.storer.Cache(), requestPipelineFactory(r.Context(), putter, false, 0), redundancy.DefaultLevel)
rLevel := headers.RedundancyLevel
Copy link
Member

@gacevicljubisa gacevicljubisa Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Get, should the default be None? or Paranoid?


l := loadsave.New(s.storer.ChunkStore(), s.storer.Cache(), requestPipelineFactory(r.Context(), putter, false, 0), rLevel)
feedManifest, err := manifest.NewDefaultManifest(l, false)
if err != nil {
logger.Debug("create manifest failed", "error", err)
Expand Down
Loading
Loading