Skip to content

Commit ddba882

Browse files
authored
Merge pull request #226 from jpinsonneau/334
NETOBSERV-334 concurrent map iteration & write fix
2 parents 532816c + 2648b82 commit ddba882

File tree

3 files changed

+38
-6
lines changed

3 files changed

+38
-6
lines changed

pkg/config/config.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@ import (
2424
"github.com/sirupsen/logrus"
2525
)
2626

27-
type GenericMap map[string]interface{}
28-
2927
var (
3028
Opt = Options{}
3129
PipeLine []Stage

pkg/config/generic_map.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright (C) 2022 IBM, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package config
19+
20+
type GenericMap map[string]interface{}
21+
22+
// Copy will create a flat copy of GenericMap
23+
func (m GenericMap) Copy() GenericMap {
24+
result := GenericMap{}
25+
26+
for k, v := range m {
27+
result[k] = v
28+
}
29+
30+
return result
31+
}

pkg/pipeline/write/write_loki.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,11 @@ func buildLokiConfig(c *api.WriteLoki) (loki.Config, error) {
109109
}
110110

111111
func (l *Loki) ProcessRecord(record config.GenericMap) error {
112+
// copy record before process to avoid alteration on parallel stages
113+
recordCopy := record.Copy()
114+
112115
// Get timestamp from record (default: TimeFlowStart)
113-
timestamp := l.extractTimestamp(record)
116+
timestamp := l.extractTimestamp(recordCopy)
114117

115118
labels := model.LabelSet{}
116119

@@ -119,15 +122,15 @@ func (l *Loki) ProcessRecord(record config.GenericMap) error {
119122
labels[k] = v
120123
}
121124

122-
l.addNonStaticLabels(record, labels)
125+
l.addNonStaticLabels(recordCopy, labels)
123126

124127
// Remove labels and configured ignore list from record
125128
ignoreList := append(l.apiConfig.IgnoreList, l.apiConfig.Labels...)
126129
for _, label := range ignoreList {
127-
delete(record, label)
130+
delete(recordCopy, label)
128131
}
129132

130-
js, err := jsonIter.ConfigCompatibleWithStandardLibrary.Marshal(record)
133+
js, err := jsonIter.ConfigCompatibleWithStandardLibrary.Marshal(recordCopy)
131134
if err != nil {
132135
return err
133136
}

0 commit comments

Comments
 (0)