Skip to content

Commit 183ba78

Browse files
authored
Relax validation of subobject leaves for flattened fields (#2090)
We added some additional validations for subobjects and arrays of objects in #1498, #1489 and related PRs. These validations only apply to packages with spec starting on 3.0.1. These validations rely on the structure of documents. With the adoption of features like `subobjects: false` or synthetic source the structure is lost, and exceptions based on spec version are not working, so the tests fail for cases where they should not for versions of the spec older than 3.0.1. This happens for example in the `dns` data stream of the `network_traffic` package when LogsDB is enabled.
1 parent 25dfab5 commit 183ba78

File tree

2 files changed

+148
-12
lines changed

2 files changed

+148
-12
lines changed

internal/fields/validate.go

Lines changed: 56 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -662,6 +662,11 @@ func (v *Validator) validateMapElement(root string, elem common.MapStr, doc comm
662662
errs = append(errs, err...)
663663
}
664664
default:
665+
if skipLeafOfObject(root, name, v.specVersion, v.Schema) {
666+
// Till some versions we skip some validations on leaf of objects, check if it is the case.
667+
break
668+
}
669+
665670
err := v.validateScalarElement(key, val, doc)
666671
if err != nil {
667672
errs = append(errs, err)
@@ -820,6 +825,37 @@ func skipValidationForField(key string) bool {
820825
isFieldFamilyMatching("event.module", key) // field is deprecated
821826
}
822827

828+
// skipLeafOfObject checks if the element is a child of an object that was skipped in some previous
829+
// version of the spec. This is relevant in documents that store fields without subobjects.
830+
func skipLeafOfObject(root, name string, specVersion semver.Version, schema []FieldDefinition) bool {
831+
// We are only skipping validation of these fields on versions older than 3.0.1.
832+
if !specVersion.LessThan(semver3_0_1) {
833+
return false
834+
}
835+
836+
// If it doesn't contain a dot in the name, we have traversed its parent, if any.
837+
if !strings.Contains(name, ".") {
838+
return false
839+
}
840+
841+
key := name
842+
if root != "" {
843+
key = root + "." + name
844+
}
845+
_, ancestor := findAncestorElementDefinition(key, schema, func(key string, def *FieldDefinition) bool {
846+
// Don't look for ancestors beyond root, these objects have been already traversed.
847+
if len(key) < len(root) {
848+
return false
849+
}
850+
if !slices.Contains([]string{"group", "object", "nested", "flattened"}, def.Type) {
851+
return false
852+
}
853+
return true
854+
})
855+
856+
return ancestor != nil
857+
}
858+
823859
func isFieldFamilyMatching(family, key string) bool {
824860
return key == family || strings.HasPrefix(key, family+".")
825861
}
@@ -858,19 +894,11 @@ func isArrayOfObjects(val any) bool {
858894
}
859895

860896
func isFlattenedSubfield(key string, schema []FieldDefinition) bool {
861-
for strings.Contains(key, ".") {
862-
i := strings.LastIndex(key, ".")
863-
key = key[:i]
864-
ancestor := FindElementDefinition(key, schema)
865-
if ancestor == nil {
866-
continue
867-
}
868-
if ancestor.Type == "flattened" {
869-
return true
870-
}
871-
}
897+
_, ancestor := findAncestorElementDefinition(key, schema, func(_ string, def *FieldDefinition) bool {
898+
return def.Type == "flattened"
899+
})
872900

873-
return false
901+
return ancestor != nil
874902
}
875903

876904
func findElementDefinitionForRoot(root, searchedKey string, fieldDefinitions []FieldDefinition) *FieldDefinition {
@@ -921,6 +949,22 @@ func findParentElementDefinition(key string, fieldDefinitions []FieldDefinition)
921949
return FindElementDefinition(parentKey, fieldDefinitions)
922950
}
923951

952+
func findAncestorElementDefinition(key string, fieldDefinitions []FieldDefinition, cond func(string, *FieldDefinition) bool) (string, *FieldDefinition) {
953+
for strings.Contains(key, ".") {
954+
i := strings.LastIndex(key, ".")
955+
key = key[:i]
956+
ancestor := FindElementDefinition(key, fieldDefinitions)
957+
if ancestor == nil {
958+
continue
959+
}
960+
if cond(key, ancestor) {
961+
return key, ancestor
962+
}
963+
}
964+
965+
return "", nil
966+
}
967+
924968
// compareKeys checks if `searchedKey` matches with the given `key`. `key` can contain
925969
// wildcards (`*`), that match any sequence of characters in `searchedKey` different to dots.
926970
func compareKeys(key string, def FieldDefinition, searchedKey string) bool {

internal/fields/validate_test.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1048,6 +1048,98 @@ func TestValidateStackVersionsWithEcsMappings(t *testing.T) {
10481048
}
10491049
}
10501050

1051+
func TestSkipLeafOfObject(t *testing.T) {
1052+
schema := []FieldDefinition{
1053+
{
1054+
Name: "foo",
1055+
Type: "keyword",
1056+
},
1057+
{
1058+
Name: "flattened",
1059+
Type: "flattened",
1060+
},
1061+
{
1062+
Name: "object",
1063+
Type: "object",
1064+
},
1065+
{
1066+
Name: "nested",
1067+
Type: "nested",
1068+
},
1069+
{
1070+
Name: "group",
1071+
Type: "group",
1072+
Fields: []FieldDefinition{
1073+
{
1074+
Name: "subgroup",
1075+
Type: "object",
1076+
},
1077+
},
1078+
},
1079+
}
1080+
1081+
cases := []struct {
1082+
name string
1083+
version *semver.Version
1084+
expected bool
1085+
}{
1086+
{
1087+
name: "foo.bar",
1088+
version: semver.MustParse("3.0.0"),
1089+
expected: true,
1090+
},
1091+
{
1092+
name: "subgroup.bar",
1093+
version: semver.MustParse("3.0.0"),
1094+
expected: true,
1095+
},
1096+
{
1097+
name: "foo.bar",
1098+
version: semver.MustParse("3.0.1"),
1099+
expected: false,
1100+
},
1101+
{
1102+
name: "subgroup.bar",
1103+
version: semver.MustParse("3.0.1"),
1104+
expected: false,
1105+
},
1106+
}
1107+
1108+
// Cases we expect to skip depending on the version.
1109+
okRoots := []string{"flattened", "object", "group", "nested"}
1110+
for _, root := range okRoots {
1111+
t.Run("empty root with prefix "+root, func(t *testing.T) {
1112+
for _, c := range cases {
1113+
t.Run(c.name+"_"+c.version.String(), func(t *testing.T) {
1114+
found := skipLeafOfObject("", root+"."+c.name, *c.version, schema)
1115+
assert.Equal(t, c.expected, found)
1116+
})
1117+
}
1118+
})
1119+
t.Run(root, func(t *testing.T) {
1120+
for _, c := range cases {
1121+
t.Run(c.name+"_"+c.version.String(), func(t *testing.T) {
1122+
found := skipLeafOfObject(root, c.name, *c.version, schema)
1123+
assert.Equal(t, c.expected, found)
1124+
})
1125+
}
1126+
})
1127+
}
1128+
1129+
// Cases we never expect to skip.
1130+
notOkRoots := []string{"foo", "notexists", "group.subgroup.other"}
1131+
for _, root := range notOkRoots {
1132+
t.Run("not ok "+root, func(t *testing.T) {
1133+
for _, c := range cases {
1134+
t.Run(c.name+"_"+c.version.String(), func(t *testing.T) {
1135+
found := skipLeafOfObject(root, c.name, *c.version, schema)
1136+
assert.Equal(t, false, found)
1137+
})
1138+
}
1139+
})
1140+
}
1141+
}
1142+
10511143
func readTestResults(t *testing.T, path string) (f results) {
10521144
c, err := os.ReadFile(path)
10531145
require.NoError(t, err)

0 commit comments

Comments
 (0)