Skip to content

Commit d558a59

Browse files
jmozahfjl
authored andcommitted
swarm/storage: pyramid chunker re-write (#14382)
1 parent 3c86563 commit d558a59

File tree

12 files changed

+1010
-235
lines changed

12 files changed

+1010
-235
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,6 @@ build/_vendor/pkg
3030
# travis
3131
profile.tmp
3232
profile.cov
33+
34+
# IdeaIDE
35+
.idea

swarm/network/depo.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,12 @@ import (
2929
// Handler for storage/retrieval related protocol requests
3030
// implements the StorageHandler interface used by the bzz protocol
3131
type Depo struct {
32-
hashfunc storage.Hasher
32+
hashfunc storage.SwarmHasher
3333
localStore storage.ChunkStore
3434
netStore storage.ChunkStore
3535
}
3636

37-
func NewDepo(hash storage.Hasher, localStore, remoteStore storage.ChunkStore) *Depo {
37+
func NewDepo(hash storage.SwarmHasher, localStore, remoteStore storage.ChunkStore) *Depo {
3838
return &Depo{
3939
hashfunc: hash,
4040
localStore: localStore,

swarm/storage/chunker.go

Lines changed: 58 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ import (
2020
"encoding/binary"
2121
"errors"
2222
"fmt"
23-
"hash"
2423
"io"
2524
"sync"
25+
"time"
2626
)
2727

2828
/*
@@ -50,14 +50,6 @@ data_{i} := size(subtree_{i}) || key_{j} || key_{j+1} .... || key_{j+n-1}
5050
The underlying hash function is configurable
5151
*/
5252

53-
const (
54-
defaultHash = "SHA3"
55-
// defaultHash = "BMTSHA3" // http://golang.org/pkg/hash/#Hash
56-
// defaultHash = "SHA256" // http://golang.org/pkg/hash/#Hash
57-
defaultBranches int64 = 128
58-
// hashSize int64 = hasherfunc.New().Size() // hasher knows about its own length in bytes
59-
// chunksize int64 = branches * hashSize // chunk is defined as this
60-
)
6153

6254
/*
6355
Tree chunker is a concrete implementation of data chunking.
@@ -67,25 +59,19 @@ If all is well it is possible to implement this by simply composing readers so t
6759
The hashing itself does use extra copies and allocation though, since it does need it.
6860
*/
6961

70-
type ChunkerParams struct {
71-
Branches int64
72-
Hash string
73-
}
74-
75-
func NewChunkerParams() *ChunkerParams {
76-
return &ChunkerParams{
77-
Branches: defaultBranches,
78-
Hash: defaultHash,
79-
}
80-
}
62+
var (
63+
errAppendOppNotSuported = errors.New("Append operation not supported")
64+
errOperationTimedOut = errors.New("operation timed out")
65+
)
8166

8267
type TreeChunker struct {
8368
branches int64
84-
hashFunc Hasher
69+
hashFunc SwarmHasher
8570
// calculated
8671
hashSize int64 // self.hashFunc.New().Size()
8772
chunkSize int64 // hashSize* branches
88-
workerCount int
73+
workerCount int64 // the number of worker routines used
74+
workerLock sync.RWMutex // lock for the worker count
8975
}
9076

9177
func NewTreeChunker(params *ChunkerParams) (self *TreeChunker) {
@@ -94,7 +80,8 @@ func NewTreeChunker(params *ChunkerParams) (self *TreeChunker) {
9480
self.branches = params.Branches
9581
self.hashSize = int64(self.hashFunc().Size())
9682
self.chunkSize = self.hashSize * self.branches
97-
self.workerCount = 1
83+
self.workerCount = 0
84+
9885
return
9986
}
10087

@@ -114,13 +101,31 @@ type hashJob struct {
114101
parentWg *sync.WaitGroup
115102
}
116103

117-
func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, swg, wwg *sync.WaitGroup) (Key, error) {
104+
func (self *TreeChunker) incrementWorkerCount() {
105+
self.workerLock.Lock()
106+
defer self.workerLock.Unlock()
107+
self.workerCount += 1
108+
}
109+
110+
func (self *TreeChunker) getWorkerCount() int64 {
111+
self.workerLock.RLock()
112+
defer self.workerLock.RUnlock()
113+
return self.workerCount
114+
}
118115

116+
func (self *TreeChunker) decrementWorkerCount() {
117+
self.workerLock.Lock()
118+
defer self.workerLock.Unlock()
119+
self.workerCount -= 1
120+
}
121+
122+
func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, swg, wwg *sync.WaitGroup) (Key, error) {
119123
if self.chunkSize <= 0 {
120124
panic("chunker must be initialised")
121125
}
122126

123-
jobC := make(chan *hashJob, 2*processors)
127+
128+
jobC := make(chan *hashJob, 2*ChunkProcessors)
124129
wg := &sync.WaitGroup{}
125130
errC := make(chan error)
126131
quitC := make(chan bool)
@@ -129,6 +134,8 @@ func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, s
129134
if wwg != nil {
130135
wwg.Add(1)
131136
}
137+
138+
self.incrementWorkerCount()
132139
go self.hashWorker(jobC, chunkC, errC, quitC, swg, wwg)
133140

134141
depth := 0
@@ -157,17 +164,24 @@ func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, s
157164
close(errC)
158165
}()
159166

160-
//TODO: add a timeout
161-
if err := <-errC; err != nil {
162-
close(quitC)
163-
return nil, err
167+
168+
defer close(quitC)
169+
select {
170+
case err := <-errC:
171+
if err != nil {
172+
return nil, err
173+
}
174+
case <-time.NewTimer(splitTimeout).C:
175+
return nil,errOperationTimedOut
164176
}
165177

166178
return key, nil
167179
}
168180

169181
func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reader, size int64, jobC chan *hashJob, chunkC chan *Chunk, errC chan error, quitC chan bool, parentWg, swg, wwg *sync.WaitGroup) {
170182

183+
//
184+
171185
for depth > 0 && size < treeSize {
172186
treeSize /= self.branches
173187
depth--
@@ -223,12 +237,15 @@ func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reade
223237
// parentWg.Add(1)
224238
// go func() {
225239
childrenWg.Wait()
226-
if len(jobC) > self.workerCount && self.workerCount < processors {
240+
241+
worker := self.getWorkerCount()
242+
if int64(len(jobC)) > worker && worker < ChunkProcessors {
227243
if wwg != nil {
228244
wwg.Add(1)
229245
}
230-
self.workerCount++
246+
self.incrementWorkerCount()
231247
go self.hashWorker(jobC, chunkC, errC, quitC, swg, wwg)
248+
232249
}
233250
select {
234251
case jobC <- &hashJob{key, chunk, size, parentWg}:
@@ -237,6 +254,8 @@ func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reade
237254
}
238255

239256
func (self *TreeChunker) hashWorker(jobC chan *hashJob, chunkC chan *Chunk, errC chan error, quitC chan bool, swg, wwg *sync.WaitGroup) {
257+
defer self.decrementWorkerCount()
258+
240259
hasher := self.hashFunc()
241260
if wwg != nil {
242261
defer wwg.Done()
@@ -249,7 +268,6 @@ func (self *TreeChunker) hashWorker(jobC chan *hashJob, chunkC chan *Chunk, errC
249268
return
250269
}
251270
// now we got the hashes in the chunk, then hash the chunks
252-
hasher.Reset()
253271
self.hashChunk(hasher, job, chunkC, swg)
254272
case <-quitC:
255273
return
@@ -260,9 +278,11 @@ func (self *TreeChunker) hashWorker(jobC chan *hashJob, chunkC chan *Chunk, errC
260278
// The treeChunkers own Hash hashes together
261279
// - the size (of the subtree encoded in the Chunk)
262280
// - the Chunk, ie. the contents read from the input reader
263-
func (self *TreeChunker) hashChunk(hasher hash.Hash, job *hashJob, chunkC chan *Chunk, swg *sync.WaitGroup) {
264-
hasher.Write(job.chunk)
281+
func (self *TreeChunker) hashChunk(hasher SwarmHash, job *hashJob, chunkC chan *Chunk, swg *sync.WaitGroup) {
282+
hasher.ResetWithLength(job.chunk[:8]) // 8 bytes of length
283+
hasher.Write(job.chunk[8:]) // minus 8 []byte length
265284
h := hasher.Sum(nil)
285+
266286
newChunk := &Chunk{
267287
Key: h,
268288
SData: job.chunk,
@@ -285,6 +305,10 @@ func (self *TreeChunker) hashChunk(hasher hash.Hash, job *hashJob, chunkC chan *
285305
}
286306
}
287307

308+
func (self *TreeChunker) Append(key Key, data io.Reader, chunkC chan *Chunk, swg, wwg *sync.WaitGroup) (Key, error) {
309+
return nil, errAppendOppNotSuported
310+
}
311+
288312
// LazyChunkReader implements LazySectionReader
289313
type LazyChunkReader struct {
290314
key Key // root key
@@ -298,7 +322,6 @@ type LazyChunkReader struct {
298322

299323
// implements the Joiner interface
300324
func (self *TreeChunker) Join(key Key, chunkC chan *Chunk) LazySectionReader {
301-
302325
return &LazyChunkReader{
303326
key: key,
304327
chunkC: chunkC,

0 commit comments

Comments
 (0)