|
| 1 | +import numpy as np |
| 2 | +import pandas as pd |
| 3 | +import anndata as ad |
| 4 | +from statsmodels.stats.multitest import multipletests |
| 5 | +from typing import Literal |
| 6 | + |
| 7 | +def compute_empirical_pvals( |
| 8 | + data_real: pd.DataFrame, |
| 9 | + data_shuffled: pd.DataFrame, |
| 10 | + value_col: str ="z_value", |
| 11 | + adata: ad.AnnData | None = None, |
| 12 | + adata_count_col: str | None = None, |
| 13 | + pval_adj_method: str | None = None, |
| 14 | + group_col: str | None = None, |
| 15 | + two_sided: bool = True, |
| 16 | + bias_correction: bool = True, |
| 17 | + winsor: float | None = None, |
| 18 | + method: Literal["default", "tnull_fixed0"] = "default", |
| 19 | + n_quantiles: int | None = None, |
| 20 | +): |
| 21 | + """ |
| 22 | + Compute empirical p-values for real data based on null distribution from shuffled data. |
| 23 | +
|
| 24 | + Parameters |
| 25 | + ---------- |
| 26 | + data_real : pd.DataFrame |
| 27 | + DataFrame containing real test statistics. |
| 28 | + data_shuffled : pd.DataFrame |
| 29 | + DataFrame containing null test statistics. |
| 30 | + value_col : str, default "z_value" |
| 31 | + Column name for test statistics in both DataFrames. |
| 32 | + adata : AnnData, optional |
| 33 | + AnnData object for computing expression quantiles if n_quantiles is specified. |
| 34 | + adata_count_col : str, optional |
| 35 | + Column in adata.var to use for quantile computation. |
| 36 | + pval_adj_method : str or None, optional |
| 37 | + Method for multiple testing correction (e.g., 'fdr_bh'). If None, |
| 38 | + no adjustment is performed. |
| 39 | + group_col : str or None, optional |
| 40 | + Column name to group by when computing p-values. If None, compute |
| 41 | + p-values globally. |
| 42 | + two_sided : bool, default True |
| 43 | + If True, compute two-sided p-values; otherwise one-sided. |
| 44 | + bias_correction : bool, default True |
| 45 | + If True, apply bias correction in empirical p-value calculation. |
| 46 | + winsor : float in (0,0.5) or None, optional |
| 47 | + If specified, winsorize null statistics at these quantiles before fitting. |
| 48 | + method : str, default "default" |
| 49 | + Method for p-value computation. Options are "default" or "tnull_fixed0". |
| 50 | + n_quantiles : int or None, optional |
| 51 | + If specified, compute expression quantiles and use them as group_col. |
| 52 | + """ |
| 53 | + |
| 54 | + if method == "default": |
| 55 | + pval_func = empirical_pvals_from_null |
| 56 | + elif method == "tnull_fixed0": |
| 57 | + pval_func = empirical_pvals_from_tnull_fixed0 |
| 58 | + else: |
| 59 | + raise ValueError(f"Unknown method: {method}") |
| 60 | + |
| 61 | + if n_quantiles is not None: |
| 62 | + assert adata is not None, "adata must be provided when n_quantiles is specified." |
| 63 | + expression_quantiles = compute_quantiles( |
| 64 | + adata=adata, |
| 65 | + counts_col=None, |
| 66 | + n_quantiles=n_quantiles, |
| 67 | + ) |
| 68 | + data_real = data_real.merge(expression_quantiles, left_on="gene", right_on="gene", how="left") |
| 69 | + data_shuffled = data_shuffled.merge(expression_quantiles, left_on="gene", right_on="gene", how="left") |
| 70 | + group_col = "mean_quantile" |
| 71 | + |
| 72 | + if group_col is None: |
| 73 | + pvals = pval_func( |
| 74 | + null_z=data_shuffled[value_col].values, |
| 75 | + real_z=data_real[value_col].values, |
| 76 | + two_sided=two_sided, |
| 77 | + bias_correction=bias_correction, |
| 78 | + winsor=winsor, |
| 79 | + ) |
| 80 | + if pval_adj_method is not None: |
| 81 | + rej, pval_adj, _, _ = multipletests(pvals, alpha=0.05, method=pval_adj_method) |
| 82 | + return pval_adj |
| 83 | + else: |
| 84 | + return pvals |
| 85 | + else: |
| 86 | + pvals = data_real.groupby(group_col)[value_col].transform( |
| 87 | + lambda x: pval_func( |
| 88 | + null_z=data_shuffled.query(f"{group_col} == @x.name")[value_col], |
| 89 | + real_z=x, |
| 90 | + two_sided=two_sided, |
| 91 | + bias_correction=bias_correction, |
| 92 | + winsor=winsor, |
| 93 | + ) |
| 94 | + ) |
| 95 | + if pval_adj_method is not None: |
| 96 | + pval_df = data_real[[group_col]].assign(pval=pvals) |
| 97 | + pval_adj = pval_df.groupby(group_col)["pval"].transform( |
| 98 | + lambda x: multipletests(x, alpha=0.05, method=pval_adj_method)[1] |
| 99 | + ) |
| 100 | + return pval_adj |
| 101 | + else: |
| 102 | + return pvals |
| 103 | + |
| 104 | + |
| 105 | +def empirical_pvals_from_tnull_fixed0( |
| 106 | + null_z, |
| 107 | + real_z, |
| 108 | + two_sided: bool = True, |
| 109 | + winsor: float | None = None, |
| 110 | + return_params: bool = False, |
| 111 | + **kwargs, |
| 112 | +): |
| 113 | + """ |
| 114 | + Fit a Student-t(df, scale) with location fixed at 0 to null z-values, |
| 115 | + then compute parametric (empirical) p-values for real z-values. |
| 116 | +
|
| 117 | + Parameters |
| 118 | + ---------- |
| 119 | + null_z : array-like or pandas.Series |
| 120 | + Null z-values (1D). |
| 121 | + real_z : array-like or pandas.Series |
| 122 | + Observed z-values to evaluate. |
| 123 | + two_sided : bool, default True |
| 124 | + If True, two-sided p = 2 * sf(|z|); otherwise one-sided upper tail. |
| 125 | + winsor : float in (0,0.5), optional |
| 126 | + Winsorize null_z at quantiles (winsor, 1-winsor) before fitting. |
| 127 | + return_params : bool, default False |
| 128 | + If True, return (pvals, params_dict). |
| 129 | +
|
| 130 | + Returns |
| 131 | + ------- |
| 132 | + pvals : Series or ndarray |
| 133 | + Empirical p-values under fitted t(df, loc=0, scale). |
| 134 | + params : dict, optional |
| 135 | + {'df': df, 'scale': scale, 'n_null': B} |
| 136 | + """ |
| 137 | + from scipy import stats |
| 138 | + |
| 139 | + # --- clean nulls |
| 140 | + null = pd.Series(null_z, dtype=float).replace([np.inf, -np.inf], np.nan).dropna() |
| 141 | + if null.size == 0: |
| 142 | + raise ValueError("No valid null statistics.") |
| 143 | + B = null.size |
| 144 | + |
| 145 | + # Optional winsorization for stability |
| 146 | + if winsor is not None: |
| 147 | + if not (0 < winsor < 0.5): |
| 148 | + raise ValueError("winsor must be in (0,0.5)") |
| 149 | + q_lo, q_hi = null.quantile([winsor, 1 - winsor]) |
| 150 | + null = null.clip(q_lo, q_hi) |
| 151 | + |
| 152 | + # --- fit Student-t with loc fixed at 0 |
| 153 | + df_hat, loc_hat, scale_hat = stats.t.fit(null.values, floc=0.0) |
| 154 | + # loc_hat will be 0.0 by construction |
| 155 | + |
| 156 | + # --- prep real values |
| 157 | + if isinstance(real_z, pd.Series): |
| 158 | + real = real_z.astype(float).replace([np.inf, -np.inf], np.nan) |
| 159 | + real_index = real.index |
| 160 | + real = real.values |
| 161 | + return_series = True |
| 162 | + else: |
| 163 | + real = np.asarray(real_z, dtype=float) |
| 164 | + real_index = None |
| 165 | + return_series = False |
| 166 | + |
| 167 | + # --- compute p-values |
| 168 | + z_std = real / scale_hat |
| 169 | + if two_sided: |
| 170 | + p = 2.0 * stats.t.sf(np.abs(z_std), df_hat) |
| 171 | + else: |
| 172 | + p = stats.t.sf(z_std, df_hat) |
| 173 | + p = np.clip(p, 0.0, 1.0) |
| 174 | + |
| 175 | + if return_series: |
| 176 | + p = pd.Series(p, index=real_index, name="p_tnull") |
| 177 | + |
| 178 | + if return_params: |
| 179 | + params = {"df": float(df_hat), "scale": float(scale_hat), "n_null": int(B)} |
| 180 | + return p, params |
| 181 | + return p |
| 182 | + |
| 183 | + |
| 184 | +def empirical_pvals_from_null( |
| 185 | + null_z, |
| 186 | + real_z, |
| 187 | + two_sided: bool = True, |
| 188 | + bias_correction: bool = True, |
| 189 | + **kwargs, |
| 190 | +): |
| 191 | + """ |
| 192 | + Compute empirical p-values from a pooled null of z-like statistics. |
| 193 | +
|
| 194 | + Parameters |
| 195 | + ---------- |
| 196 | + null_z : array-like or pandas.Series |
| 197 | + 1-D collection of null test statistics (e.g., from shuffled data). |
| 198 | + real_z : array-like or pandas.Series |
| 199 | + 1-D collection of observed test statistics to evaluate. |
| 200 | + two_sided : bool, default True |
| 201 | + If True, uses |z| (two-sided). If False, uses one-sided (upper tail). |
| 202 | + bias_correction : bool, default True |
| 203 | + If True, uses (r+1)/(B+1). If False, uses r/B. |
| 204 | +
|
| 205 | + Returns |
| 206 | + ------- |
| 207 | + pvals : pandas.Series or np.ndarray |
| 208 | + Empirical p-values aligned to real_z (Series preserves index). |
| 209 | + """ |
| 210 | + # Convert & clean |
| 211 | + null = pd.Series(null_z).astype(float).replace([np.inf, -np.inf], np.nan).dropna().values |
| 212 | + if null.size == 0: |
| 213 | + raise ValueError("No valid null statistics provided.") |
| 214 | + |
| 215 | + if isinstance(real_z, pd.Series): |
| 216 | + real = real_z.astype(float).replace([np.inf, -np.inf], np.nan) |
| 217 | + real_index = real.index |
| 218 | + real = real.values |
| 219 | + return_series = True |
| 220 | + else: |
| 221 | + real = np.asarray(real_z, dtype=float) |
| 222 | + real_index = None |
| 223 | + return_series = False |
| 224 | + |
| 225 | + # Transform for sidedness |
| 226 | + if two_sided: |
| 227 | + null_t = np.abs(null) |
| 228 | + real_t = np.abs(real) |
| 229 | + else: |
| 230 | + null_t = null |
| 231 | + real_t = real |
| 232 | + |
| 233 | + # Sort null once (ascending) |
| 234 | + null_sorted = np.sort(null_t) |
| 235 | + B = null_sorted.size |
| 236 | + |
| 237 | + # For each real stat, count null >= real (upper tail) |
| 238 | + # searchsorted gives index of first value >= real_t (with 'left'), |
| 239 | + # so r = B - idx |
| 240 | + idx = np.searchsorted(null_sorted, real_t, side="left") |
| 241 | + r = B - idx # exceedance counts |
| 242 | + |
| 243 | + if bias_correction: |
| 244 | + p = (r + 1.0) / (B + 1.0) |
| 245 | + else: |
| 246 | + # Guard against division by zero if B==0 (already caught above) |
| 247 | + p = r / B |
| 248 | + |
| 249 | + # Preserve index/type if input was a Series |
| 250 | + if return_series: |
| 251 | + return pd.Series(p, index=real_index, name="empirical_p") |
| 252 | + return p |
| 253 | + |
| 254 | + |
| 255 | +def compute_quantiles(adata, counts_col=None, gene_name_col="gene", n_quantiles=10): |
| 256 | + if counts_col is None: |
| 257 | + print("computing mean counts for quantile assignment and outputting to 'mean_expression' column") |
| 258 | + mean_counts = adata.X.sum(axis=0) / adata.n_obs |
| 259 | + if isinstance(mean_counts, np.matrix): |
| 260 | + mean_counts = np.asarray(mean_counts).squeeze() |
| 261 | + adata.var["mean_expression"] = mean_counts |
| 262 | + counts_col = "mean_expression" |
| 263 | + |
| 264 | + gene_df = adata.var.reset_index(names=[gene_name_col]) |
| 265 | + gene_df["mean_quantile"] = pd.qcut(gene_df[counts_col], n_quantiles, labels=False) + 1 |
| 266 | + decile_df = gene_df[[gene_name_col, "mean_quantile"]] |
| 267 | + return decile_df |
| 268 | + |
| 269 | + |
0 commit comments