@@ -630,8 +630,14 @@ def unique_fields(self) -> list[str]: # type: ignore[reportIncompatibleMethodOv
630630
631631 def validate (self , _ : "QueryRuleData" , __ : RuleMeta ) -> None : # type: ignore[reportIncompatibleMethodOverride]
632632 """Validate an ESQL query while checking TOMLRule."""
633+ # TODO
633634 # temporarily override to NOP until ES|QL query parsing is supported
635+ # if ENV VAR :
636+ # self.remote_validate_rule
637+ # else:
638+ # ESQLRuleData validation
634639
640+ # NOTE will go away
635641 def validate_integration (
636642 self ,
637643 _ : QueryRuleData ,
@@ -652,7 +658,12 @@ def get_rule_integrations(self, contents: TOMLRuleContents) -> list[str]:
652658 return rule_integrations
653659
654660 def prepare_integration_mappings (
655- self , rule_integrations : list [str ], stack_version : str , package_manifests : Any , integration_schemas : Any
661+ self ,
662+ rule_integrations : list [str ],
663+ stack_version : str ,
664+ package_manifests : Any ,
665+ integration_schemas : Any ,
666+ log : Callable [[str ], None ],
656667 ) -> tuple [dict [str , Any ], dict [str , Any ]]:
657668 """Prepare integration mappings for the given rule integrations."""
658669 integration_mappings : dict [str , Any ] = {}
@@ -670,6 +681,14 @@ def prepare_integration_mappings(
670681 for stream in package_schema :
671682 flat_schema = package_schema [stream ]
672683 stream_mappings = utils .flat_schema_to_index_mapping (flat_schema )
684+ nested_multifields = self .find_nested_multifields (stream_mappings )
685+ for field in nested_multifields :
686+ field_name = str (field ).split (".fields." )[0 ].replace ("." , ".properties." ) + ".fields"
687+ log (
688+ f"Warning: Nested multi-field `{ field } ` found in `{ integration } -{ stream } `. "
689+ f"Removing parent field from schema for ES|QL validation."
690+ )
691+ utils .delete_nested_key_from_dict (stream_mappings , field_name )
673692 utils .combine_dicts (integration_mappings , stream_mappings )
674693 index_lookup [f"{ integration } -{ stream } " ] = stream_mappings
675694
@@ -754,8 +773,58 @@ def execute_query_against_indices(
754773 log (f"Got query columns: { ', ' .join (query_column_names )} " )
755774 return query_columns
756775
776+ def find_nested_multifields (self , mapping : dict [str , Any ], path : str = "" ) -> list [Any ]:
777+ """Recursively search for nested multi-fields in Elasticsearch mappings."""
778+ nested_multifields = []
779+
780+ for field , properties in mapping .items ():
781+ current_path = f"{ path } .{ field } " if path else field
782+
783+ if isinstance (properties , dict ):
784+ # Check if the field has a `fields` key
785+ if "fields" in properties :
786+ # Check if any subfield in `fields` also has a `fields` key
787+ for subfield , subproperties in properties ["fields" ].items (): # type: ignore[reportUnknownVariableType]
788+ if isinstance (subproperties , dict ) and "fields" in subproperties :
789+ nested_multifields .append (f"{ current_path } .fields.{ subfield } " ) # type: ignore[reportUnknownVariableType]
790+
791+ # Recurse into subfields
792+ if "properties" in properties :
793+ nested_multifields .extend ( # type: ignore[reportUnknownVariableType]
794+ self .find_nested_multifields (properties ["properties" ], current_path ) # type: ignore[reportUnknownVariableType]
795+ )
796+
797+ return nested_multifields # type: ignore[reportUnknownVariableType]
798+
799+ def get_ecs_schema_mappings (self , current_version : Version ) -> dict [str , Any ]:
800+ """Get the ECS schema in an index mapping format (nested schema) handling scaled floats."""
801+ ecs_version = get_stack_schemas ()[str (current_version )]["ecs" ]
802+ ecs_schemas = ecs .get_schemas ()
803+ ecs_schema_flattened : dict [str , Any ] = {}
804+ ecs_schema_scaled_floats : dict [str , Any ] = {}
805+ for index , info in ecs_schemas [ecs_version ]["ecs_flat" ].items ():
806+ if info ["type" ] == "scaled_float" :
807+ ecs_schema_scaled_floats .update ({index : info ["scaling_factor" ]})
808+ ecs_schema_flattened .update ({index : info ["type" ]})
809+ ecs_schema = utils .convert_to_nested_schema (ecs_schema_flattened )
810+ for index , info in ecs_schema_scaled_floats .items ():
811+ parts = index .split ("." )
812+ current = ecs_schema
813+
814+ # Traverse the ecs_schema to the correct nested dictionary
815+ for part in parts [:- 1 ]: # Traverse all parts except the last one
816+ current = current .setdefault (part , {}).setdefault ("properties" , {})
817+
818+ current [parts [- 1 ]].update ({"scaling_factor" : info })
819+ return ecs_schema
820+
757821 def prepare_mappings (
758- self , elastic_client : Elasticsearch , indices : list [str ], stack_version : str , contents : TOMLRuleContents
822+ self ,
823+ elastic_client : Elasticsearch ,
824+ indices : list [str ],
825+ stack_version : str ,
826+ contents : TOMLRuleContents ,
827+ log : Callable [[str ], None ],
759828 ) -> tuple [dict [str , Any ], dict [str , Any ], dict [str , Any ]]:
760829 """Prepare index mappings for the given indices and rule integrations."""
761830 existing_mappings , index_lookup = misc .get_existing_mappings (elastic_client , indices )
@@ -768,7 +837,7 @@ def prepare_mappings(
768837 integration_schemas = load_integrations_schemas ()
769838
770839 integration_mappings , integration_index_lookup = self .prepare_integration_mappings (
771- rule_integrations , stack_version , package_manifests , integration_schemas
840+ rule_integrations , stack_version , package_manifests , integration_schemas , log
772841 )
773842
774843 index_lookup .update (integration_index_lookup )
@@ -789,6 +858,12 @@ def prepare_mappings(
789858 raise ValueError ("No mappings found" )
790859 index_lookup .update ({"rule-non-ecs-index" : non_ecs_mapping })
791860
861+ # Load ECS in an index mapping format (nested schema)
862+ current_version = Version .parse (load_current_package_version (), optional_minor_and_patch = True )
863+ ecs_schema = self .get_ecs_schema_mappings (current_version )
864+
865+ index_lookup .update ({"rule-ecs-index" : ecs_schema })
866+
792867 return existing_mappings , index_lookup , combined_mappings
793868
794869 def remote_validate_rule (
@@ -815,18 +890,21 @@ def log(val: str) -> None:
815890
816891 # Get mappings for all matching existing index templates
817892 existing_mappings , index_lookup , combined_mappings = self .prepare_mappings (
818- elastic_client , indices , stack_version , contents
893+ elastic_client , indices , stack_version , contents , log
819894 )
820895 log (f"Collected mappings: { len (existing_mappings )} " )
821896 log (f"Combined mappings prepared: { len (combined_mappings )} " )
822897
823898 # Create remote indices
824899 full_index_str = self .create_remote_indices (elastic_client , existing_mappings , index_lookup , log )
900+ utils .combine_dicts (combined_mappings , index_lookup ["rule-non-ecs-index" ])
901+ utils .combine_dicts (combined_mappings , index_lookup ["rule-ecs-index" ])
825902
826903 # Replace all sources with the test indices
827904 query = contents .data .query # type: ignore[reportUnknownVariableType]
828905 query = query .replace (indices_str , full_index_str ) # type: ignore[reportUnknownVariableType]
829906
907+ # TODO these query_columns are the unique fields
830908 query_columns = self .execute_query_against_indices (elastic_client , query , full_index_str , log ) # type: ignore[reportUnknownVariableType]
831909
832910 # Validate that all fields (columns) are either dynamic fields or correctly mapped
0 commit comments