diff --git a/.gitignore b/.gitignore index f487cae..fdd70f4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +/build/* /venv/ /.idea/ /.vscode/ diff --git a/hypergraphx/__init__.py b/hypergraphx/__init__.py index a1a5ba0..f4f48db 100644 --- a/hypergraphx/__init__.py +++ b/hypergraphx/__init__.py @@ -1,5 +1,8 @@ -from hypergraphx.core.directed_hypergraph import DirectedHypergraph +from hypergraphx.core.i_hypergraph import IHypergraph +from hypergraphx.core.i_undirected_hypergraph import IUndirectedHypergraph + from hypergraphx.core.hypergraph import Hypergraph +from hypergraphx.core.directed_hypergraph import DirectedHypergraph from hypergraphx.core.multiplex_hypergraph import MultiplexHypergraph from hypergraphx.core.temporal_hypergraph import TemporalHypergraph from . import readwrite diff --git a/hypergraphx/core/I_hypergraph.py b/hypergraphx/core/I_hypergraph.py new file mode 100644 index 0000000..fe43b2d --- /dev/null +++ b/hypergraphx/core/I_hypergraph.py @@ -0,0 +1,809 @@ +from abc import ABC, abstractmethod +from typing import Dict, List, Set, Tuple, Union, Optional, Any +import copy +import numpy as np +from sklearn.preprocessing import LabelEncoder +from collections import Counter + + +class IHypergraph(ABC): + """ + Abstract base class defining the common interface for all hypergraph implementations. + + This class specifies the required properties and methods that must be implemented + by all hypergraph types: Hypergraph, TemporalHypergraph, MultiplexHypergraph, and DirectedHypergraph. + + Contains functionality common to both directed and undirected hypergraphs. + """ + + def __init__( + self, + edge_list: Optional[List]=None, + weighted: bool = False, + weights:Optional[List[int]]=None, + hypergraph_metadata: Optional[Dict] = None, + node_metadata: Optional[Dict] = None, + edge_metadata: Optional[List[Dict]] = None + ): + """ + Initialize a Hypergraph. + + Parameters + ---------- + edge_list : list, optional + A list of hyperedges. Format varies by implementation. + weighted : bool, optional + Indicates whether the hypergraph is weighted. Default is False. + weights : list of floats, optional + A list of weights corresponding to each edge in `edge_list`. Required if `weighted` is True. + hypergraph_metadata : dict, optional + Metadata for the hypergraph. Default is an empty dictionary. + node_metadata : dict, optional + A dictionary of metadata for nodes, where keys are node identifiers and values are metadata dictionaries. + edge_metadata : list of dicts, optional + A list of metadata dictionaries corresponding to the edges in `edge_list`. + + Raises + ------ + ValueError + If `edge_list` and `weights` have mismatched lengths when `weighted` is True. + """ + # Initialize hypergraph metadata + self._hypergraph_metadata = hypergraph_metadata or dict() + self._hypergraph_metadata.update({"weighted": weighted}) + + self._weighted:bool = weighted + self._weights:dict = dict() + self._node_metadata:Dict[Any, Dict] = node_metadata or dict() + self._edge_metadata:Dict[Tuple, Dict] = { + edge_list[i]: edge_metadata[i] + for i in range(len(edge_list)) + } if edge_metadata and edge_list else dict() + + # store _edge_list and _reverse_edge_list as dictionaries + # keys of _edge_list are edges + # values of _edge_list are edge id's, ie integers + self._edge_list:dict = dict() + self._reverse_edge_list:dict = dict() + self._next_edge_id:int = 0 + + self._incidences_metadata = {} + + # ============================================================================= + # Node Management (Shared Implementation) + # ============================================================================= + + def add_node(self, + node: Any, + metadata: Optional[Dict] = None) -> None: + """ + Add a node to the hypergraph. If the node is already in the hypergraph, nothing happens. + + Parameters + ---------- + node : object + The node to add. + metadata : dict, optional + Metadata for the node. + + Returns + ------- + None + """ + if metadata is None: + metadata = {} + # Implementation varies by subclass due to different adjacency structures + if node not in self._node_metadata: + self._node_metadata[node] = metadata + + def add_nodes(self, + node_list: List[Any], + metadata: Optional[List | Dict] = None) -> None: + """ + Add a list of nodes to the hypergraph. + + Parameters + ---------- + node_list : list + The list of nodes to add. + metadata : list or dict + The list of nodes' metadata to add. + + Returns + ------- + None + """ + if metadata is None: + metadata = {} + # if the metadata was provided in list form, convert it to a dict + if isinstance(metadata, List): + if len(node_list) == len(metadata): + metadata = { + node_list[i]: metadata[i] + for i in range(len(node_list)) + } + else: + raise ValueError(f"len({node_list}) != len({metadata})") + + for node in node_list: + self.add_node(node, metadata=metadata.get(node)) + + @abstractmethod + def get_nodes(self, metadata: bool = False): + """ + Get all nodes in the hypergraph. + + Parameters + ---------- + metadata : bool, optional + If True, return node metadata dictionary. If False, return list of nodes. + + Returns + ------- + list or dict + List of nodes or dictionary of node metadata. + """ + pass + + @abstractmethod + def remove_node(self, node: Any, keep_edges: bool = False) -> None: + """ + Remove a node from the hypergraph. + + Parameters + ---------- + node : object + The node to remove. + keep_edges : bool, optional + If True, edges incident to the node are kept but updated to exclude the node. + If False, edges incident to the node are removed entirely. Default is False. + + Raises + ------ + ValueError + If the node is not in the hypergraph. + """ + pass + + @abstractmethod + def remove_nodes(self, node_list: List[Any], keep_edges: bool = False) -> None: + """ + Remove a list of nodes from the hypergraph. + + Parameters + ---------- + node_list : list + The list of nodes to remove. + keep_edges : bool, optional + If True, edges incident to the nodes are kept but updated to exclude the nodes. + If False, edges incident to the nodes are removed entirely. Default is False. + + Returns + ------- + None + + Raises + ------ + KeyError + If any of the nodes is not in the hypergraph. + """ + pass + + @abstractmethod + def check_node(self, node: Any) -> bool: + """ + Check if a node exists in the hypergraph. + + Parameters + ---------- + node : object + The node to check. + + Returns + ------- + bool + True if the node exists, False otherwise. + """ + pass + + # ============================================================================= + # Edge Management (Abstract - varies by implementation) + # ============================================================================= + + @abstractmethod + def add_edge(self, edge, *args, **kwargs) -> None: + """ + Add an edge to the hypergraph. + + Note: Signature varies by implementation. + """ + pass + + def add_edges(self, + edge_list:List[Tuple[Tuple, Tuple]], + + weights:List[int]=None, + metadata:List[Dict]=None, + *args, + **kwargs) -> None: + """Add a list of hyperedges to the hypergraph. If a hyperedge is already in the hypergraph, its weight is updated. + + Parameters + ---------- + edge_list : list + The list of hyperedges to add. + edge_layer : list + The list of layers to which the hyperedges belong. + weights : list, optional + The list of weights of the hyperedges. If the hypergraph is weighted, this must be provided. + metadata : list, optional + The list of metadata of the hyperedges. + + Returns + ------- + None + + Raises + ------ + ValueError + If the hypergraph is weighted and no weights are provided or if the hypergraph is not weighted and weights are provided. + """ + if weights is not None and not self._weighted: + print( + "Warning: weights are provided but the hypergraph is not weighted. The hypergraph will be weighted." + ) + self._weighted = True + + if self._weighted and weights is not None: + if len(set(edge_list)) != len(list(edge_list)): + raise ValueError( + "If weights are provided, the edge list must not contain repeated edges." + ) + if len(list(edge_list)) != len(list(weights)): + raise ValueError("The number of edges and weights must be the same.") + + for i, edge in enumerate(edge_list): + self.add_edge( + edge=edge, + weight=( + weights[i] if self._weighted and weights is not None else None + ), + metadata=metadata[i] if metadata is not None else None, + *args, + **kwargs, + ) + + @abstractmethod + def remove_edge(self, edge, *args, **kwargs) -> None: + """ + Remove an edge from the hypergraph. + + Note: Signature varies by implementation. + """ + pass + + @abstractmethod + def remove_edges(self, edge_list) -> None: + """ + Remove multiple edges from the hypergraph. + + Parameters + ---------- + edge_list : list + The list of edges to remove. + + Returns + ------- + None + + Raises + ------ + KeyError + If any edge is not in the hypergraph. + """ + pass + + @abstractmethod + def get_edges(self, *args, **kwargs): + """ + Get edges from the hypergraph. + + Note: Parameters vary by implementation due to different filtering capabilities. + """ + pass + + @abstractmethod + def get_neighbors(self, node, order: int = None, size: int = None): + """ + Get the neighbors of a node in the hypergraph. + + Parameters + ---------- + node : object + The node of interest. + order : int + The order of the hyperedges to consider. + size : int + The size of the hyperedges to consider. + + Returns + ------- + set + The neighbors of the node. + + Raises + ------ + ValueError + If order and size are both specified or neither are specified. + """ + pass + + @abstractmethod + def get_incident_edges(self, node, order: int = None, size: int = None) -> List[Tuple]: + """ + Get the incident edges of a node. + + Parameters + ---------- + node : object + The node of interest. + order : int, optional + The order of the hyperedges to consider. If None, all hyperedges are considered. + size : int, optional + The size of the hyperedges to consider. If None, all hyperedges are considered. + + Returns + ------- + list + The list of incident edges. + """ + pass + + def check_edge(self, edge, *args, **kwargs) -> bool: + """ + Check if an edge exists in the hypergraph. + + Parameters + ---------- + edge : tuple + The edge to check. + + Returns + ------- + bool + True if the edge is in the hypergraph, False otherwise. + + """ + edge = self._canon_edge(edge) + k = self._restructure_query_edge(edge, *args, **kwargs) + return k in self._edge_list + + def get_edge_list(self) -> Dict[Tuple, int]: + """Get the edge list dictionary.""" + return self._edge_list + + def set_edge_list(self, edge_list: List[Tuple]): + """Set the edge list dictionary.""" + self._edge_list = { + e: i for i, e in enumerate(edge_list) + } + self._next_edge_id = len(edge_list) + + # ============================================================================= + # Weight Management + # ============================================================================= + + def get_weight(self, edge, *args, **kwargs): + """Returns the weight of the specified edge. + + Parameters + ---------- + edge : tuple + The edge to get the weight of. + + Returns + ------- + float + Weight of the specified edge. + """ + edge = self._canon_edge(edge) + k = self._restructure_query_edge(edge, *args, **kwargs) + if k not in self._edge_list: + raise ValueError("Edge {} not in hypergraph.".format(k)) + else: + return self._weights[self._edge_list[k]] + + def set_weight(self, edge, weight, *args, **kwargs) -> None: + """Sets the weight of the specified edge. + + Parameters + ---------- + edge : tuple + The edge to set the weight of. + + weight : float + The weight to set. + + Returns + ------- + None + + Raises + ------ + ValueError + If the edge is not in the hypergraph. + """ + if not self._weighted and weight != 1: + raise ValueError( + "If the hypergraph is not weighted, weight can be 1 or None." + ) + + edge = self._canon_edge(edge) + k = self._restructure_query_edge(edge, *args, **kwargs) + if k not in self._edge_list: + raise ValueError("Edge {} not in hypergraph.".format(edge)) + else: + self._weights[self._edge_list[k]] = weight + + @abstractmethod + def get_weights(self, order=None, size=None, up_to=False, asdict=False): + """Returns the list of weights of the edges in the hypergraph. If order is specified, it returns the list of weights of the edges of the specified order. + If size is specified, it returns the list of weights of the edges of the specified size. If both order and size are specified, it raises a ValueError. + If up_to is True, it returns the list of weights of the edges of order smaller or equal to the specified order. + + Parameters + ---------- + order : int, optional + Order of the edges to get the weights of. + + size : int, optional + Size of the edges to get the weights of. + + up_to : bool, optional + If True, it returns the list of weights of the edges of order smaller or equal to the specified order. Default is False. + + Returns + ------- + list + List of weights of the edges in the hypergraph. + + Raises + ------ + ValueError + If both order and size are specified. + + """ + pass + + # ============================================================================= + # Structural Information (Shared Implementation) + # ============================================================================= + + def num_nodes(self) -> int: + """ + Returns the number of nodes in the hypergraph. + + Returns + ------- + int + Number of nodes in the hypergraph. + """ + return len(self.get_nodes()) + + @abstractmethod + def num_edges(self) -> int: + """Returns the number of edges in the hypergraph. + + Returns + ------- + int + Number of edges in the hypergraph. + """ + pass + + @abstractmethod + def get_sizes(self) -> List[int]: + """Returns the list of sizes of the hyperedges in the hypergraph. + + Returns + ------- + list + List of sizes of the hyperedges in the hypergraph. + + """ + pass + + def max_size(self) -> int: + """ + Returns the maximum size of the hypergraph. + + Returns + ------- + int + Maximum size of the hypergraph. + """ + sizes = self.get_sizes() + return max(sizes) if sizes else 0 + + def max_order(self) -> int: + """ + Returns the maximum order of the hypergraph. + + Returns + ------- + int + Maximum order of the hypergraph. + """ + return self.max_size() - 1 + + def get_orders(self) -> List[int]: + """ + Get the order of each edge in the hypergraph. + + Returns + ------- + list + A list of integers representing the order of each edge. + """ + return [size - 1 for size in self.get_sizes()] + + def distribution_sizes(self) -> Dict[int, int]: + """ + Returns the distribution of sizes of the hyperedges in the hypergraph. + + Returns + ------- + dict + Distribution of sizes of the hyperedges in the hypergraph. + """ + return dict(Counter(self.get_sizes())) + + @abstractmethod + def is_uniform(self) -> bool: + """ + Check if the hypergraph is uniform, i.e. all hyperedges have the same size. + + Returns + ------- + bool + True if the hypergraph is uniform, False otherwise. + """ + pass + + def is_weighted(self) -> bool: + """ + Check if the hypergraph is weighted. + + Returns + ------- + bool + True if the hypergraph is weighted, False otherwise. + """ + return self._weighted + + # ============================================================================= + # Metadata Management (Shared Implementation) + # ============================================================================= + + # Hypergraph metadata + def get_hypergraph_metadata(self): + """Get hypergraph metadata.""" + return self._hypergraph_metadata + + def set_hypergraph_metadata(self, metadata): + """Set hypergraph metadata.""" + self._hypergraph_metadata = metadata + + def set_attr_to_hypergraph_metadata(self, field, value): + """Set an attribute in hypergraph metadata.""" + self._hypergraph_metadata[field] = value + + # Node metadata + def get_node_metadata(self, node): + """Get metadata for a specific node.""" + if node not in self._node_metadata: + raise ValueError("Node {} not in hypergraph.".format(node)) + return self._node_metadata[node] + + def set_node_metadata(self, node, metadata): + """Set metadata for a specific node.""" + if node not in self._node_metadata: + raise ValueError("Node {} not in hypergraph.".format(node)) + self._node_metadata[node] = metadata + + def get_all_nodes_metadata(self): + """Get metadata for all nodes.""" + return self._node_metadata + + def set_attr_to_node_metadata(self, node, field, value): + """Set an attribute in node metadata.""" + if node not in self._node_metadata: + raise ValueError("Node {} not in hypergraph.".format(node)) + self._node_metadata[node][field] = value + + def remove_attr_from_node_metadata(self, node, field): + """Remove an attribute from node metadata.""" + if node not in self._node_metadata: + raise ValueError("Node {} not in hypergraph.".format(node)) + del self._node_metadata[node][field] + + # Edge metadata + def get_edge_metadata(self, edge, *args, **kwargs) -> dict: + """ + Get metadata for a specific edge. + """ + edge = self._canon_edge(edge) + k = self._restructure_query_edge(edge, *args, **kwargs) + if k not in self._edge_list: + raise ValueError("Edge {} not in hypergraph.".format(edge)) + return dict(self._edge_metadata[k]) + + def set_edge_metadata(self, edge, metadata:Dict, *args, **kwargs): + edge = self._canon_edge(edge) + k = self._restructure_query_edge(edge, *args, **kwargs) + if k not in self._edge_list: + raise ValueError("Edge {} not in hypergraph.".format(edge)) + self._edge_metadata[k] = metadata + + def get_all_edges_metadata(self) -> Dict[Tuple, Dict]: + """Get metadata for all edges.""" + return self._edge_metadata + + def set_attr_to_edge_metadata(self, edge, field, value, *args, **kwargs): + edge = self._canon_edge(edge) + k = self._restructure_query_edge(edge, *args, **kwargs) + if k not in self._edge_metadata: + raise ValueError("Edge {} not in hypergraph.".format(edge)) + self._edge_metadata[k][field] = value + + def remove_attr_from_edge_metadata(self, edge, field, *args, **kwargs): + edge = self._canon_edge(edge) + k = self._restructure_query_edge(edge, *args, **kwargs) + if k not in self._edge_metadata: + raise ValueError("Edge {} not in hypergraph.".format(edge)) + del self._edge_metadata[k][field] + + # Incidence metadata + @abstractmethod + def get_incidence_metadata(self, edge, node): + """Get incidence metadata for a specific edge-node pair.""" + pass + + @abstractmethod + def set_incidence_metadata(self, edge, node, metadata): + """Set incidence metadata for a specific edge-node pair.""" + pass + + @abstractmethod + def get_all_incidences_metadata(self): + """Get all incidence metadata.""" + pass + + @abstractmethod + def _restructure_query_edge(self, k: Tuple[Tuple, Any]): + """ + An implementation-specific helper for modifying a query edge + prior to metadata retrieval. + """ + pass + + # ============================================================================= + # Utility Methods (Shared Implementation) + # ============================================================================= + + @abstractmethod + def _canon_edge(self, edge: Tuple) -> Tuple: + """ + Gets the canonical form of an edge (sorts the inner tuples) + Works for hyperedges but WILL BREAK FOR METAEDGES + TODO: Add recursive canonicalization for future metagraph integration + """ + pass + + @abstractmethod + def clear(self): + """Clear all data from the hypergraph.""" + pass + + def copy(self): + """ + Returns a copy of the hypergraph. + + Returns + ------- + IHypergraph + A copy of the hypergraph. + """ + return copy.deepcopy(self) + + def __str__(self): + """ + Returns a string representation of the hypergraph. + + Returns + ------- + str + A string representation of the hypergraph. + """ + title = "Hypergraph with {} nodes and {} edges.\n".format( + self.num_nodes(), self.num_edges() + ) + details = "Distribution of hyperedge sizes: {}".format( + self.distribution_sizes() + ) + return title + details + + def __len__(self): + """ + Returns the number of edges in the hypergraph. + + Returns + ------- + int + The number of edges in the hypergraph. + """ + return len(self._edge_list) + + def __iter__(self): + """ + Returns an iterator over the edges in the hypergraph. + + Returns + ------- + iterator + An iterator over the edges in the hypergraph. + """ + return iter(self._edge_list.items()) + + # ============================================================================= + # Serialization Support (Abstract - implementation-specific) + # ============================================================================= + + @abstractmethod + def expose_data_structures(self) -> Dict: + """ + Expose the internal data structures of the hypergraph for serialization. + + Returns + ------- + dict + A dictionary containing all internal attributes of the hypergraph. + """ + pass + + @abstractmethod + def populate_from_dict(self, data: Dict) -> None: + """ + Populate the attributes of the hypergraph from a dictionary. + + Parameters + ---------- + data : dict + A dictionary containing the attributes to populate the hypergraph. + """ + pass + + @abstractmethod + def expose_attributes_for_hashing(self) -> dict: + """ + Expose relevant attributes for hashing. + + Returns + ------- + dict + A dictionary containing key attributes for hashing. + """ + pass + + def get_mapping(self): + """ + Map the nodes of the hypergraph to integers in [0, n_nodes). + + Returns + ------- + LabelEncoder + The mapping. + """ + encoder = LabelEncoder() + encoder.fit(self.get_nodes()) + return encoder \ No newline at end of file diff --git a/hypergraphx/core/I_undirected_hypergraph.py b/hypergraphx/core/I_undirected_hypergraph.py new file mode 100644 index 0000000..5bcf65f --- /dev/null +++ b/hypergraphx/core/I_undirected_hypergraph.py @@ -0,0 +1,601 @@ +import copy +import warnings +from abc import abstractmethod +from typing import Tuple, Any, List, Dict, Optional, Set, Union +from collections import Counter + +from hypergraphx.core.i_hypergraph import IHypergraph + + +class IUndirectedHypergraph(IHypergraph): + """ + Abstract base class for undirected hypergraphs that provides common functionality + for adjacency list management, canonical edge handling, and shared operations. + + This class serves as an intermediate layer between IHypergraph and concrete + implementations like Hypergraph, MultiplexHypergraph, and TemporalHypergraph. + """ + + def __init__( + self, + edge_list: Optional[List] = None, + weighted: bool = False, + weights: Optional[List[int]] = None, + hypergraph_metadata: Optional[Dict] = None, + node_metadata: Optional[Dict] = None, + edge_metadata: Optional[List[Dict]] = None + ): + """ + Initialize the undirected hypergraph base class. + + Parameters + ---------- + edge_list : list of tuples, optional + A list of hyperedges, where each hyperedge is represented as a tuple of nodes. + weighted : bool, optional + Indicates whether the hypergraph is weighted. Default is False. + weights : list of floats, optional + A list of weights corresponding to each edge in `edge_list`. + hypergraph_metadata : dict, optional + Metadata for the hypergraph. Default is an empty dictionary. + node_metadata : dict, optional + A dictionary of metadata for nodes. + edge_metadata : list of dicts, optional + A list of metadata dictionaries corresponding to the edges. + """ + super().__init__( + edge_list=edge_list, + weighted=weighted, + weights=weights, + hypergraph_metadata=hypergraph_metadata, + node_metadata=node_metadata, + edge_metadata=edge_metadata + ) + + # Initialize adjacency list - common to all undirected hypergraph implementations + self._adj = {} + + # ============================================================================= + # Common Node Management Implementation + # ============================================================================= + + def add_node(self, node: Any, metadata: Optional[Dict] = None) -> None: + """ + Add a node to the hypergraph. If the node is already in the hypergraph, nothing happens. + + Parameters + ---------- + node : object + The node to add. + metadata : dict, optional + Metadata for the node. + """ + # Call parent implementation for metadata handling + super().add_node(node, metadata) + + # Add to adjacency list if not already present + if node not in self._adj: + self._adj[node] = [] + + def get_nodes(self, metadata: bool = False): + """ + Get all nodes in the hypergraph. + + Parameters + ---------- + metadata : bool, optional + If True, return node metadata dictionary. If False, return list of nodes. + + Returns + ------- + list or dict + List of nodes or dictionary of node metadata. + """ + if metadata: + return {node: self.get_node_metadata(node) for node in self._adj.keys()} + else: + return list(self._adj.keys()) + + def check_node(self, node: Any) -> bool: + """ + Check if a node exists in the hypergraph. + + Parameters + ---------- + node : object + The node to check. + + Returns + ------- + bool + True if the node exists, False otherwise. + """ + return node in self._adj + + def remove_nodes(self, node_list: List[Any], keep_edges: bool = False) -> None: + """ + Remove a list of nodes from the hypergraph. + + Parameters + ---------- + node_list : list + The list of nodes to remove. + keep_edges : bool, optional + If True, edges incident to the nodes are kept but updated to exclude the nodes. + If False, edges incident to the nodes are removed entirely. Default is False. + """ + for node in node_list: + self.remove_node(node, keep_edges=keep_edges) + + # ============================================================================= + # Common Edge Management Implementation + # ============================================================================= + + def add_edges(self, edge_list: List, weights: Optional[List] = None, metadata: Optional[List[Dict]] = None, **kwargs) -> None: + """ + Add multiple edges to the hypergraph. + + Parameters + ---------- + edge_list : list + The list of edges to add. + weights : list, optional + The list of weights for the edges. + metadata : list, optional + The list of metadata dictionaries for the edges. + **kwargs + Additional parameters specific to subclass implementations. + """ + if weights is not None and not self._weighted: + warnings.warn( + "Weights are provided but the hypergraph is not weighted. The weights will be ignored.", + UserWarning, + ) + self._weighted = True + + if self._weighted and weights is not None: + if len(set(edge_list)) != len(list(edge_list)): + raise ValueError( + "If weights are provided, the edge list must not contain repeated edges." + ) + if len(list(edge_list)) != len(list(weights)): + raise ValueError("The number of edges and weights must be the same.") + + for i, edge in enumerate(edge_list): + weight = weights[i] if self._weighted and weights is not None else None + edge_metadata = metadata[i] if metadata is not None else None + self._add_edge_implementation(edge, weight, edge_metadata, **kwargs) + + @abstractmethod + def _add_edge_implementation(self, edge, weight, metadata, **kwargs): + """ + Abstract method for adding a single edge. Must be implemented by subclasses. + + Parameters + ---------- + edge : tuple + The edge to add. + weight : float, optional + The weight of the edge. + metadata : dict, optional + The metadata of the edge. + **kwargs + Additional parameters specific to subclass implementations. + """ + pass + + def remove_edges(self, edge_list: List) -> None: + """ + Remove multiple edges from the hypergraph. + + Parameters + ---------- + edge_list : list + The list of edges to remove. + """ + for edge in edge_list: + self.remove_edge(edge) + + # ============================================================================= + # Common Neighbor and Incident Edge Methods + # ============================================================================= + + def get_neighbors(self, node: Any, order: int = None, size: int = None) -> Set: + """ + Get the neighbors of a node in the hypergraph. + + Parameters + ---------- + node : object + The node of interest. + order : int, optional + The order of the hyperedges to consider. + size : int, optional + The size of the hyperedges to consider. + + Returns + ------- + set + The neighbors of the node. + + Raises + ------ + ValueError + If the node is not in the hypergraph or if both order and size are specified. + """ + if node not in self._adj: + raise ValueError("Node {} not in hypergraph.".format(node)) + if order is not None and size is not None: + raise ValueError("Order and size cannot be both specified.") + + neigh = set() + edges = self.get_incident_edges(node, order=order, size=size) + for edge in edges: + # Extract nodes from edge (handling different edge formats) + edge_nodes = self._extract_nodes_from_edge(edge) + neigh.update(edge_nodes) + + # Remove the node itself from its neighbors + neigh.discard(node) + return neigh + + @abstractmethod + def _extract_nodes_from_edge(self, edge) -> List: + """ + Extract node list from an edge representation. + Must be implemented by subclasses based on their edge format. + + Parameters + ---------- + edge : object + The edge representation. + + Returns + ------- + list + List of nodes in the edge. + """ + pass + + # ============================================================================= + # Common Structural Information Methods + # ============================================================================= + + def num_edges(self, order: int = None, size: int = None, up_to: bool = False) -> int: + """ + Get the number of edges in the hypergraph. + + Parameters + ---------- + order : int, optional + The order of edges to count. + size : int, optional + The size of edges to count. + up_to : bool, optional + If True, count edges up to the specified order/size. + + Returns + ------- + int + Number of edges matching the criteria. + """ + if order is not None and size is not None: + raise ValueError("Order and size cannot be both specified.") + + if order is None and size is None: + return len(self._edge_list) + else: + if size is not None: + order = size - 1 + count = 0 + for edge_key in self._edge_list: + edge_size = self._get_edge_size(edge_key) + edge_order = edge_size - 1 + if not up_to: + if edge_order == order: + count += 1 + else: + if edge_order <= order: + count += 1 + return count + + @abstractmethod + def _get_edge_size(self, edge_key) -> int: + """ + Get the size of an edge given its key representation. + Must be implemented by subclasses based on their edge format. + + Parameters + ---------- + edge_key : object + The edge key representation. + + Returns + ------- + int + Size of the edge. + """ + pass + + def get_sizes(self) -> List[int]: + """ + Get the sizes of all edges in the hypergraph. + + Returns + ------- + list + List of edge sizes. + """ + return [self._get_edge_size(edge_key) for edge_key in self._edge_list.keys()] + + def is_uniform(self) -> bool: + """ + Check if the hypergraph is uniform (all edges have the same size). + + Returns + ------- + bool + True if the hypergraph is uniform, False otherwise. + """ + if not self._edge_list: + return True + + sizes = self.get_sizes() + return len(set(sizes)) <= 1 + + # ============================================================================= + # Common Weight Management Methods + # ============================================================================= + + def get_weights(self, order: int = None, size: int = None, up_to: bool = False, asdict: bool = False): + """ + Get weights of edges in the hypergraph. + + Parameters + ---------- + order : int, optional + The order of edges to get weights for. + size : int, optional + The size of edges to get weights for. + up_to : bool, optional + If True, get weights for edges up to the specified order/size. + asdict : bool, optional + If True, return as dictionary mapping edges to weights. + + Returns + ------- + list or dict + Weights of the edges. + """ + if order is not None and size is not None: + raise ValueError("Order and size cannot be both specified.") + + if order is None and size is None: + w = { + edge: self._weights[self._edge_list[edge]] for edge in self._edge_list.keys() + } + else: + if size is not None: + order = size - 1 + w = {} + for edge_key in self._edge_list: + edge_size = self._get_edge_size(edge_key) + edge_order = edge_size - 1 + if not up_to: + if edge_order == order: + w[edge_key] = self._weights[self._edge_list[edge_key]] + else: + if edge_order <= order: + w[edge_key] = self._weights[self._edge_list[edge_key]] + + return w if asdict else list(w.values()) + + # ============================================================================= + # Common Utility Methods + # ============================================================================= + + def _canon_edge(self, edge: Tuple) -> Tuple: + """ + Get the canonical form of an edge by sorting its components. + This default implementation works for simple undirected edges. + Subclasses can override for more complex edge structures. + + Parameters + ---------- + edge : tuple + The edge to canonicalize. + + Returns + ------- + tuple + The canonical form of the edge. + """ + return tuple(sorted(edge)) + + def get_adj_dict(self) -> Dict: + """ + Get the adjacency dictionary. + + Returns + ------- + dict + The adjacency dictionary mapping nodes to lists of incident edge IDs. + """ + return self._adj + + def set_adj_dict(self, adj: Dict) -> None: + """ + Set the adjacency dictionary. + + Parameters + ---------- + adj : dict + The adjacency dictionary to set. + """ + self._adj = adj + + def clear(self) -> None: + """Clear all data from the hypergraph.""" + super().clear() + self._adj.clear() + + # ============================================================================= + # Common Analysis Methods (delegated to external modules) + # ============================================================================= + + def degree(self, node: Any, order: int = None, size: int = None): + """Get the degree of a node.""" + from hypergraphx.measures.degree import degree + return degree(self, node, order=order, size=size) + + def degree_sequence(self, order: int = None, size: int = None): + """Get the degree sequence of the hypergraph.""" + from hypergraphx.measures.degree import degree_sequence + return degree_sequence(self, order=order, size=size) + + def degree_distribution(self, order: int = None, size: int = None): + """Get the degree distribution of the hypergraph.""" + from hypergraphx.measures.degree import degree_distribution + return degree_distribution(self, order=order, size=size) + + def isolated_nodes(self, size: int = None, order: int = None): + """Get isolated nodes in the hypergraph.""" + from hypergraphx.utils.cc import isolated_nodes + return isolated_nodes(self, size=size, order=order) + + def is_isolated(self, node: Any, size: int = None, order: int = None): + """Check if a node is isolated.""" + from hypergraphx.utils.cc import is_isolated + return is_isolated(self, node, size=size, order=order) + + # Connected Components + def is_connected(self, size: int = None, order: int = None): + """Check if the hypergraph is connected.""" + from hypergraphx.utils.cc import is_connected + return is_connected(self, size=size, order=order) + + def connected_components(self, size: int = None, order: int = None): + """Get connected components of the hypergraph.""" + from hypergraphx.utils.cc import connected_components + return connected_components(self, size=size, order=order) + + def node_connected_component(self, node: Any, size: int = None, order: int = None): + """Get the connected component containing a specific node.""" + from hypergraphx.utils.cc import node_connected_component + return node_connected_component(self, node, size=size, order=order) + + def num_connected_components(self, size: int = None, order: int = None): + """Get the number of connected components.""" + from hypergraphx.utils.cc import num_connected_components + return num_connected_components(self, size=size, order=order) + + def largest_component(self, size: int = None, order: int = None): + """Get the largest connected component.""" + from hypergraphx.utils.cc import largest_component + return largest_component(self, size=size, order=order) + + def largest_component_size(self, size: int = None, order: int = None): + """Get the size of the largest connected component.""" + from hypergraphx.utils.cc import largest_component_size + return largest_component_size(self, size=size, order=order) + + # Matrix operations + def binary_incidence_matrix(self, return_mapping: bool = False): + """Get the binary incidence matrix.""" + from hypergraphx.linalg import binary_incidence_matrix + return binary_incidence_matrix(self, return_mapping) + + def incidence_matrix(self, return_mapping: bool = False): + """Get the incidence matrix.""" + from hypergraphx.linalg import incidence_matrix + return incidence_matrix(self, return_mapping) + + def adjacency_matrix(self, return_mapping: bool = False): + """Get the adjacency matrix.""" + from hypergraphx.linalg import adjacency_matrix + return adjacency_matrix(self, return_mapping) + + def dual_random_walk_adjacency(self, return_mapping: bool = False): + """Get the dual random walk adjacency matrix.""" + from hypergraphx.linalg import dual_random_walk_adjacency + return dual_random_walk_adjacency(self, return_mapping) + + def adjacency_factor(self, t: int = 0): + """Get the adjacency factor.""" + from hypergraphx.linalg import adjacency_factor + return adjacency_factor(self, t) + + # ============================================================================= + # Common Serialization Support + # ============================================================================= + + def expose_data_structures(self) -> Dict: + """ + Expose the internal data structures for serialization. + Base implementation that subclasses can extend. + + Returns + ------- + dict + A dictionary containing the internal data structures. + """ + base_data = super().expose_data_structures() + base_data["_adj"] = self._adj + return base_data + + def populate_from_dict(self, data: Dict) -> None: + """ + Populate the attributes from a dictionary. + Base implementation that subclasses can extend. + + Parameters + ---------- + data : dict + A dictionary containing the attributes to populate. + """ + super().populate_from_dict(data) + self._adj = data.get("_adj", {}) + + # ============================================================================= + # Abstract Methods That Must Be Implemented by Subclasses + # ============================================================================= + + @abstractmethod + def remove_node(self, node: Any, keep_edges: bool = False) -> None: + """ + Remove a node from the hypergraph. + Must be implemented by subclasses due to edge format differences. + """ + pass + + @abstractmethod + def add_edge(self, edge, weight=None, metadata=None, **kwargs) -> None: + """ + Add an edge to the hypergraph. + Must be implemented by subclasses due to edge format differences. + """ + pass + + @abstractmethod + def remove_edge(self, edge, **kwargs) -> None: + """ + Remove an edge from the hypergraph. + Must be implemented by subclasses due to edge format differences. + """ + pass + + @abstractmethod + def get_edges(self, metadata: bool = False, **kwargs): + """ + Get edges from the hypergraph. + Must be implemented by subclasses due to edge format differences. + """ + pass + + @abstractmethod + def get_incident_edges(self, node: Any, order: int = None, size: int = None): + """ + Get incident edges of a node. + Must be implemented by subclasses due to edge format differences. + """ + pass \ No newline at end of file diff --git a/hypergraphx/core/directed_hypergraph.py b/hypergraphx/core/directed_hypergraph.py index 0f47c44..a65047b 100644 --- a/hypergraphx/core/directed_hypergraph.py +++ b/hypergraphx/core/directed_hypergraph.py @@ -1,8 +1,10 @@ import copy -from typing import Tuple, List +from typing import Tuple, List, Any, Optional, Dict from sklearn.preprocessing import LabelEncoder +from hypergraphx.core.i_hypergraph import IHypergraph + def _get_edge_size(edge): """ @@ -20,7 +22,7 @@ def _get_edge_size(edge): return len(edge[0]) + len(edge[1]) -class DirectedHypergraph: +class DirectedHypergraph(IHypergraph): """ A Directed Hypergraph is a generalization of a graph in which hyperedges have a direction. Each hyperedge connects a set of source nodes to a set of target nodes. @@ -28,12 +30,12 @@ class DirectedHypergraph: def __init__( self, - edge_list=None, - weighted=False, - weights=None, - hypergraph_metadata=None, - node_metadata=None, - edge_metadata=None, + edge_list: Optional[List]=None, + weighted: bool = False, + weights:Optional[List[int]]=None, + hypergraph_metadata: Optional[Dict] = None, + node_metadata: Optional[Dict] = None, + edge_metadata: Optional[List[Dict]] = None ): """ Initialize a Directed Hypergraph. @@ -60,23 +62,22 @@ def __init__( If `edge_list` and `weights` have mismatched lengths when `weighted` is True. If `edge_list` contains improperly formatted edges. """ - # Initialize hypergraph metadata - self._hypergraph_metadata = hypergraph_metadata or {} - self._hypergraph_metadata.update( - {"weighted": weighted, "type": "DirectedHypergraph"} + # Call parent constructor + super().__init__( + edge_list=None, # We'll handle edge_list separately + weighted=weighted, + weights=weights, + hypergraph_metadata=hypergraph_metadata, + node_metadata=node_metadata, + edge_metadata=edge_metadata ) + + # Update hypergraph metadata with type + self._hypergraph_metadata.update({"type": "DirectedHypergraph"}) - # Initialize core attributes - self._weighted = weighted + # Initialize DirectedHypergraph-specific attributes self._adj_source = {} self._adj_target = {} - self._edge_list = {} - self._node_metadata = {} - self._edge_metadata = {} - self._incidences_metadata = {} - self._reverse_edge_list = {} - self._weights = {} - self._next_edge_id = 0 # Add node metadata if provided if node_metadata: @@ -89,7 +90,10 @@ def __init__( raise ValueError("Edge list and weights must have the same length.") self.add_edges(edge_list, weights=weights, metadata=edge_metadata) - # Nodes + # ============================================================================= + # Node Management Implementation + # ============================================================================= + def add_node(self, node, metadata=None): """ Add a node to the hypergraph. If the node is already in the hypergraph, nothing happens. @@ -98,35 +102,20 @@ def add_node(self, node, metadata=None): ---------- node : object The node to add. + metadata : dict, optional + Metadata for the node. Returns ------- None """ - if metadata is None: - self._node_metadata[node] = {} + # Call parent method for metadata handling + super().add_node(node, metadata) + + # DirectedHypergraph-specific initialization if node not in self._adj_source: self._adj_source[node] = [] self._adj_target[node] = [] - self._node_metadata[node] = {} - if self._node_metadata[node] == {}: - self._node_metadata[node] = metadata - - def add_nodes(self, node_list: list): - """ - Add a list of nodes to the hypergraph. - - Parameters - ---------- - node_list : list - The list of nodes to add. - - Returns - ------- - None - """ - for node in node_list: - self.add_node(node) def remove_node(self, node, keep_edges=False): """Remove a node from the hypergraph, with an option to keep or remove edges incident to it.""" @@ -144,6 +133,10 @@ def remove_node(self, node, keep_edges=False): del self._adj_source[node] del self._adj_target[node] + + # Remove from parent's node metadata + if node in self._node_metadata: + del self._node_metadata[node] def remove_nodes(self, node_list, keep_edges=False): """ @@ -153,10 +146,9 @@ def remove_nodes(self, node_list, keep_edges=False): ---------- node_list : list The list of nodes to remove. - keep_edges : bool, optional - If True, the edges incident to the nodes are kept, but the nodes are removed from the edges. If False, the edges incident to the nodes are removed. Default is False. - + If True, the edges incident to the nodes are kept, but the nodes are removed from the edges. + If False, the edges incident to the nodes are removed. Default is False. Returns ------- @@ -189,9 +181,8 @@ def check_node(self, node): ------- bool True if the node is in the hypergraph, False otherwise. - """ - return node in self._adj_source or self._adj_target + return node in self._adj_source def get_neighbors(self, node, order: int = None, size: int = None): """ @@ -226,7 +217,8 @@ def get_neighbors(self, node, order: int = None, size: int = None): neigh = set() edges = self.get_incident_edges(node) for edge in edges: - neigh.update(edge) + neigh.update(edge[0]) # Add source nodes + neigh.update(edge[1]) # Add target nodes if node in neigh: neigh.remove(node) return neigh @@ -236,7 +228,8 @@ def get_neighbors(self, node, order: int = None, size: int = None): neigh = set() edges = self.get_incident_edges(node, order=order) for edge in edges: - neigh.update(edge) + neigh.update(edge[0]) # Add source nodes + neigh.update(edge[1]) # Add target nodes if node in neigh: neigh.remove(node) return neigh @@ -279,7 +272,6 @@ def get_sources(self): ------- list List of sources of the hyperedges in the hypergraph. - """ return [edge[0] for edge in self._edge_list.keys()] @@ -290,11 +282,13 @@ def get_targets(self): ------- list List of targets of the hyperedges in the hypergraph. - """ return [edge[1] for edge in self._edge_list.keys()] - # Edges + # ============================================================================= + # Edge Management Implementation + # ============================================================================= + def add_edge(self, edge: Tuple[Tuple, Tuple], weight=None, metadata=None): """Add a directed hyperedge to the hypergraph. If the hyperedge already exists, its weight is updated. @@ -358,9 +352,10 @@ def add_edge(self, edge: Tuple[Tuple, Tuple], weight=None, metadata=None): else: self.set_edge_metadata(edge, {}) - def add_edges( - self, edge_list: List[Tuple[Tuple, Tuple]], weights=None, metadata=None - ): + def add_edges(self, + edge_list: List[Tuple[Tuple, Tuple]], + weights=None, + metadata=None): """Add a list of directed hyperedges to the hypergraph. If a hyperedge is already in the hypergraph, its weight is updated. Parameters @@ -552,7 +547,7 @@ def remove_edge(self, edge: Tuple[Tuple, Tuple]): del self._reverse_edge_list[e_idx] del self._weights[e_idx] - del self._edge_metadata[e_idx] + del self._edge_metadata[edge] del self._edge_list[edge] else: @@ -578,43 +573,9 @@ def remove_edges(self, edge_list): for edge in edge_list: self.remove_edge(edge) - def set_edge_list(self, edge_list): - self._edge_list = edge_list - - def get_edge_list(self): - return self._edge_list - - """def add_empty_edge(self, name, metadata): - pass - Don't know if needed - """ - - def check_edge(self, edge: Tuple[Tuple, Tuple]): - """Checks if the specified edge is in the hypergraph. - - Parameters - ---------- - edge : tuple - The edge to check. - - Returns - ------- - bool - True if the edge is in the hypergraph, False otherwise. - - """ - edge = (tuple(sorted(edge[0])), tuple(sorted(edge[1]))) - return edge in self._edge_list - - # Weight - def get_weight(self, edge: Tuple[Tuple, Tuple]): - """Returns the weight of the specified directed edge.""" - edge = (tuple(sorted(edge[0])), tuple(sorted(edge[1]))) - if edge in self._edge_list: - idx = self._edge_list[edge] - return self._weights[idx] - else: - raise ValueError(f"Edge {edge} not in hypergraph.") + # ============================================================================= + # Weight Management Implementation + # ============================================================================= def get_weights(self, order=None, size=None, up_to=False, asdict=False): """Returns the list of weights of the edges in the hypergraph. If order is specified, it returns the list of weights of the edges of the specified order. @@ -625,10 +586,8 @@ def get_weights(self, order=None, size=None, up_to=False, asdict=False): ---------- order : int, optional Order of the edges to get the weights of. - size : int, optional Size of the edges to get the weights of. - up_to : bool, optional If True, it returns the list of weights of the edges of order smaller or equal to the specified order. Default is False. @@ -641,7 +600,6 @@ def get_weights(self, order=None, size=None, up_to=False, asdict=False): ------ ValueError If both order and size are specified. - """ w = None if order is not None and size is not None: @@ -665,23 +623,9 @@ def get_weights(self, order=None, size=None, up_to=False, asdict=False): else: return list(w.values()) - def set_weight(self, edge: Tuple[Tuple, Tuple], weight: float): - """Sets the weight of the specified directed edge.""" - if not self._weighted and weight != 1: - raise ValueError( - "If the hypergraph is not weighted, weight can be 1 or None." - ) - edge = (tuple(sorted(edge[0])), tuple(sorted(edge[1]))) - if edge in self._edge_list: - idx = self._edge_list[edge] - self._weights[idx] = weight - else: - raise ValueError(f"Edge {edge} not in hypergraph.") - - # Info - def num_nodes(self): - """Returns the number of nodes in the hypergraph.""" - return len(self.get_nodes()) + # ============================================================================= + # Structural Information Implementation + # ============================================================================= def num_edges(self): """Returns the number of directed edges in the hypergraph.""" @@ -694,67 +638,9 @@ def get_sizes(self): ------- list List of sizes of the hyperedges in the hypergraph. - """ return [len(edge[0]) + len(edge[1]) for edge in self._edge_list.keys()] - def max_size(self): - """ - Returns the maximum size of the hypergraph. - - Returns - ------- - int - Maximum size of the hypergraph. - """ - return max(self.get_sizes()) - - def max_order(self): - """ - Returns the maximum order of the hypergraph. - - Returns - ------- - int - Maximum order of the hypergraph. - """ - return self.max_size() - 1 - - def distribution_sizes(self): - """ - Returns the distribution of sizes of the hyperedges in the hypergraph. - - Returns - ------- - collections.Counter - Distribution of sizes of the hyperedges in the hypergraph. - """ - from collections import Counter - - return dict(Counter(self.get_sizes())) - - def get_orders(self): - """Returns the list of orders of the hyperedges in the hypergraph. - - Returns - ------- - list - List of orders of the hyperedges in the hypergraph. - - """ - return [len(edge[0]) + len(edge[1]) - 1 for edge in self._edge_list.keys()] - - def is_weighted(self): - """ - Check if the hypergraph is weighted. - - Returns - ------- - bool - True if the hypergraph is weighted, False otherwise. - """ - return self._weighted - def is_uniform(self): """ Check if the hypergraph is uniform, i.e. all hyperedges have the same size. @@ -767,16 +653,19 @@ def is_uniform(self): uniform = True sz = None for edge in self._edge_list: - edge = set(edge[0]).union(set(edge[1])) + edge_nodes = set(edge[0]).union(set(edge[1])) if sz is None: - sz = len(edge) + sz = len(edge_nodes) else: - if len(edge) != sz: + if len(edge_nodes) != sz: uniform = False break return uniform - # Adj + # ============================================================================= + # Utility and DirectedHypergraph-specific methods + # ============================================================================= + def get_adj_dict(self, source_target): if source_target == "source": return self._adj_source @@ -797,249 +686,84 @@ def set_adj_dict(self, adj_dict, source_target): "Invalid value for source_target. Must be 'source' or 'target'." ) - # Degree + # Degree methods def degree(self, node, order=None, size=None): from hypergraphx.measures.degree import degree - return degree(self, node, order=order, size=size) def degree_sequence(self, order=None, size=None): from hypergraphx.measures.degree import degree_sequence - return degree_sequence(self, order=order, size=size) def degree_distribution(self, order=None, size=None): from hypergraphx.measures.degree import degree_distribution - return degree_distribution(self, order=order, size=size) - # Connected Components - """def is_connected(self, size=None, order=None): - from hypergraphx.utils.cc import is_connected - - return is_connected(self, size=size, order=order)""" - # TODO - - """def connected_components(self, size=None, order=None): - from hypergraphx.utils.cc import connected_components - - return connected_components(self, size=size, order=order)""" - # TODO - - """def node_connected_component(self, node, size=None, order=None): - from hypergraphx.utils.cc import node_connected_component - - return node_connected_component(self, node, size=size, order=order)""" - # TODO - - """def num_connected_components(self, size=None, order=None): - from hypergraphx.utils.cc import num_connected_components - - return num_connected_components(self, size=size, order=order)""" - # TODO - - """def largest_component(self, size=None, order=None): - from hypergraphx.utils.cc import largest_component - - return largest_component(self, size=size, order=order)""" - # TODO - - '''def subhypergraph_largest_component(self, size=None, order=None): - """ - Returns a subhypergraph induced by the nodes in the largest component of the hypergraph. - - Parameters - ---------- - size: int, optional - The size of the hyperedges to consider - order: int, optional - The order of the hyperedges to consider - - Returns - ------- - Hypergraph - Subhypergraph induced by the nodes in the largest component of the hypergraph. - """ - nodes = self.largest_component(size=size, order=order) - return self.subhypergraph(nodes)''' - # TODO - - """def largest_component_size(self, size=None, order=None): - from hypergraphx.utils.cc import largest_component_size - - return largest_component_size(self, size=size, order=order)""" - # TODO - - # Matrix - """def binary_incidence_matrix(self, return_mapping: bool = False): - from hypergraphx.linalg import binary_incidence_matrix - - return binary_incidence_matrix(self, return_mapping)""" - # TODO - - """def incidence_matrix(self, return_mapping: bool = False): - from hypergraphx.linalg import incidence_matrix - - return incidence_matrix(self, return_mapping)""" - # TODO - - """def adjacency_matrix(self, return_mapping: bool = False): - from hypergraphx.linalg import adjacency_matrix - - return adjacency_matrix(self, return_mapping)""" - # TODO - - # Utility + # Utility methods def isolated_nodes(self, size=None, order=None): from hypergraphx.utils.cc import isolated_nodes - return isolated_nodes(self, size=size, order=order) def is_isolated(self, node, size=None, order=None): from hypergraphx.utils.cc import is_isolated - return is_isolated(self, node, size=size, order=order) def to_line_graph(self, distance="intersection", s: int = 1, weighted=False): from hypergraphx.representations.projections import directed_line_graph - return directed_line_graph(self, distance, s, weighted) - # Metadata - def set_hypergraph_metadata(self, metadata): - self._hypergraph_metadata = metadata - - def get_hypergraph_metadata(self): - return self._hypergraph_metadata - - def set_node_metadata(self, node, metadata): - if node not in self._adj_source: - raise ValueError("Node {} not in hypergraph.".format(node)) - self._node_metadata[node] = metadata - - def get_node_metadata(self, node): - if node not in self._adj_source: - raise ValueError("Node {} not in hypergraph.".format(node)) - return self._node_metadata[node] - - def get_all_nodes_metadata(self): - return list(self._node_metadata.values()) + def _canon_edge(self, edge: Tuple) -> Tuple: + """ + Gets the canonical form of an edge (sorts the inner tuples) + Works for hyperedges but WILL BREAK FOR METAEDGES + TODO: Add recursive canonicalization for future metagraph integration + """ + return (tuple(sorted(edge[0])), tuple(sorted(edge[1]))) + + def _restructure_query_edge(self, k: Tuple[Tuple, Any]): + """ + An implementation-specific helper for modifying a query edge + prior to metadata retrieval. + """ + return k - def set_edge_metadata(self, edge, metadata): - edge = (tuple(sorted(edge[0])), tuple(sorted(edge[1]))) - if edge not in self._edge_list: - raise ValueError("Edge {} not in hypergraph.".format(edge)) - idx = self._edge_list[edge] - self._edge_metadata[idx] = metadata + # ============================================================================= + # Metadata Management Implementation + # ============================================================================= - def get_edge_metadata(self, edge): - edge = (tuple(sorted(edge[0])), tuple(sorted(edge[1]))) + def get_incidence_metadata(self, edge, node): + edge = self._canon_edge(edge) if edge not in self._edge_list: raise ValueError("Edge {} not in hypergraph.".format(edge)) - idx = self._edge_list[edge] - return self._edge_metadata[idx] - - def get_all_edges_metadata(self): - return self._edge_metadata + return self._incidences_metadata[(edge, node)] def set_incidence_metadata(self, edge, node, metadata): - edge = (tuple(sorted(edge[0])), tuple(sorted(edge[1]))) + edge = self._canon_edge(edge) if edge not in self._edge_list: raise ValueError("Edge {} not in hypergraph.".format(edge)) self._incidences_metadata[(edge, node)] = metadata - def get_incidence_metadata(self, edge, node): - edge = (tuple(sorted(edge[0])), tuple(sorted(edge[1]))) - if edge not in self._edge_list: - raise ValueError("Edge {} not in hypergraph.".format(edge)) - return self._incidences_metadata[(edge, node)] - def get_all_incidences_metadata(self): return {k: v for k, v in self._incidences_metadata.items()} - def set_attr_to_hypergraph_metadata(self, field, value): - self._hypergraph_metadata[field] = value - - def set_attr_to_node_metadata(self, node, field, value): - if node not in self._node_metadata: - raise ValueError("Node {} not in hypergraph.".format(node)) - self._node_metadata[node][field] = value - - def set_attr_to_edge_metadata(self, edge, field, value): - edge = (tuple(sorted(edge[0])), tuple(sorted(edge[1]))) - if edge not in self._edge_metadata: - raise ValueError("Edge {} not in hypergraph.".format(edge)) - self._edge_metadata[self._edge_list[edge]][field] = value - - def remove_attr_from_node_metadata(self, node, field): - if node not in self._node_metadata: - raise ValueError("Node {} not in hypergraph.".format(node)) - del self._node_metadata[node][field] - - def remove_attr_from_edge_metadata(self, edge, field): - edge = (tuple(sorted(edge[0])), tuple(sorted(edge[1]))) - if edge not in self._edge_metadata: - raise ValueError("Edge {} not in hypergraph.".format(edge)) - del self._edge_metadata[self._edge_list[edge]][field] + # ============================================================================= + # Utility Methods Implementation + # ============================================================================= - # Basic Functions def clear(self): self._edge_list.clear() self._adj_source.clear() self._adj_target.clear() self._incidences_metadata.clear() + self._node_metadata.clear() + self._edge_metadata.clear() + self._weights.clear() + self._reverse_edge_list.clear() - def copy(self): - """ - Returns a copy of the hypergraph. - - Returns - ------- - Hypergraph - A copy of the hypergraph. - """ - return copy.deepcopy(self) - - def __str__(self): - """ - Returns a string representation of the hypergraph. + # ============================================================================= + # Serialization Support Implementation + # ============================================================================= - Returns - ------- - str - A string representation of the hypergraph. - """ - title = "Hypergraph with {} nodes and {} edges.\n".format( - self.num_nodes(), self.num_edges() - ) - details = "Distribution of hyperedge sizes: {}".format( - self.distribution_sizes() - ) - return title + details - - def __len__(self): - """ - Returns the number of edges in the hypergraph. - - Returns - ------- - int - The number of edges in the hypergraph. - """ - return len(self._edge_list) - - def __iter__(self): - """ - Returns an iterator over the edges in the hypergraph. - - Returns - ------- - iterator - An iterator over the edges in the hypergraph. - """ - return iter(self._edge_list.items()) - - # Data Structure Extra def expose_data_structures(self): """ Expose the internal data structures of the directed hypergraph for serialization. @@ -1101,7 +825,7 @@ def expose_attributes_for_hashing(self): { "nodes": sorted_edge, "weight": self._weights.get(edge_id, 1), - "metadata": self._edge_metadata.get(edge_id, {}), + "metadata": self.get_edge_metadata(edge), } ) @@ -1115,17 +839,4 @@ def expose_attributes_for_hashing(self): "hypergraph_metadata": self._hypergraph_metadata, "edges": edges, "nodes": nodes, - } - - def get_mapping(self): - """ - Map the nodes of the hypergraph to integers in [0, n_nodes). - - Returns - ------- - LabelEncoder - The mapping. - """ - encoder = LabelEncoder() - encoder.fit(self.get_nodes()) - return encoder + } \ No newline at end of file diff --git a/hypergraphx/core/hypergraph.py b/hypergraphx/core/hypergraph.py index 126c64d..cf86137 100644 --- a/hypergraphx/core/hypergraph.py +++ b/hypergraphx/core/hypergraph.py @@ -1,24 +1,29 @@ import copy import warnings +from typing import Tuple, Any, List, Dict, Optional from sklearn.preprocessing import LabelEncoder +from hypergraphx.core.i_undirected_hypergraph import IUndirectedHypergraph -class Hypergraph: + +class Hypergraph(IUndirectedHypergraph): """ A Hypergraph is a generalization of a graph where an edge (hyperedge) can connect any number of nodes. It is represented as a set of nodes and a set of hyperedges, where each hyperedge is a subset of nodes. + + This implementation now inherits from IUndirectedHypergraph to leverage common functionality. """ def __init__( self, - edge_list=None, - weighted=False, - weights=None, - hypergraph_metadata=None, - node_metadata=None, - edge_metadata=None, + edge_list: Optional[List]=None, + weighted: bool = False, + weights:Optional[List[int]]=None, + hypergraph_metadata: Optional[Dict] = None, + node_metadata: Optional[Dict] = None, + edge_metadata: Optional[List[Dict]] = None ): """ Initialize a Hypergraph. @@ -43,75 +48,90 @@ def __init__( ValueError If `edge_list` and `weights` have mismatched lengths when `weighted` is True. """ - # Initialize hypergraph metadata - self._hypergraph_metadata = hypergraph_metadata or {} - self._hypergraph_metadata.update({"weighted": weighted, "type": "Hypergraph"}) - - # Initialize core attributes - self._weighted = weighted - self._adj = {} - self._edge_list = {} - self._weights = {} - self._incidences_metadata = {} - self._node_metadata = {} - self._edge_metadata = {} + # Call parent constructor (IUndirectedHypergraph) + super().__init__(edge_list, weighted, weights, hypergraph_metadata, node_metadata, edge_metadata) + + # Set hypergraph type in metadata + self._hypergraph_metadata.update({"type": "Hypergraph"}) + + # Initialize Hypergraph-specific attributes self._empty_edges = {} - self._reverse_edge_list = {} - self._next_edge_id = 0 - # Add node metadata if provided + # Add node metadata if provided (using parent's add_nodes method) if node_metadata: - for node, metadata in node_metadata.items(): - self.add_node(node, metadata=metadata) + self.add_nodes( + list(node_metadata.keys()), + metadata=node_metadata + ) # Add edges if provided if edge_list: if weighted and weights is not None and len(edge_list) != len(weights): raise ValueError("Edge list and weights must have the same length.") - self.add_edges(edge_list, weights=weights, metadata=edge_metadata) + self.add_edges( + edge_list, + weights=weights, + metadata=edge_metadata + ) - # Nodes - def add_node(self, node, metadata=None): - """ - Add a node to the hypergraph. If the node is already in the hypergraph, nothing happens. + # ============================================================================= + # Implementation of Abstract Methods from IUndirectedHypergraph + # ============================================================================= + def _add_edge_implementation(self, edge, weight, metadata, **kwargs): + """ + Implementation of abstract method for adding a single edge. + Parameters ---------- - node : object - The node to add. + edge : tuple + The edge to add. + weight : float, optional + The weight of the edge. + metadata : dict, optional + The metadata of the edge. + **kwargs + Additional parameters (unused for Hypergraph). + """ + self.add_edge(edge, weight=weight, metadata=metadata) + def _extract_nodes_from_edge(self, edge) -> List: + """ + Extract node list from an edge representation. + For Hypergraph, edges are simple tuples of nodes. + + Parameters + ---------- + edge : tuple + The edge representation (tuple of nodes). + Returns ------- - None + list + List of nodes in the edge. """ - if metadata is None: - metadata = {} - if node not in self._adj: - self._adj[node] = [] - self._node_metadata[node] = {} - if self._node_metadata[node] == {}: - self._node_metadata[node] = metadata + return list(edge) - def add_nodes(self, node_list: list, metadata=None): + def _get_edge_size(self, edge_key) -> int: """ - Add a list of nodes to the hypergraph. - + Get the size of an edge given its key representation. + For Hypergraph, edge keys are tuples of nodes. + Parameters ---------- - node_list : list - The list of nodes to add. - + edge_key : tuple + The edge key representation (tuple of nodes). + Returns ------- - None + int + Size of the edge. """ - for node in node_list: - try: - self.add_node(node, metadata[node] if metadata is not None else None) - except KeyError: - raise ValueError( - "The metadata dictionary must contain an entry for each node in the node list." - ) + return len(edge_key) + + # ============================================================================= + # Hypergraph-Specific Node Management Implementation + # ============================================================================= def remove_node(self, node, keep_edges=False): """Remove a node from the hypergraph. @@ -121,7 +141,8 @@ def remove_node(self, node, keep_edges=False): node The node to remove. keep_edges : bool, optional - If True, the edges incident to the node are kept, but the node is removed from the edges. If False, the edges incident to the node are removed. Default is False. + If True, the edges incident to the node are kept, but the node is removed from the edges. + If False, the edges incident to the node are removed. Default is False. Returns ------- @@ -134,6 +155,7 @@ def remove_node(self, node, keep_edges=False): """ if node not in self._adj: raise KeyError("Node {} not in hypergraph.".format(node)) + if not keep_edges: self.remove_edges( [self._reverse_edge_list[edge_id] for edge_id in self._adj[node]] @@ -149,157 +171,16 @@ def remove_node(self, node, keep_edges=False): ) to_remove.append(edge) self.remove_edges(to_remove) + del self._adj[node] - - def remove_nodes(self, node_list, keep_edges=False): - """ - Remove a list of nodes from the hypergraph. - - Parameters - ---------- - node_list : list - The list of nodes to remove. - - keep_edges : bool, optional - If True, the edges incident to the nodes are kept, but the nodes are removed from the edges. If False, the edges incident to the nodes are removed. Default is False. - - - Returns - ------- - None - - Raises - ------ - KeyError - If any of the nodes is not in the hypergraph. - """ - for node in node_list: - self.remove_node(node, keep_edges=keep_edges) - - def get_nodes(self, metadata=False): - """ - Returns the list of nodes in the hypergraph. If metadata is True, it returns a list of tuples (node, metadata). - - Parameters - ---------- - metadata : bool, optional - - Returns - ------- - list - List of nodes in the hypergraph. If metadata is True, it returns a list of tuples (node, metadata). - """ - if not metadata: - return list(self._adj.keys()) - else: - return {node: self.get_node_metadata(node) for node in self._adj.keys()} - - def check_node(self, node): - """Checks if the specified node is in the hypergraph. - - Parameters - ---------- - node : Object - The node to check. - - Returns - ------- - bool - True if the node is in the hypergraph, False otherwise. - - """ - return node in self._adj - - def get_neighbors(self, node, order: int = None, size: int = None): - """ - Get the neighbors of a node in the hypergraph. - - Parameters - ---------- - node : object - The node of interest. - order : int - The order of the hyperedges to consider. - size : int - The size of the hyperedges to consider. - - Returns - ------- - set - The neighbors of the node. - - Raises - ------ - ValueError - If order and size are both specified or neither are specified. - """ - if node not in self._adj: - raise ValueError("Node {} not in hypergraph.".format(node)) - if order is not None and size is not None: - raise ValueError("Order and size cannot be both specified.") - if order is None and size is None: - neigh = set() - edges = self.get_incident_edges(node) - for edge in edges: - neigh.update(edge) - if node in neigh: - neigh.remove(node) - return neigh - else: - if order is None: - order = size - 1 - neigh = set() - edges = self.get_incident_edges(node, order=order) - for edge in edges: - neigh.update(edge) - if node in neigh: - neigh.remove(node) - return neigh - - def get_incident_edges(self, node, order: int = None, size: int = None): - """ - Get the incident hyperedges of a node in the hypergraph. - - Parameters - ---------- - node : object - The node of interest. - order : int - The order of the hyperedges to consider. - size : int - The size of the hyperedges to consider. - - Returns - ------- - list - The incident hyperedges of the node. - - Raises - ------ - ValueError - If the node is not in the hypergraph. - - """ - if node not in self._adj: - raise ValueError("Node {} not in hypergraph.".format(node)) - if order is not None and size is not None: - raise ValueError("Order and size cannot be both specified.") - if order is None and size is None: - return list( - [self._reverse_edge_list[edge_id] for edge_id in self._adj[node]] - ) - else: - if order is None: - order = size - 1 - return list( - [ - self._reverse_edge_list[edge_id] - for edge_id in self._adj[node] - if len(self._reverse_edge_list[edge_id]) - 1 == order - ] - ) - - # Edges + # Remove from parent's node metadata + if node in self._node_metadata: + del self._node_metadata[node] + + # ============================================================================= + # Hypergraph-Specific Edge Management Implementation + # ============================================================================= + def add_edge(self, edge, weight=None, metadata=None): """Add a hyperedge to the hypergraph. If the hyperedge is already in the hypergraph, its weight is updated. @@ -329,8 +210,7 @@ def add_edge(self, edge, weight=None, metadata=None): if weight is None: weight = 1 - edge = tuple(sorted(edge)) - order = len(edge) - 1 + edge = self._canon_edge(edge) if metadata is None: metadata = {} @@ -342,65 +222,15 @@ def add_edge(self, edge, weight=None, metadata=None): elif edge in self._edge_list and self._weighted: self._weights[self._edge_list[edge]] += weight - if metadata is not None: - self._edge_metadata[self._edge_list[edge]] = metadata - else: - self._edge_metadata[self._edge_list[edge]] = {} + # Set edge metadata using parent method + if edge in self._edge_list: + self.set_edge_metadata(edge, metadata) + # Update adjacency list for node in edge: self.add_node(node) - self._adj[node].append(self._edge_list[edge]) - - def add_edges(self, edge_list, weights=None, metadata=None): - """Add a list of hyperedges to the hypergraph. If a hyperedge is already in the hypergraph, its weight is updated. - - Parameters - ---------- - edge_list : list - The list of hyperedges to add. - - weights : list, optional - The list of weights of the hyperedges. If the hypergraph is weighted, this must be provided. - - metadata : list, optional - The list of metadata of the hyperedges. - - Returns - ------- - None - - Raises - ------ - ValueError - If the hypergraph is weighted and no weights are provided or if the hypergraph is not weighted and weights are provided. - - """ - if weights is not None and not self._weighted: - warnings.warn( - "Weights are provided but the hypergraph is not weighted. The weights will be ignored.", - UserWarning, - ) - self._weighted = True - - if self._weighted and weights is not None: - if len(set(edge_list)) != len(list(edge_list)): - raise ValueError( - "If weights are provided, the edge list must not contain repeated edges." - ) - if len(list(edge_list)) != len(list(weights)): - raise ValueError("The number of edges and weights must be the same.") - - i = 0 - if edge_list is not None: - for edge in edge_list: - self.add_edge( - edge, - weight=( - weights[i] if self._weighted and weights is not None else None - ), - metadata=metadata[i] if metadata is not None else None, - ) - i += 1 + if self._edge_list[edge] not in self._adj[node]: + self._adj[node].append(self._edge_list[edge]) def remove_edge(self, edge): """Remove an edge from the hypergraph. @@ -419,68 +249,25 @@ def remove_edge(self, edge): KeyError If the edge is not in the hypergraph. """ - edge = tuple(sorted(edge)) + edge = self._canon_edge(edge) if edge not in self._edge_list: raise KeyError("Edge {} not in hypergraph.".format(edge)) - for node in edge: - try: - self._adj[node].remove(self._edge_list[edge]) - except KeyError: - pass - - del self._reverse_edge_list[self._edge_list[edge]] - del self._edge_metadata[self._edge_list[edge]] - del self._weights[self._edge_list[edge]] - del self._edge_list[edge] - - def remove_edges(self, edge_list): - """ - Remove a list of edges from the hypergraph. - - Parameters - ---------- - edge_list : list - The list of edges to remove. - - Returns - ------- - None - - Raises - ------ - KeyError - """ - for edge in edge_list: - self.remove_edge(edge) - - def set_edge_list(self, edge_list): - self._edge_list = edge_list - - def get_edge_list(self): - return self._edge_list - - def add_empty_edge(self, name, metadata): - if name not in self._empty_edges: - self._empty_edges[name] = metadata - else: - raise ("Edge {} already in hypergraph.".format(name)) - - def check_edge(self, edge): - """Checks if the specified edge is in the hypergraph. - - Parameters - ---------- - edge : tuple - The edge to check. + edge_id = self._edge_list[edge] - Returns - ------- - bool - True if the edge is in the hypergraph, False otherwise. + del self._reverse_edge_list[edge_id] + if edge_id in self._weights: + del self._weights[edge_id] + if edge in self._edge_metadata: + del self._edge_metadata[edge] + + # Remove from adjacency lists + for node in edge: + if node in self._adj and edge_id in self._adj[node]: + self._adj[node].remove(edge_id) - """ - return tuple(sorted(edge)) in self._edge_list + # Remove from the edge list + del self._edge_list[edge] def get_edges( self, @@ -491,6 +278,7 @@ def get_edges( keep_isolated_nodes=False, metadata=False, ): + """Get edges from the hypergraph with various filtering options.""" if order is not None and size is not None: raise ValueError("Order and size cannot be both specified.") if not subhypergraph and keep_isolated_nodes: @@ -514,31 +302,21 @@ def get_edges( if len(edge) - 1 <= order ] + edge_metadata = [self.get_edge_metadata(edge) for edge in edges] + edge_weights = [self.get_weight(edge) for edge in edges] if self._weighted else None if subhypergraph and keep_isolated_nodes: h = Hypergraph(weighted=self._weighted) - h.add_nodes(list(self.get_nodes())) - if self._weighted: - edge_weights = [self.get_weight(edge) for edge in edges] - h.add_edges(edges, edge_weights) - else: - h.add_edges(edges) - - for node in h.get_nodes(): - h.set_node_metadata(node, self.get_node_metadata(node)) - for edge in edges: - h.set_edge_metadata(edge, self.get_edge_metadata(edge)) + nodes = list(self.get_nodes()) + node_metadata = [self.get_node_metadata(node) for node in nodes] + h.add_nodes(nodes, metadata=node_metadata) + h.add_edges(edges, weights=edge_weights, metadata=edge_metadata) return h + elif subhypergraph: h = Hypergraph(weighted=self._weighted) - if self._weighted: - edge_weights = [self.get_weight(edge) for edge in edges] - h.add_edges(edges, edge_weights) - else: - h.add_edges(edges) - - for edge in edges: - h.set_edge_metadata(edge, self.get_edge_metadata(edge)) + h.add_edges(edges, weights=edge_weights, metadata=edge_metadata) return h + else: return ( edges @@ -546,253 +324,69 @@ def get_edges( else {edge: self.get_edge_metadata(edge) for edge in edges} ) - # Weight - def set_weight(self, edge, weight): - """Sets the weight of the specified edge. - - Parameters - ---------- - edge : tuple - The edge to set the weight of. - - weight : float - The weight to set. - - Returns - ------- - None - - Raises - ------ - ValueError - If the edge is not in the hypergraph. - """ - if not self._weighted and weight != 1: - raise ValueError( - "If the hypergraph is not weighted, weight can be 1 or None." - ) - - edge = tuple(sorted(edge)) - if edge not in self._edge_list: - raise ValueError("Edge {} not in hypergraph.".format(edge)) - edge_id = self._edge_list[edge] - self._weights[edge_id] = weight - - def get_weight(self, edge): - """Returns the weight of the specified edge. - - Parameters - ---------- - edge : tuple - The edge to get the weight of. - - Returns - ------- - float - Weight of the specified edge. + def get_incident_edges(self, node, order: int = None, size: int = None): """ - edge = tuple(sorted(edge)) - if edge in self._edge_list: - edge_id = self._edge_list[edge] - return self._weights[edge_id] - else: - raise ValueError("Edge {} not in hypergraph.".format(edge)) - - def get_weights(self, order=None, size=None, up_to=False, asdict=False): - """Returns the list of weights of the edges in the hypergraph. If order is specified, it returns the list of weights of the edges of the specified order. - If size is specified, it returns the list of weights of the edges of the specified size. If both order and size are specified, it raises a ValueError. - If up_to is True, it returns the list of weights of the edges of order smaller or equal to the specified order. + Get the incident hyperedges of a node in the hypergraph. Parameters ---------- - order : int, optional - Order of the edges to get the weights of. - - size : int, optional - Size of the edges to get the weights of. - - up_to : bool, optional - If True, it returns the list of weights of the edges of order smaller or equal to the specified order. Default is False. + node : object + The node of interest. + order : int + The order of the hyperedges to consider. + size : int + The size of the hyperedges to consider. Returns ------- list - List of weights of the edges in the hypergraph. + The incident hyperedges of the node. Raises ------ ValueError - If both order and size are specified. - - """ - w = None - if order is not None and size is not None: - raise ValueError("Order and size cannot be both specified.") - if order is None and size is None: - w = { - edge: self._weights[self._edge_list[edge]] for edge in self.get_edges() - } - - if size is not None: - order = size - 1 - - if w is None: - w = { - edge: self._weights[self._edge_list[edge]] - for edge in self.get_edges(order=order, up_to=up_to) - } - - if asdict: - return w - else: - return list(w.values()) - - # Info - def max_order(self): - """ - Returns the maximum order of the hypergraph. - - Returns - ------- - int - Maximum order of the hypergraph. - """ - return self.max_size() - 1 - - def max_size(self): - """ - Returns the maximum size of the hypergraph. - - Returns - ------- - int - Maximum size of the hypergraph. - """ - return max(self.get_sizes()) - - def num_nodes(self): - """ - Returns the number of nodes in the hypergraph. - - Returns - ------- - int - Number of nodes in the hypergraph. - """ - return len(list(self.get_nodes())) - - def num_edges(self, order=None, size=None, up_to=False): - """Returns the number of edges in the hypergraph. If order is specified, it returns the number of edges of the specified order. - If size is specified, it returns the number of edges of the specified size. If both order and size are specified, it raises a ValueError. - If up_to is True, it returns the number of edges of order smaller or equal to the specified order. - - Parameters - ---------- - order : int, optional - Order of the edges to count. - size : int, optional - Size of the edges to count. - up_to : bool, optional - If True, it returns the number of edges of order smaller or equal to the specified order. Default is False. - - Returns - ------- - int - Number of edges in the hypergraph. + If the node is not in the hypergraph. """ + if node not in self._adj: + raise ValueError("Node {} not in hypergraph.".format(node)) if order is not None and size is not None: raise ValueError("Order and size cannot be both specified.") - if order is None and size is None: - return len(self._edge_list) + return list( + [self._reverse_edge_list[edge_id] for edge_id in self._adj[node]] + ) else: - if size is not None: + if order is None: order = size - 1 - if not up_to: - s = 0 - for edge in self._edge_list: - if len(edge) - 1 == order: - s += 1 - return s - else: - s = 0 - for edge in self._edge_list: - if len(edge) - 1 <= order: - s += 1 - return s - - def get_sizes(self): - """Returns the list of sizes of the hyperedges in the hypergraph. - - Returns - ------- - list - List of sizes of the hyperedges in the hypergraph. - - """ - return [len(edge) for edge in self._edge_list.keys()] - - def distribution_sizes(self): - """ - Returns the distribution of sizes of the hyperedges in the hypergraph. - - Returns - ------- - collections.Counter - Distribution of sizes of the hyperedges in the hypergraph. - """ - from collections import Counter - - return dict(Counter(self.get_sizes())) - - def get_orders(self): - """Returns the list of orders of the hyperedges in the hypergraph. - - Returns - ------- - list - List of orders of the hyperedges in the hypergraph. - - """ - return [len(edge) - 1 for edge in self._edge_list.keys()] - - def is_weighted(self): - """ - Check if the hypergraph is weighted. + return list( + [ + self._reverse_edge_list[edge_id] + for edge_id in self._adj[node] + if len(self._reverse_edge_list[edge_id]) - 1 == order + ] + ) - Returns - ------- - bool - True if the hypergraph is weighted, False otherwise. - """ - return self._weighted + # ============================================================================= + # Hypergraph-Specific Utility Methods + # ============================================================================= - def is_uniform(self): + def _restructure_query_edge(self, k: Tuple[Tuple, Any]): """ - Check if the hypergraph is uniform, i.e. all hyperedges have the same size. - - Returns - ------- - bool - True if the hypergraph is uniform, False otherwise. + An implementation-specific helper for modifying a query edge + prior to metadata retrieval. """ - uniform = True - sz = None - for edge in self._edge_list: - if sz is None: - sz = len(edge) - else: - if len(edge) != sz: - uniform = False - break - return uniform + return tuple(sorted(k)) - # Adj And Subhypergraph - def get_adj_dict(self): - return self._adj + def add_empty_edge(self, name, metadata): + """Add an empty edge with metadata.""" + if name not in self._empty_edges: + self._empty_edges[name] = metadata + else: + raise ValueError("Edge {} already in hypergraph.".format(name)) - def set_adj_dict(self, adj): - self._adj = adj + # ============================================================================= + # Subgraph Methods + # ============================================================================= def subhypergraph(self, nodes: list): """ @@ -817,7 +411,7 @@ def subhypergraph(self, nodes: list): if self._weighted: h.add_edge( edge, - weight=self._edge_list[edge], + weight=self.get_weight(edge), metadata=self.get_edge_metadata(edge), ) else: @@ -827,33 +421,14 @@ def subhypergraph(self, nodes: list): def subhypergraph_by_orders( self, orders: list = None, sizes: list = None, keep_nodes=True ): - """Return a subhypergraph induced by the edges of the specified orders. - - Parameters - ---------- - orders : list, optional - List of orders of the edges to be included in the subhypergraph. If None, the sizes parameter should be specified. - sizes : list, optional - List of sizes of the edges to be included in the subhypergraph. If None, the orders parameter should be specified. - keep_nodes : bool, optional - If True, the nodes of the original hypergraph are kept in the subhypergraph. If False, only the edges are kept. Default is True. - - Returns - ------- - Hypergraph - Subhypergraph induced by the edges of the specified orders. - - Raises - ------ - ValueError - If both orders and sizes are None or if both orders and sizes are specified. - """ + """Return a subhypergraph induced by the edges of the specified orders.""" if orders is None and sizes is None: raise ValueError( "At least one between orders and sizes should be specified" ) if orders is not None and sizes is not None: raise ValueError("Order and size cannot be both specified.") + h = Hypergraph(weighted=self.is_weighted()) if keep_nodes: h.add_nodes(node_list=list(self.get_nodes())) @@ -861,266 +436,75 @@ def subhypergraph_by_orders( h.set_node_metadata(node, self.get_node_metadata(node)) if sizes is None: - sizes = [] - for order in orders: - sizes.append(order + 1) + sizes = [order + 1 for order in orders] for size in sizes: edges = self.get_edges(size=size) for edge in edges: if h.is_weighted(): h.add_edge( - edge, self.get_weight(edge), self.get_edge_metadata(edge) + edge, + self.get_weight(edge), + self.get_edge_metadata(edge) ) else: - h.add_edge(edge, metadata=self.get_edge_metadata(edge)) + h.add_edge( + edge, + metadata=self.get_edge_metadata(edge) + ) return h - # Degree - def degree(self, node, order=None, size=None): - from hypergraphx.measures.degree import degree - - return degree(self, node, order=order, size=size) - - def degree_sequence(self, order=None, size=None): - from hypergraphx.measures.degree import degree_sequence - - return degree_sequence(self, order=order, size=size) - - def degree_distribution(self, order=None, size=None): - from hypergraphx.measures.degree import degree_distribution - - return degree_distribution(self, order=order, size=size) - - # Connected Components - def is_connected(self, size=None, order=None): - from hypergraphx.utils.cc import is_connected - - return is_connected(self, size=size, order=order) - - def connected_components(self, size=None, order=None): - from hypergraphx.utils.cc import connected_components - - return connected_components(self, size=size, order=order) - - def node_connected_component(self, node, size=None, order=None): - from hypergraphx.utils.cc import node_connected_component - - return node_connected_component(self, node, size=size, order=order) - - def num_connected_components(self, size=None, order=None): - from hypergraphx.utils.cc import num_connected_components - - return num_connected_components(self, size=size, order=order) - - def largest_component(self, size=None, order=None): - from hypergraphx.utils.cc import largest_component - - return largest_component(self, size=size, order=order) - def subhypergraph_largest_component(self, size=None, order=None): """ Returns a subhypergraph induced by the nodes in the largest component of the hypergraph. - - Parameters - ---------- - size: int, optional - The size of the hyperedges to consider - order: int, optional - The order of the hyperedges to consider - - Returns - ------- - Hypergraph - Subhypergraph induced by the nodes in the largest component of the hypergraph. """ nodes = self.largest_component(size=size, order=order) return self.subhypergraph(nodes) - def largest_component_size(self, size=None, order=None): - from hypergraphx.utils.cc import largest_component_size - - return largest_component_size(self, size=size, order=order) - - # Matrix - def binary_incidence_matrix(self, return_mapping: bool = False): - from hypergraphx.linalg import binary_incidence_matrix - - return binary_incidence_matrix(self, return_mapping) - - def incidence_matrix(self, return_mapping: bool = False): - from hypergraphx.linalg import incidence_matrix - - return incidence_matrix(self, return_mapping) - - def adjacency_matrix(self, return_mapping: bool = False): - from hypergraphx.linalg import adjacency_matrix - - return adjacency_matrix(self, return_mapping) - - # Utils - def isolated_nodes(self, size=None, order=None): - from hypergraphx.utils.cc import isolated_nodes - - return isolated_nodes(self, size=size, order=order) - - def is_isolated(self, node, size=None, order=None): - from hypergraphx.utils.cc import is_isolated - - return is_isolated(self, node, size=size, order=order) - - def dual_random_walk_adjacency(self, return_mapping: bool = False): - from hypergraphx.linalg import dual_random_walk_adjacency - - return dual_random_walk_adjacency(self, return_mapping) - - def adjacency_factor(self, t: int = 0): - from hypergraphx.linalg import adjacency_factor - - return adjacency_factor(self, t) + # ============================================================================= + # Projections and Transformations + # ============================================================================= def to_line_graph(self, distance="intersection", s: int = 1, weighted=False): from hypergraphx.representations.projections import line_graph - return line_graph(self, distance, s, weighted) - # Metadata - def set_hypergraph_metadata(self, metadata): - self._hypergraph_metadata = metadata - - def get_hypergraph_metadata(self): - return self._hypergraph_metadata - - def set_node_metadata(self, node, metadata): - if node not in self._adj: - raise ValueError("Node {} not in hypergraph.".format(node)) - self._node_metadata[node] = metadata - - def get_node_metadata(self, node): - if node not in self._adj: - raise ValueError("Node {} not in hypergraph.".format(node)) - return self._node_metadata[node] - - def get_all_nodes_metadata(self): - return self._node_metadata - - def set_edge_metadata(self, edge, metadata): - edge = tuple(sorted(edge)) - if edge not in self._edge_list: - raise ValueError("Edge {} not in hypergraph.".format(edge)) - self._edge_metadata[self._edge_list[edge]] = metadata - - def get_edge_metadata(self, edge): - edge = tuple(sorted(edge)) - if edge not in self._edge_list: - raise ValueError("Edge {} not in hypergraph.".format(edge)) - return self._edge_metadata[self._edge_list[edge]] - - def get_all_edges_metadata(self): - return self._edge_metadata + # ============================================================================= + # Incidence Metadata (specific implementation for undirected hypergraphs) + # ============================================================================= def set_incidence_metadata(self, edge, node, metadata): - if tuple(sorted(edge)) not in self._edge_list: + """Set incidence metadata for a specific edge-node pair.""" + edge = self._canon_edge(edge) + if edge not in self._edge_list: raise ValueError("Edge {} not in hypergraph.".format(edge)) self._incidences_metadata[(edge, node)] = metadata def get_incidence_metadata(self, edge, node): - if tuple(sorted(edge)) not in self._edge_list: + """Get incidence metadata for a specific edge-node pair.""" + edge = self._canon_edge(edge) + if edge not in self._edge_list: raise ValueError("Edge {} not in hypergraph.".format(edge)) - return self._incidences_metadata[(edge, node)] + return self._incidences_metadata.get((edge, node), {}) def get_all_incidences_metadata(self): + """Get all incidence metadata.""" return {k: v for k, v in self._incidences_metadata.items()} - def set_attr_to_hypergraph_metadata(self, field, value): - self._hypergraph_metadata[field] = value - - def set_attr_to_node_metadata(self, node, field, value): - if node not in self._node_metadata: - raise ValueError("Node {} not in hypergraph.".format(node)) - self._node_metadata[node][field] = value - - def set_attr_to_edge_metadata(self, edge, field, value): - edge = tuple(sorted(edge)) - if edge not in self._edge_list: - raise ValueError("Edge {} not in hypergraph.".format(edge)) - self._edge_metadata[self._edge_list[edge]][field] = value - - def remove_attr_from_node_metadata(self, node, field): - if node not in self._node_metadata: - raise ValueError("Node {} not in hypergraph.".format(node)) - del self._node_metadata[node][field] - - def remove_attr_from_edge_metadata(self, edge, field): - edge = tuple(sorted(edge)) - if edge not in self._edge_metadata: - raise ValueError("Edge {} not in hypergraph.".format(edge)) - del self._edge_metadata[self._edge_list[edge]][field] + # ============================================================================= + # Utility Methods + # ============================================================================= - # Basic Functions def clear(self): - self._edge_list.clear() - self._adj.clear() - self._weights.clear() - self._hypergraph_metadata.clear() - self._incidences_metadata.clear() - self._node_metadata.clear() - self._edge_metadata.clear() + """Clear all data from the hypergraph.""" + super().clear() # Calls IUndirectedHypergraph.clear() which calls IHypergraph.clear() self._empty_edges.clear() - self._reverse_edge_list.clear() - - def copy(self): - """ - Returns a copy of the hypergraph. - - Returns - ------- - Hypergraph - A copy of the hypergraph. - """ - return copy.deepcopy(self) - - def __str__(self): - """ - Returns a string representation of the hypergraph. - - Returns - ------- - str - A string representation of the hypergraph. - """ - title = "Hypergraph with {} nodes and {} edges.\n".format( - self.num_nodes(), self.num_edges() - ) - details = "Distribution of hyperedge sizes: {}".format( - self.distribution_sizes() - ) - return title + details - - def __len__(self): - """ - Returns the number of edges in the hypergraph. - - Returns - ------- - int - The number of edges in the hypergraph. - """ - return len(self._edge_list) - def __iter__(self): - """ - Returns an iterator over the edges in the hypergraph. + # ============================================================================= + # Serialization Support + # ============================================================================= - Returns - ------- - iterator - An iterator over the edges in the hypergraph. - """ - return iter(self._edge_list.items()) - - # Data Structure Extra def expose_data_structures(self): """ Expose the internal data structures of the hypergraph for serialization. @@ -1130,18 +514,12 @@ def expose_data_structures(self): dict A dictionary containing all internal attributes of the hypergraph. """ - return { + base_data = super().expose_data_structures() + base_data.update({ "type": "Hypergraph", - "_weighted": self._weighted, - "_adj": self._adj, - "_edge_list": self._edge_list, - "_weights": self._weights, - "hypergraph_metadata": self._hypergraph_metadata, - "node_metadata": self._node_metadata, - "edge_metadata": self._edge_metadata, - "reverse_edge_list": self._reverse_edge_list, - "next_edge_id": self._next_edge_id, - } + "empty_edges": self._empty_edges, + }) + return base_data def populate_from_dict(self, data): """ @@ -1152,17 +530,8 @@ def populate_from_dict(self, data): data : dict A dictionary containing the attributes to populate the hypergraph. """ - self._weighted = data.get("_weighted", False) - self._adj = data.get("_adj", {}) - self._edge_list = data.get("_edge_list", {}) - self._weights = data.get("_weights", {}) - self._hypergraph_metadata = data.get("hypergraph_metadata", {}) - self._incidences_metadata = data.get("incidences_metadata", {}) - self._node_metadata = data.get("node_metadata", {}) - self._edge_metadata = data.get("edge_metadata", {}) + super().populate_from_dict(data) self._empty_edges = data.get("empty_edges", {}) - self._reverse_edge_list = data.get("reverse_edge_list", {}) - self._next_edge_id = data.get("next_edge_id", 0) def expose_attributes_for_hashing(self): """ @@ -1181,7 +550,7 @@ def expose_attributes_for_hashing(self): { "nodes": sorted_edge, "weight": self._weights.get(edge_id, 1), - "metadata": self._edge_metadata.get(edge_id, {}), + "metadata": self.get_edge_metadata(edge) } ) @@ -1195,17 +564,4 @@ def expose_attributes_for_hashing(self): "hypergraph_metadata": self._hypergraph_metadata, "edges": edges, "nodes": nodes, - } - - def get_mapping(self): - """ - Map the nodes of the hypergraph to integers in [0, n_nodes). - - Returns - ------- - LabelEncoder - The mapping. - """ - encoder = LabelEncoder() - encoder.fit(self.get_nodes()) - return encoder + } \ No newline at end of file diff --git a/hypergraphx/core/multiplex_hypergraph.py b/hypergraphx/core/multiplex_hypergraph.py index cc8e91f..aa14e6f 100644 --- a/hypergraphx/core/multiplex_hypergraph.py +++ b/hypergraphx/core/multiplex_hypergraph.py @@ -1,36 +1,28 @@ -from hypergraphx import Hypergraph - - -def _canon_edge(edge): - edge = tuple(edge) - - if len(edge) == 2: - if isinstance(edge[0], tuple) and isinstance(edge[1], tuple): - # Sort the inner tuples and return - return (tuple(sorted(edge[0])), tuple(sorted(edge[1]))) - elif not isinstance(edge[0], tuple) and not isinstance(edge[1], tuple): - # Sort the edge itself if it contains IDs (non-tuple elements) - return tuple(sorted(edge)) +from typing import Tuple, Any, List, Dict, Set, Optional, Union +from collections import Counter - return tuple(sorted(edge)) +from hypergraphx.core.i_undirected_hypergraph import IUndirectedHypergraph +from hypergraphx import Hypergraph -class MultiplexHypergraph: +class MultiplexHypergraph(IUndirectedHypergraph): """ A Multiplex Hypergraph is a hypergraph where hyperedges are organized into multiple layers. - Each layer share the same node-set and represents a specific context or relationship between nodes, and hyperedges can + Each layer shares the same node-set and represents a specific context or relationship between nodes, and hyperedges can have weights and metadata specific to their layer. + + This implementation inherits from IUndirectedHypergraph to leverage common functionality. """ def __init__( self, - edge_list=None, - edge_layer=None, - weighted=False, - weights=None, - hypergraph_metadata=None, - node_metadata=None, - edge_metadata=None, + edge_list: Optional[List]=None, + edge_layer: Optional[List]=None, + weighted: bool = False, + weights: Optional[List[int]]=None, + hypergraph_metadata: Optional[Dict] = None, + node_metadata: Optional[Dict] = None, + edge_metadata: Optional[List[Dict]] = None ): """ Initialize a Multiplex Hypergraph with optional edges, layers, weights, and metadata. @@ -60,27 +52,23 @@ def __init__( If `edge_list` and `edge_layer` have mismatched lengths. If `edge_list` contains improperly formatted edges when `edge_layer` is None. """ - # Initialize hypergraph metadata - self._hypergraph_metadata = hypergraph_metadata or {} - self._hypergraph_metadata.update( - {"weighted": weighted, "type": "MultiplexHypergraph"} + # Update hypergraph metadata with multiplex-specific info + multiplex_metadata = hypergraph_metadata or {} + multiplex_metadata.update({"type": "MultiplexHypergraph"}) + + # Call parent constructor + super().__init__( + edge_list=None, # We'll handle edge_list ourselves + weighted=weighted, + weights=None, # We'll handle weights ourselves + hypergraph_metadata=multiplex_metadata, + node_metadata=node_metadata, + edge_metadata=None # We'll handle edge_metadata ourselves ) - - # Initialize core attributes - self._node_metadata = {} - self._edge_metadata = {} - self._weighted = weighted - self._weights = {} - self._edge_list = {} - self._adj = {} - self._reverse_edge_list = {} - self._next_edge_id = 0 + + # Initialize multiplex-specific attributes self._existing_layers = set() - - # Add node metadata if provided - if node_metadata: - for node, metadata in node_metadata.items(): - self.add_node(node, metadata=metadata) + self._incidence_metadata = {} # Store incidence metadata # Handle edge and layer consistency if edge_list is not None and edge_layer is None: @@ -98,159 +86,228 @@ def __init__( raise ValueError("Edge list and edge layer must have the same length.") self.add_edges( edge_list, - edge_layer=edge_layer, weights=weights, metadata=edge_metadata, + edge_layer=edge_layer, ) - def get_adj_dict(self): - return self._adj - - def set_adj_dict(self, adj_dict): - self._adj = adj_dict - - def get_incident_edges(self, node): - if node not in self._adj: - raise ValueError("Node {} not in hypergraph.".format(node)) - return [self._reverse_edge_list[e_id] for e_id in self._adj[node]] - - def degree(self, node, order=None, size=None): - from hypergraphx.measures.degree import degree - - return degree(self, node, order=order, size=size) - - def degree_sequence(self, order=None, size=None): - from hypergraphx.measures.degree import degree_sequence + # ============================================================================= + # Implementation of Abstract Methods from IUndirectedHypergraph + # ============================================================================= - return degree_sequence(self, order=order, size=size) - - def get_edge_metadata(self, edge, layer): - edge = tuple(sorted(edge)) - k = (edge, layer) - if k not in self._edge_list: - raise ValueError("Edge {} not in hypergraph.".format(edge)) - return self._edge_metadata[self._edge_list[k]] - - def is_weighted(self): - return self._weighted - - def get_edge_list(self): - return self._edge_list - - def set_edge_list(self, edge_list): - self._edge_list = edge_list - - def get_existing_layers(self): - return self._existing_layers - - def set_existing_layers(self, existing_layers): - self._existing_layers = existing_layers - - def get_nodes(self, metadata=False): - if metadata: - return self._node_metadata + def _add_edge_implementation(self, edge, weight, metadata, **kwargs): + """ + Implementation of abstract method for adding a single edge. + + Parameters + ---------- + edge : tuple + The edge to add. + weight : float, optional + The weight of the edge. + metadata : dict, optional + The metadata of the edge. + **kwargs + Must contain 'edge_layer' parameter for MultiplexHypergraph. + """ + edge_layer = kwargs.get('edge_layer') + if edge_layer is None: + raise ValueError("edge_layer must be provided for MultiplexHypergraph") + + # This method is called for each individual edge, so edge_layer should be a single layer + if isinstance(edge_layer, list): + raise ValueError("_add_edge_implementation expects a single layer, not a list") + + self.add_edge(edge, edge_layer, weight=weight, metadata=metadata) + + def _extract_nodes_from_edge(self, edge) -> List: + """ + Extract node list from an edge representation. + For MultiplexHypergraph, edges are tuples of (nodes, layer). + + Parameters + ---------- + edge : tuple + The edge representation ((nodes...), layer). + + Returns + ------- + list + List of nodes in the edge. + """ + if isinstance(edge, tuple) and len(edge) == 2: + nodes, layer = edge + return list(nodes) else: - return list(self._node_metadata.keys()) + # Handle case where edge might be just nodes + return list(edge) - def add_node(self, node, metadata=None): + def _get_edge_size(self, edge_key) -> int: """ - Add a node to the hypergraph. If the node is already in the hypergraph, nothing happens. - + Get the size of an edge given its key representation. + For MultiplexHypergraph, edge keys are ((nodes...), layer). + Parameters ---------- - node : object - The node to add. - + edge_key : tuple + The edge key representation ((nodes...), layer). + Returns ------- - None + int + Size of the edge. """ - if metadata is None: - metadata = {} - if node not in self._adj: - self._adj[node] = [] - self._node_metadata[node] = {} - if self._node_metadata[node] == {}: - self._node_metadata[node] = metadata + if isinstance(edge_key, tuple) and len(edge_key) == 2: + nodes, layer = edge_key + return len(nodes) + else: + return len(edge_key) - def add_nodes(self, node_list: list, node_metadata=None): + # ============================================================================= + # Incidence Metadata Methods Implementation + # ============================================================================= + + def get_all_incidences_metadata(self) -> Dict: """ - Add a list of nodes to the hypergraph. + Get all incidence metadata for the hypergraph. + + Returns + ------- + dict + Dictionary mapping (node, edge_key) tuples to their incidence metadata. + """ + return self._incidence_metadata.copy() + def get_incidence_metadata(self, node: Any, edge: Tuple, layer: str = None) -> Dict: + """ + Get the incidence metadata for a specific node-edge pair. + Parameters ---------- - node_list : list - The list of nodes to add. - + node : object + The node in the incidence. + edge : tuple + The edge nodes or full edge representation. + layer : str, optional + The layer of the edge. Required if edge format doesn't include layer. + Returns ------- - None + dict + The incidence metadata dictionary. """ - for node in node_list: - try: - self.add_node( - node, node_metadata[node] if node_metadata is not None else None - ) - except KeyError: - raise ValueError( - "The metadata dictionary must contain an entry for each node in the node list." - ) - - def add_edges(self, edge_list, edge_layer, weights=None, metadata=None): - """Add a list of hyperedges to the hypergraph. If a hyperedge is already in the hypergraph, its weight is updated. + # Determine the edge key format + if layer is None: + if isinstance(edge, tuple) and len(edge) == 2 and isinstance(edge[1], str): + nodes, layer = edge + edge_key = (self._canon_edge(nodes), layer) + else: + raise ValueError("Layer must be provided or edge must be in format ((nodes...), layer)") + else: + edge_key = (self._canon_edge(edge), layer) + + incidence_key = (node, edge_key) + return self._incidence_metadata.get(incidence_key, {}) + def set_incidence_metadata(self, node: Any, edge: Tuple, metadata: Dict, layer: str = None) -> None: + """ + Set the incidence metadata for a specific node-edge pair. + Parameters ---------- - edge_list : list - The list of hyperedges to add. - - edge_layer : list - The list of layers to which the hyperedges belong. - - weights : list, optional - The list of weights of the hyperedges. If the hypergraph is weighted, this must be provided. - - metadata : list, optional - The list of metadata of the hyperedges. + node : object + The node in the incidence. + edge : tuple + The edge nodes or full edge representation. + metadata : dict + The metadata dictionary to set. + layer : str, optional + The layer of the edge. Required if edge format doesn't include layer. + """ + # Determine the edge key format + if layer is None: + if isinstance(edge, tuple) and len(edge) == 2 and isinstance(edge[1], str): + nodes, layer = edge + edge_key = (self._canon_edge(nodes), layer) + else: + raise ValueError("Layer must be provided or edge must be in format ((nodes...), layer)") + else: + edge_key = (self._canon_edge(edge), layer) + + # Verify the edge exists in the hypergraph + if edge_key not in self._edge_list: + raise ValueError(f"Edge {edge_key} not in hypergraph.") + + # Verify the node is part of the edge + edge_nodes, _ = edge_key + if node not in edge_nodes: + raise ValueError(f"Node {node} is not part of edge {edge_key}.") + + incidence_key = (node, edge_key) + self._incidence_metadata[incidence_key] = metadata + + # ============================================================================= + # MultiplexHypergraph-Specific Node Management Implementation + # ============================================================================= + + def remove_node(self, node: Any, keep_edges: bool = False) -> None: + """ + Remove a node from the multiplex hypergraph. - Returns - ------- - None + Parameters + ---------- + node : object + The node to remove. + keep_edges : bool, optional + If True, edges incident to the node are kept but updated to exclude the node. + If False, edges incident to the node are removed entirely. Default is False. Raises ------ ValueError - If the hypergraph is weighted and no weights are provided or if the hypergraph is not weighted and weights are provided. - + If the node is not in the hypergraph. """ - if weights is not None and not self._weighted: - print( - "Warning: weights are provided but the hypergraph is not weighted. The hypergraph will be weighted." - ) - self._weighted = True + if node not in self._adj: + raise ValueError(f"Node {node} not in hypergraph.") - if self._weighted and weights is not None: - if len(set(edge_list)) != len(list(edge_list)): - raise ValueError( - "If weights are provided, the edge list must not contain repeated edges." - ) - if len(list(edge_list)) != len(list(weights)): - raise ValueError("The number of edges and weights must be the same.") + edges_to_process = list(self._adj[node]) - i = 0 - if edge_list is not None: - for edge in edge_list: - self.add_edge( - edge, - edge_layer[i], - weight=( - weights[i] if self._weighted and weights is not None else None - ), - metadata=metadata[i] if metadata is not None else None, - ) - i += 1 + # Clean up incidence metadata for this node + keys_to_remove = [key for key in self._incidence_metadata.keys() if key[0] == node] + for key in keys_to_remove: + del self._incidence_metadata[key] + + if keep_edges: + for edge_id in edges_to_process: + edge, layer = self._reverse_edge_list[edge_id] + updated_edge = tuple(n for n in edge if n != node) + + # Get current metadata and weight before removing + current_weight = self._weights.get(edge_id, 1) + current_metadata = self.get_edge_metadata(edge, layer) + + self.remove_edge((edge, layer)) + if updated_edge: + self.add_edge( + updated_edge, + layer, + weight=current_weight, + metadata=current_metadata, + ) + else: + for edge_id in edges_to_process: + edge, layer = self._reverse_edge_list[edge_id] + self.remove_edge((edge, layer)) + + del self._adj[node] + if node in self._node_metadata: + del self._node_metadata[node] + + # ============================================================================= + # MultiplexHypergraph-Specific Edge Management Implementation + # ============================================================================= - def add_edge(self, edge, layer, weight=None, metadata=None): + def add_edge(self, edge, layer, weight=None, metadata=None) -> None: """Add a hyperedge to the hypergraph. If the hyperedge is already in the hypergraph, its weight is updated. Parameters @@ -283,9 +340,8 @@ def add_edge(self, edge, layer, weight=None, metadata=None): self._existing_layers.add(layer) - edge = _canon_edge(edge) + edge = self._canon_edge(edge) k = (edge, layer) - order = len(edge) - 1 if k not in self._edge_list: e_id = self._next_edge_id @@ -301,141 +357,365 @@ def add_edge(self, edge, layer, weight=None, metadata=None): if metadata is None: metadata = {} - self._edge_metadata[e_id] = metadata + self._edge_metadata[k] = metadata for node in edge: self.add_node(node) for node in edge: - self._adj[node].append(e_id) + if e_id not in self._adj[node]: + self._adj[node].append(e_id) - def remove_edge(self, edge): + def add_edges(self, edge_list, weights=None, metadata=None, edge_layer=None, **kwargs) -> None: + """Add a list of hyperedges to the hypergraph. + + Parameters + ---------- + edge_list : list + The list of hyperedges to add. + weights : list, optional + The list of weights of the hyperedges. + metadata : list, optional + The list of metadata of the hyperedges. + edge_layer : list + The list of layers to which the hyperedges belong. + **kwargs + Additional parameters. + """ + if edge_layer is None: + raise ValueError("edge_layer must be provided for MultiplexHypergraph") + + # Validate lengths + if len(edge_list) != len(edge_layer): + raise ValueError("edge_list and edge_layer must have the same length") + + if weights is not None and len(edge_list) != len(weights): + raise ValueError("edge_list and weights must have the same length") + + if metadata is not None and len(edge_list) != len(metadata): + raise ValueError("edge_list and metadata must have the same length") + + # Process each edge individually with its corresponding layer + for i, edge in enumerate(edge_list): + weight = weights[i] if weights is not None else None + edge_metadata = metadata[i] if metadata is not None else None + layer = edge_layer[i] + + # Call _add_edge_implementation with individual layer + self._add_edge_implementation(edge, weight, edge_metadata, edge_layer=layer) + + def remove_edge(self, edge, layer=None) -> None: """ Remove an edge from the multiplex hypergraph. Parameters ---------- edge : tuple - The edge to remove. Should be of the form ((nodes...), layer). + The edge to remove. Can be either the edge nodes or ((nodes...), layer). + layer : str, optional + The layer of the edge. Required if edge is just the nodes. Raises ------ ValueError If the edge is not in the hypergraph. """ - edge = _canon_edge(edge) - if edge not in self._edge_list: - raise ValueError(f"Edge {edge} not in hypergraph.") + # Handle both formats: edge as (nodes, layer) or separate edge and layer + if layer is None: + if isinstance(edge, tuple) and len(edge) == 2 and isinstance(edge[1], str): + nodes, layer = edge + edge_key = (self._canon_edge(nodes), layer) + else: + raise ValueError("Layer must be provided or edge must be in format ((nodes...), layer)") + else: + edge_key = (self._canon_edge(edge), layer) + + if edge_key not in self._edge_list: + raise ValueError(f"Edge {edge_key} not in hypergraph.") - edge_id = self._edge_list[edge] + edge_id = self._edge_list[edge_key] + + # Clean up incidence metadata for this edge + keys_to_remove = [key for key in self._incidence_metadata.keys() if key[1] == edge_key] + for key in keys_to_remove: + del self._incidence_metadata[key] del self._reverse_edge_list[edge_id] if edge_id in self._weights: del self._weights[edge_id] - if edge_id in self._edge_metadata: - del self._edge_metadata[edge_id] + if edge_key in self._edge_metadata: + del self._edge_metadata[edge_key] - nodes, layer = edge + nodes, layer = edge_key for node in nodes: if edge_id in self._adj[node]: self._adj[node].remove(edge_id) - del self._edge_list[edge] + del self._edge_list[edge_key] - def remove_node(self, node, keep_edges=False): + def get_edges(self, metadata: bool = False, layer: str = None): """ - Remove a node from the multiplex hypergraph. + Get edges from the hypergraph. + + Parameters + ---------- + metadata : bool, optional + If True, return edge metadata dictionary. If False, return list of edges. + layer : str, optional + If provided, only return edges from the specified layer. + + Returns + ------- + list or dict + List of edges or dictionary of edge metadata. + """ + if layer is not None: + # Filter edges by layer + filtered_edges = {k: v for k, v in self._edge_list.items() if k[1] == layer} + if metadata: + return { + self._reverse_edge_list[v]: self._edge_metadata[k] + for k, v in filtered_edges.items() + } + else: + return list(filtered_edges.keys()) + else: + if metadata: + return { + self._reverse_edge_list[v]: self._edge_metadata[k] + for k, v in self._edge_list.items() + } + else: + return list(self._edge_list.keys()) + + def get_incident_edges(self, node: Any, order: int = None, size: int = None) -> List[Tuple]: + """ + Get the incident edges of a node. Parameters ---------- node : object - The node to remove. - keep_edges : bool, optional - If True, edges incident to the node are kept but updated to exclude the node. - If False, edges incident to the node are removed entirely. Default is False. + The node of interest. + order : int, optional + The order of the hyperedges to consider. If None, all hyperedges are considered. + size : int, optional + The size of the hyperedges to consider. If None, all hyperedges are considered. - Raises - ------ - ValueError - If the node is not in the hypergraph. + Returns + ------- + list + The list of incident edges. """ + if order is not None and size is not None: + raise ValueError("Cannot specify both order and size") + if order is None and size is None: + target_size = None + elif order is not None: + target_size = order + 1 + else: + target_size = size + if node not in self._adj: - raise ValueError(f"Node {node} not in hypergraph.") + raise ValueError("Node {} not in hypergraph.".format(node)) - edges_to_process = list(self._adj[node]) + incident_edges = [] + for edge_id in self._adj[node]: + edge, layer = self._reverse_edge_list[edge_id] + if target_size is None or len(edge) == target_size: + incident_edges.append((edge, layer)) - if keep_edges: - for edge_id in edges_to_process: - edge, layer = self._reverse_edge_list[edge_id] - updated_edge = tuple(n for n in edge if n != node) + return incident_edges - self.remove_edge((edge, layer)) - if updated_edge: - self.add_edge( - updated_edge, - layer, - weight=self._weights.get(edge_id, 1), - metadata=self._edge_metadata.get(edge_id, {}), - ) - else: - for edge_id in edges_to_process: - edge, layer = self._reverse_edge_list[edge_id] - self.remove_edge((edge, layer)) + # ============================================================================= + # MultiplexHypergraph-Specific Weight Management + # ============================================================================= - del self._adj[node] - if node in self._node_metadata: - del self._node_metadata[node] + def get_weight(self, edge, layer=None): + """Returns the weight of the specified edge. + + Parameters + ---------- + edge : tuple + The edge to get the weight of. + layer : str, optional + The layer of the edge. Required if edge format doesn't include layer. - def get_edges(self, metadata=False): - if metadata: - return { - self._reverse_edge_list[k]: self._edge_metadata[k] - for k in self._edge_metadata.keys() - } + Returns + ------- + float + Weight of the specified edge. + """ + if layer is None: + if isinstance(edge, tuple) and len(edge) == 2 and isinstance(edge[1], str): + nodes, layer = edge + k = (self._canon_edge(nodes), layer) + else: + raise ValueError("Layer must be provided or edge must be in format ((nodes...), layer)") else: - return list(self._edge_list.keys()) + k = (self._canon_edge(edge), layer) - def get_weight(self, edge, layer): - edge = _canon_edge(edge) - k = (edge, layer) if k not in self._edge_list: raise ValueError("Edge {} not in hypergraph.".format(k)) else: return self._weights[self._edge_list[k]] - def set_weight(self, edge, layer, weight): + def set_weight(self, edge, layer, weight) -> None: + """Sets the weight of the specified edge. + + Parameters + ---------- + edge : tuple + The edge to set the weight of. + layer : str + The layer of the edge. + weight : float + The weight to set. + + Returns + ------- + None + + Raises + ------ + ValueError + If the edge is not in the hypergraph. + """ if not self._weighted and weight != 1: raise ValueError( "If the hypergraph is not weighted, weight can be 1 or None." ) - k = (_canon_edge(edge), layer) + k = (self._canon_edge(edge), layer) + if k not in self._edge_list: raise ValueError("Edge {} not in hypergraph.".format(edge)) else: self._weights[self._edge_list[k]] = weight + def get_weights(self, order=None, size=None, up_to=False, asdict=False, layer=None): + """Returns the list of weights of the edges in the hypergraph. + + Parameters + ---------- + order : int, optional + Order of the edges to get the weights of. + size : int, optional + Size of the edges to get the weights of. + up_to : bool, optional + If True, it returns the list of weights of the edges of order smaller or equal to the specified order. + asdict : bool, optional + If True, return as dictionary mapping edges to weights. + layer : str, optional + If provided, only return weights for edges in the specified layer. + + Returns + ------- + list or dict + List or dictionary of weights of the edges in the hypergraph. + + Raises + ------ + ValueError + If both order and size are specified. + """ + if order is not None and size is not None: + raise ValueError("Cannot specify both order and size") + + if order is None and size is None: + target_size = None + elif order is not None: + target_size = order + 1 + else: + target_size = size + + weights = [] + weight_dict = {} + + for edge_key, edge_id in self._edge_list.items(): + nodes, edge_layer = edge_key + + # Filter by layer if specified + if layer is not None and edge_layer != layer: + continue + + edge_size = len(nodes) + + # Filter by size/order + if target_size is not None: + if up_to and edge_size > target_size: + continue + elif not up_to and edge_size != target_size: + continue + + weight = self._weights.get(edge_id, 1) + weights.append(weight) + if asdict: + weight_dict[edge_key] = weight + + return weight_dict if asdict else weights + + # ============================================================================= + # MultiplexHypergraph-Specific Utility Methods + # ============================================================================= + + def _canon_edge(self, edge: Tuple) -> Tuple: + """ + Gets the canonical form of an edge by sorting its components. + For MultiplexHypergraph, handles both simple edges and complex edge structures. + """ + edge = tuple(edge) + + if len(edge) == 2: + if isinstance(edge[0], tuple) and isinstance(edge[1], tuple): + # Sort the inner tuples and return + return (tuple(sorted(edge[0])), tuple(sorted(edge[1]))) + elif not isinstance(edge[0], tuple) and not isinstance(edge[1], tuple): + # Sort the edge itself if it contains IDs (non-tuple elements) + return tuple(sorted(edge)) + + return tuple(sorted(edge)) + + def _restructure_query_edge(self, k: Tuple[Tuple, Any], layer=None): + """ + An implementation-specific helper for modifying a query edge + prior to metadata retrieval. + """ + if layer is not None: + return (k, layer) + return k + + def get_existing_layers(self): + """Get the set of existing layers.""" + return self._existing_layers + + def set_existing_layers(self, existing_layers): + """Set the existing layers.""" + self._existing_layers = existing_layers + + # ============================================================================= + # MultiplexHypergraph-Specific Methods + # ============================================================================= + def set_dataset_metadata(self, metadata): + """Set dataset-level metadata.""" self._hypergraph_metadata["multiplex_metadata"] = metadata def get_dataset_metadata(self): - return self._hypergraph_metadata["multiplex_metadata"] + """Get dataset-level metadata.""" + return self._hypergraph_metadata.get("multiplex_metadata", {}) def set_layer_metadata(self, layer_name, metadata): + """Set metadata for a specific layer.""" if layer_name not in self._hypergraph_metadata: self._hypergraph_metadata[layer_name] = {} self._hypergraph_metadata[layer_name] = metadata def get_layer_metadata(self, layer_name): - return self._hypergraph_metadata[layer_name] - - def get_hypergraph_metadata(self): - return self._hypergraph_metadata - - def set_hypergraph_metadata(self, metadata): - self._hypergraph_metadata = metadata + """Get metadata for a specific layer.""" + return self._hypergraph_metadata.get(layer_name, {}) def aggregated_hypergraph(self): + """Create an aggregated hypergraph combining all layers.""" h = Hypergraph( weighted=self._weighted, hypergraph_metadata=self._hypergraph_metadata ) @@ -450,32 +730,21 @@ def aggregated_hypergraph(self): ) return h - def set_attr_to_hypergraph_metadata(self, field, value): - self._hypergraph_metadata[field] = value + # ============================================================================= + # Utility Methods + # ============================================================================= - def set_attr_to_node_metadata(self, node, field, value): - if node not in self._node_metadata: - raise ValueError("Node {} not in hypergraph.".format(node)) - self._node_metadata[node][field] = value + def clear(self): + """Clear all data from the hypergraph.""" + super().clear() + self._existing_layers.clear() + self._incidence_metadata.clear() - def set_attr_to_edge_metadata(self, edge, layer, field, value): - edge = _canon_edge(edge) - if edge not in self._edge_metadata: - raise ValueError("Edge {} not in hypergraph.".format(edge)) - self._edge_metadata[self._edge_list[(edge, layer)]][field] = value - - def remove_attr_from_node_metadata(self, node, field): - if node not in self._node_metadata: - raise ValueError("Node {} not in hypergraph.".format(node)) - del self._node_metadata[node][field] + # ============================================================================= + # Serialization Support + # ============================================================================= - def remove_attr_from_edge_metadata(self, edge, layer, field): - edge = _canon_edge(edge) - if edge not in self._edge_metadata: - raise ValueError("Edge {} not in hypergraph.".format(edge)) - del self._edge_metadata[self._edge_list[(edge, layer)]][field] - - def expose_data_structures(self): + def expose_data_structures(self) -> Dict: """ Expose the internal data structures of the multiplex hypergraph for serialization. @@ -484,21 +753,15 @@ def expose_data_structures(self): dict A dictionary containing all internal attributes of the multiplex hypergraph. """ - return { + base_data = super().expose_data_structures() + base_data.update({ "type": "MultiplexHypergraph", - "hypergraph_metadata": self._hypergraph_metadata, - "node_metadata": self._node_metadata, - "edge_metadata": self._edge_metadata, - "_weighted": self._weighted, - "_weights": self._weights, - "_edge_list": self._edge_list, - "_adj": self._adj, - "reverse_edge_list": self._reverse_edge_list, - "next_edge_id": self._next_edge_id, "existing_layers": self._existing_layers, - } + "incidence_metadata": self._incidence_metadata, + }) + return base_data - def populate_from_dict(self, data): + def populate_from_dict(self, data: Dict) -> None: """ Populate the attributes of the multiplex hypergraph from a dictionary. @@ -507,18 +770,11 @@ def populate_from_dict(self, data): data : dict A dictionary containing the attributes to populate the hypergraph. """ - self._hypergraph_metadata = data.get("hypergraph_metadata", {}) - self._node_metadata = data.get("node_metadata", {}) - self._edge_metadata = data.get("edge_metadata", {}) - self._weighted = data.get("_weighted", False) - self._weights = data.get("_weights", {}) - self._edge_list = data.get("_edge_list", {}) - self._adj = data.get("_adj", {}) - self._reverse_edge_list = data.get("reverse_edge_list", {}) - self._next_edge_id = data.get("next_edge_id", 0) + super().populate_from_dict(data) self._existing_layers = data.get("existing_layers", set()) + self._incidence_metadata = data.get("incidence_metadata", {}) - def expose_attributes_for_hashing(self): + def expose_attributes_for_hashing(self) -> dict: """ Expose relevant attributes for hashing specific to MultiplexHypergraph. @@ -535,7 +791,7 @@ def expose_attributes_for_hashing(self): { "nodes": edge, "weight": self._weights.get(edge_id, 1), - "metadata": self._edge_metadata.get(edge_id, {}), + "metadata": self.get_edge_metadata(edge=edge[0], layer=edge[1]), } ) @@ -549,4 +805,5 @@ def expose_attributes_for_hashing(self): "hypergraph_metadata": self._hypergraph_metadata, "edges": edges, "nodes": nodes, - } + "incidence_metadata": self._incidence_metadata, + } \ No newline at end of file diff --git a/hypergraphx/core/temporal_hypergraph.py b/hypergraphx/core/temporal_hypergraph.py index 7525581..ca5f7df 100644 --- a/hypergraphx/core/temporal_hypergraph.py +++ b/hypergraphx/core/temporal_hypergraph.py @@ -1,26 +1,16 @@ import copy import math +from typing import Tuple, Any, List, Dict, Optional, Union +from collections import Counter from sklearn.preprocessing import LabelEncoder from hypergraphx import Hypergraph - - -def _canon_edge(edge): - edge = tuple(edge) - - if len(edge) == 2: - if isinstance(edge[0], tuple) and isinstance(edge[1], tuple): - # Sort the inner tuples and return - return (tuple(sorted(edge[0])), tuple(sorted(edge[1]))) - elif not isinstance(edge[0], tuple) and not isinstance(edge[1], tuple): - # Sort the edge itself if it contains IDs (non-tuple elements) - return tuple(sorted(edge)) - - return tuple(sorted(edge)) +from hypergraphx.core.i_undirected_hypergraph import IUndirectedHypergraph def _get_size(edge): + """Get the size of an edge.""" if len(edge) == 2 and isinstance(edge[0], tuple) and isinstance(edge[1], tuple): return len(edge[0]) + len(edge[1]) else: @@ -28,17 +18,19 @@ def _get_size(edge): def _get_order(edge): + """Get the order of an edge.""" return _get_size(edge) - 1 def _get_nodes(edge): + """Get all nodes from an edge.""" if len(edge) == 2 and isinstance(edge[0], tuple) and isinstance(edge[1], tuple): return list(edge[0]) + list(edge[1]) else: return list(edge) -class TemporalHypergraph: +class TemporalHypergraph(IUndirectedHypergraph): """ A Temporal Hypergraph is a hypergraph where each hyperedge is associated with a specific timestamp. Temporal hypergraphs are useful for modeling systems where interactions between nodes change over time, such as social networks, @@ -47,13 +39,13 @@ class TemporalHypergraph: def __init__( self, - edge_list=None, - time_list=None, - weighted=False, - weights=None, - hypergraph_metadata=None, - node_metadata=None, - edge_metadata=None, + edge_list: Optional[List]=None, + time_list: Optional[List]=None, + weighted: bool = False, + weights: Optional[List[int]]=None, + hypergraph_metadata: Optional[Dict] = None, + node_metadata: Optional[Dict] = None, + edge_metadata: Optional[List[Dict]] = None ): """ Initialize a Temporal Hypergraph with optional edges, times, weights, and metadata. @@ -85,28 +77,19 @@ def __init__( If `edge_list` contains improperly formatted edges when `time_list` is None. If `time_list` is provided without `edge_list`. """ - # Initialize hypergraph metadata - self._hypergraph_metadata = hypergraph_metadata or {} - self._hypergraph_metadata.update( - {"weighted": weighted, "type": "TemporalHypergraph"} + # Initialize base class with temporal-specific metadata + temporal_metadata = hypergraph_metadata or {} + temporal_metadata.update({"type": "TemporalHypergraph"}) + + super().__init__( + edge_list=None, # We'll handle edge addition separately + weighted=weighted, + weights=None, + hypergraph_metadata=temporal_metadata, + node_metadata=node_metadata, + edge_metadata=None # We'll handle this separately ) - # Initialize core attributes - self._weighted = weighted - self._weights = {} - self._adj = {} - self._edge_list = {} - self._incidences_metadata = {} - self._node_metadata = {} - self._edge_metadata = {} - self._reverse_edge_list = {} - self._next_edge_id = 0 - - # Add node metadata if provided - if node_metadata: - for node, metadata in node_metadata.items(): - self.add_node(node, metadata=metadata) - # Handle edge and time list consistency if edge_list is not None and time_list is None: # Extract times from the edge list if time information is embedded @@ -126,45 +109,81 @@ def __init__( if len(edge_list) != len(time_list): raise ValueError("Edge list and time list must have the same length.") self.add_edges( - edge_list, time_list, weights=weights, metadata=edge_metadata + edge_list, + time_list, + weights=weights, + metadata=edge_metadata + ) + + # ============================================================================= + # Implementation of abstract methods from IUndirectedHypergraph + # ============================================================================= + + def _add_edge_implementation(self, edge, weight, metadata, time=None, **kwargs): + """Implementation of abstract method for adding a single edge.""" + if time is None: + raise ValueError("Time must be provided for temporal hypergraph edges.") + + if not isinstance(time, int): + raise TypeError("Time must be an integer") + + if not self._weighted and weight is not None and weight != 1: + raise ValueError( + "If the hypergraph is not weighted, weight can be 1 or None." ) + if weight is None: + weight = 1 + + if time < 0: + raise ValueError("Time must be a positive integer") + + _edge = self._canon_edge(edge) + temporal_edge = (time, _edge) + + if temporal_edge not in self._edge_list: + e_id = self._next_edge_id + self._reverse_edge_list[e_id] = temporal_edge + self._edge_list[temporal_edge] = e_id + self._next_edge_id += 1 + self._weights[e_id] = weight + elif temporal_edge in self._edge_list and self._weighted: + self._weights[self._edge_list[temporal_edge]] += weight + + e_id = self._edge_list[temporal_edge] - # Node - def add_node(self, node, metadata=None): if metadata is None: metadata = {} - if node not in self._node_metadata: - self._adj[node] = [] - self._node_metadata[node] = {} - if self._node_metadata[node] == {}: - self._node_metadata[node] = metadata - - def add_nodes(self, node_list: list, metadata=None): - for node in node_list: - try: - self.add_node(node, metadata[node] if metadata is not None else None) - except KeyError: - raise ValueError( - "The metadata dictionary must contain an entry for each node in the node list." - ) + self._edge_metadata[temporal_edge] = metadata - def remove_node(self, node, keep_edges=False): - """ - Remove a node from the temporal hypergraph. + nodes = _get_nodes(_edge) + for node in nodes: + self.add_node(node) - Parameters - ---------- - node : object - The node to remove. - keep_edges : bool, optional - If True, edges incident to the node are kept but updated to exclude the node. - If False, edges incident to the node are removed entirely. Default is False. + for node in nodes: + if e_id not in self._adj[node]: + self._adj[node].append(e_id) + + def _extract_nodes_from_edge(self, edge) -> List: + """Extract node list from a temporal edge representation.""" + if isinstance(edge, tuple) and len(edge) == 2: + # Temporal edge format: (time, edge_nodes) + time, edge_nodes = edge + return _get_nodes(edge_nodes) + else: + # Fallback for direct edge nodes + return _get_nodes(edge) + + def _get_edge_size(self, edge_key) -> int: + """Get the size of an edge given its temporal key representation.""" + if isinstance(edge_key, tuple) and len(edge_key) == 2: + # Temporal edge format: (time, edge_nodes) + time, edge_nodes = edge_key + return _get_size(edge_nodes) + else: + return _get_size(edge_key) - Raises - ------ - ValueError - If the node is not in the hypergraph. - """ + def remove_node(self, node: Any, keep_edges: bool = False) -> None: + """Remove a node from the temporal hypergraph.""" if node not in self._adj: raise ValueError(f"Node {node} not in hypergraph.") @@ -175,172 +194,35 @@ def remove_node(self, node, keep_edges=False): time, edge = self._reverse_edge_list[edge_id] updated_edge = tuple(n for n in edge if n != node) - self.remove_edge((time, edge)) + self.remove_edge(edge, time) if updated_edge: self.add_edge( updated_edge, time, weight=self._weights.get(edge_id, 1), - metadata=self._edge_metadata.get(edge_id, {}), + metadata=self.get_edge_metadata(edge=edge, time=time), ) else: for edge_id in edges_to_process: time, edge = self._reverse_edge_list[edge_id] - self.remove_edge((time, edge)) + self.remove_edge(edge, time) del self._adj[node] if node in self._node_metadata: del self._node_metadata[node] - def remove_nodes(self, node_list, keep_edges=False): - """ - Remove a list of nodes from the hypergraph. - - Parameters - ---------- - node_list : list - The list of nodes to remove. - - keep_edges : bool, optional - If True, the edges incident to the nodes are kept, but the nodes are removed from the edges. If False, the edges incident to the nodes are removed. Default is False. - - Returns - ------- - None - - Raises - ------ - KeyError - If any of the nodes is not in the hypergraph. - """ - for node in node_list: - self.remove_node(node, keep_edges=keep_edges) - - def get_nodes(self, metadata=False): - if metadata: - return self._node_metadata - return list(self._node_metadata.keys()) - - def check_node(self, node): - """Checks if the specified node is in the hypergraph. - - Parameters - ---------- - node : Object - The node to check. - - Returns - ------- - bool - True if the node is in the hypergraph, False otherwise. - - """ - return node in self._adj - - def get_neighbors(self, node, order: int = None, size: int = None): - """ - Get the neighbors of a node in the hypergraph. - - Parameters - ---------- - node : object - The node of interest. - order : int - The order of the hyperedges to consider. - size : int - The size of the hyperedges to consider. - - Returns - ------- - set - The neighbors of the node. - - Raises - ------ - ValueError - If order and size are both specified or neither are specified. - """ - if node not in self._adj: - raise ValueError("Node {} not in hypergraph.".format(node)) - if order is not None and size is not None: - raise ValueError("Order and size cannot be both specified.") - if order is None and size is None: - neigh = set() - edges = self.get_incident_edges(node) - for edge in edges: - neigh.update(edge[1]) - if node in neigh: - neigh.remove(node) - return neigh - else: - if order is None: - order = size - 1 - neigh = set() - edges = self.get_incident_edges(node, order=order) - for edge in edges: - neigh.update(edge[1]) - if node in neigh: - neigh.remove(node) - return neigh - - def get_incident_edges(self, node, order: int = None, size: int = None): - """ - Get the incident hyperedges of a node in the hypergraph. - - Parameters - ---------- - node : object - The node of interest. - order : int - The order of the hyperedges to consider. - size : int - The size of the hyperedges to consider. - - Returns - ------- - list - The incident hyperedges of the node. - - Raises - ------ - ValueError - If the node is not in the hypergraph. - - """ - if node not in self._adj: - raise ValueError("Node {} not in hypergraph.".format(node)) - if order is not None and size is not None: - raise ValueError("Order and size cannot be both specified.") - if order is None and size is None: - return list( - [self._reverse_edge_list[edge_id] for edge_id in self._adj[node]] - ) - else: - if order is None: - order = size - 1 - return list( - [ - self._reverse_edge_list[edge_id] - for edge_id in self._adj[node] - if len(self._reverse_edge_list[edge_id][1]) - 1 == order - ] - ) - - # Edge - def add_edge(self, edge, time, weight=None, metadata=None): + def add_edge(self, edge, time: int, weight=None, metadata=None) -> None: """ Add an edge to the temporal hypergraph. If the edge already exists, the weight is updated. Parameters ---------- edge : tuple - The edge to add. If the hypergraph is undirected, should be a tuple. - If the hypergraph is directed, should be a tuple of two tuples. + The edge to add. time: int The time at which the edge occurs. weight: float, optional The weight of the edge. Default is None. - metadata: dict, optional Metadata for the edge. Default is an empty dictionary. @@ -351,68 +233,93 @@ def add_edge(self, edge, time, weight=None, metadata=None): ValueError If the hypergraph is not weighted and weight is not None or 1. """ - if not isinstance(time, int): - raise TypeError("Time must be an integer") - - if not self._weighted and weight is not None and weight != 1: - raise ValueError( - "If the hypergraph is not weighted, weight can be 1 or None." - ) - if weight is None: - weight = 1 - - t = time - - if t < 0: - raise ValueError("Time must be a positive integer") - - _edge = _canon_edge(edge) - edge = (t, _edge) - - if edge not in self._edge_list: - e_id = self._next_edge_id - self._reverse_edge_list[e_id] = edge - self._edge_list[edge] = e_id - self._next_edge_id += 1 - self._weights[e_id] = weight - elif edge in self._edge_list and self._weighted: - self._weights[self._edge_list[edge]] += weight + self._add_edge_implementation(edge, weight, metadata, time=time) - e_id = self._edge_list[edge] + def remove_edge(self, edge, time: int) -> None: + """Remove an edge from the temporal hypergraph.""" + _edge = self._canon_edge(edge) + temporal_edge = (time, _edge) + + if temporal_edge not in self._edge_list: + raise ValueError(f"Edge {temporal_edge} not in hypergraph.") + + edge_id = self._edge_list[temporal_edge] - if metadata is None: - metadata = {} - self._edge_metadata[e_id] = metadata + # Remove edge from reverse lookup and metadata + del self._reverse_edge_list[edge_id] + if edge_id in self._weights: + del self._weights[edge_id] + if temporal_edge in self._edge_metadata.keys(): + del self._edge_metadata[temporal_edge] + # Remove from adjacency lists nodes = _get_nodes(_edge) for node in nodes: - self.add_node(node) + if edge_id in self._adj[node]: + self._adj[node].remove(edge_id) - for node in nodes: - self._adj[node].append(e_id) + del self._edge_list[temporal_edge] - def add_edges(self, edge_list, time_list, weights=None, metadata=None): - """ - Add multiple edges to the temporal hypergraph. + def get_edges( + self, + time_window=None, + order=None, + size=None, + up_to=False, + metadata=False, + ): + """Get the edges in the temporal hypergraph.""" + if order is not None and size is not None: + raise ValueError("Order and size cannot be both specified.") - Parameters - ---------- - edge_list: list - A list of edges to add. - time_list: list - A list of times corresponding to each edge in `edge_list`. - weights: list, optional - A list of weights for each edge in `edge_list`. Must be provided if the hypergraph is weighted. - metadata: list, optional - A list of metadata dictionaries for each edge in `edge_list`. + edges = [] + if time_window is None: + edges = list(self._edge_list.keys()) + elif isinstance(time_window, tuple) and len(time_window) == 2: + for _t, _edge in sorted(self._edge_list.keys()): + if time_window[0] <= _t < time_window[1]: + edges.append((_t, _edge)) + else: + raise ValueError("Time window must be a tuple of length 2 or None") + + if order is not None or size is not None: + if size is not None: + order = size - 1 + if not up_to: + edges = [edge for edge in edges if len(edge[1]) - 1 == order] + else: + edges = [edge for edge in edges if len(edge[1]) - 1 <= order] + + return ( + edges + if not metadata + else {edge: self.get_edge_metadata(edge=edge[1], time=edge[0]) for edge in edges} + ) - Raises - ------ - TypeError - If `edge_list` and `time_list` are not lists. - ValueError - If `edge_list` and `time_list` have mismatched lengths. - """ + def get_incident_edges(self, node, order: int = None, size: int = None) -> List[Tuple]: + """Get the incident hyperedges of a node in the hypergraph.""" + if node not in self._adj: + raise ValueError("Node {} not in hypergraph.".format(node)) + if order is not None and size is not None: + raise ValueError("Order and size cannot be both specified.") + + if order is None and size is None: + return [self._reverse_edge_list[edge_id] for edge_id in self._adj[node]] + else: + if order is None: + order = size - 1 + return [ + self._reverse_edge_list[edge_id] + for edge_id in self._adj[node] + if len(self._reverse_edge_list[edge_id][1]) - 1 == order + ] + + # ============================================================================= + # Temporal-specific edge management methods + # ============================================================================= + + def add_edges(self, edge_list, time_list, weights=None, metadata=None) -> None: + """Add multiple edges to the temporal hypergraph.""" if not isinstance(edge_list, list) or not isinstance(time_list, list): raise TypeError("Edge list and time list must be lists") @@ -433,158 +340,151 @@ def add_edges(self, edge_list, time_list, weights=None, metadata=None): if len(list(edge_list)) != len(list(weights)): raise ValueError("The number of edges and weights must be the same.") - i = 0 - if edge_list is not None: - for edge in edge_list: - self.add_edge( - edge, - time_list[i], - weight=( - weights[i] if self._weighted and weights is not None else None - ), - metadata=metadata[i] if metadata is not None else None, - ) - i += 1 - - def remove_edge(self, edge, time): - """ - Remove an edge from the temporal hypergraph. - - Parameters - ---------- - edge : tuple - The edge to remove. - time : int - The time at which the edge occurs. + for i, edge in enumerate(edge_list): + self.add_edge( + edge, + time_list[i], + weight=( + weights[i] if self._weighted and weights is not None else None + ), + metadata=metadata[i] if metadata is not None else None, + ) - Raises - ------ - ValueError - If the edge is not in the hypergraph. - """ - edge = _canon_edge(edge) - edge = (time, edge) - if edge not in self._edge_list: - raise ValueError(f"Edge {edge} not in hypergraph.") - edge_id = self._edge_list[edge] + def remove_edges(self, edge_list) -> None: + """Remove a list of edges from the hypergraph.""" + for edge in edge_list: + if isinstance(edge, tuple) and len(edge) == 2: + time, edge_nodes = edge + self.remove_edge(edge_nodes, time) + else: + raise ValueError("Edge must be a tuple of (time, edge_nodes)") - # Remove edge from reverse lookup and metadata - del self._reverse_edge_list[edge_id] - if edge_id in self._weights: - del self._weights[edge_id] - if edge_id in self._edge_metadata: - del self._edge_metadata[edge_id] + # ============================================================================= + # Override base class methods for temporal-specific behavior + # ============================================================================= - time, nodes = edge - for node in nodes: - if edge_id in self._adj[node]: - self._adj[node].remove(edge_id) + def get_neighbors(self, node, order: int = None, size: int = None): + """Get the neighbors of a node in the hypergraph.""" + if node not in self._adj: + raise ValueError("Node {} not in hypergraph.".format(node)) + if order is not None and size is not None: + raise ValueError("Order and size cannot be both specified.") + + if order is None and size is None: + neigh = set() + edges = self.get_incident_edges(node) + for edge in edges: + neigh.update(_get_nodes(edge[1])) + neigh.discard(node) + return neigh + else: + if order is None: + order = size - 1 + neigh = set() + edges = self.get_incident_edges(node, order=order) + for edge in edges: + neigh.update(_get_nodes(edge[1])) + neigh.discard(node) + return neigh - del self._edge_list[edge] + def num_edges(self, order: int = None, size: int = None, up_to: bool = False) -> int: + """Get the number of edges in the hypergraph.""" + if order is not None and size is not None: + raise ValueError("Order and size cannot be both specified.") - def remove_edges(self, edge_list): - """ - Remove a list of edges from the hypergraph. + if order is None and size is None: + return len(self._edge_list) + else: + if size is not None: + order = size - 1 + count = 0 + for edge_key in self._edge_list: + edge_size = self._get_edge_size(edge_key) + edge_order = edge_size - 1 + if not up_to: + if edge_order == order: + count += 1 + else: + if edge_order <= order: + count += 1 + return count + + def get_sizes(self) -> List[int]: + """Get the size of each edge in the hypergraph.""" + return [_get_size(edge[1]) for edge in self._edge_list.keys()] - Parameters - ---------- - edge_list : list - The list of edges to remove. + def is_uniform(self) -> bool: + """Check if the hypergraph is uniform.""" + if not self._edge_list: + return True + + sizes = self.get_sizes() + return len(set(sizes)) <= 1 - Returns - ------- - None + def get_weights(self, order=None, size=None, up_to=False, asdict=False): + """Get weights of edges in the hypergraph.""" + w = None + if order is not None and size is not None: + raise ValueError("Order and size cannot be both specified.") + + if order is None and size is None: + w = { + edge: self._weights[self._edge_list[edge]] for edge in self.get_edges() + } - Raises - ------ - KeyError - """ - for edge in edge_list: - self.remove_edge(edge) + if size is not None: + order = size - 1 - def get_edge_list(self): - return self._edge_list + if w is None: + w = { + edge: self._weights[self._edge_list[edge]] + for edge in self.get_edges(order=order, up_to=up_to) + } - def set_edge_list(self, edge_list): - self._edge_list = edge_list + if asdict: + return w + else: + return list(w.values()) - def check_edge(self, edge, time): - """Checks if the specified edge is in the hypergraph. + # ============================================================================= + # Weight Management (Use base class with temporal edge format) + # ============================================================================= + + def get_weight(self, edge, time: int): + """Get the weight of an edge at a specific time.""" + return super().get_weight(edge, time) - Parameters - ---------- - edge : tuple - The edge to check. - time : int - The time to check. - Returns - ------- - bool - True if the edge is in the hypergraph, False otherwise. + def set_weight(self, edge, time: int, weight) -> None: + """Set the weight of an edge at a specific time.""" + super().set_weight(edge, weight, time) - """ - edge = _canon_edge(edge) - k = (time, edge) - return k in self._edge_list + # ============================================================================= + # Temporal-specific methods + # ============================================================================= - def get_edges( - self, - time_window=None, - order=None, - size=None, - up_to=False, - # subhypergraph = False, - # keep_isolated_nodes=False, - metadata=False, - ): - """ - Get the edges in the temporal hypergraph. If a time window is provided, only edges within the window are returned. + def get_times_for_edge(self, edge): + """Get the times at which a specific set of nodes forms a hyperedge.""" + edge = self._canon_edge(edge) + times = [] + for time, _edge in self._edge_list.keys(): + if _edge == edge: + times.append(time) + return times - Parameters - ---------- - time_window: tuple, optional - A tuple of two integers representing the start and end times of the window. - size: int, optional - The size of the hyperedges to consider - order: int, optional - The order of the hyperedges to consider - up_to: bool, optional - metadata: bool, optional - If True, return edge metadata. Default is False. - - Returns - ------- - list - A list of edges in the hypergraph. - """ - if order is not None and size is not None: - raise ValueError("Order and size cannot be both specified.") - # if not subhypergraph and keep_isolated_nodes: - # raise ValueError("Cannot keep nodes if not returning subhypergraphs.") + def min_time(self): + """Get the minimum time in the hypergraph.""" + if not self._edge_list: + return None + return min(edge[0] for edge in self._edge_list.keys()) - edges = [] - if time_window is None: - edges = list(self._edge_list.keys()) - elif isinstance(time_window, tuple) and len(time_window) == 2: - for _t, _edge in list(sorted(self._edge_list.keys())): - if time_window[0] <= _t < time_window[1]: - edges.append((_t, _edge)) - else: - raise ValueError("Time window must be a tuple of length 2 or None") - if order is not None or size is not None: - if size is not None: - order = size - 1 - if not up_to: - edges = [edge for edge in edges if len(edge[1]) - 1 == order] - else: - edges = [edge for edge in edges if len(edge[1]) - 1 <= order] - return ( - edges - if not metadata - else {edge: self.get_edge_metadata(edge[1], edge[0]) for edge in edges} - ) + def max_time(self): + """Get the maximum time in the hypergraph.""" + if not self._edge_list: + return None + return max(edge[0] for edge in self._edge_list.keys()) - def aggregate(self, time_window): + def aggregate(self, time_window: int): + """Aggregate edges within time windows.""" if not isinstance(time_window, int) or time_window <= 0: raise TypeError("Time window must be a positive integer") @@ -622,7 +522,7 @@ def aggregate(self, time_window): for time, edge_nodes in edges_in_window: Hypergraph_t.add_edge( edge_nodes, - metadata=self.get_edge_metadata(edge_nodes, time), + metadata=self.get_edge_metadata(edge=edge_nodes, time=time), weight=self.get_weight(edge_nodes, time), ) @@ -641,317 +541,10 @@ def aggregate(self, time_window): return aggregated - def get_times_for_edge(self, edge): - """ - Get the times at which a specific set of nodes forms a hyperedge in the hypergraph. - - Parameters - ---------- - edge: tuple - The set of nodes forming the hyperedge. - - Returns - ------- - times: list - A list of times at which the hyperedge occurs. - """ - edge = _canon_edge(edge) - times = [] - for time, _edge in self._edge_list.keys(): - if _edge == edge: - times.append(time) - return times - - # Weight - def set_weight(self, edge, time, weight): - edge = _canon_edge(edge) - if not self._weighted and weight != 1: - raise ValueError( - "If the hypergraph is not weighted, weight can be 1 or None." - ) - if (time, edge) not in self._edge_list: - raise ValueError("Edge {} not in hypergraph.".format(edge)) - e_id = self._edge_list[(time, edge)] - self._weights[e_id] = weight - - def get_weight(self, edge, time): - edge = _canon_edge(edge) - if (time, edge) not in self._edge_list: - raise ValueError("Edge {} not in hypergraph.".format(edge)) - e_id = self._edge_list[(time, edge)] - return self._weights[e_id] - - def get_weights(self, order=None, size=None, up_to=False, asdict=False): - """Returns the list of weights of the edges in the hypergraph. If order is specified, it returns the list of weights of the edges of the specified order. - If size is specified, it returns the list of weights of the edges of the specified size. If both order and size are specified, it raises a ValueError. - If up_to is True, it returns the list of weights of the edges of order smaller or equal to the specified order. - - Parameters - ---------- - order : int, optional - Order of the edges to get the weights of. - - size : int, optional - Size of the edges to get the weights of. - - up_to : bool, optional - If True, it returns the list of weights of the edges of order smaller or equal to the specified order. Default is False. - - asdict : bool, optional - Returns - ------- - list - List of weights of the edges in the hypergraph. - - Raises - ------ - ValueError - If both order and size are specified. - - """ - w = None - if order is not None and size is not None: - raise ValueError("Order and size cannot be both specified.") - if order is None and size is None: - w = { - edge: self._weights[self._edge_list[edge]] for edge in self.get_edges() - } - - if size is not None: - order = size - 1 - - if w is None: - w = { - edge: self._weights[self._edge_list[edge]] - for edge in self.get_edges(order=order, up_to=up_to) - } - - if asdict: - return w - else: - return list(w.values()) - - # Info - def max_order(self): - """ - Returns the maximum order of the hypergraph. - - Returns - ------- - int - Maximum order of the hypergraph. - """ - return self.max_size() - 1 - - def max_size(self): - """ - Returns the maximum size of the hypergraph. - - Returns - ------- - int - Maximum size of the hypergraph. - """ - return max(self.get_sizes()) - - def num_nodes(self): - """ - Returns the number of nodes in the hypergraph. - - Returns - ------- - int - Number of nodes in the hypergraph. - """ - return len(list(self.get_nodes())) - - def num_edges(self, order=None, size=None, up_to=False): - """Returns the number of edges in the hypergraph. If order is specified, it returns the number of edges of the specified order. - If size is specified, it returns the number of edges of the specified size. If both order and size are specified, it raises a ValueError. - If up_to is True, it returns the number of edges of order smaller or equal to the specified order. - - Parameters - ---------- - order : int, optional - Order of the edges to count. - size : int, optional - Size of the edges to count. - up_to : bool, optional - If True, it returns the number of edges of order smaller or equal to the specified order. Default is False. - - Returns - ------- - int - Number of edges in the hypergraph. - """ - if order is not None and size is not None: - raise ValueError("Order and size cannot be both specified.") - - if order is None and size is None: - return len(self._edge_list) - else: - if size is not None: - order = size - 1 - if not up_to: - s = 0 - for edge in self._edge_list: - if len(edge) - 1 == order: - s += 1 - return s - else: - s = 0 - for edge in self._edge_list: - if len(edge) - 1 <= order: - s += 1 - return s - - def distribution_sizes(self): - """ - Returns the distribution of sizes of the hyperedges in the hypergraph. - - Returns - ------- - collections.Counter - Distribution of sizes of the hyperedges in the hypergraph. - """ - from collections import Counter - - return dict(Counter(self.get_sizes())) - - def get_sizes(self): - """ - Get the size of each edge in the hypergraph. - - Returns - ------- - list - A list of integers representing the size of each edge. - """ - return [_get_size(edge[1]) for edge in self._edge_list.keys()] - - def get_orders(self): - """ - Get the order of each edge in the hypergraph. - - Returns - ------- - list - A list of integers representing the order of each edge. - """ - return [_get_order(edge[1]) for edge in self._edge_list.keys()] - - def is_weighted(self): - """ - Check if the hypergraph is weighted. - - Returns - ------- - bool - True if the hypergraph is weighted, False otherwise. - """ - return self._weighted - - def is_uniform(self): - """ - Check if the hypergraph is uniform, i.e. all hyperedges have the same size. - - Returns - ------- - bool - True if the hypergraph is uniform, False otherwise. - """ - uniform = True - sz = None - for edge in self._edge_list: - if sz is None: - sz = len(edge[1]) - else: - if len(edge[1]) != sz: - uniform = False - break - return uniform - - def min_time(self): - min = math.inf - for edge in self._edge_list: - if min > edge[0]: - min = edge[0] - return min - - def max_time(self): - max = -math.inf - for edge in self._edge_list: - if max < edge[0]: - max = edge[0] - return max - - # Adj - def get_adj_dict(self): - return self._adj - - def set_adj_dict(self, adj_dict): - self._adj = adj_dict - - # Degree - def degree(self, node, order=None, size=None): - from hypergraphx.measures.degree import degree - - return degree(self, node, order=order, size=size) - - def degree_sequence(self, order=None, size=None): - from hypergraphx.measures.degree import degree_sequence - - return degree_sequence(self, order=order, size=size) - - def degree_distribution(self, order=None, size=None): - from hypergraphx.measures.degree import degree_distribution - - return degree_distribution(self, order=order, size=size) - - # Utils - def isolated_nodes(self, size=None, order=None): - from hypergraphx.utils.cc import isolated_nodes - - return isolated_nodes(self, size=size, order=order) - - def is_isolated(self, node, size=None, order=None): - from hypergraphx.utils.cc import is_isolated - - return is_isolated(self, node, size=size, order=order) - - def temporal_adjacency_matrix(self, return_mapping: bool = False): - from hypergraphx.linalg import temporal_adjacency_matrix - - return temporal_adjacency_matrix(self, return_mapping) - - def annealed_adjacency_matrix(self, return_mapping: bool = False): - from hypergraphx.linalg import annealed_adjacency_matrix - - return annealed_adjacency_matrix(self, return_mapping) - - def adjacency_factor(self, t: int = 0): - from hypergraphx.linalg import adjacency_factor - - return adjacency_factor(self, t) - def subhypergraph( self, time_window=None, add_all_nodes: bool = False ) -> dict[int, Hypergraph]: - """ - Create an hypergraph for each time of the Temporal Hypergraph. - Parameters - ---------- - time_window : tuple[int,int]|None, optional - Give the time window (a,b), only the times inside the interval [a,b) will be considered. - If not specified all the times will be considered. - add_all_nodes : bool, optional - If True, the hypergraphs will have all the nodes of the Temporal Hypergraph even if they are not present - in their corresponding time. - Returns - ------- - dict: dict[int, Hypergraph] - A dictionary where the keys are the time and the values are the hypergraphs - """ + """Create a hypergraph for each time of the Temporal Hypergraph.""" edges = self.get_edges() res = dict() if time_window is None: @@ -968,203 +561,83 @@ def subhypergraph( if add_all_nodes: for node in self.get_nodes(): for k, v in res.items(): - if v.check_node(node): + if not v.check_node(node): v.add_node(node) return res - # Metadata - def set_hypergraph_metadata(self, metadata): - self._hypergraph_metadata = metadata - - def get_hypergraph_metadata(self): - return self._hypergraph_metadata - - def set_node_metadata(self, node, metadata): - if node not in self._node_metadata: - raise ValueError("Node {} not in hypergraph.".format(node)) - self._node_metadata[node] = metadata - - def get_node_metadata(self, node): - if node not in self._node_metadata: - raise ValueError("Node {} not in hypergraph.".format(node)) - return self._node_metadata[node] - - def get_all_nodes_metadata(self): - return self._node_metadata - - def set_edge_metadata(self, edge, time, metadata): - edge = _canon_edge(edge) - k = (time, edge) - if k not in self._edge_list: - raise ValueError("Edge {} not in hypergraph.".format(edge)) - e_id = self._edge_list[k] - self._edge_metadata[e_id] = metadata - - def get_edge_metadata(self, edge, time): - edge = _canon_edge(edge) - k = (time, edge) + # ============================================================================= + # Metadata Management (Use base class with temporal edge format) + # ============================================================================= + + def get_incidence_metadata(self, edge, node, time: int = None): + """Get incidence metadata for a specific edge-node pair.""" + edge = self._canon_edge(edge) + k = (time, edge) if time is not None else edge if k not in self._edge_list: raise ValueError("Edge {} not in hypergraph.".format(edge)) - e_id = self._edge_list[k] - return self._edge_metadata[e_id] + return self._incidences_metadata.get((k, node), {}) - def get_all_edges_metadata(self): - return self._edge_metadata - - def set_incidence_metadata(self, edge, time, node, metadata): - edge = _canon_edge(edge) - k = (time, edge) + def set_incidence_metadata(self, edge, node, metadata, time: int = None): + """Set incidence metadata for a specific edge-node pair.""" + edge = self._canon_edge(edge) + k = (time, edge) if time is not None else edge if k not in self._edge_list: raise ValueError("Edge {} not in hypergraph.".format(edge)) self._incidences_metadata[(k, node)] = metadata - def get_incidence_metadata(self, edge, time, node): - edge = _canon_edge(edge) - k = (time, edge) - if k not in self._edge_list: - raise ValueError("Edge {} not in hypergraph.".format(edge)) - return self._incidences_metadata[(k, node)] - def get_all_incidences_metadata(self): + """Get all incidence metadata.""" return {k: v for k, v in self._incidences_metadata.items()} - def set_attr_to_hypergraph_metadata(self, field, value): - self._hypergraph_metadata[field] = value - - def set_attr_to_node_metadata(self, node, field, value): - if node not in self._node_metadata: - raise ValueError("Node {} not in hypergraph.".format(node)) - self._node_metadata[node][field] = value + def _restructure_query_edge(self, k: Tuple[Tuple, Any], time: int): + """Helper for modifying a query edge prior to metadata retrieval.""" + return (time, k) - def set_attr_to_edge_metadata(self, edge, time, field, value): - _edge = _canon_edge(edge) - if edge not in self._edge_metadata: - raise ValueError("Edge {} not in hypergraph.".format(edge)) - self._edge_metadata[self._edge_list[(time, edge)]][field] = value + # ============================================================================= + # Utility Methods (Override temporal-specific canonical edge handling) + # ============================================================================= + + def _canon_edge(self, edge: Tuple) -> Tuple: + """Canonical form of an edge - sorts inner tuples.""" + edge = tuple(edge) - def remove_attr_from_node_metadata(self, node, field): - if node not in self._node_metadata: - raise ValueError("Node {} not in hypergraph.".format(node)) - del self._node_metadata[node][field] + if len(edge) == 2: + if isinstance(edge[0], tuple) and isinstance(edge[1], tuple): + # Sort the inner tuples and return + return (tuple(sorted(edge[0])), tuple(sorted(edge[1]))) + elif not isinstance(edge[0], tuple) and not isinstance(edge[1], tuple): + # Sort the edge itself if it contains IDs (non-tuple elements) + return tuple(sorted(edge)) - def remove_attr_from_edge_metadata(self, edge, time, field): - _edge = _canon_edge(edge) - if edge not in self._edge_metadata: - raise ValueError("Edge {} not in hypergraph.".format(edge)) - del self._edge_metadata[self._edge_list[(time, edge)]][field] - - # Basic Functions - def clear(self): - self._edge_list.clear() - self._adj.clear() - self._weights.clear() - self._hypergraph_metadata.clear() - self._node_metadata.clear() - self._edge_metadata.clear() - self._reverse_edge_list.clear() - - def copy(self): - """ - Returns a copy of the hypergraph. + return tuple(sorted(edge)) - Returns - ------- - Hypergraph - A copy of the hypergraph. - """ - return copy.deepcopy(self) - - def __str__(self): - """ - Returns a string representation of the hypergraph. - - Returns - ------- - str - A string representation of the hypergraph. - """ - title = "Hypergraph with {} nodes and {} edges.\n".format( - self.num_nodes(), self.num_edges() - ) - details = "Distribution of hyperedge sizes: {}".format( - self.distribution_sizes() - ) - return title + details + # ============================================================================= + # Matrix operations (inherited from base class) + # ============================================================================= - def __len__(self): - """ - Returns the number of edges in the hypergraph. - - Returns - ------- - int - The number of edges in the hypergraph. - """ - return len(self._edge_list) - - def __iter__(self): - """ - Returns an iterator over the edges in the hypergraph. - - Returns - ------- - iterator - An iterator over the edges in the hypergraph. - """ - return iter(self._edge_list.items()) - - # Data Structure Extra - def expose_data_structures(self): - """ - Expose the internal data structures of the temporal hypergraph for serialization. - - Returns - ------- - dict - A dictionary containing all internal attributes of the temporal hypergraph. - """ - return { - "type": "TemporalHypergraph", - "hypergraph_metadata": self._hypergraph_metadata, - "_weighted": self._weighted, - "_weights": self._weights, - "_adj": self._adj, - "_edge_list": self._edge_list, - "node_metadata": self._node_metadata, - "edge_metadata": self._edge_metadata, - "reverse_edge_list": self._reverse_edge_list, - "next_edge_id": self._next_edge_id, - } - - def populate_from_dict(self, data): - """ - Populate the attributes of the temporal hypergraph from a dictionary. + def temporal_adjacency_matrix(self, return_mapping: bool = False): + """Get the temporal adjacency matrix.""" + from hypergraphx.linalg import temporal_adjacency_matrix + return temporal_adjacency_matrix(self, return_mapping) - Parameters - ---------- - data : dict - A dictionary containing the attributes to populate the hypergraph. - """ - self._hypergraph_metadata = data.get("hypergraph_metadata", {}) - self._weighted = data.get("_weighted", False) - self._weights = data.get("_weights", {}) - self._adj = data.get("_adj", {}) - self._edge_list = data.get("_edge_list", {}) - self._node_metadata = data.get("node_metadata", {}) - self._edge_metadata = data.get("edge_metadata", {}) - self._reverse_edge_list = data.get("reverse_edge_list", {}) - self._next_edge_id = data.get("next_edge_id", 0) - - def expose_attributes_for_hashing(self): - """ - Expose relevant attributes for hashing specific to TemporalHypergraph. + def annealed_adjacency_matrix(self, return_mapping: bool = False): + """Get the annealed adjacency matrix.""" + from hypergraphx.linalg import annealed_adjacency_matrix + return annealed_adjacency_matrix(self, return_mapping) - Returns - ------- - dict - A dictionary containing key attributes. - """ + # ============================================================================= + # Serialization Support (Override base class methods) + # ============================================================================= + + def expose_data_structures(self) -> Dict: + """Expose the internal data structures for serialization.""" + base_data = super().expose_data_structures() + base_data["type"] = "TemporalHypergraph" + return base_data + + def expose_attributes_for_hashing(self) -> dict: + """Expose relevant attributes for hashing.""" edges = [] for edge in sorted(self._edge_list.keys()): edge = (edge[0], tuple(sorted(edge[1]))) @@ -1173,7 +646,7 @@ def expose_attributes_for_hashing(self): { "nodes": edge, "weight": self._weights.get(edge_id, 1), - "metadata": self._edge_metadata.get(edge_id, {}), + "metadata": self.get_edge_metadata(edge=edge[1], time=edge[0]), } ) @@ -1187,17 +660,4 @@ def expose_attributes_for_hashing(self): "hypergraph_metadata": self._hypergraph_metadata, "edges": edges, "nodes": nodes, - } - - def get_mapping(self): - """ - Map the nodes of the hypergraph to integers in [0, n_nodes). - - Returns - ------- - LabelEncoder - The mapping. - """ - encoder = LabelEncoder() - encoder.fit(self.get_nodes()) - return encoder + } \ No newline at end of file