import warnings
import importlib
from types import MethodType
from functools import partial, wraps
from multiprocess import Pool, cpu_count
import numpy as np
import SALib.sample as samplers
import SALib.analyze as analyzers
from .util_funcs import ( # noqa: F401, E402
avail_approaches,
_define_problem_with_groups,
) # noqa: F401, E402
from .results import ResultDict
ptqdm_available = True
try:
from p_tqdm import p_imap
except ImportError:
ptqdm_available = False
try:
from pathos.pp import ParallelPythonPool as pp_Pool
pathos_available = True
except ImportError:
pathos_available = False
__all__ = ["ProblemSpec"]
[docs]
class ProblemSpec(dict):
"""Dictionary-like object representing an SALib Problem specification.
Attributes
----------
samples : np.array, of generated samples
results : np.array, of evaluations (i.e., model outputs)
analysis : np.array or dict, of sensitivity indices
"""
def __init__(self, *args, **kwargs):
super(ProblemSpec, self).__init__(*args, **kwargs)
_check_spec_attributes(self)
self._samples = None
self._results = None
self._analysis = None
self["num_vars"] = len(self["names"])
self = _define_problem_with_groups(self)
self._add_samplers()
self._add_analyzers()
@property
def samples(self):
return self._samples
@samples.setter
def samples(self, vals):
cols = vals.shape[1]
if cols != self["num_vars"]:
msg = "Mismatched sample size: Expected "
msg += "{} cols, got {}".format(self["num_vars"], cols)
raise ValueError(msg)
self._samples = vals
# Clear results to avoid confusion
self._results = None
@property
def results(self):
return self._results
@results.setter
def results(self, vals):
val_shape = vals.shape
if len(val_shape) == 1:
cols = 1
else:
cols = vals.shape[1]
out_cols = self.get("outputs", None)
if out_cols is None:
if cols == 1:
self["outputs"] = ["Y"]
else:
self["outputs"] = [f"Y{i}" for i in range(1, cols + 1)]
else:
if cols != len(self["outputs"]):
msg = "Mismatched sample size: Expected "
msg += "{} cols, got {}".format(self["outputs"], cols)
raise ValueError(msg)
self._results = vals
@property
def analysis(self):
return self._analysis
[docs]
def sample(self, func, *args, **kwargs):
"""Create sample using given function.
Parameters
----------
func : function,
Sampling method to use. The given function must accept the SALib
problem specification as the first parameter and return a numpy
array.
*args : list,
Additional arguments to be passed to `func`
**kwargs : dict,
Additional keyword arguments passed to `func`
Returns
-------
self : ProblemSpec object
"""
# Clear model output and analysis results to avoid confusion
# especially if samples are forcibly changed...
self._analysis = None
self._results = None
self._samples = func(self, *args, **kwargs)
return self
[docs]
def set_samples(self, samples: np.ndarray):
"""Set previous samples used."""
self.samples = samples
return self
[docs]
def set_results(self, results: np.ndarray):
"""Set previously available model results."""
if self.samples is not None:
assert self.samples.shape[0] == results.shape[0], (
"Provided result array does not match existing number of"
" existing samples!"
)
self.results = results
return self
[docs]
def evaluate(self, func, *args, **kwargs):
"""Evaluate a given model.
Parameters
----------
func : function,
Model, or function that wraps a model, to be run/evaluated.
The provided function is required to accept a numpy array of
inputs as its first parameter and must return a numpy array of
results.
*args : list,
Additional arguments to be passed to `func`
nprocs : int,
If specified, attempts to parallelize model evaluations
**kwargs : dict,
Additional keyword arguments passed to `func`
Returns
-------
self : ProblemSpec object
"""
nprocs = kwargs.pop("nprocs", 1)
if nprocs > 1:
return self.evaluate_parallel(func, *args, nprocs=nprocs, **kwargs)
self.results = func(self._samples, *args, **kwargs)
return self
[docs]
def evaluate_parallel(self, func, *args, nprocs=None, **kwargs):
"""Evaluate model locally in parallel.
All detected processors will be used if `nprocs` is not specified.
Parameters
----------
func : function,
Model, or function that wraps a model, to be run in parallel.
The provided function needs to accept a numpy array of inputs as
its first parameter and must return a numpy array of results.
nprocs : int,
Number of processors to use.
Capped to the number of available processors.
*args : list,
Additional arguments to be passed to `func`
**kwargs : dict,
Additional keyword arguments passed to `func`
Returns
-------
self : ProblemSpec object
"""
warnings.warn(
"Parallel evaluation is an experimental feature and may not work."
)
if self._samples is None:
raise RuntimeError("Sampling not yet conducted")
max_procs = cpu_count()
if nprocs is None:
nprocs = max_procs
else:
if nprocs > max_procs:
warnings.warn(
f"{nprocs} processors requested but only {max_procs} found."
)
nprocs = min(max_procs, nprocs)
# Create wrapped partial function to allow passing of additional args
tmp_f = self._wrap_func(func, *args, **kwargs)
# Split into even chunks
chunks = np.array_split(self._samples, int(nprocs), axis=0)
if ptqdm_available:
# Display progress bar if available
res = p_imap(tmp_f, chunks, num_cpus=nprocs)
else:
with Pool(nprocs) as pool:
res = list(pool.imap(tmp_f, chunks))
self.results = self._collect_results(res)
return self
[docs]
def evaluate_distributed(
self, func, *args, nprocs=1, servers=None, verbose=False, **kwargs
):
"""Distribute model evaluation across a cluster.
Usage Conditions:
* The provided function needs to accept a numpy array of inputs as
its first parameter
* The provided function must return a numpy array of results
Parameters
----------
func : function,
Model, or function that wraps a model, to be run in parallel
nprocs : int,
Number of processors to use for each node. Defaults to 1.
servers : list[str] or None,
IP addresses or alias for each server/node to use.
verbose : bool,
Display job execution statistics. Defaults to False.
*args : list,
Additional arguments to be passed to `func`
**kwargs : dict,
Additional keyword arguments passed to `func`
Returns
-------
self : ProblemSpec object
"""
if not pathos_available:
raise RuntimeError(
"Pathos is required to run in distributed mode. Please install"
" with `pip install pathos` or"
" `conda install pathos -c conda-forge`"
)
if verbose:
from pathos.parallel import stats
warnings.warn(
"Distributed evaluation is an untested experimental feature and"
" may not work."
)
workers = pp_Pool(nprocs, servers=servers)
# Split into even chunks
chunks = np.array_split(self._samples, int(nprocs) * len(servers), axis=0)
tmp_f = self._wrap_func(func)
res = list(workers.map(tmp_f, chunks))
self.results = self._collect_results(res)
if verbose:
print(stats(), "\n")
workers.clear()
return self
[docs]
def analyze(self, func, *args, **kwargs):
"""Analyze sampled results using given function.
Parameters
----------
func : function,
Analysis method to use. The provided function must accept the
problem specification as the first parameter, X values if needed,
Y values, and return a numpy array.
*args : list,
Additional arguments to be passed to `func`
nprocs : int,
If specified, attempts to parallelize model evaluations
**kwargs : dict,
Additional keyword arguments passed to `func`
Returns
-------
self : ProblemSpec object
"""
if self["num_vars"] == 1 or (self["groups"] and len("groups") == 1):
msg = (
"There is only a single parameter or group defined. There is "
"no point in conducting sensitivity analysis as any and all"
"effect(s) will be mapped to the single parameter/group."
)
raise ValueError(msg)
if "nprocs" in kwargs:
# Call parallel method instead
return self.analyze_parallel(func, *args, **kwargs)
if self._results is None:
raise RuntimeError("Model not yet evaluated")
if "X" in func.__code__.co_varnames:
# enforce passing of X if expected
func = partial(func, *args, X=self._samples, **kwargs)
else:
func = partial(func, *args, **kwargs)
out_cols = self.get("outputs", None)
if out_cols is None:
if len(self._results.shape) == 1:
self["outputs"] = ["Y"]
else:
num_cols = self._results.shape[1]
self["outputs"] = [f"Y{i}" for i in range(1, num_cols + 1)]
if len(self["outputs"]) > 1:
self._analysis = {}
for i, out in enumerate(self["outputs"]):
self._analysis[out] = func(self, Y=self._results[:, i])
else:
self._analysis = func(self, Y=self._results)
return self
[docs]
def analyze_parallel(self, func, *args, nprocs=None, **kwargs):
"""Analyze sampled results using the given function in parallel.
Parameters
----------
func : function,
Analysis method to use. The provided function must accept the
problem specification as the first parameter, X values if needed,
Y values, and return a numpy array.
*args : list,
Additional arguments to be passed to `func`
nprocs : int,
Number of processors to use.
Capped to the number of outputs or available processors.
**kwargs : dict,
Additional keyword arguments passed to `func`
Returns
-------
self : ProblemSpec object
"""
warnings.warn("Parallel analysis is an experimental feature and may not work.")
if self._results is None:
raise RuntimeError("Model not yet evaluated")
if "X" in func.__code__.co_varnames:
# enforce passing of X if expected
func = partial(func, *args, X=self._samples, **kwargs)
else:
func = partial(func, *args, **kwargs)
out_cols = self.get("outputs", None)
if out_cols is None:
if len(self._results.shape) == 1:
self["outputs"] = ["Y"]
else:
num_cols = self._results.shape[1]
self["outputs"] = [f"Y{i}" for i in range(1, num_cols + 1)]
# Cap number of processors used
Yn = len(self["outputs"])
if Yn == 1:
# Only single output, cannot parallelize
warnings.warn(
f"Analysis was not parallelized: {nprocs} processors requested"
f" for 1 output."
)
res = func(self, Y=self._results)
else:
max_procs = cpu_count()
if nprocs is None:
nprocs = max_procs
else:
nprocs = min(Yn, nprocs, max_procs)
if ptqdm_available:
# Display progress bar if available
res = p_imap(
lambda y: func(self, Y=y),
[self._results[:, i] for i in range(Yn)],
num_cpus=nprocs,
)
else:
with Pool(nprocs) as pool:
res = list(
pool.imap(
lambda y: func(self, Y=y),
[self._results[:, i] for i in range(Yn)],
)
)
# Assign by output name if more than 1 output, otherwise
# attach directly
if Yn > 1:
self._analysis = {}
for out, Si in zip(self["outputs"], list(res)):
self._analysis[out] = Si
else:
self._analysis = res
return self
[docs]
def to_df(self):
"""Convert results to Pandas DataFrame."""
an_res = self._analysis
if isinstance(an_res, ResultDict):
return an_res.to_df()
elif isinstance(an_res, dict):
# case where analysis result is a dict of ResultDicts
return [an.to_df() for an in list(an_res.values())]
raise RuntimeError("Analysis not yet conducted")
[docs]
def plot(self, **kwargs):
"""Plot results as a bar chart.
Returns
-------
axes : matplotlib axes object
"""
if self._analysis is None:
raise RuntimeError("Analysis not yet conducted")
num_rows = len(self["outputs"])
if num_rows == 1:
return self._analysis.plot(**kwargs)
try:
plt
except NameError:
import matplotlib.pyplot as plt
num_cols = 1
fk = list(self._analysis.keys())[0]
if isinstance(self._analysis[fk].to_df(), (list, tuple)):
# have to divide by 2 to account for CI columns
num_cols = len(self._analysis[fk]) // 2
p_width = max(num_cols * 3, 5)
p_height = max(num_rows * 3, 6)
_, axes = plt.subplots(
num_rows, num_cols, sharey=True, figsize=(p_width, p_height)
)
for res, ax in zip(self._analysis, axes):
self._analysis[res].plot(ax=ax, **kwargs)
try:
ax[0].set_title(res)
except TypeError:
ax.set_title(res)
plt.tight_layout()
return axes
[docs]
def heatmap(
self, metric: str = None, index: str = None, title: str = None, ax=None
):
"""Plot results as a heatmap.
Parameters
----------
metric : str or None, name of output to analyze (display all if `None`)
index : str or None, name of index to plot, dependent on what
analysis was conducted (ST, S1, etc; displays all if `None`)
title : str, title of plot to use (defaults to the same as `metric`)
ax : axes object, matplotlib axes object to use for plot.
Creates a new figure if not provided.
Returns
-------
ax : matplotlib axes object
"""
from SALib.plotting.heatmap import heatmap # type: ignore
return heatmap(self, metric, index, title, ax)
def _wrap_func(self, func, *args, **kwargs):
# Create wrapped partial function to allow passing of additional args
tmp_f = func
if (len(args) > 0) or (len(kwargs) > 0):
tmp_f = lambda x: func(x, *args, **kwargs) # noqa
return tmp_f
def _collect_results(self, res):
res_shape = res[0].shape
if len(res_shape) > 1:
res_shape = (len(self._samples), *res_shape[1:])
else:
res_shape = len(self._samples)
final_res = np.empty(res_shape)
# Collect results
# Cannot enumerate over this as the length
# of individual results may vary
i = 0
for r in res:
r_len = len(r)
final_res[i : i + r_len] = r
i += r_len
return final_res
def _method_creator(self, func, method):
"""Generate convenience methods for specified `method`."""
@wraps(func)
def modfunc(self, *args, **kwargs):
return getattr(self, method)(func, *args, **kwargs)
return modfunc
def _add_samplers(self):
"""Dynamically add available SALib samplers as ProblemSpec methods."""
for sampler in avail_approaches(samplers):
func = getattr(
importlib.import_module("SALib.sample.{}".format(sampler)), "sample"
)
method_name = "sample_{}".format(sampler.replace("_sampler", ""))
self.__setattr__(
method_name, MethodType(self._method_creator(func, "sample"), self)
)
def _add_analyzers(self):
"""Dynamically add available SALib analyzers as ProblemSpec methods."""
for analyzer in avail_approaches(analyzers):
func = getattr(
importlib.import_module("SALib.analyze.{}".format(analyzer)), "analyze"
)
method_name = "analyze_{}".format(analyzer.replace("_analyzer", ""))
self.__setattr__(
method_name, MethodType(self._method_creator(func, "analyze"), self)
)
def _repr_pretty_(self, p, cycle):
p.text(str(self) if not cycle else "...")
def __str__(self):
rep = ""
if self._samples is not None:
arr_shape = self._samples.shape
if len(arr_shape) == 1:
arr_shape = (arr_shape[0], 1)
nr, nx = arr_shape
rep += (
"Samples:\n" f"\t{nx} parameters: {self['names']}\n" f"\t{nr} samples\n"
)
if self._results is not None:
arr_shape = self._results.shape
if len(arr_shape) == 1:
arr_shape = (arr_shape[0], 1)
nr, ny = arr_shape
rep += (
"Outputs:\n"
f"\t{ny} outputs: {self['outputs']}\n"
f"\t{nr} evaluations\n"
)
if self._analysis is not None:
rep += "Analysis:\n"
an_res = self._analysis
allowed_types = (list, tuple)
if isinstance(an_res, ResultDict):
an_res = an_res.to_df()
if not isinstance(an_res, allowed_types):
rep += f"{an_res}\n"
else:
for df in an_res:
rep += f"{df}\n"
elif isinstance(an_res, dict):
for res_name in an_res:
rep += "{}:\n".format(res_name)
dfs = an_res[res_name].to_df()
if isinstance(dfs, allowed_types):
for df in dfs:
rep += f"{df}:\n"
else:
rep += f"{dfs}:\n"
if len(rep) == 0:
rep = (
"ProblemSpec does not currently contain any samples, "
"evaluations or results."
)
return rep
def _check_spec_attributes(spec: ProblemSpec):
assert "names" in spec, "Names not defined"
assert "bounds" in spec, "Bounds not defined"
assert len(spec["bounds"]) == len(
spec["names"]
), f"""Number of bounds do not match number of names
Number of names:
{len(spec['names'])} | {spec['names']}
----------------
Number of bounds: {len(spec['bounds'])}
"""