diff --git a/causallearn/utils/FastKCI/FastKCI.py b/causallearn/utils/FastKCI/FastKCI.py new file mode 100644 index 0000000..f5fcc50 --- /dev/null +++ b/causallearn/utils/FastKCI/FastKCI.py @@ -0,0 +1,534 @@ +from causallearn.utils.KCI.KCI import GaussianKernel, Kernel +import numpy as np +from numpy.linalg import eigh +import scipy.stats as stats +from scipy.special import logsumexp +from sklearn.preprocessing import LabelEncoder +from joblib import Parallel, delayed +from sklearn.gaussian_process import GaussianProcessRegressor +from sklearn.gaussian_process.kernels import ConstantKernel, WhiteKernel, RBF +import warnings + + +class FastKCI_CInd(object): + """ + Python implementation of as speed-up version of the Kernel-based Conditional Independence (KCI) test. + Unconditional version. + + References + ---------- + [1] K. Zhang, J. Peters, D. Janzing, and B. Schölkopf, + "A kernel-based conditional independence test and application in causal discovery", In UAI 2011. + [2] M. Zhang, and S. Williamson, + "Embarrassingly Parallel Inference for Gaussian Processes", In JMLR 20 (2019) + [3] O. Schacht, and B. Huang + "FastKCI: A fast Kernel-based Conditional Independence test with application to causal discovery", + Working Paper. + + """ + def __init__(self, K=10, J=8, alpha=500, epsilon=1e-3, eig_thresh=1e-6, use_gp=False): + """ + Initialize the FastKCI_CInd object. + + Parameters + ---------- + K: Number of Gaussians that are assumed to be in the mixture. + J: Number of independent repittitions. + alpha: Parameter for the Dirichlet distribution. + epsilon: Penalty for the matrix ridge regression. + eig_threshold: Threshold for Eigenvalues. + use_gp: Whether to use Gaussian Process Regression to determine the kernel widths + """ + self.K = K + self.J = J + self.alpha = alpha + self.epsilon = epsilon + self.eig_thresh = eig_thresh + self.use_gp = use_gp + self.nullss = 5000 + + def compute_pvalue(self, data_x=None, data_y=None, data_z=None): + """ + Main function: compute the p value of H_0: X|Y conditional on Z. + + Parameters + ---------- + data_x: input data for x (nxd1 array) + data_y: input data for y (nxd2 array) + data_z: input data for z (nxd3 array) + + Returns + _________ + pvalue: p value (scalar) + test_stat: test statistic (scalar) + """ + self.data = [data_x, data_y] + self.data_z = data_z + self.n = data_x.shape[0] + + self.Z_proposal = Parallel(n_jobs=-1)(delayed(self.partition_data)() for i in range(self.J)) + block_res = Parallel(n_jobs=-1)(delayed(self.pvalue_onblocks)(self.Z_proposal[i]) for i in range(self.J)) + test_stat, null_samples, log_likelihood = zip(*block_res) + + log_likelihood = np.array(log_likelihood) + self.all_null_samples = np.vstack(null_samples) + self.all_p = np.array([np.sum(self.all_null_samples[i,] > test_stat[i]) / float(self.nullss) for i in range(self.J)]) + self.prob_weights = np.around(np.exp(log_likelihood-logsumexp(log_likelihood)), 6) + self.all_test_stats = np.array(test_stat) + self.test_stat = np.average(np.array(test_stat), weights=self.prob_weights) + self.null_samples = np.average(null_samples, axis=0, weights=self.prob_weights) + self.pvalue = np.sum(self.null_samples > self.test_stat) / float(self.nullss) + + return self.pvalue, self.test_stat + + def partition_data(self): + """ + Partitions data into K Gaussians following an expectation maximization approach on Z. + + Returns + _________ + Z: Latent Gaussian that was drawn for each observation (nx1 array) + prob_Z: Log-Likelihood of that random assignment (scalar) + """ + Z_mean = self.data_z.mean(axis=0) + Z_sd = self.data_z.std(axis=0) + mu_k = np.random.normal(Z_mean, Z_sd, size=(self.K, self.data_z.shape[1])) + sigma_k = np.eye(self.data_z.shape[1]) + pi_j = np.random.dirichlet([self.alpha]*self.K) + ll = np.tile(np.log(pi_j), (self.n, 1)) + for k in range(self.K): + ll[:, k] += stats.multivariate_normal.logpdf(self.data_z, mu_k[k, :], cov=sigma_k, allow_singular=True) + Z = np.array([np.random.multinomial(1, np.exp(ll[n, :]-logsumexp(ll[n, :]))).argmax() for n in range(self.n)]) + le = LabelEncoder() + Z = le.fit_transform(Z) + return Z + + def pvalue_onblocks(self, Z_proposal): + """ + Calculate p value on given partitions of the data. + + Parameters + ---------- + Z_proposal: partition of the data into K clusters (nxd1 array) + + Returns + _________ + test_stat: test statistic (scalar) + null_samples: bootstrapped sampled of the null distribution (self.nullssx1 array) + log_likelihood: estimated probability P(X,Y|Z,k) + """ + unique_Z_j = np.unique(Z_proposal) + test_stat = 0 + log_likelihood = 0 + null_samples = np.zeros((1, self.nullss)) + for k in unique_Z_j: + K_mask = (Z_proposal == k) + X_k = np.copy(self.data[0][K_mask]) + Y_k = np.copy(self.data[1][K_mask]) + Z_k = np.copy(self.data_z[K_mask]) + if (Z_k.shape[0] < 6): # small blocks cause problems in GP + continue + Kx, Ky, Kzx, Kzy, epsilon_x, epsilon_y, likelihood_x, likelihood_y = self.kernel_matrix(X_k, Y_k, Z_k) + KxR, Rzx = Kernel.center_kernel_matrix_regression(Kx, Kzx, epsilon_x) + if epsilon_x != epsilon_y: + KyR, Rzy = Kernel.center_kernel_matrix_regression(Ky, Kzy, epsilon_y) + else: + KyR = Rzx.dot(Ky.dot(Rzx)) + test_stat += np.einsum('ij,ji->', KxR, KyR) + uu_prod, size_u = self.get_uuprod(KxR, KyR) + null_samples += self.null_sample_spectral(uu_prod, size_u, Kx.shape[0]) + log_likelihood += likelihood_x + likelihood_y + return test_stat, null_samples, log_likelihood + + def kernel_matrix(self, data_x, data_y, data_z): + """ + Calculates the Gaussian Kernel for given data inputs as well as the shared kernel. + + Returns + _________ + K: Kernel matrices (n_kxn_k array) + """ + kernel_obj = GaussianKernel() + kernel_obj.set_width_empirical_kci(data_z) + + data_x = stats.zscore(data_x, ddof=1, axis=0) + data_x[np.isnan(data_x)] = 0. + + data_y = stats.zscore(data_y, ddof=1, axis=0) + data_y[np.isnan(data_y)] = 0. + + data_z = stats.zscore(data_z, ddof=1, axis=0) + data_z[np.isnan(data_z)] = 0. + + data_x = np.concatenate((data_x, 0.5 * data_z), axis=1) + + kernelX = GaussianKernel() + kernelX.set_width_empirical_kci(data_z) + kernelY = GaussianKernel() + kernelY.set_width_empirical_kci(data_z) + + Kx = kernelX.kernel(data_x) + Ky = kernelY.kernel(data_y) + + # centering kernel matrix + Kx = Kernel.center_kernel_matrix(Kx) + Ky = Kernel.center_kernel_matrix(Ky) + + if not self.use_gp: + kernelZ = GaussianKernel() + kernelZ.set_width_empirical_kci(data_z) + Kzx = kernelZ.kernel(data_z) + Kzx = Kernel.center_kernel_matrix(Kzx) + Kzy = Kzx + epsilon_x, epsilon_y = self.epsilon, self.epsilon + gpx = GaussianProcessRegressor() + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=Warning) + # P(X|Z) + gpx.fit(X=data_z, y=data_x) + likelihood_x = gpx.log_marginal_likelihood_value_ + gpy = GaussianProcessRegressor() + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=Warning) + # P(Y|X,Z) + gpy.fit(X=np.c_[data_x, data_z], y=data_y) + likelihood_y = gpy.log_marginal_likelihood_value_ + + else: + n, Dz = data_z.shape + + widthz = np.sqrt(1.0 / (kernelX.width * data_x.shape[1])) + + # Instantiate a Gaussian Process model for x + wx, vx = eigh(Kx) + topkx = int(np.max([np.min([400, np.floor(n / 4)]), np.min([n+1, 8])])) + idx = np.argsort(-wx) + wx = wx[idx] + vx = vx[:, idx] + wx = wx[0:topkx] + vx = vx[:, 0:topkx] + vx = vx[:, np.abs(wx) > np.abs(wx).max() * 1e-10] + wx = wx[np.abs(wx) > np.abs(wx).max() * 1e-10] + vx = 2 * np.sqrt(n) * vx.dot(np.diag(np.sqrt(wx))) / np.sqrt(wx[0]) + kernelx = ConstantKernel(1.0, (1e-3, 1e3)) * RBF(widthz * np.ones(Dz), (1e-2, 1e2)) \ + + WhiteKernel(0.1, (1e-10, 1e+1)) + gpx = GaussianProcessRegressor(kernel=kernelx) + # fit Gaussian process, including hyperparameter optimization + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=Warning) + gpx.fit(data_z, vx) + + # construct Gaussian kernels according to learned hyperparameters + Kzx = gpx.kernel_.k1(data_z, data_z) + epsilon_x = np.exp(gpx.kernel_.theta[-1]) + likelihood_x = gpx.log_marginal_likelihood_value_ + + # Instantiate a Gaussian Process model for y + wy, vy = eigh(Ky) + topky = int(np.max([np.min([400, np.floor(n / 4)]), np.min([n+1, 8])])) + idy = np.argsort(-wy) + wy = wy[idy] + vy = vy[:, idy] + wy = wy[0:topky] + vy = vy[:, 0:topky] + vy = vy[:, np.abs(wy) > np.abs(wy).max() * 1e-10] + wy = wy[np.abs(wy) > np.abs(wy).max() * 1e-10] + vy = 2 * np.sqrt(n) * vy.dot(np.diag(np.sqrt(wy))) / np.sqrt(wy[0]) + kernely = ConstantKernel(1.0, (1e-3, 1e3)) * RBF(widthz * np.ones(Dz), (1e-2, 1e2)) \ + + WhiteKernel(0.1, (1e-10, 1e+1)) + gpy = GaussianProcessRegressor(kernel=kernely) + # fit Gaussian process, including hyperparameter optimization + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=Warning) + gpy.fit(data_z, vy) + + # construct Gaussian kernels according to learned hyperparameters + Kzy = gpy.kernel_.k1(data_z, data_z) + epsilon_y = np.exp(gpy.kernel_.theta[-1]) + likelihood_y = gpy.log_marginal_likelihood_value_ + + return Kx, Ky, Kzx, Kzy, epsilon_x, epsilon_y, likelihood_x, likelihood_y + + def get_uuprod(self, Kx, Ky): + """ + Compute eigenvalues for null distribution estimation + + Parameters + ---------- + Kx: centralized kernel matrix for data_x (nxn) + Ky: centralized kernel matrix for data_y (nxn) + + Returns + _________ + uu_prod: product of the eigenvectors of Kx and Ky + size_u: number of produced eigenvectors + + """ + n_block = Kx.shape[0] + wx, vx = eigh(Kx) + wy, vy = eigh(Ky) + idx = np.argsort(-wx) + idy = np.argsort(-wy) + wx = wx[idx] + vx = vx[:, idx] + wy = wy[idy] + vy = vy[:, idy] + vx = vx[:, wx > np.max(wx) * self.eig_thresh] + wx = wx[wx > np.max(wx) * self.eig_thresh] + vy = vy[:, wy > np.max(wy) * self.eig_thresh] + wy = wy[wy > np.max(wy) * self.eig_thresh] + vx = vx.dot(np.diag(np.sqrt(wx))) + vy = vy.dot(np.diag(np.sqrt(wy))) + + # calculate their product + num_eigx = vx.shape[1] + num_eigy = vy.shape[1] + size_u = num_eigx * num_eigy + uu = np.zeros((n_block, size_u)) + for i in range(0, num_eigx): + for j in range(0, num_eigy): + uu[:, i * num_eigy + j] = vx[:, i] * vy[:, j] + + if size_u > self.n: + uu_prod = uu.dot(uu.T) + else: + uu_prod = uu.T.dot(uu) + + return uu_prod, size_u + + def get_kappa(self, mean_appr, var_appr): + """ + Get parameters for the approximated gamma distribution + Parameters + ---------- + uu_prod: product of the eigenvectors of Kx and Ky + + Returns + ---------- + k_appr, theta_appr: approximated parameters of the gamma distribution + + """ + k_appr = mean_appr ** 2 / var_appr + theta_appr = var_appr / mean_appr + return k_appr, theta_appr + + def null_sample_spectral(self, uu_prod, size_u, T): + """ + Simulate data from null distribution + + Parameters + ---------- + uu_prod: product of the eigenvectors of Kx and Ky + size_u: number of produced eigenvectors + T: sample size + + Returns + _________ + null_dstr: samples from the null distribution + + """ + eig_uu = np.linalg.eigvalsh(uu_prod) + eig_uu = -np.sort(-eig_uu) + eig_uu = eig_uu[0:np.min((T, size_u))] + eig_uu = eig_uu[eig_uu > np.max(eig_uu) * self.eig_thresh] + + f_rand = np.random.chisquare(1, (eig_uu.shape[0], self.nullss)) + null_dstr = eig_uu.T.dot(f_rand) + return null_dstr + + +class FastKCI_UInd(object): + """ + Python implementation of as speed-up version of the Kernel-based Conditional Independence (KCI) test. + Unconditional version. + + References + ---------- + [1] K. Zhang, J. Peters, D. Janzing, and B. Schölkopf, + "A kernel-based conditional independence test and application in causal discovery," In UAI 2011. + [2] M. Zhang, and S. Williamson, + "Embarrassingly Parallel Inference for Gaussian Processes" In JMLR 20 (2019) + [3] O. Schacht, and B. Huang + "FastKCI: A fast Kernel-based Conditional Independence test with application to causal discovery", + Working Paper. + """ + def __init__(self, K=10, J=8, alpha=500): + """ + Construct the FastKCI_UInd model. + + Parameters + ---------- + K: Number of Gaussians that are assumed to be in the mixture + J: Number of independent repittitions. + alpha: Parameter for the Dirichlet distribution. + """ + self.K = K + self.J = J + self.alpha = alpha + self.nullss = 5000 + self.eig_thresh = 1e-5 + + def compute_pvalue(self, data_x=None, data_y=None): + """ + Main function: compute the p value and return it together with the test statistic + + Parameters + ---------- + data_x: input data for x (nxd1 array) + data_y: input data for y (nxd2 array) + + Returns + _________ + pvalue: p value (scalar) + test_stat: test statistic (scalar) + """ + self.data_x = data_x + self.data_y = data_y + self.n = data_x.shape[0] + + Z_proposal = Parallel(n_jobs=-1)(delayed(self.partition_data)() for i in range(self.J)) + self.Z_proposal, self.prob_Y = zip(*Z_proposal) + block_res = Parallel(n_jobs=-1)(delayed(self.pvalue_onblocks)(self.Z_proposal[i]) for i in range(self.J)) + test_stat, null_samples, log_likelihood = zip(*block_res) + self.prob_weights = np.around(np.exp(log_likelihood-logsumexp(log_likelihood)), 6) + self.test_stat = np.average(np.array(test_stat), weights=self.prob_weights) + self.null_samples = np.average(null_samples, axis=0, weights=self.prob_weights) + self.pvalue = np.sum(self.null_samples > self.test_stat) / float(self.nullss) + + return self.pvalue, self.test_stat + + def partition_data(self): + """ + Partitions data into K Gaussians + + Returns + _________ + Z: Latent Gaussian that was drawn for each observation (nx1 array) + prob_Y: Log-Likelihood of that random assignment (scalar) + """ + Y_mean = self.data_y.mean(axis=0) + Y_sd = self.data_y.std(axis=0) + mu_k = np.random.normal(Y_mean, Y_sd, size=(self.K, self.data_y.shape[1])) + sigma_k = np.eye(self.data_y.shape[1]) + pi_j = np.random.dirichlet([self.alpha]*self.K) + ll = np.tile(np.log(pi_j), (self.n, 1)) + for k in range(self.K): + ll[:, k] += stats.multivariate_normal.logpdf(self.data_y, mu_k[k, :], cov=sigma_k, allow_singular=True) + Z = np.array([np.random.multinomial(1, np.exp(ll[n, :]-logsumexp(ll[n, :]))).argmax() for n in range(self.n)]) + prop_Y = np.take_along_axis(ll, Z[:, None], axis=1).sum() + le = LabelEncoder() + Z = le.fit_transform(Z) + return (Z, prop_Y) + + def pvalue_onblocks(self, Z_proposal): + unique_Z_j = np.unique(Z_proposal) + test_stat = 0 + log_likelihood = 0 + null_samples = np.zeros((1, self.nullss)) + for k in unique_Z_j: + K_mask = (Z_proposal == k) + X_k = np.copy(self.data_x[K_mask]) + Y_k = np.copy(self.data_y[K_mask]) + + Kx = self.kernel_matrix(X_k) + Ky = self.kernel_matrix(Y_k) + + v_stat, Kxc, Kyc = self.HSIC_V_statistic(Kx, Ky) + + null_samples += self.null_sample_spectral(Kxc, Kyc) + test_stat += v_stat + + gpx = GaussianProcessRegressor() + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=Warning) + # P(X|Y) + gpx.fit(X=Y_k, y=X_k) + likelihood = gpx.log_marginal_likelihood_value_ + log_likelihood += likelihood + + return test_stat, null_samples, log_likelihood + + def kernel_matrix(self, data): + """ + Calculates the Gaussian Kernel for given data inputs. + + Returns + _________ + K: Kernel matrix (n_kxn_k array) + """ + kernel_obj = GaussianKernel() + kernel_obj.set_width_empirical_hsic(data) + + data = stats.zscore(data, ddof=1, axis=0) + data[np.isnan(data)] = 0. + + K = kernel_obj.kernel(data) + + return K + + def get_kappa(self, mean_appr, var_appr): + """ + Get parameters for the approximated gamma distribution + Parameters + ---------- + Kx: kernel matrix for data_x (nxn) + Ky: kernel matrix for data_y (nxn) + + Returns + _________ + k_appr, theta_appr: approximated parameters of the gamma distribution + """ + k_appr = mean_appr ** 2 / var_appr + theta_appr = var_appr / mean_appr + return k_appr, theta_appr + + def null_sample_spectral(self, Kxc, Kyc): + """ + Simulate data from null distribution + + Parameters + ---------- + Kxc: centralized kernel matrix for data_x (nxn) + Kyc: centralized kernel matrix for data_y (nxn) + + Returns + _________ + null_dstr: samples from the null distribution + + """ + T = Kxc.shape[0] + if T > 1000: + num_eig = int(np.floor(T / 2)) + else: + num_eig = T + lambdax = np.linalg.eigvalsh(Kxc) + lambday = np.linalg.eigvalsh(Kyc) + lambdax = -np.sort(-lambdax) + lambday = -np.sort(-lambday) + lambdax = lambdax[0:num_eig] + lambday = lambday[0:num_eig] + lambda_prod = np.dot(lambdax.reshape(num_eig, 1), lambday.reshape(1, num_eig)).reshape( + (num_eig ** 2, 1)) + lambda_prod = lambda_prod[lambda_prod > lambda_prod.max() * self.eig_thresh] + f_rand = np.random.chisquare(1, (lambda_prod.shape[0], self.nullss)) + null_dstr = lambda_prod.T.dot(f_rand) / T + return null_dstr + + def HSIC_V_statistic(self, Kx, Ky): + """ + Compute V test statistic from kernel matrices Kx and Ky + Parameters + ---------- + Kx: kernel matrix for data_x (nxn) + Ky: kernel matrix for data_y (nxn) + + Returns + _________ + Vstat: HSIC v statistics + Kxc: centralized kernel matrix for data_x (nxn) + Kyc: centralized kernel matrix for data_y (nxn) + """ + Kxc = Kernel.center_kernel_matrix(Kx) + Kyc = Kernel.center_kernel_matrix(Ky) + V_stat = np.einsum('ij,ij->', Kxc, Kyc) + return V_stat, Kxc, Kyc diff --git a/causallearn/utils/FastKCI/__init__.py b/causallearn/utils/FastKCI/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/causallearn/utils/RCIT/RCIT.py b/causallearn/utils/RCIT/RCIT.py new file mode 100644 index 0000000..11608ed --- /dev/null +++ b/causallearn/utils/RCIT/RCIT.py @@ -0,0 +1,403 @@ +import numpy as np +import itertools +from scipy.stats import chi2 +from scipy.linalg import pinv +from momentchi2 import hbe, sw, lpb4 +from scipy.spatial.distance import pdist, squareform + + +class RCIT(object): + """ + Python implementation of Randomized Conditional Independence Test (RCIT) test. + The original R implementation can be found at https://github.com/ericstrobl/RCIT/tree/master + + References + ---------- + [1] Strobl, E. V., Zhang, K., and Visweswaran, S. (2019). "Approximate kernel-based conditional + independence tests for fast non-parametric causal discovery." Journal of Causal Inference, 7(1), 20180017. + """ + def __init__(self, approx="lpd4", num_f=100, num_f2=5, rcit=True): + """ + Initialize the RCIT object. + + Parameters + ---------- + approx : str + Method for approximating the null distribution. + - "lpd4" for the Lindsay-Pilla-Basak method + - "hbe" for the Hall-Buckley-Eagleson method + - "gamma" for the Satterthwaite-Welch method + - "chi2" for a normalized chi-squared statistic + - "perm" for permutation testing + Default is "lpd4". + num_f : int + Number of features for conditioning set. Default is 25. + num_f2 : int + Number of features for non-conditioning sets. Default is 5. + rcit : bool + Whether to use RCIT or RCoT. Default is True. + """ + self.approx = approx + self.num_f = num_f + self.num_f2 = num_f2 + self.rcit = rcit + + def compute_pvalue(self, data_x, data_y, data_z): + """ + Compute the p value and return it together with the test statistic. + + Parameters + ---------- + data_x: input data for x (nxd1 array) + data_y: input data for y (nxd2 array) + data_z: input data for z (nxd3 array) + + Returns + ------- + p: p value + sta: test statistic + """ + d = data_z.shape[1] + r = data_x.shape[0] + r1 = 500 if (r > 500) else r + + data_x = (data_x - data_x.mean(axis=0)) / data_x.std(axis=0, ddof=1) + data_y = (data_y - data_y.mean(axis=0)) / data_y.std(axis=0, ddof=1) + data_z = (data_z - data_z.mean(axis=0)) / data_z.std(axis=0, ddof=1) + + if self.rcit: + data_y = np.column_stack((data_y, data_z)) + + sigma = dict() + for key, value in [("x", data_x), ("y", data_y), ("z", data_z)]: + distances = pdist(value[:r1, :], metric='euclidean') + flattened_distances = squareform(distances).ravel() + non_zero_distances = flattened_distances[flattened_distances != 0] + sigma[key] = np.median(non_zero_distances) + + four_z = self.random_fourier_features(data_z, num_f=self.num_f, sigma=sigma["z"]) + four_x = self.random_fourier_features(data_x, num_f=self.num_f2, sigma=sigma["x"]) + four_y = self.random_fourier_features(data_y, num_f=self.num_f2, sigma=sigma["y"]) + + f_x = (four_x - four_x.mean(axis=0)) / four_x.std(axis=0, ddof=1) + f_y = (four_y - four_y.mean(axis=0)) / four_y.std(axis=0, ddof=1) + f_z = (four_z - four_z.mean(axis=0)) / four_z.std(axis=0, ddof=1) + + Cxy = f_x.T @ f_y / (f_x.shape[0] - 1) + + Czz = np.cov(f_z, rowvar=False) + + regularized_Czz = Czz + np.eye(self.num_f) * 1e-10 + L = np.linalg.cholesky(regularized_Czz) + i_Czz = np.linalg.inv(L).T.dot(np.linalg.inv(L)) + + Cxz = f_x.T @ f_z / (f_x.shape[0] - 1) + Czy = f_z.T @ f_y / (f_z.shape[0] - 1) + + z_i_Czz = f_z @ i_Czz + e_x_z = z_i_Czz @ Cxz.T + e_y_z = z_i_Czz @ Czy + + res_x = f_x - e_x_z + res_y = f_y - e_y_z + + if self.num_f2 == 1: + self.approx = "hbe" + + if self.approx == "perm": + Cxy_z = self.matrix_cov(res_x, res_y) + sta = r * np.sum(Cxy_z**2) + + nperm = 1000 + + stas = [] + for _ in range(nperm): + perm = np.random.choice(np.arange(r), size=r, replace=False) + Cxy = self.matrix_cov(res_x[perm, ], res_y) + sta_p = r * np.sum(Cxy**2) + stas.append(sta_p) + + p = 1 - (np.sum(sta >= stas) / len(stas)) + + else: + Cxy_z = Cxy - Cxz @ i_Czz @ Czy + sta = r * np.sum(Cxy_z**2) + + d = list(itertools.product(range(f_x.shape[1]), range(f_y.shape[1]))) + res = np.array([res_x[:, idx_x] * res_y[:, idx_y] for idx_x, idx_y in d]).T + Cov = 1/r * res.T @ res + + if self.approx == "chi2": + i_Cov = pinv(Cov) + + sta = r * (np.dot(Cxy_z.flatten(), np.dot(i_Cov, Cxy_z.flatten()))) + p = 1 - chi2.cdf(sta, Cxy_z.size) + + else: + eigenvalues, eigenvectors = np.linalg.eigh(Cov) + eig_d = eigenvalues[eigenvalues > 0] + + if self.approx == "gamma": + p = 1 - sw(eig_d, sta) + + elif self.approx == "hbe": + p = 1 - hbe(eig_d, sta) + + elif self.approx == "lpd4": + try: + p = 1 - lpb4(eig_d, sta) + except Exception: + p = 1 - hbe(eig_d, sta) + if np.isnan(p): + p = 1 - hbe(eig_d, sta) + + if (p < 0): + p = 0 + + return p, sta + + def random_fourier_features(self, x, w=None, b=None, num_f=None, sigma=None): + """ + Generate random Fourier features. + + Parameters + ---------- + x : np.ndarray + Random variable x. + w : np.ndarray + RRandom coefficients. + b : np.ndarray + Random offsets. + num_f : int + Number of random Fourier features. + sigma : float + Smooth parameter of RBF kernel. + + Returns + ------- + feat : np.ndarray + Random Fourier features. + """ + if num_f is None: + num_f = 25 + + r = x.shape[0] + c = x.shape[1] + + if ((sigma == 0) | (sigma is None)): + sigma = 1 + + if w is None: + w = (1/sigma) * np.random.normal(size=(num_f, c)) + b = np.tile(2*np.pi*np.random.uniform(size=(num_f, 1)), (1, r)) + + feat = np.sqrt(2) * (np.cos(w[0:num_f, 0:c] @ x.T + b[0:num_f, :])).T + + return feat + + def matrix_cov(self, mat_a, mat_b): + """ + Compute the covariance matrix between two matrices. + Equivalent to ``cov()`` between two matrices in R. + + Parameters + ---------- + mat_a : np.ndarray + First data matrix. + mat_b : np.ndarray + Second data matrix. + + Returns + ------- + mat_cov : np.ndarray + Covariance matrix. + """ + n_obs = mat_a.shape[0] + + assert mat_a.shape == mat_b.shape + mat_a = mat_a - mat_a.mean(axis=0) + mat_b = mat_b - mat_b.mean(axis=0) + + mat_cov = mat_a.T @ mat_b / (n_obs - 1) + + return mat_cov + + +class RIT(object): + """ + Python implementation of Randomized Independence Test (RIT) test. + The original R implementation can be found at https://github.com/ericstrobl/RCIT/tree/master + + References + ---------- + [1] Strobl, E. V., Zhang, K., and Visweswaran, S. (2019). "Approximate kernel-based conditional + independence tests for fast non-parametric causal discovery." Journal of Causal Inference, 7(1), 20180017. + """ + def __init__(self, approx="lpd4"): + """ + Initialize the RIT object. + + Parameters + ---------- + approx : str + Method for approximating the null distribution. + - "lpd4" for the Lindsay-Pilla-Basak method + - "hbe" for the Hall-Buckley-Eagleson method + - "gamma" for the Satterthwaite-Welch method + - "chi2" for a normalized chi-squared statistic + - "perm" for permutation testing + Default is "lpd4". + """ + self.approx = approx + + def compute_pvalue(self, data_x, data_y): + """ + Compute the p value and return it together with the test statistic. + + Parameters + ---------- + data_x: input data for x (nxd1 array) + data_y: input data for y (nxd2 array) + data_z: input data for z (nxd3 array) + + Returns + ------- + p: p value + sta: test statistic + """ + r = data_x.shape[0] + r1 = 500 if (r > 500) else r + + data_x = (data_x - data_x.mean(axis=0)) / data_x.std(axis=0, ddof=1) + data_y = (data_y - data_y.mean(axis=0)) / data_y.std(axis=0, ddof=1) + + sigma = dict() + for key, value in [("x", data_x), ("y", data_y)]: + distances = pdist(value[:r1, :], metric='euclidean') + flattened_distances = squareform(distances).ravel() + non_zero_distances = flattened_distances[flattened_distances != 0] + sigma[key] = np.median(non_zero_distances) + + four_x = self.random_fourier_features(data_x, num_f=5, sigma=sigma["x"]) + four_y = self.random_fourier_features(data_y, num_f=5, sigma=sigma["y"]) + + f_x = (four_x - four_x.mean(axis=0)) / four_x.std(axis=0, ddof=1) + f_y = (four_y - four_y.mean(axis=0)) / four_y.std(axis=0, ddof=1) + + Cxy = self.matrix_cov(f_x, f_y) + sta = r * np.sum(Cxy**2) + + if self.approx == "perm": + nperm = 1000 + + stas = [] + for _ in range(nperm): + perm = np.random.choice(np.arange(r), size=r, replace=False) + Cxy = self.matrix_cov(f_x[perm, ], f_y) + sta_p = r * np.sum(Cxy**2) + stas.append(sta_p) + + p = 1 - (np.sum(sta >= stas) / len(stas)) + + else: + res_x = f_x - f_x.mean(axis=0) + res_y = f_y - f_y.mean(axis=0) + + d = list(itertools.product(range(f_x.shape[1]), range(f_y.shape[1]))) + res = np.array([res_x[:, idx_x] * res_y[:, idx_y] for idx_x, idx_y in d]).T + Cov = 1/r * res.T @ res + + if self.approx == "chi2": + i_Cov = pinv(Cov) + + sta = r * (np.dot(Cxy.flatten(), np.dot(i_Cov, Cxy.flatten()))) + p = 1 - chi2.cdf(sta, Cxy.size) + + else: + eigenvalues, eigenvectors = np.linalg.eigh(Cov) + eig_d = eigenvalues[eigenvalues > 0] + + if self.approx == "gamma": + p = 1 - sw(eig_d, sta) + + elif self.approx == "hbe": + p = 1 - hbe(eig_d, sta) + + elif self.approx == "lpd4": + try: + p = 1 - lpb4(eig_d, sta) + except Exception: + p = 1 - hbe(eig_d, sta) + if np.isnan(p): + p = 1 - hbe(eig_d, sta) + + if (p < 0): + p = 0 + + return p, sta + + def random_fourier_features(self, x, w=None, b=None, num_f=None, sigma=None): + """ + Generate random Fourier features. + + Parameters + ---------- + x : np.ndarray + Random variable x. + w : np.ndarray + RRandom coefficients. + b : np.ndarray + Random offsets. + num_f : int + Number of random Fourier features. + sigma : float + Smooth parameter of RBF kernel. + + Returns + ------- + feat : np.ndarray + Random Fourier features. + """ + if num_f is None: + num_f = 25 + + r = x.shape[0] + c = x.shape[1] + + if ((sigma == 0) | (sigma is None)): + sigma = 1 + + if w is None: + w = (1/sigma) * np.random.normal(size=(num_f, c)) + b = np.tile(2*np.pi*np.random.uniform(size=(num_f, 1)), (1, r)) + + feat = np.sqrt(2) * (np.cos(w[0:num_f, 0:c] @ x.T + b[0:num_f, :])).T + + return feat + + def matrix_cov(self, mat_a, mat_b): + """ + Compute the covariance matrix between two matrices. + Equivalent to ``cov()`` between two matrices in R. + + Parameters + ---------- + mat_a : np.ndarray + First data matrix. + mat_b : np.ndarray + Second data matrix. + + Returns + ------- + mat_cov : np.ndarray + Covariance matrix. + """ + n_obs = mat_a.shape[0] + + assert mat_a.shape == mat_b.shape + mat_a = mat_a - mat_a.mean(axis=0) + mat_b = mat_b - mat_b.mean(axis=0) + + mat_cov = mat_a.T @ mat_b / (n_obs - 1) + + return mat_cov diff --git a/causallearn/utils/RCIT/__init__.py b/causallearn/utils/RCIT/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/causallearn/utils/cit.py b/causallearn/utils/cit.py index 8119dd7..fbc95e9 100644 --- a/causallearn/utils/cit.py +++ b/causallearn/utils/cit.py @@ -5,6 +5,9 @@ from scipy.stats import chi2, norm from causallearn.utils.KCI.KCI import KCI_CInd, KCI_UInd +from causallearn.utils.FastKCI.FastKCI import FastKCI_CInd, FastKCI_UInd +from causallearn.utils.RCIT.RCIT import RCIT as RCIT_CInd +from causallearn.utils.RCIT.RCIT import RIT as RCIT_UInd from causallearn.utils.PCUtils import Helper CONST_BINCOUNT_UNIQUE_THRESHOLD = 1e5 @@ -13,6 +16,8 @@ mv_fisherz = "mv_fisherz" mc_fisherz = "mc_fisherz" kci = "kci" +rcit = "rcit" +fastkci = "fastkci" chisq = "chisq" gsq = "gsq" d_separation = "d_separation" @@ -23,8 +28,8 @@ def CIT(data, method='fisherz', **kwargs): Parameters ---------- data: numpy.ndarray of shape (n_samples, n_features) - method: str, in ["fisherz", "mv_fisherz", "mc_fisherz", "kci", "chisq", "gsq"] - kwargs: placeholder for future arguments, or for KCI specific arguments now + method: str, in ["fisherz", "mv_fisherz", "mc_fisherz", "kci", "rcit", "fastkci", "chisq", "gsq"] + kwargs: placeholder for future arguments, or for KCI, FastKCI or RCIT specific arguments now TODO: utimately kwargs should be replaced by explicit named parameters. check https://github.com/cmu-phil/causal-learn/pull/62#discussion_r927239028 ''' @@ -32,6 +37,10 @@ def CIT(data, method='fisherz', **kwargs): return FisherZ(data, **kwargs) elif method == kci: return KCI(data, **kwargs) + elif method == fastkci: + return FastKCI(data, **kwargs) + elif method == rcit: + return RCIT(data, **kwargs) elif method in [chisq, gsq]: return Chisq_or_Gsq(data, method_name=method, **kwargs) elif method == mv_fisherz: @@ -43,6 +52,7 @@ def CIT(data, method='fisherz', **kwargs): else: raise ValueError("Unknown method: {}".format(method)) + class CIT_Base(object): # Base class for CIT, contains basic operations for input check and caching, etc. def __init__(self, data, cache_path=None, **kwargs): @@ -193,6 +203,50 @@ def __call__(self, X, Y, condition_set=None): self.pvalue_cache[cache_key] = p return p +class FastKCI(CIT_Base): + def __init__(self, data, **kwargs): + super().__init__(data, **kwargs) + kci_ui_kwargs = {k: v for k, v in kwargs.items() if k in + ['K', 'J', 'alpha']} + kci_ci_kwargs = {k: v for k, v in kwargs.items() if k in + ['K', 'J', 'alpha', 'use_gp']} + self.check_cache_method_consistent( + 'kci', hashlib.md5(json.dumps(kci_ci_kwargs, sort_keys=True).encode('utf-8')).hexdigest()) + self.assert_input_data_is_valid() + self.kci_ui = FastKCI_UInd(**kci_ui_kwargs) + self.kci_ci = FastKCI_CInd(**kci_ci_kwargs) + + def __call__(self, X, Y, condition_set=None): + # Kernel-based conditional independence test. + Xs, Ys, condition_set, cache_key = self.get_formatted_XYZ_and_cachekey(X, Y, condition_set) + if cache_key in self.pvalue_cache: return self.pvalue_cache[cache_key] + p = self.kci_ui.compute_pvalue(self.data[:, Xs], self.data[:, Ys])[0] if len(condition_set) == 0 else \ + self.kci_ci.compute_pvalue(self.data[:, Xs], self.data[:, Ys], self.data[:, condition_set])[0] + self.pvalue_cache[cache_key] = p + return p + +class RCIT(CIT_Base): + def __init__(self, data, **kwargs): + super().__init__(data, **kwargs) + rit_kwargs = {k: v for k, v in kwargs.items() if k in + ['approx']} + rcit_kwargs = {k: v for k, v in kwargs.items() if k in + ['approx', 'num_f', 'num_f2', 'rcit']} + self.check_cache_method_consistent( + 'kci', hashlib.md5(json.dumps(rcit_kwargs, sort_keys=True).encode('utf-8')).hexdigest()) + self.assert_input_data_is_valid() + self.rit = RCIT_UInd(**rit_kwargs) + self.rcit = RCIT_CInd(**rcit_kwargs) + + def __call__(self, X, Y, condition_set=None): + # Kernel-based conditional independence test. + Xs, Ys, condition_set, cache_key = self.get_formatted_XYZ_and_cachekey(X, Y, condition_set) + if cache_key in self.pvalue_cache: return self.pvalue_cache[cache_key] + p = self.rit.compute_pvalue(self.data[:, Xs], self.data[:, Ys])[0] if len(condition_set) == 0 else \ + self.rcit.compute_pvalue(self.data[:, Xs], self.data[:, Ys], self.data[:, condition_set])[0] + self.pvalue_cache[cache_key] = p + return p + class Chisq_or_Gsq(CIT_Base): def __init__(self, data, method_name, **kwargs): def _unique(column): diff --git a/setup.py b/setup.py index f132a02..f40a536 100644 --- a/setup.py +++ b/setup.py @@ -22,7 +22,8 @@ 'matplotlib', 'networkx', 'pydot', - 'tqdm' + 'tqdm', + 'momentchi2' ], url='https://github.com/py-why/causal-learn', packages=setuptools.find_packages(), diff --git a/tests/TestCIT_FastKCI.py b/tests/TestCIT_FastKCI.py new file mode 100644 index 0000000..b8507f7 --- /dev/null +++ b/tests/TestCIT_FastKCI.py @@ -0,0 +1,36 @@ +import unittest + +import numpy as np + +import causallearn.utils.cit as cit + + +class TestCIT_FastKCI(unittest.TestCase): + def test_Gaussian_dist(self): + np.random.seed(10) + X = np.random.randn(1200, 1) + X_prime = np.random.randn(1200, 1) + Y = X + 0.5 * np.random.randn(1200, 1) + Z = Y + 0.5 * np.random.randn(1200, 1) + data = np.hstack((X, X_prime, Y, Z)) + + pvalue01 = [] + pvalue03 = [] + pvalue032 = [] + for K in [3, 10]: + for J in [8, 16]: + for use_gp in [True, False]: + cit_CIT = cit.CIT(data, 'fastkci', K=K, J=J, use_gp=use_gp) + pvalue01.append(round(cit_CIT(0, 1), 4)) + pvalue03.append(round(cit_CIT(0, 3), 4)) + pvalue032.append(round(cit_CIT(0, 3, {2}), 4)) + + pvalue01 = np.array(pvalue01) + pvalue03 = np.array(pvalue03) + pvalue032 = np.array(pvalue032) + self.assertTrue(np.all((0.0 <= pvalue01) & (pvalue01 <= 1.0)), + "pvalue01 contains invalid values") + self.assertTrue(np.all((0.0 <= pvalue03) & (pvalue03 <= 1.0)), + "pvalue03 contains invalid values") + self.assertTrue(np.all((0.0 <= pvalue032) & (pvalue032 <= 1.0)), + "pvalue032 contains invalid values") diff --git a/tests/TestCIT_RCIT.py b/tests/TestCIT_RCIT.py new file mode 100644 index 0000000..ef6406f --- /dev/null +++ b/tests/TestCIT_RCIT.py @@ -0,0 +1,38 @@ +import unittest + +import numpy as np + +import causallearn.utils.cit as cit + + +class TestCIT_RCIT(unittest.TestCase): + def test_Gaussian_dist(self): + np.random.seed(10) + X = np.random.randn(300, 1) + X_prime = np.random.randn(300, 1) + Y = X + 0.5 * np.random.randn(300, 1) + Z = Y + 0.5 * np.random.randn(300, 1) + data = np.hstack((X, X_prime, Y, Z)) + + pvalue01 = [] + pvalue03 = [] + pvalue032 = [] + for approx in ["lpd4", "hbe", "gamma", "chi2", "perm"]: + for num_f in [50, 100]: + for num_f2 in [5, 10]: + for rcit in [True, False]: + cit_CIT = cit.CIT(data, 'rcit', approx=approx, num_f=num_f, + num_f2=num_f2, rcit=rcit) + pvalue01.append(round(cit_CIT(0, 1), 4)) + pvalue03.append(round(cit_CIT(0, 3), 4)) + pvalue032.append(round(cit_CIT(0, 3, {2}), 4)) + + pvalue01 = np.array(pvalue01) + pvalue03 = np.array(pvalue03) + pvalue032 = np.array(pvalue032) + self.assertTrue(np.all((0.0 <= pvalue01) & (pvalue01 <= 1.0)), + "pvalue01 contains invalid values") + self.assertTrue(np.all((0.0 <= pvalue03) & (pvalue03 <= 1.0)), + "pvalue03 contains invalid values") + self.assertTrue(np.all((0.0 <= pvalue032) & (pvalue032 <= 1.0)), + "pvalue032 contains invalid values")