diff --git a/python/cugraph-pyg/cugraph_pyg/data/graph_store.py b/python/cugraph-pyg/cugraph_pyg/data/graph_store.py index ca41425c..db882b52 100644 --- a/python/cugraph-pyg/cugraph_pyg/data/graph_store.py +++ b/python/cugraph-pyg/cugraph_pyg/data/graph_store.py @@ -1,7 +1,8 @@ -# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION. +# SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION. # SPDX-License-Identifier: Apache-2.0 import os +import warnings import numpy as np import cupy @@ -15,7 +16,7 @@ from cugraph_pyg.tensor import DistTensor, DistMatrix from cugraph_pyg.tensor.utils import has_nvlink_network, is_empty -from typing import Union, Optional, List, Dict, Tuple +from typing import Union, Optional, List, Dict, Tuple, Callable # cudf is an optional dependency. It is only imported here for typing. cudf = import_optional("cudf") @@ -70,7 +71,7 @@ def __clear_graph(self): self.__graph = None self.__vertex_offsets = None self.__weight_attr = None - self.__etime_attr = None + self.__time_attr = None self.__numeric_edge_types = None def _put_edge_index( @@ -317,19 +318,27 @@ def _vertex_offset_array(self) -> "torch.Tensor": def is_homogeneous(self) -> bool: return len(self._vertex_offsets) == 1 - def _set_etime_attr(self, attr: Tuple["torch_geometric.data.FeatureStore", str]): - if attr != self.__etime_attr: + def _set_time_attr(self, attr: Tuple["torch_geometric.data.FeatureStore", str]): + if attr != self.__time_attr: weight_attr = self.__weight_attr self.__clear_graph() - self.__etime_attr = attr + self.__time_attr = attr self.__weight_attr = weight_attr def _set_weight_attr(self, attr: Tuple["torch_geometric.data.FeatureStore", str]): if attr != self.__weight_attr: - etime_attr = self.__etime_attr + time_attr = self.__time_attr self.__clear_graph() self.__weight_attr = attr - self.__etime_attr = etime_attr + self.__time_attr = time_attr + + def _get_ntime_func( + self, + ) -> Optional[Callable[[str, "torch.Tensor"], "torch.Tensor"]]: + if self.__time_attr is None: + return None + feature_store, attr_name = self.__time_attr + return lambda node_type, node_id: feature_store[node_type, attr_name][node_id] def __get_etime_tensor( self, @@ -337,7 +346,7 @@ def __get_etime_tensor( start_offsets: "torch.Tensor", num_edges_t: "torch.Tensor", ): - feature_store, attr_name = self.__etime_attr + feature_store, attr_name = self.__time_attr etimes = [] for i, et in enumerate(sorted_keys): ix = torch.arange( @@ -498,7 +507,12 @@ def __get_edgelist(self): sorted_keys, start_offsets.cpu(), num_edges_t.cpu() ).cuda() - if self.__etime_attr is not None: + if self.__time_attr is not None: + warnings.warn( + "cuGraph-PyG currently supports only edge-based temporal sampling." + " Node times (if present) can still be used for negative sampling." + ) + # TODO if node times are present, do node-based temporal sampling instead. d["etime"] = self.__get_etime_tensor( sorted_keys, start_offsets.cpu(), num_edges_t.cpu() ).cuda() diff --git a/python/cugraph-pyg/cugraph_pyg/examples/movielens_mnmg.py b/python/cugraph-pyg/cugraph_pyg/examples/movielens_mnmg.py index a8b2bcd7..b08dacd8 100644 --- a/python/cugraph-pyg/cugraph_pyg/examples/movielens_mnmg.py +++ b/python/cugraph-pyg/cugraph_pyg/examples/movielens_mnmg.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION. +# SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION. # SPDX-License-Identifier: Apache-2.0 import os @@ -92,6 +92,12 @@ def cugraph_pyg_from_heterodata(data): feature_store["user", "x", None] = data["user"].x feature_store["movie", "x", None] = data["movie"].x + feature_store[("user", "rates", "movie"), "time", None] = data[ + "user", "rates", "movie" + ].time + feature_store[("movie", "rev_rates", "user"), "time", None] = data[ + "user", "rates", "movie" + ].time return feature_store, graph_store @@ -136,7 +142,18 @@ def preprocess_and_partition(data, edge_path, features_path, meta_path): fx, os.path.join(movie_path, f"rank={r}.pt"), ) - + time_path = os.path.join(features_path, "time") + os.makedirs( + time_path, + exist_ok=True, + ) + for r, time in enumerate( + torch.tensor_split(data["user", "movie"].time, world_size) + ): + torch.save( + time, + os.path.join(time_path, f"rank={r}.pt"), + ) print("Writing metadata...") meta = { "num_nodes": { @@ -190,6 +207,10 @@ def load_partitions(edge_path, features_path, meta_path): ], dim=1, ) + data["user", "rates", "movie"].time = torch.load( + os.path.join(features_path, "time", f"rank={rank}.pt"), + weights_only=True, + ) label_dict = { "train": torch.randperm(ei["train"].shape[1]), @@ -398,8 +419,19 @@ def test(test_loader, model): feature_store, graph_store = cugraph_pyg_from_heterodata(data) eli_train = data["user", "rates", "movie"].edge_index[:, label_dict["train"]] eli_test = data["user", "rates", "movie"].edge_index[:, label_dict["test"]] + time_train = data["user", "rates", "movie"].time[label_dict["train"]] num_nodes = {"user": data["user"].num_nodes, "movie": data["movie"].num_nodes} + # Set node times to 0 + feature_store["user", "time", None] = torch.tensor_split( + torch.zeros(data["user"].num_nodes, dtype=torch.int64, device=device), + world_size, + )[global_rank] + feature_store["movie", "time", None] = torch.tensor_split( + torch.zeros(data["movie"].num_nodes, dtype=torch.int64, device=device), + world_size, + )[global_rank] + # Extract feature dimensions num_features = { "user": data["user"].x.shape[-1] if data["user"].x is not None else 1, @@ -417,17 +449,16 @@ def test(test_loader, model): ("movie", "rev_rates", "user"): [5, 5, 5], }, batch_size=256, - # time_attr='time', shuffle=True, drop_last=True, - # temporal_strategy='last', ) from cugraph_pyg.loader import LinkNeighborLoader train_loader = LinkNeighborLoader( edge_label_index=(("user", "rates", "movie"), eli_train), - # edge_label_time=time[train_index] - 1, # No leakage. + edge_label_time=time_train - 1, # No leakage. + time_attr="time", neg_sampling=dict(mode="binary", amount=2), **kwargs, ) diff --git a/python/cugraph-pyg/cugraph_pyg/loader/link_neighbor_loader.py b/python/cugraph-pyg/cugraph_pyg/loader/link_neighbor_loader.py index d0783c3f..33aa44b6 100644 --- a/python/cugraph-pyg/cugraph_pyg/loader/link_neighbor_loader.py +++ b/python/cugraph-pyg/cugraph_pyg/loader/link_neighbor_loader.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION. +# SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION. # SPDX-License-Identifier: Apache-2.0 import warnings @@ -129,10 +129,10 @@ def __init__( all workers. If not provided, it will be automatically calculated. See cugraph_pyg.sampler.BaseDistributedSampler. - temporal_comparison: str (optional, default='monotonically decreasing') + temporal_comparison: str (optional, default='monotonically_decreasing') The comparison operator for temporal sampling - ('strictly increasing', 'monotonically increasing', - 'strictly decreasing', 'monotonically decreasing', 'last'). + ('strictly_increasing', 'monotonically_increasing', + 'strictly_decreasing', 'monotonically_decreasing', 'last'). Note that this should be 'last' for temporal_strategy='last'. See cugraph_pyg.sampler.BaseDistributedSampler. **kwargs @@ -142,7 +142,7 @@ def __init__( subgraph_type = torch_geometric.sampler.base.SubgraphType(subgraph_type) if temporal_comparison is None: - temporal_comparison = "monotonically decreasing" + temporal_comparison = "monotonically_decreasing" if not directed: subgraph_type = torch_geometric.sampler.base.SubgraphType.induced @@ -182,7 +182,7 @@ def __init__( is_temporal = (edge_label_time is not None) and (time_attr is not None) - if (edge_label_time is None) != (time_attr is None): + if not is_temporal and (edge_label_time is not None or time_attr is not None): warnings.warn( "Edge-based temporal sampling requires that both edge_label_time and time_attr are provided. Defaulting to non-temporal sampling." ) @@ -190,7 +190,7 @@ def __init__( if weight_attr is not None: graph_store._set_weight_attr((feature_store, weight_attr)) if is_temporal: - graph_store._set_etime_attr((feature_store, time_attr)) + graph_store._set_time_attr((feature_store, time_attr)) if isinstance(num_neighbors, dict): sorted_keys, _, _ = graph_store._numeric_edge_types diff --git a/python/cugraph-pyg/cugraph_pyg/loader/neighbor_loader.py b/python/cugraph-pyg/cugraph_pyg/loader/neighbor_loader.py index f0a1b0b4..a81adce1 100644 --- a/python/cugraph-pyg/cugraph_pyg/loader/neighbor_loader.py +++ b/python/cugraph-pyg/cugraph_pyg/loader/neighbor_loader.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION. +# SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION. # SPDX-License-Identifier: Apache-2.0 import warnings @@ -122,10 +122,10 @@ def __init__( all workers. If not provided, it will be automatically calculated. See cugraph_pyg.sampler.BaseDistributedSampler. - temporal_comparison: str (optional, default='monotonically decreasing') + temporal_comparison: str (optional, default='monotonically_decreasing') The comparison operator for temporal sampling - ('strictly increasing', 'monotonically increasing', - 'strictly decreasing', 'monotonically decreasing', 'last'). + ('strictly_increasing', 'monotonically_increasing', + 'strictly_decreasing', 'monotonically_decreasing', 'last'). Note that this should be 'last' for temporal_strategy='last'. See cugraph_pyg.sampler.BaseDistributedSampler. **kwargs @@ -135,7 +135,7 @@ def __init__( subgraph_type = torch_geometric.sampler.base.SubgraphType(subgraph_type) if temporal_comparison is None: - temporal_comparison = "monotonically decreasing" + temporal_comparison = "monotonically_decreasing" if not directed: subgraph_type = torch_geometric.sampler.base.SubgraphType.induced @@ -176,7 +176,7 @@ def __init__( is_temporal = time_attr is not None if is_temporal: - graph_store._set_etime_attr((feature_store, time_attr)) + graph_store._set_time_attr((feature_store, time_attr)) if input_time is None: input_type, input_nodes, _ = ( diff --git a/python/cugraph-pyg/cugraph_pyg/sampler/distributed_sampler.py b/python/cugraph-pyg/cugraph_pyg/sampler/distributed_sampler.py index 364986ba..00eb1144 100644 --- a/python/cugraph-pyg/cugraph_pyg/sampler/distributed_sampler.py +++ b/python/cugraph-pyg/cugraph_pyg/sampler/distributed_sampler.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION. +# SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION. # SPDX-License-Identifier: Apache-2.0 import warnings @@ -556,9 +556,25 @@ def __sample_from_edges_func( leftover_time = leftover_time[lyi] lz = torch.sort(lyi)[1] - leftover_seeds, lui = leftover_seeds.unique_consecutive(return_inverse=True) if leftover_time is not None: - leftover_time = leftover_time[lui] + if leftover_seeds.numel() == 0: + assert leftover_time.numel() == 0, ( + "Leftover time should be empty if leftover seeds are empty" + ) + leftover_seeds_unique_mask = torch.tensor( + [], device="cuda", dtype=torch.bool + ) + else: + leftover_seeds_unique_mask = torch.concat( + [ + torch.tensor([True], device="cuda"), + leftover_seeds[1:] != leftover_seeds[:-1], + ] + ) + leftover_seeds, lui = leftover_seeds.unique_consecutive(return_inverse=True) + leftover_time = leftover_time[leftover_seeds_unique_mask] + else: + leftover_seeds, lui = leftover_seeds.unique_consecutive(return_inverse=True) leftover_inv = lui[lz] if leftover_seeds.numel() > 0: diff --git a/python/cugraph-pyg/cugraph_pyg/sampler/sampler.py b/python/cugraph-pyg/cugraph_pyg/sampler/sampler.py index 26f7cbd7..9199edea 100644 --- a/python/cugraph-pyg/cugraph_pyg/sampler/sampler.py +++ b/python/cugraph-pyg/cugraph_pyg/sampler/sampler.py @@ -1,8 +1,10 @@ -# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION. +# SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION. # SPDX-License-Identifier: Apache-2.0 from typing import Optional, Iterator, Union, Dict, Tuple, List +from math import ceil + from cugraph_pyg.utils.imports import import_optional from cugraph_pyg.sampler.distributed_sampler import DistributedNeighborSampler @@ -797,7 +799,7 @@ def sample_from_nodes( def sample_from_edges( self, index: "torch_geometric.sampler.EdgeSamplerInput", - neg_sampling: Optional["torch_geometric.sampler.NegativeSampling"], + neg_sampling: Optional["torch_geometric.sampler.NegativeSampling"] = None, **kwargs, ) -> Iterator[ Union[ @@ -808,10 +810,14 @@ def sample_from_edges( src = index.row dst = index.col input_id = index.input_id + input_time = index.time + + # TODO ensure this is handled correctly when disjoint sampling is implemented. + node_time = self.__graph_store._get_ntime_func() + neg_batch_size = 0 if neg_sampling: # Sample every negative subset at once. - # TODO handle temporal sampling (node_time) src_neg, dst_neg = neg_sample( self.__graph_store, index.row, @@ -819,8 +825,8 @@ def sample_from_edges( index.input_type, self.__batch_size, neg_sampling, - None, # src_time, - None, # src_node_time, + index.time, + node_time, ) if neg_sampling.is_binary(): src, _ = neg_cat(src.cuda(), src_neg, self.__batch_size) @@ -834,6 +840,13 @@ def sample_from_edges( src, _ = neg_cat(scu, scu[per], self.__batch_size) dst, neg_batch_size = neg_cat(dst.cuda(), dst_neg, self.__batch_size) + if node_time is not None and input_time is not None: + input_time, _ = neg_cat( + input_time.repeat_interleave(int(ceil(neg_sampling.amount))).cuda(), + input_time.cuda(), + self.__batch_size, + ) + # Concatenate -1s so the input id tensor lines up and can # be processed by the dist sampler. # When loading the output batch, '-1' will be dropped. @@ -858,7 +871,7 @@ def sample_from_edges( reader = self.__sampler.sample_from_edges( torch.stack([src, dst]), # reverse of usual convention input_id=input_id, - input_time=index.time, + input_time=input_time, input_label=index.label, batch_size=self.__batch_size + neg_batch_size, metadata=metadata, diff --git a/python/cugraph-pyg/cugraph_pyg/sampler/sampler_utils.py b/python/cugraph-pyg/cugraph_pyg/sampler/sampler_utils.py index 168eb015..f8bb1f6e 100644 --- a/python/cugraph-pyg/cugraph_pyg/sampler/sampler_utils.py +++ b/python/cugraph-pyg/cugraph_pyg/sampler/sampler_utils.py @@ -1,8 +1,8 @@ -# SPDX-FileCopyrightText: Copyright (c) 2022-2025, NVIDIA CORPORATION. +# SPDX-FileCopyrightText: Copyright (c) 2022-2026, NVIDIA CORPORATION. # SPDX-License-Identifier: Apache-2.0 - -from typing import Tuple, Optional, Dict, Union +import warnings +from typing import Tuple, Optional, Dict, Union, Callable from math import ceil @@ -65,6 +65,33 @@ def filter_cugraph_pyg_store( return data +def _call_plc_negative_sampling( + graph_store, + num_neg, + vertices, + src_weight, + dst_weight, + remove_duplicates=False, + remove_false_negatives=False, + exact_number_of_samples=False, +): + result_dict = pylibcugraph.negative_sampling( + graph_store._resource_handle, + graph_store._graph, + num_neg, + vertices=None if vertices is None else cupy.asarray(vertices), + src_bias=None if src_weight is None else cupy.asarray(src_weight), + dst_bias=None if dst_weight is None else cupy.asarray(dst_weight), + remove_duplicates=remove_duplicates, + remove_false_negatives=remove_false_negatives, + exact_number_of_samples=exact_number_of_samples, + do_expensive_check=False, + ) + src_neg = torch.as_tensor(result_dict["sources"], device="cuda")[:num_neg] + dst_neg = torch.as_tensor(result_dict["destinations"], device="cuda")[:num_neg] + return src_neg, dst_neg + + def neg_sample( graph_store: GraphStore, seed_src: "torch.Tensor", @@ -72,9 +99,10 @@ def neg_sample( input_type: Tuple[str, str, str], batch_size: int, neg_sampling: "torch_geometric.sampler.NegativeSampling", - time: "torch.Tensor", - node_time: "torch.Tensor", + seed_time: Optional["torch.Tensor"] = None, + node_time_func: Callable[[str, "torch.Tensor"], "torch.Tensor"] = None, ) -> Tuple["torch.Tensor", "torch.Tensor"]: + # TODO Add support for remove_duplicates, remove_false_negatives (rapidsai/cugraph-gnn#378) try: # Compatibility for PyG 2.5 src_weight = neg_sampling.src_weight @@ -155,46 +183,133 @@ def neg_sample( else: vertices = torch.arange(num_src_nodes, dtype=torch.int64, device="cuda") - if node_time is None: - result_dict = pylibcugraph.negative_sampling( - graph_store._resource_handle, - graph_store._graph, - num_neg, - vertices=None if vertices is None else cupy.asarray(vertices), - src_bias=None if src_weight is None else cupy.asarray(src_weight), - dst_bias=None if dst_weight is None else cupy.asarray(dst_weight), - remove_duplicates=False, - remove_false_negatives=False, - exact_number_of_samples=True, - do_expensive_check=False, + src_neg, dst_neg = _call_plc_negative_sampling( + graph_store, num_neg, vertices, src_weight, dst_weight + ) + + # TODO modifiy the C API so this condition is impossible + if src_neg.numel() < num_neg: + num_gen = num_neg - src_neg.numel() + src_neg = torch.concat( + [ + src_neg, + torch.randint( + 0, src_neg.max(), (num_gen,), device="cuda", dtype=torch.int64 + ), + ] + ) + dst_neg = torch.concat( + [ + dst_neg, + torch.randint( + 0, dst_neg.max(), (num_gen,), device="cuda", dtype=torch.int64 + ), + ] ) - src_neg = torch.as_tensor(result_dict["sources"], device="cuda")[:num_neg] - dst_neg = torch.as_tensor(result_dict["destinations"], device="cuda")[:num_neg] + if node_time_func is not None: + if seed_time is None: + warnings.warn( + "seed_time is None, temporal negative sampling will not be performed" + ) + else: + # Temporal negative sampling - node_time must be <= seed_time + # Seed time is both src/dst time in the PyG API. + # TODO maybe handle this in the C API? + num_neg_per_pos = int(ceil(neg_sampling.amount)) + seed_time = ( + seed_time.view(1, -1).expand(num_neg_per_pos, -1).flatten().cuda() + ) - # TODO modifiy the C API so this condition is impossible - if src_neg.numel() < num_neg: - num_gen = num_neg - src_neg.numel() - src_neg = torch.concat( - [ - src_neg, - torch.randint( - 0, src_neg.max(), (num_gen,), device="cuda", dtype=torch.int64 - ), + # For homogeneous graphs, input_type is None, so get the single node type + if graph_store.is_homogeneous: + node_type = list(graph_store._vertex_offsets.keys())[0] + node_offset = graph_store._vertex_offsets[node_type] + src_node_type = dst_node_type = node_type + src_node_offset = dst_node_offset = node_offset + else: + src_node_type = input_type[0] + dst_node_type = input_type[2] + src_node_offset = graph_store._vertex_offsets[src_node_type] + dst_node_offset = graph_store._vertex_offsets[dst_node_type] + + src_node_time = node_time_func(src_node_type, src_neg - src_node_offset) + dst_node_time = node_time_func(dst_node_type, dst_neg - dst_node_offset) + + target_samples = src_neg.numel() + valid_mask = (src_node_time <= seed_time) & (dst_node_time <= seed_time) + src_neg = src_neg[valid_mask] + dst_neg = dst_neg[valid_mask] + seed_time = seed_time[~valid_mask] + + # Matches the PyG API, attempts 5 times. + for _ in range(5): + diff = target_samples - src_neg.numel() + assert diff == seed_time.numel(), ( + "Diff should be equal to shape of seed_time." + ) + if diff <= 0: + break + src_neg_p, dst_neg_p = _call_plc_negative_sampling( + graph_store, diff, vertices, src_weight, dst_weight + ) + + src_time_p = node_time_func(src_node_type, src_neg_p - src_node_offset) + dst_time_p = node_time_func(dst_node_type, dst_neg_p - dst_node_offset) + + valid_mask = (src_time_p <= seed_time) & (dst_time_p <= seed_time) + src_neg_p = src_neg_p[valid_mask] + dst_neg_p = dst_neg_p[valid_mask] + src_neg = torch.concat([src_neg, src_neg_p]) + dst_neg = torch.concat([dst_neg, dst_neg_p]) + seed_time = seed_time[~valid_mask] + + if src_neg.numel() == 0: + # Generate subsample of pseudo-negative edges to avoid edge case where no negative edges are generated. + # In the next step, these will be used to choose the earlist occuring node for src/dst. + subsample_size = int(ceil(target_samples**0.5)) + src_neg = torch.randint( + src_node_offset, + src_node_offset + num_src_nodes, + (subsample_size,), + device="cuda", + dtype=torch.int64, + ) + dst_neg = torch.randint( + dst_node_offset, + dst_node_offset + num_dst_nodes, + (subsample_size,), + device="cuda", + dtype=torch.int64, + ) + diff = target_samples + else: + diff = target_samples - src_neg.numel() + if diff > 0: + # Select the earliest occuring node for src/dst and + # broadcast it to the invalid indices. + # Again, this matches the PyG API. + src_neg_p, dst_neg_p = _call_plc_negative_sampling( + graph_store, diff, vertices, src_weight, dst_weight + ) + + src_time_p = node_time_func(src_node_type, src_neg_p - src_node_offset) + invalid_src = src_time_p > seed_time + src_neg_p[invalid_src] = src_neg[ + node_time_func(src_node_type, src_neg - src_node_offset).argmin() ] - ) - dst_neg = torch.concat( - [ - dst_neg, - torch.randint( - 0, dst_neg.max(), (num_gen,), device="cuda", dtype=torch.int64 - ), + + dst_time_p = node_time_func(dst_node_type, dst_neg_p - dst_node_offset) + invalid_dst = dst_time_p > seed_time + dst_neg_p[invalid_dst] = dst_neg[ + node_time_func(dst_node_type, dst_neg - dst_node_offset).argmin() ] - ) - return src_neg, dst_neg - raise NotImplementedError( - "Temporal negative sampling is currently unimplemented in cuGraph-PyG" - ) + src_neg = torch.concat([src_neg, src_neg_p]) + dst_neg = torch.concat([dst_neg, dst_neg_p]) + + # The returned negative edges already have offsetted vertex IDs, + # and are valid input for the pylibcugraph sampler. + return src_neg, dst_neg def neg_cat( diff --git a/python/cugraph-pyg/cugraph_pyg/tests/loader/test_neighbor_loader.py b/python/cugraph-pyg/cugraph_pyg/tests/loader/test_neighbor_loader.py index 79d66355..c078ee8f 100644 --- a/python/cugraph-pyg/cugraph_pyg/tests/loader/test_neighbor_loader.py +++ b/python/cugraph-pyg/cugraph_pyg/tests/loader/test_neighbor_loader.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION. +# SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION. # SPDX-License-Identifier: Apache-2.0 import pytest @@ -919,3 +919,254 @@ def test_neighbor_loader_temporal_linkpred_heterogeneous(single_pytorch_worker, assert out["author", "writes", "paper"].num_sampled_edges.tolist() == [2, 2, 0] # FIXME resolve issues with num_sampled_nodes + + +@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") +@pytest.mark.parametrize("batch_size", [1, 2]) +@pytest.mark.parametrize("neg_sampling_mode", ["binary", "triplet"]) +@pytest.mark.sg +def test_link_neighbor_loader_temporal_negative_sampling_homogeneous( + single_pytorch_worker, batch_size, neg_sampling_mode +): + """ + Test temporal negative sampling for homogeneous graphs. + This test ensures that: + 1. Negative samples are generated respecting temporal constraints + 2. Negative sample nodes have timestamps <= edge_label_time + 3. Both positive and negative samples are present in the output + """ + # Create a homogeneous temporal graph with paper-paper citations + src_cite = torch.tensor([3, 2, 1, 2, 3, 4, 0]) # paper + dst_cite = torch.tensor([2, 1, 0, 0, 1, 2, 1]) # paper + tme_cite = torch.tensor([5, 6, 7, 3, 4, 8, 2]) # edge timestamps + + num_papers = 5 + + # Create node timestamps (papers are created/published at specific times) + node_time = torch.tensor([0, 1, 2, 3, 4]) # paper timestamps + + graph_store = GraphStore() + feature_store = FeatureStore() + + # Add paper-paper citations + graph_store[("paper", "cites", "paper"), "coo", False, (num_papers, num_papers)] = [ + dst_cite, + src_cite, + ] + + # Add time attributes for both edges and nodes + feature_store[("paper", "cites", "paper"), "time", None] = tme_cite + feature_store["paper", "time", None] = node_time + + # Create edge label index for link prediction + # We'll test prediction for a subset of edges + edge_label_index = torch.tensor([[3, 2], [2, 1]]) + edge_label_time = torch.tensor([10, 10]) # Future time for prediction + + # Configure negative sampling + if neg_sampling_mode == "binary": + neg_sampling = torch_geometric.sampler.NegativeSampling("binary", amount=2.0) + else: + neg_sampling = torch_geometric.sampler.NegativeSampling("triplet", amount=2.0) + + loader = cugraph_pyg.loader.LinkNeighborLoader( + (feature_store, graph_store), + num_neighbors=[2, 2], + batch_size=batch_size, + edge_label_index=edge_label_index, + edge_label_time=edge_label_time, + time_attr="time", + neg_sampling=neg_sampling, + shuffle=False, + ) + + # Test that the loader produces batches with proper temporal negative sampling + total_pos = total_neg = 0 + for i, batch in enumerate(loader): + # Check that we have edge labels + assert hasattr(batch, "edge_label") + assert hasattr(batch, "edge_label_index") + + # Should have both positive (1.0) and negative (0.0) labels + edge_labels = batch.edge_label + assert torch.any(edge_labels == 1.0), "Should have positive labels" + assert torch.any(edge_labels == 0.0), "Should have negative labels" + + # Verify negative sampling ratio + num_pos = (edge_labels == 1.0).sum().item() + num_neg = (edge_labels == 0.0).sum().item() + + total_pos += num_pos + total_neg += num_neg + + # Verify that edge label index has the correct shape + edge_label_idx = batch.edge_label_index + assert edge_label_idx.shape[0] == 2 + assert edge_label_idx.shape[1] == len(edge_labels) + + # For temporal negative sampling, verify that the negative sample nodes + # have timestamps that respect temporal constraints + # Negative samples should only use nodes that existed before edge_label_time + neg_mask = edge_labels == 0.0 + if neg_mask.sum() > 0: + neg_src = edge_label_idx[0, neg_mask] + neg_dst = edge_label_idx[1, neg_mask] + + # Convert to original node IDs and check their timestamps + src_node_ids = batch.n_id[neg_src.cpu()].cpu() + dst_node_ids = batch.n_id[neg_dst.cpu()].cpu() + + # All negative sample nodes should have timestamps <= edge_label_time + # for the corresponding batch element + for j in range(len(neg_src)): + src_id = src_node_ids[j].item() + dst_id = dst_node_ids[j].item() + # Node timestamps should be <= edge prediction time + assert node_time[src_id] <= edge_label_time[i * batch_size].item() + assert node_time[dst_id] <= edge_label_time[i * batch_size].item() + + assert total_neg == 2 * total_pos, ( + f"Expected 2:1 negative:positive ratio, got {total_neg}:{total_pos}" + ) + + # Verify we processed all batches + assert i >= 0 + + +@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") +@pytest.mark.parametrize("batch_size", [1, 2]) +@pytest.mark.parametrize("neg_sampling_mode", ["binary", "triplet"]) +@pytest.mark.sg +def test_link_neighbor_loader_temporal_negative_sampling_heterogeneous( + single_pytorch_worker, batch_size, neg_sampling_mode +): + """ + Test temporal negative sampling for heterogeneous graphs. + This test ensures that temporal constraints are properly enforced + across different node and edge types. + """ + # Create a heterogeneous graph with papers, authors, and citations + src_cite = torch.tensor([3, 2, 1, 2]) # paper + dst_cite = torch.tensor([2, 1, 0, 0]) # paper + tme_cite = torch.tensor([5, 6, 7, 3]) # edge timestamps + + src_author = torch.tensor([3, 2, 2, 1, 3, 2, 0]) # paper + dst_author = torch.tensor([0, 0, 1, 1, 2, 2, 2]) # author + tme_author = torch.tensor([4, 3, 5, 2, 7, 6, 1]) # edge timestamps + + num_papers = 4 + num_authors = 3 + + # Create node timestamps + paper_time = torch.tensor([0, 1, 2, 3]) # paper timestamps + author_time = torch.tensor([0, 1, 2]) # author timestamps + + graph_store = GraphStore() + feature_store = FeatureStore() + + # Add edges + graph_store[("paper", "cites", "paper"), "coo", False, (num_papers, num_papers)] = [ + dst_cite, + src_cite, + ] + graph_store[ + ("author", "writes", "paper"), "coo", False, (num_authors, num_papers) + ] = [ + dst_author, + src_author, + ] + + # Add time attributes for edges + feature_store[("paper", "cites", "paper"), "time", None] = tme_cite + feature_store[("author", "writes", "paper"), "time", None] = tme_author + + # Add time attributes for nodes + feature_store["paper", "time", None] = paper_time + feature_store["author", "time", None] = author_time + + # Create edge label index for author-paper relationships + edge_label_index = torch.stack( + [ + torch.tensor([0, 1, 2]), # authors + torch.tensor([3, 2, 1]), # papers + ] + ) + edge_label_time = torch.tensor([8, 8, 8]) # Future time for prediction + + # Configure negative sampling + if neg_sampling_mode == "binary": + neg_sampling = torch_geometric.sampler.NegativeSampling("binary", amount=2.0) + else: + neg_sampling = torch_geometric.sampler.NegativeSampling("triplet", amount=2.0) + + loader = cugraph_pyg.loader.LinkNeighborLoader( + (feature_store, graph_store), + num_neighbors={ + ("paper", "cites", "paper"): [2, 2], + ("author", "writes", "paper"): [2, 2], + }, + batch_size=batch_size, + edge_label_index=(("author", "writes", "paper"), edge_label_index), + edge_label_time=edge_label_time, + time_attr="time", + neg_sampling=neg_sampling, + shuffle=False, + ) + + # Test that the loader produces batches with proper temporal negative sampling + total_pos = total_neg = 0 + for i, batch in enumerate(loader): + # Check that we have the expected edge types + assert "author" in batch.node_types + assert "paper" in batch.node_types + + # Check edge label structure + assert [("author", "writes", "paper")] == list( + batch.edge_label_index_dict.keys() + ) + assert [("author", "writes", "paper")] == list(batch.edge_label_dict.keys()) + + # Should have both positive and negative labels + edge_labels = batch["author", "writes", "paper"].edge_label + assert torch.any(edge_labels == 1.0), "Should have positive labels" + assert torch.any(edge_labels == 0.0), "Should have negative labels" + + # Verify negative sampling ratio + num_pos = (edge_labels == 1.0).sum().item() + num_neg = (edge_labels == 0.0).sum().item() + total_pos += num_pos + total_neg += num_neg + + # Verify edge label index shape + edge_label_idx = batch["author", "writes", "paper"].edge_label_index + assert edge_label_idx.shape[0] == 2 + assert edge_label_idx.shape[1] == len(edge_labels) + + # Verify temporal constraints for negative samples + neg_mask = edge_labels == 0.0 + if neg_mask.sum() > 0: + neg_src = edge_label_idx[0, neg_mask] + neg_dst = edge_label_idx[1, neg_mask] + + # Convert to original node IDs + author_node_ids = batch["author"].n_id[neg_src.cpu()].cpu() + paper_node_ids = batch["paper"].n_id[neg_dst.cpu()].cpu() + + # All negative sample nodes should have timestamps <= edge_label_time + for j in range(len(neg_src)): + author_id = author_node_ids[j].item() + paper_id = paper_node_ids[j].item() + # Node timestamps should be <= edge prediction time + assert author_time[author_id] <= edge_label_time[i * batch_size].item() + assert paper_time[paper_id] <= edge_label_time[i * batch_size].item() + + # Verify that node IDs are valid + assert batch["author"].n_id.numel() > 0 + assert batch["paper"].n_id.numel() > 0 + + assert total_neg == 2 * total_pos, ( + f"Expected 2:1 negative:positive ratio, got {total_neg}:{total_pos}" + ) + + # Verify we processed all batches + assert i >= 0