Skip to content

Commit 2f2db65

Browse files
authored
Merge pull request #240 from jotak/copy-before-writes
Copy maps before write
2 parents 16d9c36 + c9abab9 commit 2f2db65

File tree

2 files changed

+43
-41
lines changed

2 files changed

+43
-41
lines changed

pkg/pipeline/transform/transform_filter.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ func (f *Filter) Transform(input []config.GenericMap) []config.GenericMap {
3232
log.Debugf("f = %v", f)
3333
output := make([]config.GenericMap, 0)
3434
for _, entry := range input {
35-
outputEntry := entry
35+
// copy input entry before transform to avoid alteration on parallel stages
36+
outputEntry := entry.Copy()
3637
addToOutput := true
3738
for _, rule := range f.Rules {
3839
log.Debugf("rule = %v", rule)

pkg/pipeline/transform/transform_network.go

Lines changed: 41 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -39,17 +39,18 @@ type Network struct {
3939
api.TransformNetwork
4040
}
4141

42-
func (n *Network) Transform(inputEntries []config.GenericMap) []config.GenericMap {
42+
func (n *Network) Transform(input []config.GenericMap) []config.GenericMap {
4343
outputEntries := make([]config.GenericMap, 0)
44-
for _, entry := range inputEntries {
44+
for _, entry := range input {
4545
outputEntry := n.TransformEntry(entry)
4646
outputEntries = append(outputEntries, outputEntry)
4747
}
4848
return outputEntries
4949
}
5050

5151
func (n *Network) TransformEntry(inputEntry config.GenericMap) config.GenericMap {
52-
outputEntries := inputEntry
52+
// copy input entry before transform to avoid alteration on parallel stages
53+
outputEntry := inputEntry.Copy()
5354

5455
for _, rule := range n.Rules {
5556
switch rule.Type {
@@ -59,28 +60,28 @@ func (n *Network) TransformEntry(inputEntry config.GenericMap) config.GenericMap
5960
panic(err)
6061
}
6162
buf := &bytes.Buffer{}
62-
err = template.Execute(buf, outputEntries)
63+
err = template.Execute(buf, outputEntry)
6364
if err != nil {
6465
panic(err)
6566
}
6667
FlowIDFieldsAsString := buf.String()
6768
isNew := connection_tracking.CT.AddFlow(FlowIDFieldsAsString)
6869
if isNew {
6970
if rule.Parameters != "" {
70-
outputEntries[rule.Output] = rule.Parameters
71+
outputEntry[rule.Output] = rule.Parameters
7172
} else {
72-
outputEntries[rule.Output] = true
73+
outputEntry[rule.Output] = true
7374
}
7475
}
7576

7677
case api.TransformNetworkOperationName("AddRegExIf"):
77-
matched, err := regexp.MatchString(rule.Parameters, fmt.Sprintf("%s", outputEntries[rule.Input]))
78+
matched, err := regexp.MatchString(rule.Parameters, fmt.Sprintf("%s", outputEntry[rule.Input]))
7879
if err != nil {
7980
continue
8081
}
8182
if matched {
82-
outputEntries[rule.Output] = outputEntries[rule.Input]
83-
outputEntries[rule.Output+"_Matched"] = true
83+
outputEntry[rule.Output] = outputEntry[rule.Input]
84+
outputEntry[rule.Output+"_Matched"] = true
8485
}
8586
case api.TransformNetworkOperationName("AddIf"):
8687
expressionString := fmt.Sprintf("val %s", rule.Parameters)
@@ -89,81 +90,81 @@ func (n *Network) TransformEntry(inputEntry config.GenericMap) config.GenericMap
8990
log.Errorf("Can't evaluate AddIf rule: %+v expression: %v. err %v", rule, expressionString, err)
9091
continue
9192
}
92-
result, evaluateErr := expression.Evaluate(map[string]interface{}{"val": outputEntries[rule.Input]})
93+
result, evaluateErr := expression.Evaluate(map[string]interface{}{"val": outputEntry[rule.Input]})
9394
if evaluateErr == nil && result.(bool) {
94-
outputEntries[rule.Output] = outputEntries[rule.Input]
95-
outputEntries[rule.Output+"_Evaluate"] = true
95+
outputEntry[rule.Output] = outputEntry[rule.Input]
96+
outputEntry[rule.Output+"_Evaluate"] = true
9697
}
9798
case api.TransformNetworkOperationName("AddSubnet"):
98-
_, ipv4Net, err := net.ParseCIDR(fmt.Sprintf("%v%s", outputEntries[rule.Input], rule.Parameters))
99+
_, ipv4Net, err := net.ParseCIDR(fmt.Sprintf("%v%s", outputEntry[rule.Input], rule.Parameters))
99100
if err != nil {
100-
log.Errorf("Can't find subnet for IP %v and prefix length %s - err %v", outputEntries[rule.Input], rule.Parameters, err)
101+
log.Errorf("Can't find subnet for IP %v and prefix length %s - err %v", outputEntry[rule.Input], rule.Parameters, err)
101102
continue
102103
}
103-
outputEntries[rule.Output] = ipv4Net.String()
104+
outputEntry[rule.Output] = ipv4Net.String()
104105
case api.TransformNetworkOperationName("AddLocation"):
105106
var locationInfo *location.Info
106-
err, locationInfo := location.GetLocation(fmt.Sprintf("%s", outputEntries[rule.Input]))
107+
err, locationInfo := location.GetLocation(fmt.Sprintf("%s", outputEntry[rule.Input]))
107108
if err != nil {
108-
log.Errorf("Can't find location for IP %v err %v", outputEntries[rule.Input], err)
109+
log.Errorf("Can't find location for IP %v err %v", outputEntry[rule.Input], err)
109110
continue
110111
}
111-
outputEntries[rule.Output+"_CountryName"] = locationInfo.CountryName
112-
outputEntries[rule.Output+"_CountryLongName"] = locationInfo.CountryLongName
113-
outputEntries[rule.Output+"_RegionName"] = locationInfo.RegionName
114-
outputEntries[rule.Output+"_CityName"] = locationInfo.CityName
115-
outputEntries[rule.Output+"_Latitude"] = locationInfo.Latitude
116-
outputEntries[rule.Output+"_Longitude"] = locationInfo.Longitude
112+
outputEntry[rule.Output+"_CountryName"] = locationInfo.CountryName
113+
outputEntry[rule.Output+"_CountryLongName"] = locationInfo.CountryLongName
114+
outputEntry[rule.Output+"_RegionName"] = locationInfo.RegionName
115+
outputEntry[rule.Output+"_CityName"] = locationInfo.CityName
116+
outputEntry[rule.Output+"_Latitude"] = locationInfo.Latitude
117+
outputEntry[rule.Output+"_Longitude"] = locationInfo.Longitude
117118
case api.TransformNetworkOperationName("AddService"):
118-
protocol := fmt.Sprintf("%v", outputEntries[rule.Parameters])
119-
portNumber, err := strconv.Atoi(fmt.Sprintf("%v", outputEntries[rule.Input]))
119+
protocol := fmt.Sprintf("%v", outputEntry[rule.Parameters])
120+
portNumber, err := strconv.Atoi(fmt.Sprintf("%v", outputEntry[rule.Input]))
120121
if err != nil {
121-
log.Errorf("Can't convert port to int: Port %v - err %v", outputEntries[rule.Input], err)
122+
log.Errorf("Can't convert port to int: Port %v - err %v", outputEntry[rule.Input], err)
122123
continue
123124
}
124125
service := netdb.GetServByPort(portNumber, netdb.GetProtoByName(protocol))
125126
if service == nil {
126127
protocolAsNumber, err := strconv.Atoi(fmt.Sprintf("%v", protocol))
127128
if err != nil {
128-
log.Infof("Can't find service name for Port %v and protocol %v - err %v", outputEntries[rule.Input], protocol, err)
129+
log.Infof("Can't find service name for Port %v and protocol %v - err %v", outputEntry[rule.Input], protocol, err)
129130
continue
130131
}
131132
service = netdb.GetServByPort(portNumber, netdb.GetProtoByNumber(protocolAsNumber))
132133
if service == nil {
133-
log.Infof("Can't find service name for Port %v and protocol %v - err %v", outputEntries[rule.Input], protocol, err)
134+
log.Infof("Can't find service name for Port %v and protocol %v - err %v", outputEntry[rule.Input], protocol, err)
134135
continue
135136
}
136137
}
137-
outputEntries[rule.Output] = service.Name
138+
outputEntry[rule.Output] = service.Name
138139
case api.TransformNetworkOperationName("AddKubernetes"):
139140
var kubeInfo *kubernetes.Info
140-
kubeInfo, err := kubernetes.Data.GetInfo(fmt.Sprintf("%s", outputEntries[rule.Input]))
141+
kubeInfo, err := kubernetes.Data.GetInfo(fmt.Sprintf("%s", outputEntry[rule.Input]))
141142
if err != nil {
142-
log.Debugf("Can't find kubernetes info for IP %v err %v", outputEntries[rule.Input], err)
143+
log.Debugf("Can't find kubernetes info for IP %v err %v", outputEntry[rule.Input], err)
143144
continue
144145
}
145-
outputEntries[rule.Output+"_Namespace"] = kubeInfo.Namespace
146-
outputEntries[rule.Output+"_Name"] = kubeInfo.Name
147-
outputEntries[rule.Output+"_Type"] = kubeInfo.Type
148-
outputEntries[rule.Output+"_OwnerName"] = kubeInfo.Owner.Name
149-
outputEntries[rule.Output+"_OwnerType"] = kubeInfo.Owner.Type
146+
outputEntry[rule.Output+"_Namespace"] = kubeInfo.Namespace
147+
outputEntry[rule.Output+"_Name"] = kubeInfo.Name
148+
outputEntry[rule.Output+"_Type"] = kubeInfo.Type
149+
outputEntry[rule.Output+"_OwnerName"] = kubeInfo.Owner.Name
150+
outputEntry[rule.Output+"_OwnerType"] = kubeInfo.Owner.Type
150151
if rule.Parameters != "" {
151152
for labelKey, labelValue := range kubeInfo.Labels {
152-
outputEntries[rule.Parameters+"_"+labelKey] = labelValue
153+
outputEntry[rule.Parameters+"_"+labelKey] = labelValue
153154
}
154155
}
155156
if kubeInfo.HostIP != "" {
156-
outputEntries[rule.Output+"_HostIP"] = kubeInfo.HostIP
157+
outputEntry[rule.Output+"_HostIP"] = kubeInfo.HostIP
157158
if kubeInfo.HostName != "" {
158-
outputEntries[rule.Output+"_HostName"] = kubeInfo.HostName
159+
outputEntry[rule.Output+"_HostName"] = kubeInfo.HostName
159160
}
160161
}
161162
default:
162163
log.Panicf("unknown type %s for transform.Network rule: %v", rule.Type, rule)
163164
}
164165
}
165166

166-
return outputEntries
167+
return outputEntry
167168
}
168169

169170
// NewTransformNetwork create a new transform

0 commit comments

Comments
 (0)