Skip to content

Commit 68b3fd1

Browse files
committed
Update README.md
1 parent 49c8953 commit 68b3fd1

File tree

1 file changed

+33
-18
lines changed

1 file changed

+33
-18
lines changed

README.md

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,43 @@
11
go-stream
22
=========
33

4-
This library is a framework for stream processing analysis.
4+
This library is a framework for stream processing analysis. It is meant to be used as a library for go programs
5+
that need to do stream processing of large volumes of data.
56

67
It is made up of a graph connecting a source to 1 or more operators, terminating at a sink.
7-
Operators pass data from one to another with go channels. An example graph is:
8+
Operators pass data from one to another with go channels. An example graph to encode objects to snappy is:
89

9-
input := make(chan stream.Object)
10-
11-
passthruFn := func(in int) []int {
12-
return []int{in}
13-
}
14-
15-
FirstOp := mapper.NewOp(passthruFn, "First PT no")
16-
FirstOp.SetIn(input)
17-
SecondOp := mapper.NewOp(passthruFn, "2nd PT no")
18-
19-
ch := stream.NewChain()
20-
ch.Add(FirstOp)
21-
ch.Add(SecondOp)
10+
var from *util.MemoryBuffer
11+
// fill up from
2212

23-
ch.Start()
24-
25-
Now any data sent through the input channel will be processed by the library.
13+
var to *util.MemoryBuffer
14+
15+
ch := stream.NewOrderedChain()
16+
ch.Add(source.NewNextReaderSource(from))
17+
timingOp, _, dur := timing.NewTimingOp()
18+
ch.Add(timingOp)
19+
ch.Add(compress.NewSnappyEncodeOp())
20+
ch.Add(sink.NewWriterSink(to))
21+
22+
ch.Start()
23+
24+
log.Printf("RES: Compress Snappy.\t\tRatio %v", float64(to.ByteSize())/float64(from.ByteSize()))
25+
log.Printf("RES: Compress Snappy.\t\tBuffered Items: %d\tItem: %v\ttook: %v\trate: %d\tsize: %E\tsize/item: %E", to.Len(), *counter, *dur, int( float64(*counter)/(*dur).Seconds()), float64(to.ByteSize()), float64(to.ByteSize())/float64(*counter))
26+
27+
Operators are the main components of a chain.
28+
They process tuples to produce results. Sources are operators with no output. Sinks are operators
29+
with no input. Operators implement stream.Operator. If it takes input implements stream.In; if it produces output implements stream.Out.
30+
31+
Mappers give a simple way to implement operators. mapper.NewOp() takes a function of the form
32+
func(input stream.Object, out Outputer) which processes the input and outputs it to the Outputer object.
33+
Mappers are automatically parallelized. Generators give a way to give mappers thread-local storage through closures.
34+
You can also give mappers special functionality after they have finished processing the last tuple.
35+
36+
You can also split the data of a chain into other chains. stream.Fanout takes input and copies them to N other chains.
37+
Distributor takes input and puts it onto 1 of N chains according to a mapping function.
38+
39+
Chains can be ordered or unordered. Ordered chains preserve the order of tuples from input to output
40+
(although the operators still use parallelism).
2641

2742
Compiling:
2843
go build

0 commit comments

Comments
 (0)