Source code for pygpc.GPC

import numpy as np
import fastmat as fm
import scipy.stats
import copy
import h5py
import time
import random
import sys
from sklearn import linear_model
from scipy.signal import savgol_filter
from .misc import get_cartesian_product
from .misc import display_fancy_bar
from .misc import nrmsd
from .misc import mat2ten
from .misc import ten2mat
from .pygpc_extensions import create_gpc_matrix_cpu
from .pygpc_extensions import create_gpc_matrix_omp
from .ValidationSet import *
from .Computation import *
from .Grid import *


try:
    from .pygpc_extensions_cuda import create_gpc_matrix_cuda
except ImportError:
    pass


[docs] class GPC(object): """ General gPC base class Parameters ---------- problem: Problem class instance GPC Problem under investigation options : dict Options of gPC algorithm validation: ValidationSet object (optional) Object containing a set of validation points and corresponding solutions. Can be used to validate gpc approximation setting options["error_type"]="nrmsd". - grid: Grid object containing the validation points (grid.coords, grid.coords_norm) - results: ndarray [n_grid x n_out] results Attributes ---------- problem: Problem class instance GPC Problem under investigation basis: Basis class instance Basis of the gPC including BasisFunctions grid: Grid class instance Grid of the derived gPC approximation validation: ValidationSet object (optional) Object containing a set of validation points and corresponding solutions. Can be used to validate gpc approximation setting options["error_type"]="nrmsd". - grid: Grid object containing the validation points (grid.coords, grid.coords_norm) - results: ndarray [n_grid x n_out] results gpc_matrix: [N_samples x N_poly] ndarray of float Generalized polynomial chaos matrix gpc_matrix_gradient: [N_samples * dim x N_poly] ndarray of float Derivative of generalized polynomial chaos matrix matrix_inv: [N_poly (+ N_gradient) x N_samples] ndarray of float Pseudo inverse of the generalized polynomial chaos matrix (with or without gradient) p_matrix: [dim_red x dim] ndarray of float Projection matrix to reduce number of efficient dimensions (\\eta = p_matrix * \\xi) p_matrix_norm: [dim_red] ndarray of float Maximal possible length of new axis in \\eta space. Since the projected variables are modelled in the normalized space between [-1, 1], the transformed coordinates need to be scaled. nan_elm: ndarray of int Indices of NaN elements of model output gpc_matrix_coords_id: list of UUID4() UUID4() IDs of grid points the gPC matrix derived with gpc_matrix_b_id: list of UUID4() UUID4() IDs of basis functions the gPC matrix derived with n_basis: int or list of int Number of basis functions (for iterative solvers, this is a list of its history) n_grid: int or list of int Number of grid points (for iterative solvers, this is a list of its history) solver: str Default solver to determine the gPC coefficients (can be chosen during GPC.solve) - 'Moore-Penrose' ... Pseudoinverse of gPC matrix (SGPC.Reg, EGPC) - 'OMP' ... Orthogonal Matching Pursuit, sparse recovery approach (SGPC.Reg, EGPC) - 'LarsLasso' ... {"alpha": float 0...1} Regularization parameter - 'NumInt' ... Numerical integration, spectral projection (SGPC.Quad) verbose: bool Boolean value to determine if to print out the progress into the standard output fn_results : string, optional, default=None If provided, model evaluations are saved in fn_results.hdf5 file and gpc object in fn_results.pkl file relative_error_loocv: list of float Relative error of the leave-one-out-cross-validation relative_error_nrmsd: list of float Normalized root mean square deviation between model and gpc approximation options : dict Options of gPC algorithm """ def __init__(self, problem, options, validation=None): """ Constructor; Initializes GPC class """ # objects self.problem = problem self.problem_original = None self.basis = None self.grid = None self.validation = validation # arrays self.gpc_matrix = None self.gpc_matrix_gradient = None self.matrix_inv = None self.p_matrix = None self.p_matrix_norm = None self.nan_elm = [] self.gpc_matrix_coords_id = None self.gpc_matrix_b_id = None self.gpc_matrix_gradient_coords_id = None self.gpc_matrix_gradient_b_id = None self.n_basis = [] self.n_grid = [] self.relative_error_nrmsd = [] self.relative_error_loocv = [] self.error = [] self.n_out = [] self.gradient_idx = None # options if options is not None: if "gradient_enhanced" not in options.keys(): options["gradient_enhanced"] = False if "fn_results" not in options.keys(): options["fn_results"] = None if "matlab_model" not in options.keys(): options["matlab_model"] = False if "backend" not in options.keys(): options["backend"] = "omp" self.gradient = options["gradient_enhanced"] self.fn_results = options["fn_results"] self.matlab_model = options["matlab_model"] self.backend = options["backend"] else: self.gradient = None self.fn_results = None self.matlab_model = False self.backend = "omp" self.solver = None self.settings = None self.verbose = True self.options = options
[docs] def init_gpc_matrix(self, gradient_idx=None): """ Sets self.gpc_matrix and self.gpc_matrix_gradient with given self.basis and self.grid Parameters ---------- gradient_idx : ndarray of int [gradient_results.shape[0]] Indices of grid points where the gradient in gradient_results is provided """ if self.gradient_idx is None or gradient_idx is not None: self.gradient_idx = gradient_idx self.gpc_matrix = self.create_gpc_matrix(b=self.basis.b, x=self.grid.coords_norm, gradient=False) self.n_grid.append(self.gpc_matrix.shape[0]) self.n_basis.append(self.gpc_matrix.shape[1]) self.gpc_matrix_coords_id = copy.deepcopy(self.grid.coords_id) self.gpc_matrix_b_id = copy.deepcopy(self.basis.b_id) if self.gradient and self.gradient_idx is not None: self.gpc_matrix_gradient = self.create_gpc_matrix(b=self.basis.b, x=self.grid.coords_norm, gradient=True) self.gpc_matrix_gradient = ten2mat(self.gpc_matrix_gradient) self.gpc_matrix_gradient_coords_id = copy.deepcopy(self.grid.coords_id) self.gpc_matrix_gradient_b_id = copy.deepcopy(self.basis.b_id)
[docs] def create_gpc_matrix(self, b, x, gradient=False, gradient_idx=None, weighted=False, verbose=False): """ Construct the gPC matrix or its derivative. Parameters ---------- b : list of BasisFunction object instances [n_basis][n_dim] Parameter wise basis function objects used in gPC (Basis.b) Multiplying all elements in a row at location xi = (x1, x2, ..., x_dim) yields the global basis function. x : ndarray of float [n_x x n_dim] Coordinates of x = (x1, x2, ..., x_dim) where the rows of the gPC matrix are evaluated (normalized [-1, 1]) gradient : bool, optional, default: False Determine gradient gPC matrix. gradient_idx : ndarray of int [gradient_results.shape[0]] Indices of grid points where the gradient in gradient_results is provided weighted : bool, optional, default: False Weight gPC matrix with (row 2-norm)^-1 verbose : bool, optional, default: False boolean value to determine if to print out the progress into the standard output Returns ------- gpc_matrix: ndarray of float [n_x x n_basis (x dim)] GPC matrix where the columns correspond to the basis functions and the rows the to the sample coordinates. If gradient_idx!=None, the gradient is returned at the specified sample coordinates point by point. """ # overwrite self.gradient_idx if it is provided if gradient_idx is not None: self.gradient_idx = gradient_idx iprint('Constructing gPC matrix...', verbose=verbose, tab=0) # Python backend if self.backend == "python": if not gradient: gpc_matrix = np.ones([x.shape[0], len(b)]) for i_basis in range(len(b)): for i_dim in range(self.problem.dim): gpc_matrix[:, i_basis] *= b[i_basis][i_dim](x[:, i_dim]) else: gpc_matrix = np.ones([len(self.gradient_idx), len(b), self.problem.dim]) for i_dim_gradient in range(self.problem.dim): for i_basis in range(len(b)): for i_dim in range(self.problem.dim): if i_dim == i_dim_gradient: derivative = True else: derivative = False gpc_matrix[:, i_basis, i_dim_gradient] *= b[i_basis][i_dim](x[self.gradient_idx, i_dim], derivative=derivative) # CPU backend (CPU single core) elif self.backend == "cpu": if not gradient: # the third dimension is important and should not be removed # otherwise the code could produce undefined behaviour gpc_matrix = np.empty([x.shape[0], len(b), 1]) create_gpc_matrix_cpu(x, self.basis.b_array, gpc_matrix) gpc_matrix = np.squeeze(gpc_matrix) else: gpc_matrix = np.empty([len(self.gradient_idx), len(b), self.problem.dim]) create_gpc_matrix_cpu(x[self.gradient_idx, :], self.basis.b_array_grad, gpc_matrix) # OpenMP backend (CPU multi core) elif self.backend == "omp": if not gradient: # the third dimension is important and should not be removed # otherwise the code could produce undefined behaviour gpc_matrix = np.empty([x.shape[0], len(b), 1]) create_gpc_matrix_omp(x, self.basis.b_array, gpc_matrix) gpc_matrix = np.squeeze(gpc_matrix) else: gpc_matrix = np.empty([len(self.gradient_idx), len(b), self.problem.dim]) create_gpc_matrix_omp(x[self.gradient_idx, :], self.basis.b_array_grad, gpc_matrix) # CUDA backend (GPU multi core) elif self.backend == "cuda": try: from .pygpc_extensions_cuda import create_gpc_matrix_cuda except (ImportError): raise NotImplementedError("The CUDA-extension is not installed. Use the build script to install.") else: if not gradient: # the third dimension is important and should not be removed # otherwise the code could produce undefined behaviour gpc_matrix = np.empty([x.shape[0], len(b), 1]) create_gpc_matrix_cuda(x, self.basis.b_array, gpc_matrix) gpc_matrix = np.squeeze(gpc_matrix) else: gpc_matrix = np.empty([len(self.gradient_idx), len(b), self.problem.dim]) create_gpc_matrix_cuda(x[self.gradient_idx, :], self.basis.b_array_grad, gpc_matrix) else: raise NotImplementedError if gpc_matrix.ndim == 1 and x.shape[0] == 1: gpc_matrix = gpc_matrix[np.newaxis, :] elif gpc_matrix.ndim == 1 and self.basis.n_basis == 1: gpc_matrix = gpc_matrix[:, np.newaxis] if weighted: w = np.diag(1/np.linalg.norm(gpc_matrix, axis=1)) gpc_matrix = np.matmul(w, gpc_matrix) return gpc_matrix
[docs] def get_loocv(self, coeffs, results, gradient_results=None, error_norm="relative"): """ Perform leave-one-out cross validation of gPC approximation and add error value to self.relative_error_loocv. The loocv error is calculated analytically after eq. (35) in [1] but omitting the "1 - " term, i.e. it corresponds to 1 - Q^2. relative_error_loocv = GPC.loocv(sim_results, coeffs) .. math:: \\epsilon_{LOOCV} = \\frac{\\frac{1}{N}\sum_{i=1}^N \\left( \\frac{y(\\xi_i) - \hat{y}(\\xi_i)}{1-h_i} \\right)^2}{\\frac{1}{N-1}\sum_{i=1}^N \\left( y(\\xi_i) - \\bar{y} \\right)^2} with .. math:: \\mathbf{h} = \mathrm{diag}(\\mathbf{\\Psi} (\\mathbf{\\Psi}^T \\mathbf{\\Psi})^{-1} \\mathbf{\\Psi}^T) Parameters ---------- coeffs: ndarray of float [n_basis x n_out] GPC coefficients results: ndarray of float [n_grid x n_out] Results from n_grid simulations with n_out output quantities error_norm: str, optional, default="relative" Decide if error is determined "relative" or "absolute" gradient_results : ndarray of float [n_grid x n_out x dim], optional, default: None Gradient of results in original parameter space (tensor) Returns ------- relative_error_loocv: float Relative mean error of leave one out cross validation Notes ----- .. [1] Blatman, G., & Sudret, B. (2010). An adaptive algorithm to build up sparse polynomial chaos expansions for stochastic finite element analysis. Probabilistic Engineering Mechanics, 25(2), 183-197. """ # if self.options["gradient_enhanced"]: # matrix = np.vstack((self.gpc_matrix, self.gpc_matrix_gradient)) # # # transform gradient of results in case of projection # if self.p_matrix is not None: # gradient_results = np.matmul(gradient_results, # self.p_matrix.transpose() * self.p_matrix_norm[np.newaxis, :]) # # results_complete = np.vstack((results, ten2mat(gradient_results))) # else: # matrix = self.gpc_matrix # results_complete = results # matrix = self.gpc_matrix # results_complete = results # Analytical error estimation in case of overdetermined systems # if matrix.shape[0] > 2*matrix.shape[1]: # determine Psi (Psi^T Psi)^-1 Psi^T # h = np.matmul(np.matmul(matrix, np.linalg.inv(np.matmul(matrix.transpose(), matrix))), matrix.transpose()) # # # determine loocv error # err = np.mean(((results_complete - np.matmul(matrix, coeffs)) / # (1 - np.diag(h))[:, np.newaxis]) ** 2, axis=0) # # if error_norm == "relative": # norm = np.var(results_complete, axis=0, ddof=1) # else: # norm = 1. # # # normalize # relative_error_loocv = np.mean(err / norm) # else: # perform manual loocv without gradient matrix = self.gpc_matrix results_complete = results n_loocv = 25 # define number of performed cross validations (max 100) n_loocv_points = np.min((results_complete.shape[0], n_loocv)) # make list of indices, which are randomly sampled loocv_point_idx = random.sample(list(range(results_complete.shape[0])), n_loocv_points) start = time.time() relative_error = np.zeros(n_loocv_points) for i in range(n_loocv_points): # get mask of eliminated row mask = np.arange(results_complete.shape[0]) != loocv_point_idx[i] # determine gpc coefficients (this takes a lot of time for large problems) coeffs_loo = self.solve(results=results_complete[mask, :], solver=self.options["solver"], matrix=matrix[mask, :], settings=self.options["settings"], verbose=False) sim_results_temp = results_complete[loocv_point_idx[i], :] if error_norm == "relative": norm = scipy.linalg.norm(sim_results_temp) else: norm = 1. relative_error[i] = scipy.linalg.norm(sim_results_temp - np.matmul(matrix[loocv_point_idx[i], :], coeffs_loo))\ / norm display_fancy_bar("LOOCV", int(i + 1), int(n_loocv_points)) # store result in relative_error_loocv relative_error_loocv = np.mean(relative_error) iprint("LOOCV computation time: {} sec".format(time.time() - start), tab=0, verbose=True) return relative_error_loocv
[docs] def validate(self, coeffs, results=None, gradient_results=None, qoi_idx=None): """ Validate gPC approximation using the ValidationSet object contained in the Problem object. Determines the normalized root mean square deviation between the gpc approximation and the original model. Skips this step if no validation set is present Parameters ---------- coeffs: ndarray of float [n_coeffs x n_out] GPC coefficients results: ndarray of float [n_grid x n_out] Results from n_grid simulations with n_out output quantities gradient_results : ndarray of float [n_grid x n_out x dim], optional, default: None Gradient of results in original parameter space (tensor) qoi_idx : int, optional, default: None Index of QOI to validate (if None, all QOI are considered) Returns ------- error: float Estimated difference between gPC approximation and original model """ if qoi_idx is None: qoi_idx = np.arange(0, results.shape[1]) # Determine QOIs with NaN in results and exclude them from validation non_nan_mask = np.where(np.all(~np.isnan(results), axis=0))[0] n_nan = results.shape[1] - non_nan_mask.size if n_nan > 0: iprint("In {}/{} output quantities NaN's were found.".format(n_nan, results.shape[1]), tab=0, verbose=self.options["verbose"]) results = results[:, non_nan_mask] if gradient_results is not None: gradient_results = gradient_results[:, non_nan_mask, :] # always determine nrmsd if a validation set is present if isinstance(self.validation, ValidationSet): gpc_results = self.get_approximation(coeffs, self.validation.grid.coords_norm, output_idx=None) if gpc_results.ndim == 1: gpc_results = gpc_results[:, np.newaxis] if self.validation.results[:, qoi_idx].ndim == 1: validation_results = self.validation.results[:, qoi_idx][:, np.newaxis] else: validation_results = self.validation.results[:, qoi_idx] self.relative_error_nrmsd.append(float(np.mean(nrmsd(gpc_results, validation_results, error_norm=self.options["error_norm"], x_axis=False)))) if self.options["error_type"] == "nrmsd": self.error.append(self.relative_error_nrmsd[-1]) elif self.options["error_type"] == "loocv": self.relative_error_loocv.append(self.get_loocv(coeffs=coeffs, results=results, gradient_results=gradient_results, error_norm=self.options["error_norm"])) self.error.append(self.relative_error_loocv[-1]) elif self.options["error_type"] is None: self.error.append(None) return self.error[-1]
[docs] def get_pdf(self, coeffs, n_samples, output_idx=None, filter=True, return_samples=False): """ Determine the estimated pdfs of the output quantities pdf_x, pdf_y = SGPC.get_pdf(coeffs, n_samples, output_idx=None) Parameters ---------- coeffs: ndarray of float [n_coeffs x n_out] GPC coefficients n_samples: int Number of samples used to estimate output pdfs output_idx: ndarray, optional, default=None [1 x n_out] Index of output quantities to consider (if output_idx=None, all output quantities are considered) filter : bool, optional, default: True Use savgol_filter to smooth probability density return_samples : bool, optional, default: False Additionally returns in and output samples with which the pdfs were computed Returns ------- pdf_x: ndarray of float [100 x n_out] x-coordinates of output pdfs of output quantities pdf_y: ndarray of float [100 x n_out] y-coordinates of output pdfs (probability density of output quantity) samples_in : ndarray of float [n_samples x dim] (optional) Input samples (if return_samples=True) samples_out : ndarray of float [n_samples x n_out] (optional) Output samples (if return_samples=True) """ # handle (N,) arrays if len(coeffs.shape) == 1: coeffs = coeffs[:, np.newaxis] # if output index array is not provided, determine pdfs of all outputs if output_idx is None: output_idx = np.arange(0, coeffs.shape[1]) output_idx = output_idx[np.newaxis, :] n_out = len(output_idx) # sample gPC expansion samples_in, samples_out = self.get_samples(n_samples=n_samples, coeffs=coeffs, output_idx=output_idx) # determine kernel density estimates using Gaussian kernel pdf_x = np.zeros([100, n_out]) pdf_y = np.zeros([100, n_out]) for i_out in range(n_out): pdf_y[:, i_out], tmp = np.histogram(samples_out[:, i_out], bins=100, density=True) pdf_x[:, i_out] = (tmp[1:] + tmp[0:-1]) / 2. if filter: pdf_y[:, i_out] = savgol_filter(pdf_y[:, i_out], 51, 5) # kde = scipy.stats.gaussian_kde(samples_out[:, i_out], bw_method=0.1 / samples_out[:, i_out].std(ddof=1)) # pdf_y[:, i_out] = kde(pdf_x[:, i_out]) # pdf_x[:, i_out] = np.linspace(samples_out[:, i_out].min(), samples_out[:, i_out].max(), 100) if return_samples: return pdf_x, pdf_y, samples_in, samples_out else: return pdf_x, pdf_y
[docs] def get_samples(self, coeffs, n_samples, output_idx=None): """ Randomly sample gPC expansion. x, pce = SGPC.get_pdf_mc(n_samples, coeffs, output_idx=None) Parameters ---------- coeffs: ndarray of float [n_basis x n_out] GPC coefficients n_samples: int Number of random samples drawn from the respective input pdfs. output_idx: ndarray of int [1 x n_out] optional, default=None Index of output quantities to consider. Returns ------- x: ndarray of float [n_samples x dim] Generated samples in normalized coordinates [-1, 1]. pce: ndarray of float [n_samples x n_out] GPC approximation at points x. """ # seed the random numbers generator np.random.seed() if self.p_matrix is not None: problem = self.problem_original else: problem = self.problem # generate temporary grid with random samples for each random input variable [n_samples x dim] grid = Random(parameters_random=problem.parameters_random, n_grid=n_samples) # if output index list is not provided, sample all gpc outputs if output_idx is None: n_out = 1 if coeffs.ndim == 1 else coeffs.shape[1] output_idx = np.arange(n_out) # output_idx = output_idx[np.newaxis, :] pce = self.get_approximation(coeffs=coeffs, x=grid.coords_norm, output_idx=output_idx) return grid.coords_norm, pce
[docs] def get_approximation(self, coeffs, x, output_idx=None): """ Calculates the gPC approximation in points with output_idx and normalized parameters xi (interval: [-1, 1]). pce = GPC.get_approximation(coeffs, x, output_idx=None) Parameters ---------- coeffs: ndarray of float [n_basis x n_out] GPC coefficients for each output variable x: ndarray of float [n_x x n_dim] Coordinates of x = (x1, x2, ..., x_dim) where the rows of the gPC matrix are evaluated (normalized [-1, 1]). The coordinates will be transformed in case of projected gPC. output_idx: ndarray of int, optional, default=None [n_out] Indices of output quantities to consider (Default: all). Returns ------- pce: ndarray of float [n_x x n_out] GPC approximation at normalized coordinates x. """ if len(x.shape) == 1: x = x[:, np.newaxis] # crop coordinates to gPC boundaries (values outside do not yield meaningful values) for i_dim, key in enumerate(list(self.problem.parameters_random.keys())): xmin = self.problem.parameters_random[key].pdf_limits_norm[0] xmax = self.problem.parameters_random[key].pdf_limits_norm[1] x[x[:, i_dim] < xmin, i_dim] = xmin x[x[:, i_dim] > xmax, i_dim] = xmax if output_idx is not None: # convert to 1d array output_idx = np.asarray(output_idx).flatten().astype(int) # crop coeffs array if output index is specified coeffs = coeffs[:, output_idx] if coeffs.ndim == 1: coeffs = coeffs[:, np.newaxis] # transform variables from xi to eta space if gpc model is reduced if self.p_matrix is not None: x = np.matmul(x, self.p_matrix.transpose() / self.p_matrix_norm[np.newaxis, :]) if self.backend == 'python' or self.backend == 'cpu' or self.backend == 'omp': # determine gPC matrix at coordinates x gpc_matrix = self.create_gpc_matrix(self.basis.b, x, gradient=False) # multiply with gPC coeffs pce = np.matmul(gpc_matrix, coeffs) elif self.backend == "cuda": try: from .pygpc_extensions_cuda import get_approximation_cuda except ImportError: raise NotImplementedError("The CUDA-extension is not installed. Use the build script to install.") else: pce = np.empty([x.shape[0], coeffs.shape[1]]) get_approximation_cuda(x, self.basis.b_array, coeffs, pce) else: raise NotImplementedError return pce
[docs] def replace_gpc_matrix_samples(self, idx, seed=None): """ Replace distinct sample points from the gPC matrix with new ones. GPC.replace_gpc_matrix_samples(idx, seed=None) Parameters ---------- idx: ndarray of int [n_samples] Array of grid indices of grid.coords[idx, :] which are going to be replaced (rows of gPC matrix will be replaced by new ones) seed: float, optional, default=None Random seeding point """ # Generate new grid points new_grid_points = Random(parameters_random=self.problem.parameters_random, n_grid=idx.size, seed=seed) # replace old grid points self.grid.coords[idx, :] = new_grid_points.coords self.grid.coords_norm[idx, :] = new_grid_points.coords_norm # replace old IDs of grid points with new ones for i in idx: self.grid.coords_id[i] = uuid.uuid4() self.gpc_matrix_coords_id[i] = copy.deepcopy(self.grid.coords_id[i]) # determine new rows of gpc matrix and overwrite rows of gpc matrix self.gpc_matrix[idx, :] = self.create_gpc_matrix(b=self.basis.b, x=new_grid_points.coords_norm, gradient=False)
[docs] def update_gpc_matrix(self): """ Update gPC matrix and gPC matrix gradient according to existing self.grid and self.basis. Call this method when self.gpc_matrix does not fit to self.grid and self.basis objects anymore The old gPC matrix with their self.gpc_matrix_b_id and self.gpc_matrix_coords_id is compared to self.basis.b_id and self.grid.coords_id. New rows and columns are computed if differences are found. """ self._update_gpc_matrix(gradient=False) if self.gradient: self._update_gpc_matrix(gradient=True)
def _update_gpc_matrix(self, gradient=False): """ Update gPC matrix or gPC gradient matrix """ if self.backend == "python": # initialize updated matrix and variables if gradient: # reshape gpc gradient matrix from 2D to 3D representation [n_grid x n_basis x n_dim] matrix = mat2ten(mat=self.gpc_matrix_gradient, incr=self.problem.dim) matrix_updated = np.zeros((len(self.gradient_idx), len(self.basis.b_id), self.problem.dim)) coords_id = self.gpc_matrix_gradient_coords_id[self.gradient_idx] coords_id_ref = self.grid.coords_gradient_id[self.gradient_idx] b_id = self.gpc_matrix_gradient_b_id b_id_ref = self.basis.b_id coords_norm = self.grid.coords_norm[self.gradient_idx] ge_str = "(gradient)" else: matrix = self.gpc_matrix matrix_updated = np.zeros((len(self.grid.coords_id), len(self.basis.b_id))) coords_id = self.gpc_matrix_coords_id coords_id_ref = self.grid.coords_id b_id = self.gpc_matrix_b_id b_id_ref = self.basis.b_id coords_norm = self.grid.coords_norm ge_str = "" # # determine indices of new basis functions and grid_points # idx_coords_new = [i for i, _id in enumerate(self.grid.coords_id) if _id not in self.gpc_matrix_coords_id] # idx_basis_new = [i for i, _id in enumerate(self.basis.b_id) if _id not in self.gpc_matrix_b_id] # determine indices of old grid points in updated gpc matrix idx_coords_old = np.empty(len(coords_id)) * np.nan for i, coords_id_old in enumerate(coords_id): for j, coords_id_new in enumerate(coords_id_ref): if coords_id_old == coords_id_new: idx_coords_old[i] = j break # determine indices of old basis functions in updated gpc matrix idx_b_old = np.empty(len(b_id))*np.nan for i, b_id_old in enumerate(b_id): for j, b_id_new in enumerate(b_id_ref): if b_id_old == b_id_new: idx_b_old[i] = j break # filter out non-existent rows and columns matrix = matrix[~np.isnan(idx_coords_old), :, ] matrix = matrix[:, ~np.isnan(idx_b_old), ] idx_coords_old = idx_coords_old[~np.isnan(idx_coords_old)].astype(int) idx_b_old = idx_b_old[~np.isnan(idx_b_old)].astype(int) # indices of new coords and basis in updated gpc matrix (values have to be computed there) idx_coords_new = np.array(list(set(np.arange(len(coords_id_ref))) - set(idx_coords_old))).astype(int) idx_b_new = np.array(list(set(np.arange(len(b_id_ref))) - set(idx_b_old))).astype(int) # write old results at correct location in updated gpc matrix idx = get_cartesian_product([idx_coords_old, idx_b_old]).astype(int) idx_row = np.reshape(idx[:, 0], matrix.shape[:2]).astype(int) idx_col = np.reshape(idx[:, 1], matrix.shape[:2]).astype(int) matrix_updated[idx_row, idx_col, ] = matrix # determine new columns (new basis functions) with old grid idx = get_cartesian_product([idx_coords_old, idx_b_new]).astype(int) if idx.any(): iprint('Adding {} columns to gPC matrix {}...'.format(idx_b_new.size, ge_str), tab=0, verbose=True) idx_row = np.reshape(idx[:, 0], (idx_coords_old.size, idx_b_new.size)).astype(int) idx_col = np.reshape(idx[:, 1], (idx_coords_old.size, idx_b_new.size)).astype(int) matrix_updated[idx_row, idx_col, ] = self.create_gpc_matrix(b=[self.basis.b[i] for i in idx_b_new], x=coords_norm[idx_coords_old, :], gradient=gradient, verbose=False) # determine new rows (new grid points) with all basis functions idx = get_cartesian_product([idx_coords_new, np.arange(len(self.basis.b))]).astype(int) if idx.any(): iprint('Adding {} rows to gPC matrix {}...'.format(idx_coords_new.size, ge_str), tab=0, verbose=True) idx_row = np.reshape(idx[:, 0], (idx_coords_new.size, len(self.basis.b))).astype(int) idx_col = np.reshape(idx[:, 1], (idx_coords_new.size, len(self.basis.b))).astype(int) matrix_updated[idx_row, idx_col, ] = self.create_gpc_matrix(b=self.basis.b, x=coords_norm[idx_coords_new, :], gradient=gradient, verbose=False) # overwrite old attributes and append new sizes if gradient: # reshape from 3D to 2D self.gpc_matrix_gradient = ten2mat(matrix_updated) self.gpc_matrix_gradient_coords_id = copy.deepcopy(self.grid.coords_id) self.gpc_matrix_gradient_b_id = copy.deepcopy(self.basis.b_id) else: self.gpc_matrix = matrix_updated self.gpc_matrix_coords_id = copy.deepcopy(self.grid.coords_id) self.gpc_matrix_b_id = copy.deepcopy(self.basis.b_id) self.n_grid.append(self.gpc_matrix.shape[0]) self.n_basis.append(self.gpc_matrix.shape[1]) else: self.init_gpc_matrix()
[docs] def save_gpc_matrix_hdf5(self, hdf5_path_gpc_matrix=None, hdf5_path_gpc_matrix_gradient=None): """ Save gPC matrix and gPC gradient matrix in .hdf5 file <"fn_results" + ".hdf5"> under the key "gpc_matrix" and "gpc_matrix_gradient". If matrices are already present, check for equality and save only appended rows and columns. Parameters ---------- hdf5_path_gpc_matrix : str Path in .hdf5 file, where the gPC matrix is saved in hdf5_path_gpc_matrix_gradient : str Path in .hdf5 file, where the gPC gradient matrix is saved in """ if hdf5_path_gpc_matrix is None: hdf5_path_gpc_matrix = "gpc_matrix" if hdf5_path_gpc_matrix_gradient is None: hdf5_path_gpc_matrix_gradient = "gpc_matrix_gradient" with h5py.File(self.fn_results + ".hdf5", "a") as f: try: # write gpc matrix gpc_matrix_hdf5 = f[hdf5_path_gpc_matrix][:] n_rows_hdf5 = gpc_matrix_hdf5.shape[0] n_cols_hdf5 = gpc_matrix_hdf5.shape[1] n_rows_current = self.gpc_matrix.shape[0] n_cols_current = self.gpc_matrix.shape[1] # save only new rows and cols if current matrix > saved matrix if n_rows_current >= n_rows_hdf5 and n_cols_current >= n_cols_hdf5 and \ (self.gpc_matrix[0:n_rows_hdf5, 0:n_cols_hdf5] == gpc_matrix_hdf5).all(): # resize dataset and save new columns and rows f[hdf5_path_gpc_matrix].resize(self.gpc_matrix.shape[1], axis=1) f[hdf5_path_gpc_matrix][:, n_cols_hdf5:] = self.gpc_matrix[0:n_rows_hdf5, n_cols_hdf5:] f[hdf5_path_gpc_matrix].resize(self.gpc_matrix.shape[0], axis=0) f[hdf5_path_gpc_matrix][n_rows_hdf5:, :] = self.gpc_matrix[n_rows_hdf5:, :] else: del f[hdf5_path_gpc_matrix] f.create_dataset(hdf5_path_gpc_matrix, (self.gpc_matrix.shape[0], self.gpc_matrix.shape[1]), maxshape=(None, None), dtype="float64", data=self.gpc_matrix) # write gpc gradient matrix if available if self.gpc_matrix_gradient is not None: gpc_matrix_gradient_hdf5 = f[hdf5_path_gpc_matrix_gradient][:] n_rows_hdf5 = gpc_matrix_gradient_hdf5.shape[0] n_cols_hdf5 = gpc_matrix_gradient_hdf5.shape[1] n_rows_current = self.gpc_matrix_gradient.shape[0] n_cols_current = self.gpc_matrix_gradient.shape[1] # save only new rows and cols if current matrix > saved matrix if n_rows_current >= n_rows_hdf5 and n_cols_current >= n_cols_hdf5 and \ (self.gpc_matrix_gradient[0:n_rows_hdf5, 0:n_cols_hdf5] == gpc_matrix_gradient_hdf5).all(): # resize dataset and save new columns and rows f[hdf5_path_gpc_matrix_gradient].resize(self.gpc_matrix_gradient.shape[1], axis=1) f[hdf5_path_gpc_matrix_gradient][:, n_cols_hdf5:] = self.gpc_matrix_gradient[0:n_rows_hdf5, n_cols_hdf5:] f[hdf5_path_gpc_matrix_gradient].resize(self.gpc_matrix_gradient.shape[0], axis=0) f[hdf5_path_gpc_matrix_gradient][n_rows_hdf5:, :] = self.gpc_matrix_gradient[n_rows_hdf5:, :] else: del f[hdf5_path_gpc_matrix_gradient] f.create_dataset(hdf5_path_gpc_matrix_gradient, (self.gpc_matrix_gradient.shape[0], self.gpc_matrix_gradient.shape[1]), maxshape=(None, None), dtype="float64", data=self.gpc_matrix_gradient) except KeyError: # save whole matrix if not existent f.create_dataset(hdf5_path_gpc_matrix, (self.gpc_matrix.shape[0], self.gpc_matrix.shape[1]), maxshape=(None, None), dtype="float64", data=self.gpc_matrix) # save whole gradient matrix if not existent and available if self.gpc_matrix_gradient is not None: f.create_dataset(hdf5_path_gpc_matrix_gradient, (self.gpc_matrix_gradient.shape[0], self.gpc_matrix_gradient.shape[1]), maxshape=(None, None), dtype="float64", data=self.gpc_matrix_gradient)
[docs] def solve(self, results, gradient_results=None, solver=None, settings=None, matrix=None, verbose=False): """ Determines gPC coefficients Parameters ---------- results : [n_grid x n_out] np.ndarray of float Results from simulations with N_out output quantities gradient_results : ndarray of float [n_gradient x n_out x dim], optional, default: None Gradient of results in original parameter space in specific grid points solver : str Solver to determine the gPC coefficients - 'Moore-Penrose' ... Pseudoinverse of gPC matrix (SGPC.Reg, EGPC) - 'OMP' ... Orthogonal Matching Pursuit, sparse recovery approach (SGPC.Reg, EGPC) - 'LarsLasso' ... Least-Angle Regression using Lasso model (SGPC.Reg, EGPC) - 'NumInt' ... Numerical integration, spectral projection (SGPC.Quad) settings : dict Solver settings - 'Moore-Penrose' ... None - 'OMP' ... {"n_coeffs_sparse": int} Number of gPC coefficients != 0 or "sparsity": float 0...1 - 'LarsLasso' ... {"alpha": float 0...1} Regularization parameter - 'NumInt' ... None matrix : ndarray of float, optional, default: self.gpc_matrix or [self.gpc_matrix, self.gpc_matrix_gradient] Matrix to invert. Depending on gradient_enhanced option, this matrix consist of the standard gPC matrix and their derivatives. verbose : bool boolean value to determine if to print out the progress into the standard output Returns ------- coeffs: ndarray of float [n_coeffs x n_out] gPC coefficients """ ge_str = "" if matrix is None: matrix = self.gpc_matrix if self.gradient is False: matrix = self.gpc_matrix ge_str = "" else: if not solver == 'NumInt': if self.gpc_matrix_gradient is not None: matrix = np.vstack((self.gpc_matrix, self.gpc_matrix_gradient)) else: matrix = self.gpc_matrix ge_str = "(gradient enhanced)" else: Warning("Gradient enhanced version not applicable in case of numerical integration (quadrature).") # use default solver if not specified if solver is None: solver = self.solver # use default solver settings if not specified if solver is None: settings = self.settings iprint("Determine gPC coefficients using '{}' solver {}...".format(solver, ge_str), tab=0, verbose=verbose) # construct results array if not solver == 'NumInt' and gradient_results is not None: # transform gradient of results according to projection if self.p_matrix is not None: gradient_results = np.matmul(gradient_results, self.p_matrix.transpose() * self.p_matrix_norm[np.newaxis, :]) results_complete = np.vstack((results, ten2mat(gradient_results))) else: results_complete = results self.coherence_matrix = matrix if isinstance(self.grid, CO) or (isinstance(self.grid, L1) and not (["D"] in self.grid.criterion)): w = np.diag(1/np.linalg.norm(matrix, axis=1)) matrix = np.matmul(w, matrix) results_complete = np.matmul(w, results_complete) ################# # Moore-Penrose # ################# if solver == 'Moore-Penrose': # determine pseudoinverse of gPC matrix self.matrix_inv = np.linalg.pinv(matrix) try: coeffs = np.matmul(self.matrix_inv, results_complete) except ValueError: raise AttributeError("Please check format of parameter sim_results: [n_grid (* dim) x n_out] " "np.ndarray.") ############################### # Orthogonal Matching Pursuit # ############################### elif solver == 'OMP': # transform gPC matrix to fastmat format matrix_fm = fm.Matrix(matrix) if results_complete.ndim == 1: results_complete = results_complete[:, np.newaxis] # determine gPC-coefficients of extended basis using OMP if "n_coeffs_sparse" in settings.keys(): n_coeffs_sparse = int(settings["n_coeffs_sparse"]) elif "sparsity" in settings.keys(): n_coeffs_sparse = int(np.ceil(matrix.shape[1]*settings["sparsity"])) else: raise AttributeError("Please specify 'n_coeffs_sparse' or 'sparsity' in solver settings dictionary!") coeffs = fm.algs.OMP(matrix_fm, results_complete, n_coeffs_sparse) ################################ # Least-Angle Regression Lasso # ################################ elif solver == 'LarsLasso': if results_complete.ndim == 1: results_complete = results_complete[:, np.newaxis] # determine gPC-coefficients of extended basis using LarsLasso # from sklearn.preprocessing import normalize # matrix_norm = normalize(matrix, axis=0, return_norm=False) # # reg = linear_model.LassoLars(alpha=settings["alpha"], fit_intercept=False, normalize=False) # reg.fit(matrix_norm, results_complete) # coeffs = reg.coef_ reg = linear_model.LassoLars(alpha=settings["alpha"], fit_intercept=False, normalize=False) reg.fit(matrix, results_complete) coeffs = reg.coef_ if coeffs.ndim == 1: coeffs = coeffs[:, np.newaxis] else: coeffs = coeffs.transpose() ######################### # Numerical Integration # ######################### elif solver == 'NumInt': # check if quadrature rule (grid) fits to the probability density distribution (pdf) grid_pdf_fit = True for i_p, p in enumerate(self.problem.parameters_random): if self.problem.parameters_random[p].pdf_type == 'beta': if not (self.grid.grid_type[i_p] == 'jacobi'): grid_pdf_fit = False break elif self.problem.parameters_random[p].pdf_type in ['norm', 'normal']: if not (self.grid.grid_type[i_p] == 'hermite'): grid_pdf_fit = False break # if not, calculate joint pdf if not grid_pdf_fit: joint_pdf = np.ones(self.grid.coords_norm.shape) for i_p, p in enumerate(self.problem.parameters_random): joint_pdf[:, i_p] = \ self.problem.parameters_random[p].pdf_norm(x=self.grid.coords_norm[:, i_p]) joint_pdf = np.array([np.prod(joint_pdf, axis=1)]).transpose() # weight sim_results with the joint pdf results_complete = results_complete * joint_pdf * 2 ** self.problem.dim # scale rows of gpc matrix with quadrature weights matrix_weighted = np.matmul(np.diag(self.grid.weights), matrix) # determine gpc coefficients [n_coeffs x n_output] coeffs = np.matmul(results_complete.transpose(), matrix_weighted).transpose() else: raise AttributeError("Unknown solver: '{}'!") return coeffs
[docs] def create_validation_set(self, n_samples, n_cpu=1): """ Creates a ValidationSet instance (calls the model) Parameters ---------- n_samples: int Number of sampling points contained in the validation set n_cpu: int Number of parallel function evaluations to evaluate validation set (n_cpu=0 assumes that the model is capable to evaluate all grid points in parallel) """ # create set of validation points n_samples = n_samples if self.problem_original is not None: problem = self.problem_original else: problem = self.problem grid = Random(parameters_random=problem.parameters_random, n_grid=n_samples, options={"seed": self.options["seed"]}) # Evaluate original model at grid points com = Computation(n_cpu=n_cpu, matlab_model=self.matlab_model) results = com.run(model=problem.model, problem=problem, coords=grid.coords) if results.ndim == 1: results = results[:, np.newaxis] self.validation = ValidationSet(grid=grid, results=results)