import numpy as np
import fastmat as fm
import scipy.stats
import copy
import h5py
import time
import random
from sklearn import linear_model
from .misc import get_cartesian_product
from .misc import get_gradient_idx_domain
from .misc import display_fancy_bar
from .misc import nrmsd
from .misc import mat2ten
from .misc import ten2mat
from .misc import increment_basis
from .Gradient import get_gradient
from .ValidationSet import *
from .Computation import *
from .Classifier import *
from .Grid import *
from .SGPC import *
[docs]
class MEGPC(object):
"""
General Multi-Element gPC base class
Parameters
----------
problem: Problem class instance
GPC Problem under investigation
options : dict
Options of gPC algorithm
validation: ValidationSet object (optional)
Object containing a set of validation points and corresponding solutions. Can be used
to validate gpc approximation setting options["error_type"]="nrmsd".
- grid: Grid object containing the validation points (grid.coords, grid.coords_norm)
- results: ndarray [n_grid x n_out] results
Attributes
----------
problem: Problem class instance
GPC Problem under investigation
grid: Grid class instance
Grid of the derived gPC approximation
validation: ValidationSet object (optional)
Object containing a set of validation points and corresponding solutions. Can be used
to validate gpc approximation setting options["error_type"]="nrmsd".
- grid: Grid object containing the validation points (grid.coords, grid.coords_norm)
- results: ndarray [n_grid x n_out] results
n_grid: int or list of int
Number of grid points (for iterative solvers, this is a list of its history)
solver: str
Default solver to determine the gPC coefficients (can be chosen during GPC.solve)
- 'Moore-Penrose' ... Pseudoinverse of gPC matrix (SGPC.Reg, EGPC)
- 'OMP' ... Orthogonal Matching Pursuit, sparse recovery approach (SGPC.Reg, EGPC)
- 'LarsLasso' ... {"alpha": float 0...1} Regularization parameter
- 'NumInt' ... Numerical integration, spectral projection (SGPC.Quad)
verbose: bool
boolean value to determine if to print out the progress into the standard output
fn_results : string, optional, default=None
If provided, model evaluations are saved in fn_results.hdf5 file and gpc object in fn_results.pkl file
options : dict
Options of gPC algorithm
"""
def __init__(self, problem, options, validation=None):
"""
Constructor; Initializes MEGPC class
"""
# objects
self.problem = problem
# self.sub_problems = None
self.grid = None
self.validation = validation
self.gpc = None
self.classifier = None
self.domains = None
# arrays
self.n_grid = []
self.n_out = []
self.n_gpc = None
self.relative_error_loocv = []
self.relative_error_nrmsd = []
self.error = []
self.gradient_idx = None
# options
self.gradient = options["gradient_enhanced"]
self.solver = None
self.settings = None
self.verbose = True
if "fn_results" not in options.keys():
options["fn_results"] = None
self.fn_results = options["fn_results"]
self.options = options
self.matlab_model = options["matlab_model"]
[docs]
def init_classifier(self, coords, results, algorithm, options):
"""
Initializes Classifier object in MEGPC class
Parameters
----------
coords : ndarray of float [n_grid, n_dim]
Set of n_grid parameter combinations
results : ndarray [n_grid x n_out]
Results of the model evaluation
algorithm : str, optional, default: "learning"
Algorithm to classify grid points
- "learning" ... 2-step procedure with unsupervised and supervised learning
- ...
options : dict, optional, default=None
Classifier options
"""
self.classifier = Classifier(coords=coords,
results=results,
algorithm=algorithm,
options=options)
self.domains = self.classifier.domains
self.n_gpc = len(np.unique(self.domains))
[docs]
def update_classifier(self, coords, results):
"""
Updates self.classifier and keeps the existing class labels
Parameters
----------
coords : ndarray of float [n_grid, n_dim]
Set of n_grid parameter combinations
results : ndarray [n_grid x n_out]
Results of the model evaluation
"""
self.classifier.update(coords=coords, results=results)
self.domains = self.classifier.domains
self.n_gpc = len(np.unique(self.domains))
[docs]
def add_sub_gpc(self, problem, order, order_max, order_max_norm, interaction_order,
interaction_order_current, options, domain, validation=None):
"""
Add sub-gPC
"""
if self.gpc is None:
if self.n_gpc is not None:
self.gpc = [None for _ in range(self.n_gpc)]
else:
self.gpc = [0 for _ in range(domain)]
elif len(self.gpc) < domain:
self.gpc = self.gpc + [None for _ in range(domain - len(self.gpc))]
# create sub-gpc objects
self.gpc[domain] = Reg(problem=problem,
order=order,
order_max=order_max,
order_max_norm=order_max_norm,
interaction_order=interaction_order,
interaction_order_current=interaction_order_current,
options=options,
validation=validation)
[docs]
def init_gpc_matrices(self):
"""
Sets self.gpc_matrix with given self.basis and self.grid
The gradient_idx of the sub-gPCs are already assigned in assign_grids()
"""
for gpc in self.gpc:
gpc.init_gpc_matrix()
[docs]
def assign_grids(self, gradient_idx=None):
"""
Assign sub-grids to sub-gPCs
(including transformation in case of projection and gradient_idx)
Parameters
----------
gradient_idx : ndarray of int [gradient_results.shape[0]]
Indices of grid points where the gradient in gradient_results is provided
"""
self.gradient_idx = gradient_idx
# update domain indices if grid points were added
if len(self.domains) != self.grid.coords_norm.shape[0]:
self.domains = self.classifier.predict(self.grid.coords_norm)
for d in np.unique(self.domains):
coords = self.grid.coords[self.domains == d, :]
coords_norm = self.grid.coords_norm[self.domains == d, :]
coords_id = np.array(self.grid.coords_id)[self.domains == d].tolist()
# transform variables of original grid to reduced parameter space
if self.gpc[d].p_matrix is not None:
coords = np.matmul(coords, self.gpc[d].p_matrix.transpose())
coords_norm = np.matmul(coords_norm, self.gpc[d].p_matrix.transpose() /
self.gpc[d].p_matrix_norm[np.newaxis, :])
if self.grid.coords_gradient is not None:
coords_gradient = self.grid.coords_gradient[self.domains == d, :, :]
coords_gradient_norm = self.grid.coords_gradient_norm[self.domains == d, :, :]
coords_gradient_id = np.array(self.grid.coords_gradient_id)[self.domains == d].tolist()
else:
coords_gradient = None
coords_gradient_norm = None
coords_gradient_id = None
self.gpc[d].grid = Grid(parameters_random=self.problem.parameters_random,
coords=coords,
coords_norm=coords_norm,
coords_gradient=coords_gradient,
coords_gradient_norm=coords_gradient_norm,
coords_id=coords_id,
coords_gradient_id=coords_gradient_id)
# assign gradient_idx for sub-gPCs
if self.gradient_idx is not None:
self.gpc[d].gradient_idx = get_gradient_idx_domain(domains=self.domains,
d=d,
gradient_idx=self.gradient_idx)
[docs]
def loocv(self, results, error_norm="relative", domain=None):
"""
Perform leave-one-out cross validation of gPC approximation and add error value to self.relative_error_loocv.
The loocv error is calculated analytically after eq. (35) in [1] but omitting the "1 - " term, i.e. it
corresponds to 1 - Q^2.
relative_error_loocv = GPC.loocv(sim_results, coeffs)
.. math::
\\epsilon_{LOOCV} = \\frac{\\frac{1}{N}\sum_{i=1}^N \\left( \\frac{y(\\xi_i) - \hat{y}(\\xi_i)}{1-h_i}
\\right)^2}{\\frac{1}{N-1}\sum_{i=1}^N \\left( y(\\xi_i) - \\bar{y} \\right)^2}
with
.. math::
\\mathbf{h} = \mathrm{diag}(\\mathbf{\\Psi} (\\mathbf{\\Psi}^T \\mathbf{\\Psi})^{-1} \\mathbf{\\Psi}^T)
Parameters
----------
results : ndarray of float [n_grid x n_out]
Results from n_grid simulations with n_out output quantities
error_norm : str, optional, default="relative"
Decide if error is determined "relative" or "absolute"
domain : int, optional, default: None
Determine error in specified domain only. Default: None (all domains)
Returns
-------
relative_error_loocv : float
Relative mean error of leave one out cross validation
Notes
-----
.. [1] Blatman, G., & Sudret, B. (2010). An adaptive algorithm to build up sparse polynomial chaos expansions
for stochastic finite element analysis. Probabilistic Engineering Mechanics, 25(2), 183-197.
"""
n_loocv = 25
if domain is not None:
results_domain = copy.deepcopy(results[self.domains == domain, :])
domain_idx = copy.deepcopy(domain)
else:
results_domain = copy.deepcopy(results)
# define number of performed cross validations (max 25)
n_loocv_points = np.min((results_domain.shape[0], n_loocv))
# make list of indices, which are randomly sampled (this index is w.r.t. to all points if domain is None)
loocv_point_idx = random.sample(list(range(results_domain.shape[0])), n_loocv_points)
start = time.time()
relative_error = np.zeros(n_loocv_points)
for i in range(n_loocv_points):
if domain is None:
# determine domain of loocv point
domain_idx = int(self.classifier.predict(self.grid.coords_norm[loocv_point_idx[i]][np.newaxis, :]))
results_domain = results[self.domains == domain_idx, ]
# determine row in sub-gPC matrix of loocv point
loocv_point_idx_domain = np.sum(np.array(self.domains == domain_idx)[0:loocv_point_idx[i]])
# get mask of eliminated row
mask = np.arange(results_domain.shape[0]) != loocv_point_idx_domain
# select right gpc matrix
matrix = self.gpc[domain_idx].gpc_matrix
# determine gpc coefficients (this takes a lot of time for large problems)
coeffs_loo = self.gpc[domain_idx].solve(results=results_domain[mask, :],
solver=self.options["solver"],
matrix=matrix[mask, :],
settings=self.options["settings"],
verbose=False)
sim_results_temp = results_domain[loocv_point_idx_domain, :]
if error_norm == "relative":
norm = scipy.linalg.norm(sim_results_temp)
else:
norm = 1.
# determine error
relative_error[i] = scipy.linalg.norm(sim_results_temp - np.matmul(matrix[loocv_point_idx_domain, :],
coeffs_loo)) \
/ norm
display_fancy_bar("LOOCV", int(i + 1), int(n_loocv_points))
# store result in relative_error_loocv
relative_error_loocv = np.mean(relative_error)
iprint("LOOCV computation time: {} sec".format(time.time() - start), tab=0, verbose=True)
return relative_error_loocv
[docs]
def validate(self, coeffs, results=None, gradient_results=None, domain=None, output_idx=None):
"""
Validate gPC approximation using the ValidationSet object contained in the Problem object.
Determines the normalized root mean square deviation between the gpc approximation and the
original model. Skips this step if no validation set is present
Parameters
----------
coeffs: list of ndarray of float [n_gpc][n_coeffs x n_out]
GPC coefficients
results: ndarray of float [n_grid x n_out]
Results from n_grid simulations with n_out output quantities
gradient_results : ndarray of float [n_grid x n_out x dim], optional, default: None
Gradient of results in original parameter space (tensor)
domain : int, optional, default: None
Determine error in specified domain only. Default: None (all domains)
output_idx : int or list of int
Index of the QOI the provided coeffs and results are referring to. The correct QOI will be
selected from the validation set in case of nrmsd error.
Returns
-------
error: float
Estimated difference between gPC approximation and original model
"""
if output_idx is None:
output_idx = np.arange(results.shape[1])
if type(output_idx) is list:
output_idx = np.array(output_idx)
if type(output_idx) is not np.ndarray:
output_idx = np.array([output_idx])
if domain is None:
domain_idx = np.arange(len(coeffs))
else:
domain_idx = domain
# Determine QOIs with NaN in results and exclude them from validation
non_nan_mask = np.where(np.all(~np.isnan(results), axis=0))[0]
n_nan = results.shape[1] - non_nan_mask.size
if n_nan > 0:
iprint("In {}/{} output quantities NaN's were found.".format(n_nan, results.shape[1]),
tab=0, verbose=self.options["verbose"])
results = results[:, non_nan_mask]
# always determine nrmsd if a validation set is present
if isinstance(self.validation, ValidationSet):
if domain is None:
mask_domain = np.ones(self.validation.grid.coords_norm.shape[0]).astype(bool)
gpc_results = self.get_approximation(coeffs, self.validation.grid.coords_norm, output_idx=None)
else:
mask_domain = self.classifier.predict(self.validation.grid.coords_norm) == domain
coords_domain = self.validation.grid.coords_norm[mask_domain, ]
gpc_results = self.gpc[domain].get_approximation(coeffs[domain],
coords_domain,
output_idx=None)
if gpc_results.ndim == 1:
gpc_results = gpc_results[:, np.newaxis]
validation_results_passed = self.validation.results[np.argwhere(mask_domain), output_idx]
if validation_results_passed.ndim == 1:
validation_results_passed = validation_results_passed[:, np.newaxis]
error_nrmsd = float(np.mean(nrmsd(gpc_results,
validation_results_passed,
error_norm=self.options["error_norm"],
x_axis=False)))
if domain is None:
self.relative_error_nrmsd.append(error_nrmsd)
if self.options["error_type"] == "nrmsd":
if domain is None:
self.error.append(self.relative_error_nrmsd[-1])
elif self.options["error_type"] == "loocv":
error_loocv = self.loocv(results=results,
error_norm=self.options["error_norm"],
domain=domain)
if domain is None:
self.relative_error_loocv.append(error_loocv)
self.error.append(self.relative_error_loocv[-1])
if domain is None:
return self.error[-1]
else:
if self.options["error_type"] == "nrmsd":
return error_nrmsd
elif self.options["error_type"] == "loocv":
return error_loocv
[docs]
def get_pdf(self, coeffs, n_samples, output_idx=None):
""" Determine the estimated pdfs of the output quantities
pdf_x, pdf_y = MEGPC.get_pdf(coeffs, n_samples, output_idx=None)
Parameters
----------
coeffs: list of ndarray of float [n_gpc][n_coeffs x n_out]
GPC coefficients
n_samples: int
Number of samples used to estimate output pdfs
output_idx: ndarray, optional, default=None [1 x n_out]
Index of output quantities to consider (if output_idx=None, all output quantities are considered)
Returns
-------
pdf_x: ndarray of float [100 x n_out]
x-coordinates of output pdfs of output quantities
pdf_y: ndarray of float [100 x n_out]
y-coordinates of output pdfs (probability density of output quantity)
"""
# handle (N,) arrays
if len(coeffs[0].shape) == 1:
n_out = 1
else:
n_out = coeffs[0].shape[1]
# if output index array is not provided, determine pdfs of all outputs
if output_idx is None:
output_idx = np.linspace(0, n_out - 1, n_out)
output_idx = output_idx[np.newaxis, :]
# sample gPC expansion
samples_in, samples_out = self.get_samples(n_samples=n_samples, coeffs=coeffs, output_idx=output_idx)
# determine kernel density estimates using Gaussian kernel
pdf_x = np.zeros([100, n_out])
pdf_y = np.zeros([100, n_out])
for i_out in range(n_out):
pdf_y[:, i_out], tmp = np.histogram(samples_out, bins=100, density=True)
pdf_x[:, i_out] = (tmp[1:] + tmp[0:-1])/2.
# kde = scipy.stats.gaussian_kde(samples_out[:, i_out], bw_method=0.1 / samples_out[:, i_out].std(ddof=1))
# pdf_y[:, i_out] = kde(pdf_x[:, i_out])
# pdf_x[:, i_out] = np.linspace(samples_out[:, i_out].min(), samples_out[:, i_out].max(), 100)
return pdf_x, pdf_y
[docs]
def get_samples(self, coeffs, n_samples, output_idx=None):
"""
Randomly sample gPC expansion.
x, pce = SGPC.get_pdf_mc(n_samples, coeffs, output_idx=None)
Parameters
----------
coeffs: list of ndarray of float [n_gpc][n_basis x n_out]
GPC coefficients for each sub-domain
n_samples: int
Number of random samples drawn from the respective input pdfs.
output_idx: ndarray of int [1 x n_out] optional, default=None
Index of output quantities to consider.
Returns
-------
x: ndarray of float [n_samples x dim]
Generated samples in normalized coordinates [-1, 1]. (original parameter space)
pce: ndarray of float [n_samples x n_out]
GPC approximation at points x.
"""
# seed the random numbers generator
np.random.seed()
# generate temporary grid with random samples for each random input variable [n_samples x dim]
grid = Random(parameters_random=self.problem.parameters_random,
n_grid=n_samples,
options=None)
# if output index list is not provided, sample all gpc outputs
if output_idx is None:
n_out = 1 if coeffs[0].ndim == 1 else coeffs[0].shape[1]
output_idx = np.arange(n_out)
pce = self.get_approximation(coeffs=coeffs, x=grid.coords_norm, output_idx=output_idx)
return grid.coords_norm, pce
[docs]
def get_approximation(self, coeffs, x, output_idx=None):
"""
Calculates the gPC approximation in points with output_idx and normalized parameters xi (interval: [-1, 1]).
pce = MEGPC.get_approximation(coeffs, x, output_idx=None)
Parameters
----------
coeffs: list of ndarray of float [n_gpc][n_basis x n_out]
GPC coefficients for each output variable of each sub-domain
x: ndarray of float [n_x x n_dim]
Normalized coordinates, where the gPC approximation is calculated (original parameter space)
output_idx: ndarray of int, optional, default=None [n_out]
Indices of output quantities to consider (Default: all).
Returns
-------
pce: ndarray of float [n_x x n_out]
GPC approximation at normalized coordinates x.
"""
if type(output_idx) is list:
output_idx = np.array(output_idx)
elif type(output_idx) != np.ndarray and output_idx is not None:
output_idx = np.array([output_idx])
else:
if type(coeffs) is list:
output_idx = np.arange(coeffs[0].shape[1])
else:
output_idx = np.arange(coeffs.shape[1])
pce = np.zeros((x.shape[0], len(output_idx)))
# get classes of grid-points
domains = self.classifier.predict(x)
# determine gPC approximation for sub-domains
for d in np.unique(domains):
pce[domains == d, :] = self.gpc[d].get_approximation(coeffs=coeffs[d],
x=x[(domains == d).flatten(), :],
output_idx=output_idx)
return pce
[docs]
def update_gpc_matrices(self, gradient=False):
"""
Update gPC matrix according to existing self.grid and self.basis.
Call this method when self.gpc_matrix does not fit to self.grid and self.basis objects anymore
The old gPC matrix with their self.gpc_matrix_b_id and self.gpc_matrix_coords_id is compared
to self.basis.b_id and self.grid.coords_id. New rows and columns are computed when differences are found.
"""
for i, gpc in enumerate(self.gpc):
gpc.update_gpc_matrix(gradient=gradient)
[docs]
def save_gpc_matrices_hdf5(self):
"""
Save gPC matrix and gPC gradient matrix in .hdf5 file <"fn_results" + ".hdf5"> under the key "gpc_matrix/dom_x"
and "gpc_matrix_gradient/dom_x". If matrices are already present, check for equality and save only appended
rows and columns.
"""
for i, gpc in enumerate(self.gpc):
gpc.save_gpc_matrix_hdf5(hdf5_path_gpc_matrix="gpc_matrix/dom_" + str(i),
hdf5_path_gpc_matrix_gradient="gpc_matrix_gradient/dom_" + str(i))
[docs]
def solve(self, results, gradient_results=None, solver=None, settings=None, verbose=False):
"""
Determines gPC coefficients of sub-gPCs
Parameters
----------
results : ndarray of float [n_grid x n_out]
Results from simulations with n_out output quantities
gradient_results : ndarray of float [n_gradient x n_out x dim], optional, default: None
Gradient of results in original parameter space in specific grid points
solver : str
Solver to determine the gPC coefficients
- 'Moore-Penrose' ... Pseudoinverse of gPC matrix (SGPC.Reg, EGPC)
- 'OMP' ... Orthogonal Matching Pursuit, sparse recovery approach (SGPC.Reg, EGPC)
- 'LarsLasso' ... Least-Angle Regression using Lasso model (SGPC.Reg, EGPC)
- 'NumInt' ... Numerical integration, spectral projection (SGPC.Quad)
settings : dict
Solver settings
- 'Moore-Penrose' ... None
- 'OMP' ... {"n_coeffs_sparse": int} Number of gPC coefficients != 0 or "sparsity": float 0...1
- 'LarsLasso' ... {"alpha": float 0...1} Regularization parameter
- 'NumInt' ... None
verbose : bool
boolean value to determine if to print out the progress into the standard output
Returns
-------
coeffs: list of ndarray of float [n_gpc][n_coeffs x n_out]
gPC coefficients
"""
# use default solver if not specified
if solver is None:
solver = self.solver
# use default solver settings if not specified
if solver is None:
settings = self.settings
coeffs = [0 for _ in range(self.n_gpc)]
# determine coeffs of sub-gPCs
for d in np.unique(self.domains):
if gradient_results is not None:
gradient_results_passed = gradient_results[self.domains[self.gradient_idx] == d, :, :]
else:
gradient_results_passed = None
coeffs[d] = self.gpc[d].solve(results=results[self.domains == d, :],
gradient_results=gradient_results_passed,
solver=solver,
settings=settings,
verbose=verbose)
return coeffs
# def extract_domain(self, data, domain):
# """
# Extract data from dataset of specified domain
#
# Parameters
# ----------
# data : ndarray of float [n_data x m]
# Dataset
# domain : int
# Domain index to extract
# """
# mask_results = self.domains == domain
#
# # determine mask
# if self.gpc[domain].gpc_matrix_gradient is not None:
# mask_gradient = np.zeros((self.grid.coords_norm.shape[0], 1, self.problem.dim)).astype(bool)
# mask_gradient[mask_results, :, :] = True
# mask = np.vstack((mask_results[:, np.newaxis], ten2mat(mask_gradient)))
#
# else:
# mask = np.zeros((data.shape[0], 1)).astype(bool)
# mask[mask_results, :] = True
#
# return data[mask.flatten(), :]
[docs]
def create_validation_set(self, n_samples, n_cpu=1, gradient=False):
"""
Creates a ValidationSet instance (calls the model)
Parameters
----------
n_samples : int
Number of sampling points contained in the validation set
n_cpu : int
Number of parallel function evaluations to evaluate validation set (n_cpu=0 assumes that the
model is capable to evaluate all grid points in parallel)
gradient : bool, optional, default: False
Determine gradient of results in each grid points
"""
# create set of validation points
n_samples = n_samples
grid = Random(parameters_random=self.problem.parameters_random,
n_grid=n_samples,
options={"seed": self.options["seed"]})
# Evaluate original model at grid points
com = Computation(n_cpu=n_cpu, matlab_model=self.matlab_model)
results = com.run(model=self.problem.model, problem=self.problem, coords=grid.coords)
if results.ndim == 1:
results = results[:, np.newaxis]
# Determine gradient of results at grid points
if gradient:
gradient_results, gradient_idx = get_gradient(model=self.problem.model,
problem=self.problem,
grid=grid,
results=results,
com=com,
method="FD_fwd",
gradient_results_present=None,
gradient_idx_skip=None,
i_iter=None,
i_subiter=None,
print_func_time=False,
dx=1e-3,
distance_weight=None)
else:
gradient_results = None
gradient_idx = None
self.validation = ValidationSet(grid=grid,
results=results,
gradient_results=gradient_results,
gradient_idx=gradient_idx)
[docs]
@staticmethod
def get_mean(samples):
"""
Calculate the expected value.
mean = MEGPC.get_mean(samples)
Parameters
----------
samples : ndarray of float [n_x x n_out], optional, default: None
Model evaluations from MEGPC approximation
Returns
-------
mean: ndarray of float [1 x n_out]
Expected value of output quantities
"""
mean = np.mean(samples, axis=0)
mean = mean[np.newaxis, :]
return mean
[docs]
@staticmethod
def get_std(samples=None):
"""
Calculate the standard deviation.
std = MEGPC.get_std(samples)
Parameters
----------
samples : ndarray of float [n_samples x n_out], optional, default: None
Model evaluations from MEGPC approximation
Returns
-------
std: ndarray of float [1 x n_out]
Standard deviation of output quantities
"""
std = np.std(samples, axis=0)
std = std[np.newaxis, :]
return std
# noinspection PyTypeChecker
[docs]
def get_sobol_indices(self, coeffs, n_samples=1e4):
"""
Calculate the available sobol indices from the gPC coefficients by sampling up to second order.
sobol, sobol_idx, sobol_idx_bool = MEGPC.get_sobol_indices(coeffs, n_samples=1e4)
Parameters
----------
coeffs: list of ndarray of float [n_gpc][n_basis x n_out]
GPC coefficients
n_samples : int, optional, default: 1e4
Number of samples to determine Sobol indices by sampling. The efficient number of samples
increases to n_samples * (2*dim + 2) in Saltelli's Sobol sampling sequence.
Returns
-------
sobol: ndarray of float [n_sobol x n_out]
Normalized Sobol indices w.r.t. total variance
sobol_idx: list of ndarray of int [n_sobol x (n_sobol_included)]
Parameter combinations in rows of sobol.
sobol_idx_bool: ndarray of bool [n_sobol x dim]
Boolean mask which contains unique multi indices.
Notes
-----
.. [1] Sobol, I. M. (2001). "Global sensitivity indices for nonlinear
mathematical models and their Monte Carlo estimates." Mathematics
and Computers in Simulation, 55(1-3):271-280,
doi:10.1016/S0378-4754(00)00270-6.
.. [2] Saltelli, A. (2002). "Making best use of model evaluations to
compute sensitivity indices." Computer Physics Communications,
145(2):280-297, doi:10.1016/S0010-4655(02)00280-1.
.. [3] Saltelli, A., P. Annoni, I. Azzini, F. Campolongo, M. Ratto, and
S. Tarantola (2010). "Variance based sensitivity analysis of model
output. Design and estimator for the total sensitivity index."
Computer Physics Communications, 181(2):259-270,
doi:10.1016/j.cpc.2009.09.018.
"""
# iprint("Determining Sobol indices...", tab=0)
dim = self.problem.dim
problem_original = self.problem
# generate uniform distributed sobol sequence (parameter space [0, 1])
coords_norm_01 = saltelli_sampling(n_samples=n_samples, dim=dim, calc_second_order=True)
coords_norm = np.zeros(coords_norm_01.shape)
# transform to respective input pdfs using inverse cdfs
for i_key, key in enumerate(problem_original.parameters_random.keys()):
coords_norm[:, i_key] = problem_original.parameters_random[key].icdf(coords_norm_01[:, i_key])
# run model evaluations
res = self.get_approximation(coeffs=coeffs, x=coords_norm)
# determine sobol indices
sobol, sobol_idx, sobol_idx_bool = get_sobol_indices_saltelli(y=res,
dim=dim,
calc_second_order=True,
num_resamples=100,
conf_level=0.95)
# sort
idx = np.flip(np.argsort(sobol[:, 0], axis=0))
sobol = sobol[idx, :]
sobol_idx = [sobol_idx[i] for i in idx]
sobol_idx_bool = sobol_idx_bool[idx, :]
return sobol, sobol_idx, sobol_idx_bool
# noinspection PyTypeChecker
[docs]
def get_global_sens(self, coeffs, n_samples=1e5):
"""
Determine the global derivative based sensitivity coefficients after Xiu (2009) [1].
global_sens = MEGPC.get_global_sens(coeffs, n_samples=1e5)
Parameters
----------
coeffs: list of ndarray of float [n_gpc][n_basis x n_out], optional, default: None
GPC coefficients
n_samples : int, optional, default: 1e4
Number of samples
Returns
-------
global_sens: ndarray [dim x n_out]
Global derivative based sensitivity coefficients
Notes
-----
.. [1] D. Xiu, Fast Numerical Methods for Stochastic Computations: A Review,
Commun. Comput. Phys., 5 (2009), pp. 242-272 eq. (3.14) page 255
"""
# generate sample coordinates (original parameter space)
grid = Random(parameters_random=self.problem.parameters_random,
n_grid=n_samples,
options=None)
local_sens = self.get_local_sens(coeffs, grid.coords_norm)
# average the results and reshape [dim x n_out]
global_sens = np.mean(local_sens, axis=0).transpose()
return global_sens
# noinspection PyTypeChecker
[docs]
def get_local_sens(self, coeffs, x=None):
"""
Determine the local derivative based sensitivity coefficients in the point of interest x
(normalized coordinates [-1, 1]).
local_sens = MEGPC.calc_localsens(coeffs, x)
Parameters
----------
coeffs: list of ndarray of float [n_gpc][n_basis x n_out]
GPC coefficients
x: ndarray of float [n_points x dim], optional, default: center of parameter space
Points in variable space to evaluate local sensitivity in (normalized coordinates [-1, 1])
(original parameter space)
Returns
-------
local_sens: ndarray [n_points x n_out x dim]
Local sensitivity of output quantities in point x
"""
if x is None:
x = np.zeros(self.problem.dim)[np.newaxis, :]
# classify coordinates
domains = self.classifier.predict(x)
local_sens = np.zeros((x.shape[0], coeffs[0].shape[1], self.problem.dim))
for d in np.unique(domains):
# project coordinate to reduced parameter space if necessary
if self.gpc[d].p_matrix is not None:
x_passed = np.matmul(x[domains == d, :], self.gpc[d].p_matrix.transpose() /
self.gpc[d].p_matrix_norm[np.newaxis, :])
else:
x_passed = x[domains == d, :]
# construct gPC gradient matrix [n_samples x n_basis x dim(_red)]
gpc_matrix_gradient = self.gpc[d].create_gpc_matrix(b=self.gpc[d].basis.b,
x=x_passed,
gradient=True,
gradient_idx=np.arange(x_passed.shape[0]))
local_sens_domain = np.matmul(gpc_matrix_gradient.transpose(2, 0, 1), coeffs[d]).transpose(1, 2, 0)
# project the gradient back to the original space if necessary
if self.gpc[d].p_matrix is not None:
local_sens_domain = np.matmul(local_sens_domain, self.gpc[d].p_matrix /
self.gpc[d].p_matrix_norm[:, np.newaxis])
local_sens[domains == d, :, :] = local_sens_domain
return local_sens