Skip to content

Commit 8eac256

Browse files
authored
NETOBSERV-2186: Set sampling in flows (#931)
1 parent 13e630b commit 8eac256

File tree

3 files changed

+61
-10
lines changed

3 files changed

+61
-10
lines changed

pkg/api/transform_filter.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
package api
1919

2020
type TransformFilter struct {
21-
Rules []TransformFilterRule `yaml:"rules,omitempty" json:"rules,omitempty" doc:"list of filter rules, each includes:"`
21+
Rules []TransformFilterRule `yaml:"rules,omitempty" json:"rules,omitempty" doc:"list of filter rules, each includes:"`
22+
SamplingField string `yaml:"samplingField,omitempty" json:"samplingField,omitempty" doc:"sampling field name to be set when sampling is used; if the field already exists in flows, its value is multiplied with the new sampling"`
2223
}
2324

2425
func (tf *TransformFilter) Preprocess() {

pkg/pipeline/transform/transform_filter.go

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ var (
3939
)
4040

4141
type Filter struct {
42-
Rules []api.TransformFilterRule
43-
KeepRules []predicateRule
42+
Rules []api.TransformFilterRule
43+
KeepRules []predicateRule
44+
SamplingField string
4445
}
4546

4647
type predicateRule struct {
@@ -56,7 +57,7 @@ func (f *Filter) Transform(entry config.GenericMap) (config.GenericMap, bool) {
5657
if len(f.KeepRules) > 0 {
5758
keep := false
5859
for _, r := range f.KeepRules {
59-
if applyPredicate(outputEntry, r) {
60+
if applyKeepPredicate(outputEntry, r, f.SamplingField) {
6061
keep = true
6162
break
6263
}
@@ -182,11 +183,21 @@ func isRemoveEntrySatisfied(entry config.GenericMap, rules []*api.RemoveEntryRul
182183
return true
183184
}
184185

185-
func applyPredicate(entry config.GenericMap, rule predicateRule) bool {
186-
if !rollSampling(rule.sampling) {
187-
return false
186+
// Returns true if flow must be kept. If sampling is configured, set the Sampling field on that flow.
187+
func applyKeepPredicate(entry config.GenericMap, rule predicateRule, samplingField string) bool {
188+
if rule.predicate(entry) {
189+
if rule.sampling > 0 {
190+
if rollSampling(rule.sampling) {
191+
if len(samplingField) > 0 {
192+
storeSampling(entry, int(rule.sampling), samplingField)
193+
}
194+
return true // predicate true and sampled-in
195+
}
196+
return false // sampled-out
197+
}
198+
return true // predicate true / no sampling
188199
}
189-
return rule.predicate(entry)
200+
return false // predicate false
190201
}
191202

192203
func sample(entry config.GenericMap, rules []*api.SamplingCondition) bool {
@@ -202,12 +213,27 @@ func rollSampling(value uint16) bool {
202213
return value == 0 || (rndgen.Intn(int(value)) == 0)
203214
}
204215

216+
func storeSampling(entry config.GenericMap, value int, samplingField string) {
217+
if current, found := entry[samplingField]; found {
218+
if cast, err := utils.ConvertToInt(current); err == nil {
219+
if cast == 0 {
220+
cast = 1
221+
}
222+
entry[samplingField] = cast * value
223+
}
224+
} else {
225+
entry[samplingField] = value
226+
}
227+
}
228+
205229
// NewTransformFilter create a new filter transform
206230
func NewTransformFilter(params config.StageParam) (Transformer, error) {
207231
tlog.Debugf("entering NewTransformFilter")
208232
keepRules := []predicateRule{}
209233
rules := []api.TransformFilterRule{}
234+
var samplingField string
210235
if params.Transform != nil && params.Transform.Filter != nil {
236+
samplingField = params.Transform.Filter.SamplingField
211237
params.Transform.Filter.Preprocess()
212238
for i := range params.Transform.Filter.Rules {
213239
baseRules := &params.Transform.Filter.Rules[i]
@@ -226,8 +252,9 @@ func NewTransformFilter(params config.StageParam) (Transformer, error) {
226252
}
227253
}
228254
transformFilter := &Filter{
229-
Rules: rules,
230-
KeepRules: keepRules,
255+
Rules: rules,
256+
KeepRules: keepRules,
257+
SamplingField: samplingField,
231258
}
232259
return transformFilter, nil
233260
}

pkg/pipeline/transform/transform_filter_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -714,6 +714,7 @@ func Test_Transform_KeepEntry(t *testing.T) {
714714

715715
func Test_Transform_KeepEntrySampling(t *testing.T) {
716716
newFilter := api.TransformFilter{
717+
SamplingField: "sampling",
717718
Rules: []api.TransformFilterRule{
718719
{
719720
Type: api.KeepEntryQuery,
@@ -724,6 +725,11 @@ func Test_Transform_KeepEntrySampling(t *testing.T) {
724725
Type: api.KeepEntryQuery,
725726
KeepEntryQuery: `namespace="B"`,
726727
},
728+
{
729+
Type: api.KeepEntryQuery,
730+
KeepEntryQuery: `namespace="C"`,
731+
KeepEntrySampling: 10,
732+
},
727733
},
728734
}
729735

@@ -738,23 +744,40 @@ func Test_Transform_KeepEntrySampling(t *testing.T) {
738744
input = append(input, config.GenericMap{
739745
"namespace": "B",
740746
})
747+
input = append(input, config.GenericMap{
748+
"namespace": "C",
749+
"sampling": 50, // First-pass sampling already exists
750+
})
741751
}
742752

743753
countA := 0
744754
countB := 0
755+
countC := 0
745756

746757
for _, flow := range input {
747758
if out, ok := tf.Transform(flow); ok {
748759
switch out["namespace"] {
749760
case "A":
750761
countA++
762+
sampling, ok := out["sampling"]
763+
assert.True(t, ok)
764+
assert.Equal(t, 10, sampling)
751765
case "B":
752766
countB++
767+
_, ok := out["sampling"]
768+
assert.False(t, ok)
769+
case "C":
770+
countC++
771+
sampling, ok := out["sampling"]
772+
assert.True(t, ok)
773+
assert.Equal(t, 500, sampling)
753774
}
754775
}
755776
}
756777

757778
assert.Less(t, countA, 300)
758779
assert.Greater(t, countA, 30)
759780
assert.Equal(t, countB, 1000)
781+
assert.Less(t, countC, 300)
782+
assert.Greater(t, countC, 30)
760783
}

0 commit comments

Comments
 (0)