Skip to content

Commit ea9f8c4

Browse files
authored
Merge pull request #93 from DCSO/unicorn
add testdata submission for flow reports
2 parents ff210ee + e310092 commit ea9f8c4

File tree

5 files changed

+121
-13
lines changed

5 files changed

+121
-13
lines changed

README.md

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ FEVER can optionally inject in-band test data into downstream submissions, such
213213
}
214214
```
215215
will be created and forwarded.
216-
* For passive DNS observation submissions, use the `pdns.test-domain` config item to insert a dummy entry for that domain, e.g. for `pdns.tests-domain` set to `heartbeat.fever-heartbeat`:
216+
* For passive DNS observation submissions, use the `pdns.test-domain` config item to insert a dummy entry for that domain, e.g. for `pdns.test-domain` set to `heartbeat.fever-heartbeat`:
217217
```json
218218
{
219219
"timestamp_start": "2021-12-07T18:18:00.029197078Z",
@@ -230,11 +230,34 @@ FEVER can optionally inject in-band test data into downstream submissions, such
230230
}
231231
]
232232
},
233-
...
233+
...
234234
}
235235
}
236236
```
237-
237+
* For flow report submission, use the `flowreport.testdata*` config items to insert a dummy flow for that specific IPs and ports, e.g. for :
238+
```yaml
239+
flowreport:
240+
# ...
241+
testdata-srcip: 0.0.0.1
242+
testdata-destip: 0.0.0.2
243+
testdata-destport: 99999
244+
```
245+
we would get
246+
```json
247+
{
248+
"sensor-id": "XXX",
249+
"time-start": "2021-12-08T13:53:36.442182896+01:00",
250+
"time-end": "2021-12-08T13:53:46.490743527+01:00",
251+
"tuples": {
252+
"0.0.0.1_0.0.0.2_99999": {
253+
"count": 1,
254+
"total_bytes_toclient": 23,
255+
"total_bytes_toserver": 42
256+
}
257+
},
258+
...
259+
}
260+
```
238261

239262
## Author/Contact
240263

cmd/fever/cmds/run.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,12 @@ func mainfunc(cmd *cobra.Command, args []string) {
441441
}).Info("compression of flow stats")
442442
}
443443
ua := processing.MakeUnicornAggregator(submitter, unicornSleep, dummyMode)
444+
testSrcIP := viper.GetString("flowreport.testdata-srcip")
445+
testDestIP := viper.GetString("flowreport.testdata-destip")
446+
testDestPort := viper.GetInt64("flowreport.testdata-destport")
447+
if testSrcIP != "" && testDestIP != "" {
448+
ua.EnableTestFlow(testSrcIP, testDestIP, testDestPort)
449+
}
444450
dispatcher.RegisterHandler(ua)
445451
ua.Run()
446452
defer func() {

fever.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,11 @@ flowreport:
8080
submission-exchange: aggregations
8181
# Set to true to disable gzip compression for uploads.
8282
nocompress: false
83+
# If both srcip and destip are non-empty, inject an extra flow record for
84+
# these towards the given destination port.
85+
#testdata-srcip: 0.0.0.1
86+
#testdata-destip: 0.0.0.2
87+
#testdata-destport: 99999
8388

8489
# Configuration for metrics (i.e. InfluxDB) submission.
8590
metrics:

processing/unicorn_aggregator.go

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package processing
66
import (
77
"bytes"
88
"encoding/json"
9+
"fmt"
910
"os"
1011
"strconv"
1112
"sync"
@@ -41,6 +42,9 @@ type UnicornAggregator struct {
4142
StringBuf bytes.Buffer
4243
UnicornTuplesMutex sync.RWMutex `json:"-"`
4344
UnicornProxyMapMutex sync.RWMutex `json:"-"`
45+
TestFlowSrcIP string
46+
TestFlowDestIP string
47+
TestFlowDestPort int64
4448
}
4549

4650
// MakeUnicornAggregate creates a new empty UnicornAggregate object.
@@ -59,12 +63,13 @@ func MakeUnicornAggregator(statsSubmitter util.StatsSubmitter,
5963
Logger: log.WithFields(log.Fields{
6064
"domain": "aggregate",
6165
}),
62-
Submitter: statsSubmitter,
63-
DummyMode: dummyMode,
64-
SubmitPeriod: submitPeriod,
65-
CloseChan: make(chan bool),
66-
ClosedChan: make(chan bool),
67-
Aggregate: *MakeUnicornAggregate(),
66+
Submitter: statsSubmitter,
67+
DummyMode: dummyMode,
68+
SubmitPeriod: submitPeriod,
69+
CloseChan: make(chan bool),
70+
ClosedChan: make(chan bool),
71+
Aggregate: *MakeUnicornAggregate(),
72+
TestFlowDestPort: 99999,
6873
}
6974
return a
7075
}
@@ -86,6 +91,16 @@ func (a *UnicornAggregator) stop() {
8691
}
8792

8893
func (a *UnicornAggregator) submit(submitter util.StatsSubmitter, dummyMode bool) {
94+
if a.TestFlowSrcIP != "" && a.TestFlowDestIP != "" {
95+
// Inject test flow into aggregation
96+
a.CountFlowTuple(
97+
fmt.Sprintf("%s_%s_%d", a.TestFlowSrcIP,
98+
a.TestFlowDestIP, a.TestFlowDestPort),
99+
23,
100+
42,
101+
20, // count 20 to ensure some limits are met downstream
102+
)
103+
}
89104
// Lock the current measurements for submission. Since this is a blocking
90105
// operation, we don't want this to depend on how long submitter.Submit()
91106
// takes but keep it independent of that. Hence we take the time to create
@@ -122,14 +137,16 @@ func (a *UnicornAggregator) submit(submitter util.StatsSubmitter, dummyMode bool
122137

123138
}
124139

125-
// CountFlowTuple increments the flow tuple counter for the given key.
140+
// CountFlowTuple increments the flow tuple counter for the given key. If addCnt
141+
// is >1, then the caller is responsible for providing the correct (sub-total)
142+
// counts for bytestoclient and bytestoserver.
126143
func (a *UnicornAggregator) CountFlowTuple(key string, bytestoclient int64,
127-
bytestoserver int64) {
144+
bytestoserver int64, addCnt int64) {
128145
a.UnicornTuplesMutex.Lock()
129146
if _, ok := a.Aggregate.FlowTuples[key]; !ok {
130147
a.Aggregate.FlowTuples[key] = make(map[string]int64)
131148
}
132-
a.Aggregate.FlowTuples[key]["count"]++
149+
a.Aggregate.FlowTuples[key]["count"] += addCnt
133150
a.Aggregate.FlowTuples[key]["total_bytes_toclient"] += bytestoclient
134151
a.Aggregate.FlowTuples[key]["total_bytes_toserver"] += bytestoserver
135152
a.UnicornTuplesMutex.Unlock()
@@ -187,7 +204,7 @@ func (a *UnicornAggregator) Consume(e *types.Entry) error {
187204
a.StringBuf.Write([]byte("_"))
188205
a.StringBuf.Write([]byte(strconv.Itoa(int(e.DestPort))))
189206
a.CountFlowTuple(a.StringBuf.String(), e.BytesToClient,
190-
e.BytesToServer)
207+
e.BytesToServer, 1)
191208
a.StringBuf.Reset()
192209
}
193210

@@ -210,3 +227,10 @@ func (a *UnicornAggregator) GetName() string {
210227
func (a *UnicornAggregator) GetEventTypes() []string {
211228
return []string{"http", "flow"}
212229
}
230+
231+
// EnableTestFlow adds a dummy flow with the given specs to each aggregation
232+
func (a *UnicornAggregator) EnableTestFlow(srcip, dstip string, dstport int64) {
233+
a.TestFlowSrcIP = srcip
234+
a.TestFlowDestIP = dstip
235+
a.TestFlowDestPort = dstport
236+
}

processing/unicorn_aggregator_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,56 @@ func TestUnicornAggregator(t *testing.T) {
184184
}
185185
}
186186

187+
func TestUnicornAggregatorWithTestdata(t *testing.T) {
188+
rand.Seed(time.Now().UTC().UnixNano())
189+
dsub := &testSubmitter{
190+
Data: make([]string, 0),
191+
}
192+
f := MakeUnicornAggregator(dsub, 500*time.Millisecond, false)
193+
f.EnableTestFlow("1.2.3.4", "5.6.7.8", 33333)
194+
f.Run()
195+
196+
for {
197+
if dsub.GetTotalAggs() < 1 {
198+
log.Debug(dsub.GetTotalAggs())
199+
time.Sleep(100 * time.Millisecond)
200+
} else {
201+
break
202+
}
203+
}
204+
205+
consumeWaitChan := make(chan bool)
206+
f.Stop(consumeWaitChan)
207+
<-consumeWaitChan
208+
209+
var d UnicornAggregate
210+
211+
err := json.Unmarshal([]byte(dsub.Data[0]), &d)
212+
if err != nil {
213+
t.Fatal(err)
214+
}
215+
if v, ok := d.FlowTuples["1.2.3.4_5.6.7.8_33333"]; ok {
216+
if val, ok := v["count"]; ok {
217+
if val != 20 {
218+
t.Fatalf("wrong value: %v", val)
219+
}
220+
}
221+
if val, ok := v["total_bytes_toclient"]; ok {
222+
if val != 23 {
223+
t.Fatalf("wrong value: %v", val)
224+
}
225+
}
226+
if val, ok := v["total_bytes_toserver"]; ok {
227+
if val != 42 {
228+
t.Fatalf("wrong value: %v", val)
229+
}
230+
}
231+
} else {
232+
t.Fatalf("missing key in map: %v", d.FlowTuples)
233+
}
234+
235+
}
236+
187237
func TestUnicornAggregatorWithDispatch(t *testing.T) {
188238
rand.Seed(time.Now().UTC().UnixNano())
189239
dsub := &testSubmitter{

0 commit comments

Comments
 (0)