diff --git a/src/ontoweaver/tabular.py b/src/ontoweaver/tabular.py index 10767eb..99eaebc 100644 --- a/src/ontoweaver/tabular.py +++ b/src/ontoweaver/tabular.py @@ -269,154 +269,127 @@ def make_edge(self, edge_t, id_target, id_source, properties): """ return edge_t(id_source=id_source, id_target=id_target, properties=properties) - def run(self): - """Iterate through dataframe in parallel and map cell values according to YAML file, using a list of transformers.""" + # ========================== + # Helper functions for run + # ========================== - # Thread-safe containers with their respective locks - self._nodes = [] - self._edges = [] - self._errors = [] - self._nodes_lock = threading.Lock() - self._edges_lock = threading.Lock() - self._errors_lock = threading.Lock() - self._row_lock = threading.Lock() - self._transformations_lock = threading.Lock() - self._local_nb_nodes_lock = threading.Lock() + def _make_default_source_node_id(self, row, i, local_nodes, local_errors): + """ + Helper function to create the default source node id for each row. Referred to as default because of possibility + of it changing type with `from_subject` attribute in transformers. + """ - # Function to process a single row and collect operations - def process_row(row_data): - i, row = row_data - local_nodes = [] - local_edges = [] - local_errors = [] - local_rows = 0 - local_transformations = 0 - local_nb_nodes = 0 + logger.debug("\tLabelMaker subject node:") + subject_generator_list = list(self.subject_transformer(row, i)) + if (len(subject_generator_list) > 1): + local_errors.append(self.error(f"You cannot use a transformer yielding multiple IDs as a subject. " + f"Subject Transformer `{self.subject_transformer}` produced multiple IDs: " + f"{subject_generator_list}", indent=2, + exception=exceptions.TransformerInterfaceError)) + + source_id, subject_edge, subject_node = subject_generator_list[0] + + if self.subject_transformer.final_type: + # If a final_type attribute is present in the transformer, use it as the source node type, instead + # of the default type. + subject_node = self.subject_transformer.final_type + + if source_id: + source_node_id = self.make_id(subject_node.__name__, source_id) + + if source_node_id: + logger.debug(f"\t\tDeclared subject ID: {source_node_id}") + local_nodes.append(self.make_node(node_t=subject_node, id=source_node_id, + # FIXME: Should we use the meta-way of accessing node properties as well? + # FIXME: This would require a refactoring of the transformer interfaces and tabular.run. + properties=self.properties(self.subject_transformer.properties_of, + row, i, subject_edge, subject_node, + node=True))) + else: + local_errors.append(self.error(f"Failed to declare subject ID for row #{i}: `{row}`.", + indent=2, exception=exceptions.DeclarationError)) - logger.debug(f"Process row {i}...") - local_rows += 1 - # There can be only one subject, so transformers yielding multiple IDs cannot be used. - logger.debug("\tLabelMaker subject node:") - subject_generator_list = list(self.subject_transformer(row, i)) - if (len(subject_generator_list) > 1): - local_errors.append(self.error(f"You cannot use a transformer yielding multiple IDs as a subject. " - f"Subject Transformer `{self.subject_transformer}` produced multiple IDs: " - f"{subject_generator_list}", indent=2, exception = exceptions.TransformerInterfaceError)) - - source_id, subject_edge, subject_node = subject_generator_list[0] - - if self.subject_transformer.final_type: - # If a final_type attribute is present in the transformer, use it as the source node type, instead + return source_node_id + + else: + local_errors.append(self.error(f"No valid source node identifier from {self.subject_transformer} for {i}th row." + f" This row will be skipped.", indent=2, exception = exceptions.TransformerDataError)) + + def _make_target_node_id(self, row, i, transformer, j, target_id, target_edge, target_node, local_nodes, local_errors): + """ + Helper function to create the target node id for each target transformer. + """ + + if target_id and target_edge and target_node: + + if transformer.final_type: + # If a final_type attribute is present in the transformer, use it as the target node type, instead # of the default type. - subject_node = self.subject_transformer.final_type - - if source_id: - source_node_id = self.make_id(subject_node.__name__, source_id) - - if source_node_id: - logger.debug(f"\t\tDeclared subject ID: {source_node_id}") - local_nodes.append(self.make_node(node_t=subject_node, id=source_node_id, - # FIXME: Should we use the meta-way of accessing node properties as well? - # FIXME: This would require a refactoring of the transformer interfaces and tabular.run. - properties=self.properties(self.subject_transformer.properties_of, - row, i, subject_edge, subject_node, - node=True))) - else: - local_errors.append(self.error(f"Failed to declare subject ID for row #{i}: `{row}`.", - indent=2, exception = exceptions.DeclarationError)) - - # Loop over list of transformer instances and label_maker corresponding nodes and edges. - # FIXME the transformer variable here shadows the transformer module. - for j,transformer in enumerate(self.transformers): - local_transformations += 1 - logger.debug(f"\tCalling transformer: {transformer}...") - for target_id, target_edge, target_node in transformer(row, i): - local_nb_nodes += 1 - if target_id and target_edge and target_node: - - if transformer.final_type: - # If a final_type attribute is present in the transformer, use it as the target node type, instead - # of the default type. - target_node = transformer.final_type - - target_node_id = self.make_id(target_node.__name__, target_id) - logger.debug(f"\t\tMake node {target_node_id}") - local_nodes.append(self.make_node(node_t=target_node, id=target_node_id, - # FIXME: Should we use the meta-way of accessing node properties as well? - # FIXME: This would require a refactoring of the transformer interfaces and tabular.run. - properties=self.properties(transformer.properties_of, row, - i, target_edge, target_node, node=True))) - - # If a `from_subject` attribute is present in the transformer, loop over the transformer - # list to find the transformer instance mapping to the correct type, and then label_maker new - # subject id. - - # FIXME add hook functions to be overloaded. - - # FIXME: Make from_subject reference a list of subjects instead of using the add_edge function. - - if hasattr(transformer, "from_subject"): - - found_valid_subject = False - - for t in self.transformers: - if transformer.from_subject == t.target_type: - found_valid_subject = True - for s_id, s_edge, s_node in t(row, i): - if s_id and s_edge and s_node: - if t.final_type: - s_node = t.final_type - subject_id = s_id - subject_node_id = self.make_id(t.target_type, subject_id) - logger.debug( - f"\t\tMake edge from {subject_node_id} toward {target_node_id}") - local_edges.append( - self.make_edge(edge_t=target_edge, id_source=subject_node_id, - id_target=target_node_id, - properties=self.properties(target_edge.fields(), - row, i, s_edge, s_node))) - - else: - local_errors.append(self.error( - f"No valid identifiers from {t} for {i}th row, when trying to change default subject type", - f"by {transformer} with `from_subject` attribute.", - indent=7, section="transformers", index=j, - exception=exceptions.TransformerDataError)) - continue - - else: - # The transformer instance type does not match the type in the `from_subject` attribute. - continue - - if not found_valid_subject: - local_errors.append(self.error(f"\t\t\tInvalid subject declared from {transformer}." - f" The subject you declared in the `from_subject` directive: `" - f"{transformer.from_subject}` must not be the same as the default subject type.", - exception=exceptions.ConfigError)) - - - else: # no attribute `from_subject` in `transformer` - logger.debug(f"\t\tMake edge {target_edge.__name__} from {source_node_id} toward {target_node_id}") - local_edges.append(self.make_edge(edge_t=target_edge, id_target=target_node_id, - id_source=source_node_id, - properties=self.properties(target_edge.fields(), - row, i, target_edge, target_node))) - else: - local_errors.append(self.error(f"No valid target node identifier from {transformer}" - f" for {i}th row.", indent=2, section="transformers", - index=j, exception = exceptions.TransformerDataError)) - continue + target_node = transformer.final_type + + target_node_id = self.make_id(target_node.__name__, target_id) + logger.debug(f"\t\tMake node {target_node_id}") + local_nodes.append(self.make_node(node_t=target_node, id=target_node_id, + # FIXME: Should we use the meta-way of accessing node properties as well? + # FIXME: This would require a refactoring of the transformer interfaces and tabular.run. + properties=self.properties(transformer.properties_of, row, + i, target_edge, target_node, node=True))) + + else: + local_errors.append(self.error(f"No valid target node identifier from {transformer}" + f" for {i}th row.", indent=2, section="transformers", + index=j, exception=exceptions.TransformerDataError)) + + target_node_id = None + + return target_node_id + + def _make_alternative_source_node_id(self, row, i, transformer, j, target_node_id, target_edge, local_edges, local_errors): + """ + Helper function to create an alternative source node id, in case the target transformer has a `from_subject` attribute. + """ + + found_valid_subject = False + + for t in self.transformers: + if transformer.from_subject == t.target_type: + found_valid_subject = True + for s_id, s_edge, s_node in t(row, i): + if s_id and s_edge and s_node: + if t.final_type: + s_node = t.final_type + subject_id = s_id + subject_node_id = self.make_id(t.target_type, subject_id) + logger.debug( + f"\t\tMake edge from {subject_node_id} toward {target_node_id}") + local_edges.append( + self.make_edge(edge_t=target_edge, id_source=subject_node_id, + id_target=target_node_id, + properties=self.properties(target_edge.fields(), + row, i, s_edge, s_node))) + + else: + local_errors.append(self.error( + f"No valid identifiers from {t} for {i}th row, when trying to change default subject type", + f"by {transformer} with `from_subject` attribute.", + indent=7, section="transformers", index=j, + exception=exceptions.TransformerDataError)) + continue + else: - local_errors.append(self.error(f"No valid source node identifier from {self.subject_transformer} for {i}th row." - f" This row will be skipped.", indent=2, exception = exceptions.TransformerDataError)) - return local_nodes, local_edges, local_errors, local_rows, local_transformations, local_nb_nodes + # The transformer instance type does not match the type in the `from_subject` attribute. + continue - return local_nodes, local_edges, local_errors, local_rows, local_transformations, local_nb_nodes - # End of process_row local function + if not found_valid_subject: + local_errors.append(self.error(f"\t\t\tInvalid subject declared from {transformer}." + f" The subject you declared in the `from_subject` directive: `" + f"{transformer.from_subject}` must not be the same as the default subject type.", + exception=exceptions.ConfigError)) - nb_rows = 0 - nb_transformations = 0 - nb_nodes = 0 + def _run_final_logs(self, process_row, nb_rows, nb_transformations, nb_nodes): + """ + Perform the final logging after processing all rows. + """ if self.parallel_mapping > 0: logger.info(f"Processing dataframe in parallel. Number of workers set to: {self.parallel_mapping} ...") @@ -475,6 +448,88 @@ def process_row(row_data): logger.info( f"Performed {nb_transformations} transformations with {1+len(self.transformers)} node transformers, producing {nb_nodes} nodes for {nb_rows} rows.") + # ============= + # Run function + # ============= + + def run(self): + """Iterate through dataframe in parallel and map cell values according to YAML file, using a list of transformers.""" + + # Thread-safe containers with their respective locks + self._nodes = [] + self._edges = [] + self._errors = [] + self._nodes_lock = threading.Lock() + self._edges_lock = threading.Lock() + self._errors_lock = threading.Lock() + self._row_lock = threading.Lock() + self._transformations_lock = threading.Lock() + self._local_nb_nodes_lock = threading.Lock() + + # Function to process a single row and collect operations + def process_row(row_data): + i, row = row_data + local_nodes = [] + local_edges = [] + local_errors = [] + local_rows = 0 + local_transformations = 0 # Count of transformations for this row. Does not include the property transformers. + local_nb_nodes = 0 + + logger.debug(f"Process row {i}...") + local_rows += 1 + + source_node_id = self._make_default_source_node_id(row, i, local_nodes, local_errors) + + if source_node_id: + local_nb_nodes += 1 + + # Loop over list of transformer instances and label_maker corresponding nodes and edges. + # FIXME the transformer variable here shadows the transformer module. + for j,transformer in enumerate(self.transformers): + local_transformations += 1 + logger.debug(f"\tCalling transformer: {transformer}...") + for target_id, target_edge, target_node in transformer(row, i): + target_node_id = self._make_target_node_id(row, i, transformer, j, target_id, target_edge, + target_node, local_nodes, local_errors) + + #If no valid target node id was created, an error is logged in the `_make_target_node_id` function, + #and we move to the next iteration of the loop. + if target_node_id is None: + continue + else: + local_nb_nodes += 1 + + # If a `from_subject` attribute is present in the transformer, loop over the transformer + # list to find the transformer instance mapping to the correct type, and then label_maker new + # subject id. + + # FIXME add hook functions to be overloaded. + + # FIXME: Make from_subject reference a list of subjects instead of using the add_edge function. + + if hasattr(transformer, "from_subject"): + + self._make_alternative_source_node_id( + row, i, transformer, j, target_node_id, target_edge, local_edges, local_errors) + + + else: # no attribute `from_subject` in `transformer` + logger.debug(f"\t\tMake edge {target_edge.__name__} from {source_node_id} toward {target_node_id}") + local_edges.append(self.make_edge(edge_t=target_edge, id_target=target_node_id, + id_source=source_node_id, + properties=self.properties(target_edge.fields(), + row, i, target_edge, target_node))) + + return local_nodes, local_edges, local_errors, local_rows, local_transformations, local_nb_nodes + # End of process_row local function + + nb_rows = 0 + nb_transformations = 0 + nb_nodes = 0 + + self._run_final_logs(process_row, nb_rows, nb_transformations, nb_nodes) + def extract_table(df: pd.DataFrame, config: dict, parallel_mapping = 0, module = types, affix = "suffix", separator = ":", validate_output = False, raise_errors = True): """ @@ -528,6 +583,8 @@ def __init__(self, super().__init__(raise_errors) self.module = module + + def make_node_class(self, name, properties={}, base=base.Node): """ LabelMaker a node class with the given name and properties. @@ -704,10 +761,37 @@ def __init__(self, config: dict, module=types, validate_output = False, raise_er super().__init__(module, raise_errors = raise_errors) self.config = config self.validate_output = validate_output + if not self.validate_output: + logger.info( + f"Transformer output validation will be skipped. This could result in some empty or `nan` nodes in your knowledge graph." + f" To enable output validation set `validate_output` to `True`.") logger.debug(f"Classes will be created in module '{self.module}'") + + def _get_input_validation_rules(self,): + """ + Extract input data validation schema from yaml file and instantiate a Pandera DataFrameSchema object and validator. + """ + k_validate = ["validate"] + validation_rules = self.get(k_validate) + yaml_validation_rules = yaml.dump(validation_rules, default_flow_style=False) + validator = None + + try: + validation_schema = pa.DataFrameSchema.from_yaml(yaml_validation_rules) + validator = validate.InputValidator(validation_schema, raise_errors=self.raise_errors) + except Exception as e: + self.error(f"Failed to parse the input validation schema: {e}", exception=exceptions.ConfigError) + + return validator + + + # ==================================================================== + # Getter functions for retrieval of dictionary items from YAML config + # ==================================================================== + def get_not(self, keys, pconfig=None): """ Get the first dictionary (key, item) not matching any of the passed keys. @@ -745,6 +829,11 @@ def get(self, keys, pconfig=None): return pconfig[k] return None + + # ================================================================ + # Helper Functions for both Target, Subject and Property parsing + # ================================================================ + def _extract_metadata(self, k_metadata_column, metadata_list, metadata, types, columns): """ Extract metadata and update the metadata dictionary. @@ -804,24 +893,6 @@ def _extract_metadata(self, k_metadata_column, metadata_list, metadata, types, c else: return None - - def _get_input_validation_rules(self,): - """ - Extract input data validation schema from yaml file and instantiate a Pandera DataFrameSchema object and validator. - """ - k_validate = ["validate"] - validation_rules = self.get(k_validate) - yaml_validation_rules = yaml.dump(validation_rules, default_flow_style=False) - validator = None - - try: - validation_schema = pa.DataFrameSchema.from_yaml(yaml_validation_rules) - validator = validate.InputValidator(validation_schema, raise_errors=self.raise_errors) - except Exception as e: - self.error(f"Failed to parse the input validation schema: {e}", exception=exceptions.ConfigError) - - return validator - def _make_output_validator(self, output_validation_rules = None): """ LabelMaker a validator for the output of a transformer. @@ -924,6 +995,11 @@ def _make_branching_dict(self, subject: bool, match_parser, properties_of, metad 'final_type': final_type_class if final_type_class else alt_final_type_class } + + # ============================================================ + # Property parsing + # ============================================================ + def parse_properties(self, properties_of, possible_subject_types, transformers_list): """ Parse the properties of the transformers defined in the YAML mapping, and update the properties_of dictionary. @@ -979,6 +1055,10 @@ def parse_properties(self, properties_of, possible_subject_types, transformers_l return properties_of + # ============================================================ + # Subject parsing + # ============================================================ + def parse_subject(self, properties_of, transformers_list, metadata_list, metadata): """ Parse the subject transformer and its properties from the YAML mapping. @@ -1071,6 +1151,109 @@ def parse_subject(self, properties_of, transformers_list, metadata_list, metadat return possible_subject_types, subject_transformer, source_t, subject_columns + # ============================================================ + # Target Transformer Parsing and Helper Functions + # ============================================================ + + def _make_target_label_maker(self, target, edge, gen_data, columns, transformer_index, multi_type_dictionary): + + label_maker = None + + if (target and not edge) or (edge and not target): + self.error(f"Cannot declare the mapping `{columns}` => `{edge}` (target: `{target}`), " + f"missing either an object or a relation.", "transformers", transformer_index, + indent=2, exception=exceptions.MissingDataError) + elif multi_type_dictionary and "match_type_from_column" in gen_data and not target and not edge: + label_maker = make_labels.MultiTypeOnColumnLabelMaker(raise_errors=self.raise_errors, + match_type_from_column=gen_data["match_type_from_column"]) + + elif multi_type_dictionary and not target and not edge and "match_type_from_column" not in gen_data: + label_maker = make_labels.MultiTypeLabelMaker(raise_errors=self.raise_errors) + else: + label_maker = make_labels.SimpleLabelMaker(raise_errors = self.raise_errors) + + return label_maker + + def _check_target_sanity(self, transformer_keyword_dict, transformer_type, transformer_index): + """ + Preforms sanity checks on target transformer before assigning the target, edge and subject variables, and connected + columns. + """ + + if not transformer_keyword_dict: + self.error(f"No keywords declared for target transformer `{transformer_type}` at index" + f" `{transformer_index}`.", section="transformers", + exception=exceptions.ParsingDeclarationsError) + + elif any(field in transformer_keyword_dict for field in self.k_properties): + if any(field in transformer_keyword_dict for field in self.k_target): + prop = self.get(self.k_properties, transformer_keyword_dict) + target = self.get(self.k_target, transformer_keyword_dict) + self.error(f"ERROR in transformer '{transformer_type}', at index `{transformer_index}`: one cannot " + f"declare a mapping to both properties '{prop}' and object type '{target}'.", "transformers", + transformer_index, exception=exceptions.CardinalityError) + + elif type(transformer_keyword_dict) != dict: + self.error(str(transformer_keyword_dict) + " is not a dictionary", + exception=exceptions.ParsingDeclarationsError) + + else: + columns = self.get(self.k_columns, pconfig=transformer_keyword_dict) + if type(columns) != list: + logger.debug(f"\tDeclared singular column") + # The rowIndex transformer is a special case, where the column does not need to be defined in the mapping. + # FIXME: In next refactoring do not assert `rowIndex` transformer name, in order to have a generic implementation. (ref: https://github.com/oncodash/ontoweaver/pull/153) + if transformer_type != "rowIndex": + assert (type(columns) == str) + columns = [columns] + + target = self.get(self.k_target, pconfig=transformer_keyword_dict) + if type(target) == list: + self.error( + f"You cannot declare multiple objects in transformers. For transformer `{transformer_type}`.", + section="transformers", index=transformer_index, indent=1, exception=exceptions.CardinalityError) + + subject = self.get(self.k_subject, pconfig=transformer_keyword_dict) + if type(subject) == list: + self.error( + f"You cannot declare multiple subjects in transformers. For transformer `{transformer_type}`.", + section="transformers", index=transformer_index, indent=1, exception=exceptions.CardinalityError) + + edge = self.get(self.k_edge, pconfig=transformer_keyword_dict) + if type(edge) == list: + self.error( + f"You cannot declare multiple relations in transformers. For transformer `{transformer_type}`.", + section="transformers", index=transformer_index, indent=1, exception=exceptions.CardinalityError) + + return columns, target, edge, subject + + + def _make_target_classes(self, target, properties_of, edge, source_t, final_type_class, possible_target_types, possible_edge_types, multi_type_dictionary): + """ + Helper function to create the target and edge classes for a target transformer, and store them in the multi_type_dictionary. + """ + + logger.debug(f"\tDeclare node .target for `{target}`...") + target_t = self.make_node_class(target, properties_of.get(target, {})) + possible_target_types.add(target) + logger.debug(f"\t\tDeclared target for `{target}`: {target_t.__name__}") + + logger.debug(f"\tDeclare edge for `{edge}`...") + edge_t = self.make_edge_class(edge, source_t, target_t, properties_of.get(edge, {})) + possible_edge_types.add(edge_t.__name__) + + # "None" key is used to return any type of string, in case no branching is needed. + multi_type_dictionary['None'] = { + 'to_object': target_t, + 'via_relation': edge_t, + 'final_type': final_type_class + } + + return edge_t, target_t + + # ============================================================ + # Target parsing + # ============================================================ def parse_targets(self, transformers_list, properties_of, source_t, metadata_list, metadata): """ @@ -1082,157 +1265,82 @@ def parse_targets(self, transformers_list, properties_of, source_t, metadata_lis possible_edge_types = set() logger.debug(f"Declare types...") - for n_transformer,transformer_types in enumerate(transformers_list): - for transformer_type, field_dict in transformer_types.items(): - if not field_dict: - continue - elif any(field in field_dict for field in self.k_properties): - if any(field in field_dict for field in self.k_target): - prop = self.get(self.k_properties, field_dict) - target = self.get(self.k_target, field_dict) - self.error(f"ERROR in transformer '{transformer_type}': one cannot " - f"declare a mapping to both properties '{prop}' and object type '{target}'.", "transformers", - n_transformer, exception = exceptions.CardinalityError) + # Iterate through the list of target transformers and extract the information needed to create the + # corresponding classes. + for transformer_index, target_transformer_yaml_dict in enumerate(transformers_list): + for transformer_type, transformer_keyword_dict in target_transformer_yaml_dict.items(): + + target_branching = False + + elements = self._check_target_sanity(transformer_keyword_dict, transformer_type, transformer_index) + if elements is None: continue else: - if type(field_dict) != dict: - self.error(str(field_dict)+" is not a dictionary", exception = exceptions.ParsingDeclarationsError) - - columns = self.get(self.k_columns, pconfig=field_dict) - if type(columns) != list: - logger.debug(f"\tDeclared singular column") - # The rowIndex transformer is a special case, where the column does not need to be defined in the mapping. - # FIXME: In next refactoring do not assert `rowIndex` transformer name, in order to have a generic implementation. (ref: https://github.com/oncodash/ontoweaver/pull/153) - if transformer_type != "rowIndex": - assert(type(columns) == str) - columns = [columns] - - target = self.get(self.k_target, pconfig=field_dict) - if type(target) == list: - self.error(f"You cannot declare multiple objects in transformers. For transformer `{transformer_type}`.", - section="transformers", index=n_transformer, indent=1, exception = exceptions.CardinalityError) - - subject = self.get(self.k_subject, pconfig=field_dict) - if type(subject) == list: - self.error(f"You cannot declare multiple subjects in transformers. For transformer `{transformer_type}`.", - section="transformers", index=n_transformer, indent=1, exception = exceptions.CardinalityError) - - edge = self.get(self.k_edge, pconfig=field_dict) - if type(edge) == list: - self.error(f"You cannot declare multiple relations in transformers. For transformer `{transformer_type}`.", - section="transformers", index=n_transformer, indent=1, exception = exceptions.CardinalityError) - - gen_data = self.get_not(self.k_target + self.k_edge + self.k_columns + self.k_final_type, pconfig=field_dict) - - # Extract the final type if defined in the mapping. - final_type = self.get(self.k_final_type, pconfig=field_dict) - - final_type_class = self._extract_final_type_class(final_type, possible_target_types, metadata, - metadata_list, columns, properties_of) - - # Harmonize the use of the `from_subject` and `from_source` synonyms in the configuration, because - # from_subject` is used in the transformer class to refer to the source node type. - if 'from_source' in gen_data: - gen_data['from_subject'] = gen_data['from_source'] - del gen_data['from_source'] - - multi_type_dictionary = {} - - if "match" in gen_data: - - self._make_branching_dict(subject=False, match_parser=gen_data["match"], - properties_of=properties_of, metadata_list=metadata_list, - metadata=metadata, - columns=columns, final_type_class=final_type_class, - multi_type_dictionary=multi_type_dictionary, - possible_node_types=possible_target_types, - possible_edge_types=possible_edge_types) + columns, target, edge, subject = elements + + gen_data = self.get_not(self.k_target + self.k_edge + self.k_columns + self.k_final_type, pconfig=transformer_keyword_dict) + # Extract the final type if defined in the mapping. + final_type = self.get(self.k_final_type, pconfig=transformer_keyword_dict) + final_type_class = self._extract_final_type_class(final_type, possible_target_types, metadata, + metadata_list, columns, properties_of) + + # Harmonize the use of the `from_subject` and `from_source` synonyms in the configuration, because + # from_subject` is used in the transformer class to refer to the source node type. + if 'from_source' in gen_data: + gen_data['from_subject'] = gen_data['from_source'] + del gen_data['from_source'] + + multi_type_dictionary = {} + + #The target transformer is a simple transformer if it does not have a `match` clause. We create a simple multi_type_dictionary, + #with a "None" key, to indicate that no branching is needed. + if target and edge: + edge_t, target_t = self._make_target_classes(target, properties_of, edge, source_t, final_type_class, possible_target_types, possible_edge_types, multi_type_dictionary) # Parse the validation rules for the output of the transformer. Each transformer gets its own # instance of the OutputValidator with (at least) the default output validation rules. - output_validation_rules = self.get(self.k_validate_output, pconfig=field_dict) + output_validation_rules = self.get(self.k_validate_output, pconfig=transformer_keyword_dict) output_validator = self._make_output_validator(output_validation_rules) - - if target and edge: - logger.debug(f"\tDeclare node .target for `{target}`...") - target_t = self.make_node_class(target, properties_of.get(target, {})) - possible_target_types.add(target) - logger.debug(f"\t\tDeclared target for `{target}`: {target_t.__name__}") - if subject: - logger.debug(f"\tDeclare subject for `{subject}`...") - subject_t = self.make_node_class(subject, properties_of.get(subject, {})) - possible_target_types.add(subject_t.__name__) - edge_t = self.make_edge_class(edge, subject_t, target_t, properties_of.get(edge, {})) - possible_edge_types.add(edge_t.__name__) - else: - logger.debug(f"\tDeclare edge for `{edge}`...") - edge_t = self.make_edge_class(edge, source_t, target_t, properties_of.get(edge, {})) - possible_edge_types.add(edge_t.__name__) - - # "None" key is used to return any type of string, in case no branching is needed. - multi_type_dictionary['None'] = { - 'to_object': target_t, - 'via_relation': edge_t, - 'final_type': final_type_class - } - - # Parse the validation rules for the output of the transformer. Each transformer gets its own - # instance of the OutputValidator with (at least) the default output validation rules. - output_validation_rules = self.get(self.k_validate_output, pconfig=field_dict) - output_validator = self._make_output_validator(output_validation_rules) - - logger.debug(f"\tDeclare transformer `{transformer_type}`...") - target_transformer = self.make_transformer_class(transformer_type=transformer_type, - multi_type_dictionary=multi_type_dictionary, - properties=properties_of.get(target, {}), - columns=columns, - output_validator=output_validator, - label_maker=make_labels.SimpleLabelMaker( - raise_errors=self.raise_errors), - raise_errors=self.raise_errors, **gen_data) - transformers.append(target_transformer) - logger.debug(f"\t\tDeclared mapping `{columns}` => `{edge_t.__name__}`") - elif (target and not edge) or (edge and not target): - self.error(f"Cannot declare the mapping `{columns}` => `{edge}` (target: `{target}`), " - f"missing either an object or a relation.", "transformers", n_transformer, - indent=2, exception = exceptions.MissingDataError) - - - elif multi_type_dictionary and "match_type_from_column" in gen_data and not target and not edge: - target_transformer = self.make_transformer_class(transformer_type=transformer_type, - multi_type_dictionary=multi_type_dictionary, - branching_properties=properties_of, - columns=columns, - output_validator=output_validator, - label_maker=make_labels.MultiTypeOnColumnLabelMaker( - raise_errors=self.raise_errors, match_type_from_column=gen_data["match_type_from_column"]), - raise_errors=self.raise_errors, **gen_data) - - transformers.append(target_transformer) - - elif multi_type_dictionary and not target and not edge and "match_type_from_column" not in gen_data: - target_transformer = self.make_transformer_class(transformer_type=transformer_type, - multi_type_dictionary=multi_type_dictionary, - branching_properties=properties_of, - columns=columns, - output_validator=output_validator, - label_maker=make_labels.MultiTypeLabelMaker( - raise_errors=self.raise_errors), - raise_errors=self.raise_errors, **gen_data) - - transformers.append(target_transformer) - - # Declare the metadata for the target and edge types. - - extracted_metadata = self._extract_metadata(self.k_metadata_column, metadata_list, metadata, target, columns) + logger.debug(f"\tDeclare transformer `{transformer_type}`...") + + #The target transformer is a branching transformer if it has a `match` clause. We create a branching dictionary. + #The keys of the dictionary are the regex patterns to be matched against the extracted value of the column. + if "match" in gen_data: + + target_branching = True + self._make_branching_dict(subject=False, match_parser=gen_data["match"], + properties_of=properties_of, metadata_list=metadata_list, + metadata=metadata, + columns=columns, final_type_class=final_type_class, + multi_type_dictionary=multi_type_dictionary, + possible_node_types=possible_target_types, + possible_edge_types=possible_edge_types) + # Parse the validation rules for the output of the transformer. Each transformer gets its own + # instance of the OutputValidator with (at least) the default output validation rules. + output_validation_rules = self.get(self.k_validate_output, pconfig=transformer_keyword_dict) + output_validator = self._make_output_validator(output_validation_rules) + + label_maker = self._make_target_label_maker(target, edge, gen_data, columns, transformer_index, multi_type_dictionary) + target_transformer = self.make_transformer_class(transformer_type=transformer_type, + multi_type_dictionary=multi_type_dictionary, + branching_properties=properties_of if target_branching else None, + properties=properties_of.get(target_t.__name__, + {}) if not target_branching else None, + columns=columns, + output_validator=output_validator, + label_maker=label_maker, + raise_errors=self.raise_errors, **gen_data) + transformers.append(target_transformer) + # Declare the metadata for the target and edge types. + + extracted_metadata = self._extract_metadata(self.k_metadata_column, metadata_list, metadata, target, columns) + if extracted_metadata: + metadata.update(extracted_metadata) + if edge: + extracted_metadata = self._extract_metadata(self.k_metadata_column, metadata_list, metadata, edge, None) if extracted_metadata: metadata.update(extracted_metadata) - if edge: - extracted_metadata = self._extract_metadata(self.k_metadata_column, metadata_list, metadata, edge, None) - if extracted_metadata: - metadata.update(extracted_metadata) - return transformers, possible_target_types, possible_edge_types diff --git a/src/ontoweaver/validate.py b/src/ontoweaver/validate.py index df10c0d..1a6632e 100644 --- a/src/ontoweaver/validate.py +++ b/src/ontoweaver/validate.py @@ -193,8 +193,5 @@ def __init__(self, validation_rules = None, raise_errors = True): def __call__(self, val): - logger.info(f"Transformer output validation skipped. This could result in some empty or `nan` nodes in your knowledge graph." - f" To enable output validation set validate_output to True.") - return True