from types import MethodType
from warnings import warn
from scipy.stats import norm
import numpy as np
import pandas as pd
from . import common_args
from ..util import read_param_file, ResultDict, extract_group_names
from multiprocessing import Pool, cpu_count
from functools import partial
from itertools import combinations, zip_longest
CONST_RESULT_MSG = (
"Constant values encountered, indicating model evaluations "
"(or subset of evaluations) produced identical values."
)
[docs]
def analyze(
problem,
Y,
calc_second_order=True,
num_resamples=100,
conf_level=0.95,
print_to_console=False,
parallel=False,
n_processors=None,
keep_resamples=False,
seed=None,
):
"""Perform Sobol Analysis on model outputs.
Returns a dictionary with keys 'S1', 'S1_conf', 'ST', and 'ST_conf', where
each entry is a list of size D (the number of parameters) containing the
indices in the same order as the parameter file. If calc_second_order is
True, the dictionary also contains keys 'S2' and 'S2_conf'.
There are several approaches to estimating sensitivity indices.
The general approach is described in [1]. The implementation offered here
follows [2] for first and total order indices, whereas estimation of
second order sensitivities follows [3]. A noteworthy point is the
improvement to reduce error rates in sensitivity estimation is introduced
in [4].
Notes
-----
Compatible with:
`saltelli` : :func:`SALib.sample.saltelli.sample`
`sobol` : :func:`SALib.sample.sobol.sample`
Examples
--------
>>> X = saltelli.sample(problem, 512)
>>> Y = Ishigami.evaluate(X)
>>> Si = sobol.analyze(problem, Y, print_to_console=True)
Parameters
----------
problem : dict
The problem definition
Y : numpy.array
A NumPy array containing the model outputs
calc_second_order : bool
Calculate second-order sensitivities (default True)
num_resamples : int
The number of resamples (default 100)
conf_level : float
The confidence interval level (default 0.95)
print_to_console : bool
Print results directly to console (default False)
parallel : bool
Perform analysis in parallel if True
n_processors : int
Number of parallel processes (only used if parallel is True)
keep_resamples : bool
Whether or not to store intermediate resampling results (default False)
seed : int
Seed to generate a random number
References
----------
1. Sobol, I. M. (2001).
Global sensitivity indices for nonlinear
mathematical models and their Monte Carlo estimates.
Mathematics and Computers in Simulation, 55(1-3):271-280,
doi:10.1016/S0378-4754(00)00270-6.
2. Saltelli, A., P. Annoni, I. Azzini, F. Campolongo, M. Ratto, and
S. Tarantola (2010).
Variance based sensitivity analysis of model output. Design and
estimator for the total sensitivity index.
Computer Physics Communications, 181(2):259-270,
doi:10.1016/j.cpc.2009.09.018.
3. Saltelli, A. (2002).
Making best use of model evaluations to compute sensitivity indices.
Computer Physics Communications, 145(2):280-297
doi:10.1016/S0010-4655(02)00280-1.
4. Sobol', I. M., Tarantola, S., Gatelli, D., Kucherenko, S. S., &
Mauntz, W. (2007).
Estimating the approximation error when fixing unessential factors in global
sensitivity analysis.
Reliability Engineering & System Safety, 92(7), 957-960.
https://doi.org/10.1016/j.ress.2006.07.001
"""
if seed:
# Set seed to ensure CIs are the same
rng = np.random.default_rng(seed).integers
else:
rng = np.random.randint
# Determine if groups are defined and adjusting the number
# of rows in the cross-sampled matrix accordingly
_, D = extract_group_names(problem)
if calc_second_order and Y.size % (2 * D + 2) == 0:
N = int(Y.size / (2 * D + 2))
elif not calc_second_order and Y.size % (D + 2) == 0:
N = int(Y.size / (D + 2))
else:
raise RuntimeError(
"""
Incorrect number of samples in model output file.
Confirm that calc_second_order matches option used during sampling."""
)
if not 0 < conf_level < 1:
raise RuntimeError("Confidence level must be between 0-1.")
# Normalize the model output.
# Estimates of the Sobol' indices can be biased for non-centered outputs
# so we center here by normalizing with the standard deviation.
# Other approaches opt to subtract the mean.
Y = (Y - Y.mean()) / Y.std()
A, B, AB, BA = separate_output_values(Y, D, N, calc_second_order)
r = rng(N, size=(N, num_resamples))
Z = norm.ppf(0.5 + conf_level / 2)
if not parallel:
S = create_Si_dict(D, num_resamples, keep_resamples, calc_second_order)
for j in range(D):
S["S1"][j] = first_order(A, AB[:, j], B)
S1_conf_j = first_order(A[r], AB[r, j], B[r])
if keep_resamples:
S["S1_conf_all"][:, j] = S1_conf_j
var_diff = np.ptp(np.r_[A[r], B[r]])
if var_diff != 0.0:
S["S1_conf"][j] = Z * S1_conf_j.std(ddof=1)
else:
S["S1_conf"][j] = 0.0
S["ST"][j] = total_order(A, AB[:, j], B)
ST_conf_j = total_order(A[r], AB[r, j], B[r])
if keep_resamples:
S["ST_conf_all"][:, j] = ST_conf_j
if var_diff != 0.0:
S["ST_conf"][j] = Z * ST_conf_j.std(ddof=1)
else:
S["ST_conf"][j] = 0.0
# Second order (+conf.)
if calc_second_order:
for j in range(D):
for k in range(j + 1, D):
S["S2"][j, k] = second_order(A, AB[:, j], AB[:, k], BA[:, j], B)
S["S2_conf"][j, k] = Z * second_order(
A[r], AB[r, j], AB[r, k], BA[r, j], B[r]
).std(ddof=1)
else:
tasks, n_processors = create_task_list(D, calc_second_order, n_processors)
func = partial(sobol_parallel, Z, A, AB, BA, B, r)
pool = Pool(n_processors)
S_list = pool.map_async(func, tasks)
pool.close()
pool.join()
S = Si_list_to_dict(
S_list.get(), D, num_resamples, keep_resamples, calc_second_order
)
# Add problem context and override conversion method for special case
S.problem = problem
S.to_df = MethodType(to_df, S)
# Print results to console
if print_to_console:
res = S.to_df()
for df in res:
print(df)
return S
[docs]
def first_order(A, AB, B):
"""
First order estimator following Saltelli et al. 2010 CPC, normalized by
sample variance
"""
y = np.r_[A, B]
if np.ptp(y) == 0:
warn(CONST_RESULT_MSG)
return np.array([0.0])
return np.mean(B * (AB - A), axis=0) / np.var(y, axis=0)
[docs]
def total_order(A, AB, B):
"""
Total order estimator following Saltelli et al. 2010 CPC, normalized by
sample variance
"""
y = np.r_[A, B]
if np.ptp(y) == 0:
warn(CONST_RESULT_MSG)
return np.array([0.0])
return 0.5 * np.mean((A - AB) ** 2, axis=0) / np.var(y, axis=0)
[docs]
def second_order(A, ABj, ABk, BAj, B):
"""Second order estimator following Saltelli 2002"""
y = np.r_[A, B]
if np.ptp(y) == 0:
warn(CONST_RESULT_MSG)
return np.array([0.0])
Vjk = np.mean(BAj * ABk - A * B, axis=0) / np.var(y, axis=0)
Sj = first_order(A, ABj, B)
Sk = first_order(A, ABk, B)
return Vjk - Sj - Sk
[docs]
def create_Si_dict(
D: int, num_resamples: int, keep_resamples: bool, calc_second_order: bool
):
"""initialize empty dict to store sensitivity indices"""
S = ResultDict((k, np.zeros(D)) for k in ("S1", "S1_conf", "ST", "ST_conf"))
if keep_resamples:
# Create entries to store intermediate resampling results
S["S1_conf_all"] = np.zeros((num_resamples, D))
S["ST_conf_all"] = np.zeros((num_resamples, D))
if calc_second_order:
S["S2"] = np.full((D, D), np.nan)
S["S2_conf"] = np.full((D, D), np.nan)
return S
[docs]
def separate_output_values(Y, D, N, calc_second_order):
AB = np.zeros((N, D))
BA = np.zeros((N, D)) if calc_second_order else None
step = 2 * D + 2 if calc_second_order else D + 2
A = Y[0 : Y.size : step]
B = Y[(step - 1) : Y.size : step]
for j in range(D):
AB[:, j] = Y[(j + 1) : Y.size : step]
if calc_second_order:
BA[:, j] = Y[(j + 1 + D) : Y.size : step]
return A, B, AB, BA
[docs]
def sobol_parallel(Z, A, AB, BA, B, r, tasks):
sobol_indices = []
for d, j, k in tasks:
if d == "S1":
s = first_order(A, AB[:, j], B)
elif d == "S1_conf":
s = Z * first_order(A[r], AB[r, j], B[r]).std(ddof=1)
elif d == "ST":
s = total_order(A, AB[:, j], B)
elif d == "ST_conf":
s = Z * total_order(A[r], AB[r, j], B[r]).std(ddof=1)
elif d == "S2":
s = second_order(A, AB[:, j], AB[:, k], BA[:, j], B)
elif d == "S2_conf":
s = Z * second_order(A[r], AB[r, j], AB[r, k], BA[r, j], B[r]).std(ddof=1)
sobol_indices.append([d, j, k, s])
return sobol_indices
[docs]
def create_task_list(D, calc_second_order, n_processors):
"""
Create list with one entry (key, parameter 1, parameter 2) per sobol
index (+conf.). This is used to supply parallel tasks to
multiprocessing.Pool
"""
tasks_first_order = [
[d, j, None] for j in range(D) for d in ("S1", "S1_conf", "ST", "ST_conf")
]
# Add second order (+conf.) to tasks
tasks_second_order = []
if calc_second_order:
tasks_second_order = [
[d, j, k]
for j in range(D)
for k in range(j + 1, D)
for d in ("S2", "S2_conf")
]
if n_processors is None:
n_processors = min(
cpu_count(), len(tasks_first_order) + len(tasks_second_order)
)
if not calc_second_order:
tasks = np.array_split(tasks_first_order, n_processors)
else:
# merges both lists alternating its elements and splits the
# resulting lists into n_processors sublists
tasks = np.array_split(
[
v
for v in sum(
zip_longest(tasks_first_order[::-1], tasks_second_order), ()
)
if v is not None
],
n_processors,
)
return tasks, n_processors
[docs]
def Si_list_to_dict(
S_list, D: int, num_resamples: int, keep_resamples: bool, calc_second_order: bool
):
"""Convert the parallel output into the regular dict format for
printing/returning"""
S = create_Si_dict(D, num_resamples, keep_resamples, calc_second_order)
L = []
for list in S_list: # first reformat to flatten
L += list
for s in L: # First order (+conf.)
if s[2] is None:
S[s[0]][s[1]] = s[3]
else:
S[s[0]][s[1], s[2]] = s[3]
return S
[docs]
def Si_to_pandas_dict(S_dict):
"""Convert Si information into Pandas DataFrame compatible dict.
Examples
--------
>>> X = saltelli.sample(problem, 512)
>>> Y = Ishigami.evaluate(X)
>>> Si = sobol.analyze(problem, Y, print_to_console=True)
>>> T_Si, first_Si, (idx, second_Si) = sobol.Si_to_pandas_dict(Si, problem)
Parameters
----------
S_dict : ResultDict
Sobol sensitivity indices
See Also
--------
Si_list_to_dict
Returns
-------
tuple : of total, first, and second order sensitivities.
Total and first order are dicts.
Second order sensitivities contain a tuple of parameter name
combinations for use as the DataFrame index and second order
sensitivities.
If no second order indices found, then returns tuple of
(None, None)
"""
total_order = {"ST": S_dict["ST"], "ST_conf": S_dict["ST_conf"]}
first_order = {"S1": S_dict["S1"], "S1_conf": S_dict["S1_conf"]}
idx = None
second_order = None
if "S2" in S_dict:
names, _ = extract_group_names(S_dict.problem)
if len(names) > 2:
idx = list(combinations(names, 2))
else:
idx = (names,)
second_order = {
"S2": [S_dict["S2"][names.index(i[0]), names.index(i[1])] for i in idx],
"S2_conf": [
S_dict["S2_conf"][names.index(i[0]), names.index(i[1])] for i in idx
],
}
return total_order, first_order, (idx, second_order)
[docs]
def to_df(self):
"""Conversion method to Pandas DataFrame. To be attached to ResultDict.
Returns
-------
List : of Pandas DataFrames in order of Total, First, Second
Examples
--------
>>> Si = sobol.analyze(problem, Y, print_to_console=True)
>>> total_Si, first_Si, second_Si = Si.to_df()
"""
total, first, (idx, second) = Si_to_pandas_dict(self)
names, _ = extract_group_names(self.problem)
ret = [pd.DataFrame(total, index=names), pd.DataFrame(first, index=names)]
if second:
ret += [pd.DataFrame(second, index=idx)]
return ret
[docs]
def cli_parse(parser):
parser.add_argument(
"--max-order",
type=int,
required=False,
default=2,
choices=[1, 2],
help="Maximum order of sensitivity indices to " "calculate",
)
parser.add_argument(
"-r",
"--resamples",
type=int,
required=False,
default=100,
help="Number of bootstrap resamples for Sobol " "confidence intervals",
)
parser.add_argument(
"--parallel",
action="store_true",
help="Makes " "use of parallelization.",
dest="parallel",
)
parser.add_argument(
"--processors",
type=int,
required=False,
default=None,
help="Number of processors to be used with the " + "parallel option.",
dest="n_processors",
)
return parser
[docs]
def cli_action(args):
problem = read_param_file(args.paramfile)
Y = np.loadtxt(
args.model_output_file, delimiter=args.delimiter, usecols=(args.column,)
)
analyze(
problem,
Y,
(args.max_order == 2),
num_resamples=args.resamples,
print_to_console=True,
parallel=args.parallel,
n_processors=args.n_processors,
seed=args.seed,
)
if __name__ == "__main__":
common_args.run_cli(cli_parse, cli_action)