Source code for SALib.analyze.delta

from typing import Optional, Dict
from scipy.stats import norm, gaussian_kde, rankdata

import numpy as np
import pandas as pd

from . import common_args
from ..util import read_param_file, ResultDict

import warnings
import json


[docs] class AnalysisError(Exception): def __init__(self, cmd, note=None): self.cmd = cmd self.note = note or cmd super().__init__(self.note) warnings.warn(self.cmd)
[docs] def exception_handler(e, name, Ysize): if isinstance(e, AnalysisError): return e.note elif isinstance(e, np.linalg.LinAlgError): raise ValueError( f"ERROR [{name}]: Singular matrix - likely due to degenerate input or small sample size (N={Ysize})" ) from e else: warnings.warn(f"Unhandled exception: {type(e).__name__} - {e}") return str(e)
[docs] def custom_warning_formatter(message, category, filename, lineno, line=None): return f"{message}\n"
[docs] def analyze( problem: Dict, X: np.ndarray, Y: np.ndarray, num_resamples: int = 100, conf_level: float = 0.95, print_to_console: bool = False, seed: Optional[int] = None, y_resamples: Optional[int] = None, method: str = "all", bootstrap_savedf: Optional[str] = None, bins_specs: Dict = {}, ) -> Dict: """Perform Delta Moment-Independent Analysis on model outputs. Returns a dictionary with keys: - 'delta_balanced' - 'delta_balanced_conf' - 'delta_raw' - 'delta_raw_conf' - 'delta_step' - 'delta_step_conf' - 'S1' - 'S1_conf' - 'names' where each entry is a list of size D (the number of parameters) containing the indices in the same order as the input file and 'names'. The different delta configurations/methods 'balanced', 'step', 'raw', their corresponding confidence scores (*_conf), and other keys in returned dictionary: - 'names' input column/feature names in X - 'notes' Notes and insights on analysis, warnings, errors, etc. - 'balanced' Divides the input into bins during bootstrapping (default 10) and divides bootstrap subset equally among those bins - 'step' Generates all entries in the input into 1 (X>0) or 0 (X<=0) and bootstrap subset is approximately 50% - 'raw' Bootstraps the input column as is, with random bootstrap sampling with replacement - 'S1' Sobol' first indices, on raw dataset with no bootstrap manipulations (random, with replacement) Notes ----- Compatible with: all samplers Interpretaion: 'balanced' tells us how important variation in that input feature is on the output feature. - Answers the question: "How important is input feature X on output Y?" 'step' tells us how important that input feature as active (versus inactive) is on the output feature. - Answers the question: "What is the impact of input feature X when it is greater than zero/active, versus inactive, regardless of its actual value?" 'raw' tells us how much the input feature affects variation in the output feature, specifically in this dataset and its composition. This score may be skewed or biased toward frequently occurring values in the dataset. (eg. if most values of the input feature lie between 15-20 in your dataset, then the delta score will likely primarily be biased toward influence within that range, rather than its total influence over its entire input range) - Answers the question: "How important is input feature X, and how prominent is its influence specifically in my dataset?" - Is a valid interpretation for real-world scenario, if the input data is representative of the real-world sample and variability without inconsistent or incomplete sampling (rare for real-world data capture) 'notes' contains flags, warnings, or error messages. - Will raise a flag if 'balanced' and 'raw' delta scores differ by >0.1. This suggests that the input dataset is likely skewed or biased with frequently occuring values in a specific range. - Raises errors if binning cannot be executed, if there aren't enough/any zeroes/>0s present to do step analysis, etc. - Guides user with results interpretation Examples -------- >>> X = latin.sample(problem, 1000) >>> Y = Ishigami.evaluate(X) >>> Si = delta.analyze(problem, X, Y, print_to_console=True) Parameters ---------- problem : dict The problem definition X: numpy.matrix A NumPy matrix containing the model inputs Y : numpy.array A NumPy array containing the model outputs num_resamples : int The number of resamples when computing confidence intervals (default 100) conf_level : float The confidence interval level (default 0.95) print_to_console : bool Print results directly to console (default False) y_resamples : int, optional Number of samples to use when resampling (bootstrap) (default None) method : {"all", "delta", "sobol"}, optional Whether to compute "delta", "sobol" or both ("all") indices (default "all") bootstrap_savedf : str, optional User inputs path or filename if they want to save a bootstrap sample to inspect subset composition bins_specs : dict, optional Dict with parameter title as key and bin_input as entry. If bin_input is an int, number of bins. If bin_input is a list, it specifies bin boundaries ( number of bins = len(list)+1 ) Default is 10 bins equally distributed across input feature range References ---------- 1. Borgonovo, E. (2007). "A new uncertainty importance measure." Reliability Engineering & System Safety, 92(6):771-784, doi:10.1016/j.ress.2006.04.015. 2. Plischke, E., E. Borgonovo, and C. L. Smith (2013). "Global sensitivity measures from given data." European Journal of Operational Research, 226(3):536-550, doi:10.1016/j.ejor.2012.11.047. """ if seed: np.random.seed(seed) valid_methods = {"delta", "sobol", "all"} warnings.formatwarning = custom_warning_formatter warnings.simplefilter("once") if X.shape[0] != Y.shape[0]: raise RuntimeError("Input X and output Y must have the same number of rows") if not 0 < conf_level < 1: raise RuntimeError("Confidence level must be between 0-1.") if method not in valid_methods: raise ValueError(f"Method must be one of {valid_methods}") delta_warn_threshold = 0.1 D = problem["num_vars"] if y_resamples is None: y_resamples = Y.size methods = ["delta", "sobol"] if method == "all" else [method] if bootstrap_savedf is not None: if not bootstrap_savedf.endswith(".parquet"): bootstrap_savedf = bootstrap_savedf + ".parquet" if not y_resamples <= Y.size: raise RuntimeError( "y_resamples must be less than or equal to the total number of samples" ) # equal frequency partition exp = 2.0 / (7.0 + np.tanh((1500.0 - y_resamples) / 500.0)) M = int(np.round(min(int(np.ceil(y_resamples**exp)), 48))) m = np.linspace(0, y_resamples, M + 1) Ygrid = np.linspace(np.min(Y), np.max(Y), 100) # S keys = [] if "delta" in methods: keys += [ "delta_balanced", "delta_balanced_conf", "delta_raw", "delta_raw_conf", "delta_step", "delta_step_conf", ] if "sobol" in methods: keys += ["S1", "S1_conf"] S = ResultDict((k, np.full(D, np.nan)) for k in keys) nameslist = problem["names"] if any( any(col.endswith(s) for s in ["_raw", "_balanced", "_step", "_conf"]) for col in nameslist ): raise ValueError( "Forbidden column name suffix: columns may not end with ['_raw', '_balanced', '_step', '_conf'] " ) keys += ["notes"] S["names"] = problem["names"] notes = [] bootstrap_matrix = {} for i in range(D): name = problem["names"][i] X_i = X[:, i] min_class_size = max(int(len(X_i) * 0.005), 10) bininfo = bins_specs.get(name, None) bin_edges, note = check_specified_bininfo( bininfo=bininfo, Xmin=X_i.min(), Xmax=X_i.max(), paramname=name ) try: if "delta" in methods: for mode in ["balanced", "raw", "step"]: if mode == "step": zeroes = np.sum(X_i <= 0) nonzeroes = np.sum(X_i > 0) if zeroes < min_class_size or nonzeroes < min_class_size: S["delta_step"][i] = np.nan S["delta_step_conf"][i] = np.nan note += f"Step: too few samples. {zeroes} (<=0), {nonzeroes} (>0); " continue S[f"delta_{mode}"][i], S[f"delta_{mode}_conf"][i], boot_idx = ( bias_reduced_delta( Y, Ygrid, X_i, m, num_resamples, conf_level, y_resamples, mode, bin_edges, name, min_class_size, ) ) if bootstrap_savedf is not None: bootstrap_matrix[f"{name}_{mode}"] = ( X_i[boot_idx] if len(boot_idx) == y_resamples else np.full(y_resamples, np.nan) ) diff = abs(S["delta_balanced"][i] - S["delta_raw"][i]) if diff > delta_warn_threshold: note += f"Raw delta differs from balanced by {diff:.2f}; " warnings.warn( f"[{name}] Potential Bias Notice: Raw delta score differs from balanced delta by {diff:.2f}. Potential dataset bias, take care in interpretation." ) if "sobol" in methods: ind = np.random.randint(Y.size, size=y_resamples) S["S1"][i] = sobol_first(Y[ind], X_i[ind], m) S["S1_conf"][i] = sobol_first_conf( Y, X_i, m, num_resamples, conf_level, y_resamples ) notes.append(note.strip() if note else "") except Exception as e: note = exception_handler(e, name, Y.size) notes.append(note) continue S["notes"] = notes if print_to_console: # Merge dictionaries summary_dict = {"names": problem["names"]} | S df_summary = pd.DataFrame(summary_dict) print(df_summary.to_string(index=False)) if bootstrap_savedf is not None: df_boot = pd.DataFrame(bootstrap_matrix) try: df_boot.to_parquet(bootstrap_savedf, engine="pyarrow", compression="snappy") except Exception as e: warnings.warn( f"WARNING: Failed to write bootstrap file to {bootstrap_savedf}. Reason: {e}" ) return S
[docs] def check_specified_bininfo(bininfo, Xmin, Xmax, paramname): """Validate user-specified bin edges or number of bins; fallback to default if invalid.""" defaultbins = 10 if bininfo is None: return np.linspace(Xmin, Xmax, defaultbins + 1), "" elif isinstance(bininfo, int): return np.linspace(Xmin, Xmax, bininfo + 1), "" elif isinstance(bininfo, (list, np.ndarray)): bin_edges = np.array(bininfo) if not np.issubdtype(bin_edges.dtype, np.number): warnings.warn( f"[{paramname}] Input Error: Bin value error - Custom bin list only contain numeric values. Resorting to default binning ({defaultbins} bins)." ) return ( np.linspace(Xmin, Xmax, defaultbins + 1), f"Custom bin list had non-numeric values. Used default {defaultbins} bins; ", ) elif np.any(bin_edges < Xmin) or np.any(bin_edges > Xmax): warnings.warn( f"[{paramname}] Input Error: Bin boundary error - Custom bin boundaries must be within the X range. Resorting to default binning ({defaultbins} bins)." ) return ( np.linspace(Xmin, Xmax, defaultbins + 1), f"Custom bin bounds out of X range. Used default {defaultbins} bins; ", ) elif not np.all(np.diff(bin_edges) > 0): warnings.warn( f"[{paramname}] Input Error: Bin edge error - Custom bin edges must be ascending. Resorting to default binning ({defaultbins} bins)." ) return ( np.linspace(Xmin, Xmax, defaultbins + 1), f"Custom bin edges not ascending. Used default {defaultbins} bins; ", ) return np.concatenate(([Xmin - 1e-9], bin_edges, [Xmax + 1e-9])), "" else: warnings.warn( f"[{paramname}] Input Error: Invalid custom bin dtype - Must be int, list or None. Resorting to default binning ({defaultbins} bins)." ) return ( np.linspace(Xmin, Xmax, defaultbins + 1), f"Custom bin invalid dtype. Used default {defaultbins} bins; ", )
[docs] def calc_delta(Y, Ygrid, X, m): """Plischke et al. (2013) delta index estimator (eqn 26) for d_hat.""" N = len(Y) fy = gaussian_kde(Y, bw_method="silverman")(Ygrid) xr = rankdata(X, method="ordinal") d_hat = 0.0 total_bins = len(m) - 1 num_skipped = 0 for j in range(total_bins): ix = np.where((xr > m[j]) & (xr <= m[j + 1]))[0] nm = len(ix) if nm == 0: # Skip empty bins. Even if it doesn't throw an error, # can't estimate local distribution for gaussian_kde from empty data continue Y_ix = Y[ix] if np.ptp(Y_ix) != 0.0: fyc = gaussian_kde(Y_ix, bw_method="silverman")(Ygrid) fy_ = np.abs(fy - fyc) else: num_skipped += 1 continue d_hat += (nm / (2 * N)) * np.trapezoid(fy_, Ygrid) if num_skipped / total_bins > 0.35: warnings.warn( f"{num_skipped}/{total_bins} bins skipped (>35%)" "This may indicate under-sampling or poor binning." "Try increasing sample size (y_resamples) or decreasing the number of bins (M)." ) return d_hat
[docs] def bias_reduced_delta( Y, Ygrid, X, m, num_resamples, conf_level, y_resamples, mode, bin_edges, paramname, min_class_size, ): """Plischke et al. 2013 bias reduction technique (eqn 30)""" d = np.empty(num_resamples) ind = obtain_resampled_subset( X, y_resamples, mode, bin_edges, paramname, min_class_size, warn_print=True ) d_hat = calc_delta(Y[ind], Ygrid, X[ind], m) try: r = [ obtain_resampled_subset( X, y_resamples, mode, bin_edges, paramname, min_class_size, warn_print=False, ) for _ in range(num_resamples) ] except ValueError as e: raise RuntimeError(f"Bootstrap error: [{paramname}][{mode}]: {e}") from e for i, r_i in enumerate(r): d[i] = calc_delta(Y[r_i], Ygrid, X[r_i], m) d = 2.0 * d_hat - d return (d.mean(), norm.ppf(0.5 + conf_level / 2) * d.std(ddof=1), ind)
[docs] def obtain_bins_indices(bin_edges, X, min_class_size): """ Returns indices for all specified bins, ensuring that all bins are at least minimum_class_size """ init_indices = [] for i in range(len(bin_edges) - 1): if i == 0: idx = np.where((X >= bin_edges[i]) & (X <= bin_edges[i + 1]))[0] else: idx = np.where((X > bin_edges[i]) & (X <= bin_edges[i + 1]))[0] init_indices.append(idx) final_indices = [] # for the case where some bins are underpopulated i = 0 while i < len(init_indices): current_bin = init_indices[i] while len(current_bin) < min_class_size and i + 1 < len(init_indices): current_bin = np.concatenate((current_bin, init_indices[i + 1])) i += 1 final_indices.append(current_bin) i += 1 return init_indices, final_indices
[docs] def obtain_resampled_subset( X, y_resamples, mode, bin_edges, paramname, min_class_size, warn_print=False ): """Returns subset indices for X""" N = len(X) X = np.asarray(X) if X.max() == X.min(): raise ValueError( f"[{paramname}] has constant values, cannot bin or calculate delta.", "Cannot bin or calc delta from constants; ", ) if mode == "raw": return np.random.randint(N, size=y_resamples) elif mode in ["balanced", "step"]: indices = [] X_select = X binselectionsize = y_resamples if mode == "step": zeroes = np.where(X <= 0)[0] nonzeroes = np.where(X > 0)[0] n_zeroes = int(0.5 * y_resamples) binselectionsize = y_resamples - n_zeroes min_class_size = min_class_size // 2 chosen_zeroes = np.random.choice(zeroes, size=n_zeroes, replace=True) indices.append(chosen_zeroes) X_select = X[nonzeroes] bin_edges, _ = check_specified_bininfo( bininfo=5, Xmin=X_select.min(), Xmax=X_select.max(), paramname=paramname ) init_indices, final_indices = obtain_bins_indices( bin_edges, X_select, min_class_size ) n_bins = len(final_indices) n_per_bin = binselectionsize // n_bins remaining = binselectionsize - (n_per_bin * n_bins) if n_bins < 2: raise AnalysisError( f"[{paramname}][{mode}] Only one bin remains. Revisit whether processing method is suitable for this parameter, or increase dataset size or spread. Highly biased input.", f"[{mode}] Single valid bin. Insufficient spread in feature, highly biased input; ", ) if n_bins != len(init_indices): if warn_print: warnings.warn( f"[{paramname}][{mode}] Bin Merge Notice: Final no. bins is {len(final_indices)}. Min samples per bin: {min_class_size}" ) if n_per_bin < min_class_size: raise ValueError( f"[{paramname}][{mode}] Dataset size error: Dataset is not large enough for number of bins.", f"[{mode}]: Dataset too small for no. bins; ", ) for i, bin_idx in enumerate(final_indices): if len(bin_idx) == 0: continue binsize = n_per_bin + (1 if i < remaining else 0) sampled = np.random.choice(bin_idx, size=binsize, replace=True) if mode == "step": sampled = nonzeroes[sampled] # type: ignore indices.append(sampled) return np.concatenate(indices) else: raise ValueError( f"{paramname} Issue with mode {mode} suffix. \ This line should never execute." )
[docs] def sobol_first(Y, X, m): # Pre-process to catch constant array # see: https://github.com/numpy/numpy/issues/9631 if np.ptp(Y) == 0.0: # Catch constant results # If Y does not change then it is not sensitive to anything... return 0.0 xr = rankdata(X, method="ordinal") Vi = 0 N = len(Y) Y_mean = Y.mean() for j in range(len(m) - 1): ix = np.where((xr > m[j]) & (xr <= m[j + 1]))[0] nm = len(ix) Vi += (nm / N) * ((Y[ix].mean() - Y_mean) ** 2) return Vi / np.var(Y)
[docs] def sobol_first_conf(Y, X, m, num_resamples, conf_level, y_resamples): s = np.zeros(num_resamples) N = len(Y) r = np.random.choice( N, size=(num_resamples, y_resamples), replace=True ) # bootstrap-like behaviour, on raw vals without removing input bias for i in range(num_resamples): r_i = r[i, :] s[i] = sobol_first(Y[r_i], X[r_i], m) return norm.ppf(0.5 + conf_level / 2) * s.std(ddof=1)
[docs] def cli_parse(parser): parser.add_argument( "-X", "--model-input-file", type=str, required=True, default=None, help="Model input file", ) parser.add_argument( "-r", "--resamples", type=int, required=False, default=10, help="Number of bootstrap resamples for \ Sobol confidence intervals", ) parser.add_argument( "-m", "--method", type=str, required=False, default="all", help="Method to compute sensitivities \ 'delta', 'sobol' or 'all'", ) parser.add_argument( "--y_resamples", type=int, required=False, default=None, help="Number of samples to use when \ resampling (bootstrap)", ) parser.add_argument( "--bootstrap_savedf", type=str, help="Optional path to save bootstrap \ feature values", ) parser.add_argument( "--bins_specs", type=str, default=None, help="Optional: JSON string or path to a \ JSON file specifying bin info per \ input feature", ) return parser
[docs] def cli_action(args): problem = read_param_file(args.paramfile) Y = np.loadtxt( args.model_output_file, delimiter=args.delimiter, usecols=(args.column,) ) X = np.loadtxt(args.model_input_file, delimiter=args.delimiter, ndmin=2) if len(X.shape) == 1: X = X.reshape((len(X), 1)) try: if args.bins_specs is None: bins_specs = {} elif args.bins_specs.strip().startswith("{"): bins_specs = json.loads(args.bins_specs) else: with open(args.bins_specs) as f: bins_specs = json.load(f) except (json.JSONDecodeError, FileNotFoundError) as e: raise ValueError( f"Failed to parse bins_specs. Must be a JSON string or \ file path. Error: {e}" ) analyze( problem, X, Y, num_resamples=args.resamples, print_to_console=True, seed=args.seed, method=args.method, y_resamples=args.y_resamples, bootstrap_savedf=args.bootstrap_savedf, bins_specs=bins_specs, )
if __name__ == "__main__": common_args.run_cli(cli_parse, cli_action)