Source code for pygpc.Basis

import os
import uuid
import time
import numpy as np
import multiprocessing
import multiprocessing.pool
import matplotlib.pyplot as plt
from _functools import partial
from .misc import get_multi_indices
# from mpl_toolkits.mplot3d import Axes3D
from .BasisFunction import *


[docs] class Basis: """ Basis class of gPC Attributes ---------- b : list of list of BasisFunction object instances [n_basis][n_dim] Parameter wise basis function objects used in gPC. Multiplying all elements in a row at location xi = (x1, x2, ..., x_dim) yields the global basis function. b_array : ndarray of float [n_poly_coeffs] Polynomial coefficients of basis functions b_id : list of UUID objects (version 4) [n_basis] Unique IDs of global basis functions b_norm : ndarray of float [n_basis x dim] Normalization factor of individual basis functions b_norm_basis : ndarray of float [n_basis x 1] Normalization factor of global basis functions dim : int Number of variables n_basis : int Total number of (global) basis function multi_indices: ndarray [n_basis x dim] Multi-indices of polynomial basis functions """ def __init__(self): """ Constructor; initializes the Basis class """ self.b = None self.b_array = None self.b_array_grad = None self.b_id = None self.b_norm = None self.b_norm_basis = None self.dim = None self.n_basis = 0 self.multi_indices = None
[docs] def set_basis(self, i_basis, problem): """ Worker function to initialize a global basis function (called by multiprocessing.pool). It also initializes polynomial basis coefficients for fast processing. Converts list of lists of basis into np.ndarray that can be processed on multi core systems. Parameters ---------- i_basis : int Index of global basis function problem : Problem class instance gPC problem Returns ------- b_ : list [n_dim] List containing the individual basis functions of the parameters b_a_ : ndarray of int Concatenated list of polynomial basis coefficients b_a_grad_ : ndarray of int Concatenated list of polynomial basis coefficients for gradient evaluation """ b_ = [0 for _ in range(problem.dim)] b_a_ = [] b_a_grad_ = [] for i_dim, p in enumerate(problem.parameters_random): # OrderedDict of RandomParameter objects b_[i_dim] = problem.parameters_random[p].init_basis_function(order=self.multi_indices[i_basis, i_dim]) for i_dim in range(problem.dim): for i_dim_inner in range(problem.dim): if i_dim == 0: b_a_ = b_a_ + [np.array([b_[i_dim_inner].fun.order]), b_[i_dim_inner].fun.c] if i_dim == i_dim_inner: b_a_grad_ = b_a_grad_ + [np.array([b_[i_dim_inner].fun.deriv().order]), b_[i_dim_inner].fun.deriv().c] else: b_a_grad_ = b_a_grad_ + [np.array([b_[i_dim_inner].fun.order]), b_[i_dim_inner].fun.c] b_a_ = np.concatenate(b_a_) b_a_grad_ = np.concatenate(b_a_grad_) return b_, b_a_, b_a_grad_
[docs] def init_basis_sgpc(self, problem, order, order_max, order_max_norm, interaction_order, interaction_order_current=None): """ Initializes basis functions for standard gPC. Parameters ---------- problem : Problem object GPC Problem to analyze order : list of int [dim] Maximum individual expansion order Generates individual polynomials also if maximum expansion order in order_max is exceeded order_max : int Maximum global expansion order. The maximum expansion order considers the sum of the orders of combined polynomials together with the chosen norm "order_max_norm". Typically this norm is 1 such that the maximum order is the sum of all monomial orders. order_max_norm : float Norm for which the maximum global expansion order is defined [0, 1]. Values < 1 decrease the total number of polynomials in the expansion such that interaction terms are penalized more. This truncation scheme is also referred to "hyperbolic polynomial chaos expansion" such that sum(a_i^q)^1/q <= p, where p is order_max and q is order_max_norm (for more details see eq. (27) in [1]). interaction_order : int Number of random variables, which can interact with each other interaction_order_current : int, optional, default: interaction_order Number of random variables currently interacting with respect to the highest order. (interaction_order_current <= interaction_order) The parameters for lower orders are all interacting with "interaction order". Notes ----- .. [1] Blatman, G., & Sudret, B. (2011). Adaptive sparse polynomial chaos expansion based on least angle regression. Journal of Computational Physics, 230(6), 2345-2367. .. math:: \\begin{tabular}{l*{4}{c}} Polynomial Index & Dimension 1 & Dimension 2 & ... & Dimension M \\\\ \\hline Basis 1 & [Order D1] & [Order D2] & \\vdots & [Order M] \\\\ Basis 2 & [Order D1] & [Order D2] & \\vdots & [Order M] \\\\ \\vdots & [Order D1] & [Order D2] & \\vdots & [Order M] \\\\ Basis N & [Order D1] & [Order D2] & \\vdots & [Order M] \\\\ \\end{tabular} Adds Attributes: b: list of BasisFunction object instances [n_basis x n_dim] Parameter wise basis function objects used in gPC. Multiplying all elements in a row at location xi = (x1, x2, ..., x_dim) yields the global basis function. """ self.dim = problem.dim assert self.dim == len(order), "gPC order does not fit to number of random variables" if self.dim == 1: self.multi_indices = np.linspace(0, order_max, order_max + 1, dtype=int)[:, np.newaxis] else: self.multi_indices = get_multi_indices(order=order, order_max=order_max, order_max_norm=order_max_norm, interaction_order=interaction_order, interaction_order_current=interaction_order_current) # get total number of basis functions self.n_basis = self.multi_indices.shape[0] # construct 2D list with BasisFunction objects and array with coefficients and # initialize array of basis coefficients workhorse_partial = partial(self.set_basis, problem=problem) with multiprocessing.Pool(multiprocessing.cpu_count()) as pool: out = pool.map(workhorse_partial, range(self.n_basis)) self.b = [o[0] for o in out] self.b_array = np.concatenate([o[1] for o in out]) self.b_array_grad = np.concatenate([o[2] for o in out]) # This is the single core implementation: # self.b = [[0 for _ in range(self.dim)] for _ in range(self.n_basis)] # # for i_basis in range(self.n_basis): # for i_dim, p in enumerate(problem.parameters_random): # OrderedDict of RandomParameter objects # self.b[i_basis][i_dim] = problem.parameters_random[p].init_basis_function( # order=self.multi_indices[i_basis, i_dim]) # Generate unique IDs of basis functions self.b_id = [uuid.uuid4() for _ in range(self.n_basis)] # initialize normalization factor (self.b_norm and self.b_norm_basis) self.init_basis_norm()
[docs] def init_basis_norm(self): """ Construct array of scaling factors self.b_norm [n_basis x dim] and self.b_norm_basis [n_basis x 1] to normalize basis functions <psi^2> = int(psi^2*p)dx """ # read individual normalization factors from function objects self.b_norm = np.array([list(map(lambda x:x.fun_norm, _b)) for _b in self.b]) # determine global normalization factor of basis function self.b_norm_basis = np.prod(self.b_norm, axis=1)
[docs] def set_basis_poly(self, order, order_max, order_max_norm, interaction_order, interaction_order_current, problem): """ Sets up polynomial basis self.b for given order, order_max_norm and interaction order. Adds only the basis functions, which are not yet included. Parameters ---------- order : list of int Maximum individual expansion order Generates individual polynomials also if maximum expansion order in order_max is exceeded order_max : int Maximum global expansion order. The maximum expansion order considers the sum of the orders of combined polynomials together with the chosen norm "order_max_norm". Typically this norm is 1 such that the maximum order is the sum of all monomial orders. order_max_norm : float Norm for which the maximum global expansion order is defined [0, 1]. Values < 1 decrease the total number of polynomials in the expansion such that interaction terms are penalized more. sum(a_i^q)^1/q <= p, where p is order_max and q is order_max_norm (for more details see eq (11) in [1]). interaction_order : int Number of random variables, which can interact with each other All polynomials are ignored, which have an interaction order greater than specified interaction_order_current : int, optional, default: interaction_order Number of random variables currently interacting with respect to the highest order. (interaction_order_current <= interaction_order) The parameters for lower orders are all interacting with interaction_order. problem : Problem class instance GPC Problem to analyze """ b_added = None dim = len(order) # determine new possible set of basis functions for next main iteration multi_indices_all_new = get_multi_indices(order=order, order_max=order_max, order_max_norm=order_max_norm, interaction_order=interaction_order, interaction_order_current=interaction_order_current) # delete multi-indices, which are already present if self.b is not None: multi_indices_all_current = np.array([list(map(lambda x: x.p["i"], _b)) for _b in self.b]) idx_old = np.hstack([np.where((multi_indices_all_current[i, :] == multi_indices_all_new).all(axis=1)) for i in range(multi_indices_all_current.shape[0])]) multi_indices_all_new = np.delete(multi_indices_all_new, idx_old, axis=0) if multi_indices_all_new.any(): # construct 2D list with new BasisFunction objects b_added = [[0 for _ in range(dim)] for _ in range(multi_indices_all_new.shape[0])] for i_basis in range(multi_indices_all_new.shape[0]): for i_p, p in enumerate(problem.parameters_random): b_added[i_basis][i_p] = problem.parameters_random[p].init_basis_function( order=multi_indices_all_new[i_basis, i_p]) # extend basis self.extend_basis(b_added) return b_added
[docs] def add_basis_poly_by_order(self, multi_indices, problem): """ Adds polynomial basis self.b for given order, order_max_norm and interaction order. Adds only the basis functions, which are not yet included. Parameters ---------- multi_indices : ndarray of int [n_basis_new, dim] Array containing the orders of the polynomials to be added to the basis problem : Problem class instance GPC Problem to analyze """ # delete multi-indices, which are already present if self.b is not None: multi_indices_all_current = np.array([list(map(lambda x: x.p["i"], _b)) for _b in self.b]) idx_old = np.hstack([np.where((multi_indices_all_current[i, :] == multi_indices).all(axis=1)) for i in range(multi_indices_all_current.shape[0])]) multi_indices = np.delete(multi_indices, idx_old, axis=0) if multi_indices.any(): # construct 2D list with new BasisFunction objects b_added = [[0 for _ in range(problem.dim)] for _ in range(multi_indices.shape[0])] for i_basis in range(multi_indices.shape[0]): for i_p, p in enumerate(problem.parameters_random): b_added[i_basis][i_p] = problem.parameters_random[p].init_basis_function( order=multi_indices[i_basis, i_p]) # extend basis self.extend_basis(b_added) return b_added
[docs] def extend_basis(self, b_added): """ Extend set of basis functions. Skips basis functions, which are already present in self.b. Parameters ---------- b_added: list of list of BasisFunction instances [n_b_added][dim] Individual BasisFunctions to add """ if self.b is None: self.b = [] if self.b_id is None: self.b_id = [] # add b_added to b (check for duplicates) and generate IDs for i_row, _b in enumerate(b_added): if _b not in self.b: self.b.append(_b) self.b_id.append(uuid.uuid4()) self.multi_indices = np.vstack((self.multi_indices, np.array([_b[i_dim].p["i"] for i_dim in range(self.dim)]))) # update size self.n_basis = len(self.b) # update normalization factors self.init_basis_norm() # extend array of basis coefficients self.extend_basis_array(b_added)
[docs] def init_basis_array(self): """ Initialize polynomial basis coefficients for fast processing. Converts list of lists of self.b into np.ndarray that can be processed on multi core systems. """ _b_array = [] _b_array_grad = [] for i_basis in range(self.n_basis): for i_dim_outer in range(self.dim): for i_dim_inner in range(self.dim): if i_dim_outer == 0: _b_array = _b_array + [np.array([self.b[i_basis][i_dim_inner].fun.order]), self.b[i_basis][i_dim_inner].fun.c] if i_dim_outer == i_dim_inner: _b_array_grad = _b_array_grad + [np.array([self.b[i_basis][i_dim_inner].fun.deriv().order]), self.b[i_basis][i_dim_inner].fun.deriv().c] else: _b_array_grad = _b_array_grad + [np.array([self.b[i_basis][i_dim_inner].fun.order]), self.b[i_basis][i_dim_inner].fun.c] self.b_array = np.concatenate(_b_array) self.b_array_grad = np.concatenate(_b_array_grad)
[docs] def extend_basis_array(self, b_added): """ Extends polynomial basis coefficients for fast processing. Converts list of lists of b_added into np.ndarray that can be processed on multi core systems. Parameters ---------- b_added: list of list of BasisFunction instances [n_b_added][dim] Individual BasisFunctions to add """ _b_array = [] _b_array_grad = [] for i_basis in range(len(b_added)): for i_dim_outer in range(self.dim): for i_dim_inner in range(self.dim): if i_dim_outer == 0: _b_array = _b_array + [np.array([b_added[i_basis][i_dim_inner].fun.order]), b_added[i_basis][i_dim_inner].fun.c] if i_dim_outer == i_dim_inner: _b_array_grad = _b_array_grad + [np.array([b_added[i_basis][i_dim_inner].fun.deriv().order]), b_added[i_basis][i_dim_inner].fun.deriv().c] else: _b_array_grad = _b_array_grad + [np.array([b_added[i_basis][i_dim_inner].fun.order]), b_added[i_basis][i_dim_inner].fun.c] if self.b_array is not None: self.b_array = np.hstack((self.b_array, np.concatenate(_b_array))) else: self.b_array = np.concatenate(_b_array) if self.b_array_grad is not None: self.b_array_grad = np.hstack((self.b_array_grad, np.concatenate(_b_array_grad))) else: self.b_array_grad = np.concatenate(_b_array_grad)
[docs] def plot_basis(self, dims, fn_plot=None, dynamic_plot_update=False): """ Generate 2D or 3D cube-plot of basis functions. Parameters ---------- dims : list of int of length [2] or [3] Indices of parameters in gPC expansion to plot fn_plot : str, optional, default: None Filename of plot to save (with .png or .pdf extension) Returns ------- <File> : *.png and *.pdf file Plot of basis functions """ plt.rc('text', usetex=False) plt.rc('font', family='serif', size=14) multi_indices = np.array([list(map(lambda x: x.p["i"], _b)) for _b in self.b]) fig = plt.figure(figsize=[6, 6]) if len(dims) == 2: ax = fig.add_subplot(111) else: ax = fig.add_subplot(111, projection='3d') for i_poly in range(multi_indices.shape[0]): if len(dims) == 2: ax.scatter(multi_indices[i_poly, dims[0]], # lower corner coordinates multi_indices[i_poly, dims[1]], c=np.array([[51, 153, 255]]) / 255.0, # bar colour marker="s", s=450) # transparency of the bars ax.set_xlabel("$x_1$", fontsize=18) ax.set_ylabel("$x_2$", fontsize=18) ax.set_xlim([-1, np.max(multi_indices) + 1]) ax.set_ylim([-1, np.max(multi_indices) + 1]) ax.set_xticklabels(range(np.max(multi_indices) + 1)) ax.set_xticks(range(np.max(multi_indices) + 1)) ax.set_yticklabels(range(np.max(multi_indices) + 1)) ax.set_yticks(range(np.max(multi_indices) + 1)) ax.set_aspect('equal', 'box') else: ax.bar3d(multi_indices[i_poly, dims[0]] - 0.4, # lower corner coordinates multi_indices[i_poly, dims[1]] - 0.4, multi_indices[i_poly, dims[2]] - 0.4, 0.8, 0.8, 0.8, # width, depth and height color=np.array([51, 153, 255]) / 255.0, # bar colour alpha=1) # transparency of the bars ax.view_init(elev=30, azim=45) ax.set_xlabel("$x_1$", fontsize=18) ax.set_ylabel("$x_2$", fontsize=18) ax.set_zlabel("$x_3$", fontsize=18) ax.set_xlim([0, np.max(multi_indices) + 1]) ax.set_ylim([0, np.max(multi_indices) + 1]) ax.set_zlim([0, np.max(multi_indices) + 1]) ax.set_xticklabels(range(np.max(multi_indices) + 1)) ax.set_xticks(range(np.max(multi_indices) + 1)) ax.set_yticklabels(range(np.max(multi_indices) + 1)) ax.set_yticks(range(np.max(multi_indices) + 1)) ax.set_zticklabels(range(np.max(multi_indices) + 1)) ax.set_zticks(range(np.max(multi_indices) + 1)) # ax.set_xlim([0, 5]) # ax.set_ylim([0, 5]) # ax.set_zlim([0, 5]) # # ax.set_xticklabels(range(5)) # ax.set_xticks(range(5)) # ax.set_yticklabels(range(5)) # ax.set_yticks(range(5)) # ax.set_zticklabels(range(5)) # ax.set_zticks(range(5)) if fn_plot is not None: if os.path.splitext(fn_plot) not in [".pdf", ".png"]: fn_plot = fn_plot + ".png" plt.savefig(fn_plot, dpi=600)