Skip to content

Commit 093b7bf

Browse files
committed
add third parameter domainID/srcID to key in template cache for IPFix/v9# This is the commit message #5:
add third parameter domainID/srcID to key in template cache for IPFix/v9
1 parent 9ae42e9 commit 093b7bf

File tree

7 files changed

+87
-43
lines changed

7 files changed

+87
-43
lines changed

ipfix/decoder.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -164,17 +164,19 @@ func (d *Decoder) decodeSet(mem MemCache, msg *Message) error {
164164
// This check is somewhat redundant with the switch-clause below, but the retrieve() operation should not be executed inside the loop.
165165
if setHeader.SetID > 255 {
166166
var ok bool
167-
if tr, ok = mem.retrieve(setHeader.SetID, d.raddr); !ok {
167+
if tr, ok = mem.retrieve(setHeader.SetID, d.raddr, msg.Header.DomainID); !ok {
168168
select {
169169
case rpcChan <- RPCRequest{
170170
ID: setHeader.SetID,
171171
IP: d.raddr,
172+
SrcID: msg.Header.DomainID,
172173
}:
173174
default:
174175
}
175-
err = nonfatalError{fmt.Errorf("%s unknown ipfix template id# %d",
176+
err = nonfatalError{fmt.Errorf("%s unknown ipfix template id# %d with domain ID %d",
176177
d.raddr.String(),
177178
setHeader.SetID,
179+
msg.Header.DomainID,
178180
)}
179181
}
180182
}
@@ -196,15 +198,15 @@ func (d *Decoder) decodeSet(mem MemCache, msg *Message) error {
196198
err = tr.unmarshalOpts(d.reader)
197199
}
198200
if err == nil {
199-
mem.insert(tr.TemplateID, d.raddr, tr)
201+
mem.insert(tr.TemplateID, d.raddr, tr, msg.Header.DomainID)
200202
}
201203
} else if setID >= 4 && setID <= 255 {
202-
// Reserved set, do not read any records
203-
break
204-
} else if setID == 0 {
205-
// Invalid set
206-
return fmt.Errorf("failed to decodeSet / invalid setID")
207-
} else {
204+
// Reserved set, do not read any records
205+
break
206+
} else if setID == 0 {
207+
// Invalid set
208+
return fmt.Errorf("failed to decodeSet / invalid setID")
209+
} else {
208210
// Data set
209211
var data []DecodedField
210212
if data, err = d.decodeData(tr); err == nil {

ipfix/decoder_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import (
3131
var tpl, optsTpl, multiMessage, unknownDatasetMessage []byte
3232

3333
func init() {
34-
// IPFIX packet including template SetID:2, 25 fields
34+
// IPFIX packet including template SetID:2, 25 fields, Domain id 33792
3535
tpl = []byte{
3636
0x0, 0xa, 0x0, 0x7c, 0x58, 0x90, 0xd6, 0x40, 0x28, 0xf7,
3737
0xa0, 0x4a, 0x0, 0x0, 0x84, 0x0, 0x0, 0x2, 0x0, 0x6c, 0x1,
@@ -205,8 +205,8 @@ func TestUnknownDatasetsMessage(t *testing.T) {
205205
t.Error("Did not expect any result datasets, but got", l)
206206
}
207207
expectedErrorStr := `Multiple errors:
208-
- 127.0.0.1 unknown ipfix template id# 264
209-
- 127.0.0.1 unknown ipfix template id# 264`
208+
- 127.0.0.1 unknown ipfix template id# 264 with domain ID 1
209+
- 127.0.0.1 unknown ipfix template id# 264 with domain ID 1`
210210
if err == nil || err.Error() != expectedErrorStr {
211211
t.Error("Received unexpected erorr:", err)
212212
}

ipfix/memcache.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -79,27 +79,32 @@ func GetCache(cacheFile string) MemCache {
7979
return m
8080
}
8181

82-
func (m MemCache) getShard(id uint16, addr net.IP) (*TemplatesShard, uint32) {
83-
b := make([]byte, 2)
84-
binary.BigEndian.PutUint16(b, id)
85-
key := append(addr, b...)
86-
82+
func (m MemCache) getShard(templateId uint16, addr net.IP, domainId uint32) (*TemplatesShard, uint32) {
83+
var key []byte
8784
hash := fnv.New32()
85+
dId := make([]byte, 4)
86+
tID := make([]byte, 2)
87+
binary.LittleEndian.PutUint32(dId, domainId)
88+
binary.BigEndian.PutUint16(tID, templateId)
89+
key = append(key, addr...)
90+
key = append(key, dId...)
91+
key = append(key, tID...)
92+
8893
hash.Write(key)
8994
hSum32 := hash.Sum32()
9095

9196
return m[uint(hSum32)%uint(shardNo)], hSum32
9297
}
9398

94-
func (m MemCache) insert(id uint16, addr net.IP, tr TemplateRecord) {
95-
shard, key := m.getShard(id, addr)
99+
func (m MemCache) insert(id uint16, addr net.IP, tr TemplateRecord, domainID uint32) {
100+
shard, key := m.getShard(id, addr, domainID)
96101
shard.Lock()
97102
defer shard.Unlock()
98103
shard.Templates[key] = Data{tr, time.Now().Unix()}
99104
}
100105

101-
func (m MemCache) retrieve(id uint16, addr net.IP) (TemplateRecord, bool) {
102-
shard, key := m.getShard(id, addr)
106+
func (m MemCache) retrieve(id uint16, addr net.IP, domainID uint32) (TemplateRecord, bool) {
107+
shard, key := m.getShard(id, addr, domainID)
103108
shard.RLock()
104109
defer shard.RUnlock()
105110
v, ok := shard.Templates[key]

ipfix/memcache_rpc.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ type RPCConfig struct {
5858
type RPCRequest struct {
5959
ID uint16
6060
IP net.IP
61+
SrcID uint32
6162
}
6263

6364
type vFlowServer struct {
@@ -91,7 +92,7 @@ func NewRPC(mCache MemCache) *IRPC {
9192
func (r *IRPC) Get(req RPCRequest, resp *TemplateRecord) error {
9293
var ok bool
9394

94-
*resp, ok = r.mCache.retrieve(req.ID, req.IP)
95+
*resp, ok = r.mCache.retrieve(req.ID, req.IP, req.SrcID)
9596
if !ok {
9697
return errNotAvail
9798
}
@@ -168,7 +169,7 @@ func RPC(m MemCache, config *RPCConfig) {
168169
continue
169170
}
170171

171-
m.insert(req.ID, req.IP, *tr)
172+
m.insert(req.ID, req.IP, *tr, req.SrcID)
172173
break
173174
}
174175

ipfix/memcache_test.go

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func TestMemCacheRetrieve(t *testing.T) {
3333
mCache := GetCache("cache.file")
3434
d := NewDecoder(ip, tpl)
3535
d.Decode(mCache)
36-
v, ok := mCache.retrieve(256, ip)
36+
v, ok := mCache.retrieve(256, ip, 33792)
3737
if !ok {
3838
t.Error("expected mCache retrieve status true, got", ok)
3939
}
@@ -48,9 +48,9 @@ func TestMemCacheInsert(t *testing.T) {
4848
mCache := GetCache("cache.file")
4949

5050
tpl.TemplateID = 310
51-
mCache.insert(310, ip, tpl)
51+
mCache.insert(310, ip, tpl, 513)
5252

53-
v, ok := mCache.retrieve(310, ip)
53+
v, ok := mCache.retrieve(310, ip, 513)
5454
if !ok {
5555
t.Error("expected mCache retrieve status true, got", ok)
5656
}
@@ -65,15 +65,46 @@ func TestMemCacheAllSetIds(t *testing.T) {
6565
mCache := GetCache("cache.file")
6666

6767
tpl.TemplateID = 310
68-
mCache.insert(tpl.TemplateID, ip, tpl)
68+
mCache.insert(tpl.TemplateID, ip, tpl, 513)
6969
tpl.TemplateID = 410
70-
mCache.insert(tpl.TemplateID, ip, tpl)
70+
mCache.insert(tpl.TemplateID, ip, tpl, 513)
7171
tpl.TemplateID = 210
72-
mCache.insert(tpl.TemplateID, ip, tpl)
72+
mCache.insert(tpl.TemplateID, ip, tpl, 513)
7373

7474
expected := []int{210, 310, 410}
7575
actual := mCache.allSetIds()
7676
if !reflect.DeepEqual(expected, actual) {
7777
t.Errorf("Expected set IDs %v, got %v", expected, actual)
7878
}
7979
}
80+
81+
func TestMemCache_keyWithDifferentDomainIDs(t *testing.T) {
82+
var tpl TemplateRecord
83+
ip := net.ParseIP("127.0.0.1")
84+
mCache := GetCache("cache.file")
85+
86+
tpl.TemplateID = 310
87+
tpl.FieldCount = 19
88+
mCache.insert(tpl.TemplateID, ip, tpl, 513)
89+
90+
tpl.FieldCount = 21
91+
mCache.insert(tpl.TemplateID, ip, tpl, 514)
92+
93+
v, ok := mCache.retrieve(tpl.TemplateID, ip, 513)
94+
95+
if !ok {
96+
t.Error("expected mCache retrieve status true, got", ok)
97+
}
98+
if v.FieldCount != 19 {
99+
t.Error("expected template id#:310 with Field count#:19, got", v.TemplateID, v.FieldCount)
100+
}
101+
102+
v, ok = mCache.retrieve(tpl.TemplateID, ip, 514)
103+
104+
if !ok {
105+
t.Error("expected mCache retrieve status true, got", ok)
106+
}
107+
if v.FieldCount != 21 {
108+
t.Error("expected template id#:310 with Field count#:21, got", v.TemplateID, v.FieldCount)
109+
}
110+
}

netflow/v9/decoder.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -426,11 +426,11 @@ func (d *Decoder) decodeSet(mem MemCache, msg *Message) error {
426426
// This check is somewhat redundant with the switch-clause below, but the retrieve() operation should not be executed inside the loop.
427427
if setHeader.FlowSetID > 255 {
428428
var ok bool
429-
tr, ok = mem.retrieve(setHeader.FlowSetID, d.raddr)
429+
tr, ok = mem.retrieve(setHeader.FlowSetID, d.raddr, msg.Header.SrcID)
430430
if !ok {
431-
err = nonfatalError(fmt.Errorf("%s unknown netflow template id# %d",
431+
err = nonfatalError(fmt.Errorf("%s unknown netflow template id# %d from sourceID %d",
432432
d.raddr.String(),
433-
setHeader.FlowSetID,
433+
setHeader.FlowSetID, msg.Header.SrcID,
434434
))
435435
}
436436
}
@@ -446,9 +446,9 @@ func (d *Decoder) decodeSet(mem MemCache, msg *Message) error {
446446
err = tr.unmarshalOpts(d.reader)
447447
}
448448
if err == nil {
449-
mem.insert(tr.TemplateID, d.raddr, tr)
449+
mem.insert(tr.TemplateID, d.raddr, tr, msg.Header.SrcID)
450450
}
451-
} else if setId >= 4 && setId <= 255 {
451+
} else if setId <= 255 {
452452
// Reserved set, do not read any records
453453
break
454454
} else {

netflow/v9/memcache.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -78,27 +78,32 @@ func GetCache(cacheFile string) MemCache {
7878
return m
7979
}
8080

81-
func (m MemCache) getShard(id uint16, addr net.IP) (*TemplatesShard, uint32) {
82-
b := make([]byte, 2)
83-
binary.BigEndian.PutUint16(b, id)
84-
key := append(addr, b...)
85-
81+
func (m MemCache) getShard(templateId uint16, addr net.IP, srcId uint32) (*TemplatesShard, uint32) {
82+
var key []byte
8683
hash := fnv.New32()
84+
sId := make([]byte, 4)
85+
tID := make([]byte, 2)
86+
binary.LittleEndian.PutUint32(sId, srcId)
87+
binary.BigEndian.PutUint16(tID, templateId)
88+
key = append(key, addr...)
89+
key = append(key, sId...)
90+
key = append(key, tID...)
91+
8792
hash.Write(key)
8893
hSum32 := hash.Sum32()
8994

9095
return m[uint(hSum32)%uint(shardNo)], hSum32
9196
}
9297

93-
func (m *MemCache) insert(id uint16, addr net.IP, tr TemplateRecord) {
94-
shard, key := m.getShard(id, addr)
98+
func (m *MemCache) insert(id uint16, addr net.IP, tr TemplateRecord, srcID uint32) {
99+
shard, key := m.getShard(id, addr, srcID)
95100
shard.Lock()
96101
defer shard.Unlock()
97102
shard.Templates[key] = Data{tr, time.Now().Unix()}
98103
}
99104

100-
func (m *MemCache) retrieve(id uint16, addr net.IP) (TemplateRecord, bool) {
101-
shard, key := m.getShard(id, addr)
105+
func (m *MemCache) retrieve(id uint16, addr net.IP, srcID uint32) (TemplateRecord, bool) {
106+
shard, key := m.getShard(id, addr, srcID)
102107
shard.RLock()
103108
defer shard.RUnlock()
104109
v, ok := shard.Templates[key]

0 commit comments

Comments
 (0)