import uuid
import copy
import warnings
import scipy.stats
import numpy as np
from tqdm import tqdm
from .io import iprint
from .Quadrature import *
from scipy.special import gamma
from scipy.optimize import minimize
from .RandomParameter import Beta
from .RandomParameter import Norm
from .misc import compute_chunks
from .misc import mutual_coherence
from .misc import get_multi_indices
from .misc import get_cartesian_product
from .misc import squared_exponential_kernel
from .misc import t_averaged_mutual_coherence
from .misc import average_cross_correlation_gram
from .misc import get_different_rows_from_matrices
import multiprocessing.pool
from _functools import partial
warnings.filterwarnings('ignore')
[docs]
class Grid(object):
"""
Grid class
Parameters
----------
parameters_random : OrderedDict of RandomParameter instances
OrderedDict containing the RandomParameter instances the grids are generated for
weights: ndarray of float [n_grid x dim]
Weights of the grid (all)
coords: ndarray of float [n_grid x dim]
Denormalized coordinates xi
coords_norm: ndarray of float [n_grid x dim]
Normalized coordinates xi
coords_gradient: ndarray of float [n_grid x dim x dim]
Denormalized coordinates xi
coords_gradient_norm: ndarray of float [n_grid x dim x dim]
Normalized coordinates xi
coords_id: list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
Attributes
----------
parameters_random : OrderedDict of RandomParameter instances
OrderedDict containing the RandomParameter instances the grids are generated for
_weights: ndarray of float [n_grid x dim]
Weights of the grid (all)
_coords: ndarray of float [n_grid x dim]
Denormalized coordinates xi
_coords_norm: ndarray of float [n_grid x dim]
Normalized coordinates xi
_domains: ndarray of float [n_grid]
Domain IDs of grid points for multi-element gPC
_coords_gradient: ndarray of float [n_grid x dim x dim]
Denormalized coordinates xi
_coords_gradient_norm: ndarray of float [n_grid x dim x dim]
Normalized coordinates xi
coords_id: list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
n_grid: int
Total number of nodes in grid.
"""
def __init__(self, parameters_random, coords=None, coords_norm=None,
coords_gradient=None, coords_gradient_norm=None, coords_id=None, coords_gradient_id=None,
grid_pre=None):
"""
Constructor; Initialize Grid class
"""
self._coords = coords # Coordinates of gpc model calculation in the system space
self._coords_norm = coords_norm # Coordinates of gpc model calculation in the gpc space
self.coords_id = coords_id # Unique IDs of grid points
self.coords_gradient_id = coords_gradient_id # Unique IDs of grid gradient points
self._weights = None # Weights for numerical integration
self.parameters_random = parameters_random # OrderedDict of RandomParameter instances
self.dim = len(self.parameters_random) # Number of random variables
self._coords_gradient = coords_gradient # Shifted coordinates for gradient calculation in the system space
self._coords_gradient_norm = coords_gradient_norm # Normalized coordinates for gradient calculation
self.grid_pre = grid_pre # Previous grid the new grid is based on
if coords is not None:
self.n_grid = self.coords.shape[0] # Total number of grid points
if coords_gradient is not None:
self.n_grid_gradient = self.coords_gradient.shape[0] # Total number of grid points for gradient calculation
if coords_id is None and coords is not None:
self.coords_id = [uuid.uuid4() for _ in range(self.n_grid)]
self.n_grid = self._coords.shape[0]
if coords_gradient_id is None and coords_gradient is not None:
self.coords_gradient_id = [uuid.uuid4() for _ in range(self.n_grid)]
self.n_grid_gradient = self._coords_gradient.shape[0]
if coords is not None and coords_norm is None:
self.coords_norm = self.get_normalized_coordinates(self.coords)
@property
def coords(self):
return self._coords
@coords.setter
def coords(self, value):
if value.ndim == 1:
value = value[np.newaxis, :]
self._coords = value
if value is not None:
self.n_grid = self._coords.shape[0]
# Generate unique IDs of grid points
if self.coords_id is None:
self.coords_id = [uuid.uuid4() for _ in range(self.n_grid)]
@property
def coords_norm(self):
return self._coords_norm
@coords_norm.setter
def coords_norm(self, value):
if value.ndim == 1:
value = value[np.newaxis, :]
self._coords_norm = value
if value is not None:
self.n_grid = self._coords_norm.shape[0]
# Generate unique IDs of grid points
if self.coords_id is None:
self.coords_id = [uuid.uuid4() for _ in range(self.n_grid)]
@property
def coords_gradient(self):
return self._coords_gradient
@coords_gradient.setter
def coords_gradient(self, value):
assert value.ndim == 3, "Specify coords_gradient as 3D tensor of shape [n_grid x dim x dim]"
self._coords_gradient = value
if value is not None:
self.n_grid_gradient = self._coords_gradient.shape[0]
# Generate unique IDs of grid gradient points
if self.coords_gradient_id is None:
self.coords_gradient_id = [uuid.uuid4() for _ in range(self.n_grid_gradient)]
@property
def coords_gradient_norm(self):
return self._coords_gradient_norm
@coords_gradient_norm.setter
def coords_gradient_norm(self, value):
assert value.ndim == 3, "Specify coords_gradient_norm as 3D tensor of shape [n_grid x dim x dim]"
self._coords_gradient_norm = value
if value is not None:
self.n_grid_gradient = self._coords_gradient_norm.shape[0]
# Generate unique IDs of grid gradient points
if self.coords_gradient_id is None:
self.coords_gradient_id = [uuid.uuid4() for _ in range(self.n_grid_gradient)]
@property
def weights(self):
return self._weights
@weights.setter
def weights(self, value):
self._weights = value
[docs]
def get_denormalized_coordinates(self, coords_norm):
"""
Denormalize grid from normalized to original parameter space for simulations.
coords = Grid.get_denormalized_coordinates(coords_norm)
Parameters
----------
coords_norm: [N_samples x dim] np.ndarray
Normalized coordinates xi
Returns
-------
coords: [N_samples x dim] np.ndarray
Denormalized coordinates xi
"""
coords = np.zeros(coords_norm.shape)
for i_p, p in enumerate(self.parameters_random):
if self.parameters_random[p].pdf_type == "beta":
coords[:, i_p, ] = (coords_norm[:, i_p, ] + 1) / \
2 * (self.parameters_random[p].pdf_limits[1] -
self.parameters_random[p].pdf_limits[0]) \
+ self.parameters_random[p].pdf_limits[0]
if self.parameters_random[p].pdf_type in ["norm", "normal"]:
coords[:, i_p, ] = coords_norm[:, i_p, ] * self.parameters_random[p].pdf_shape[1] + \
self.parameters_random[p].pdf_shape[0]
if self.parameters_random[p].pdf_type in ["gamma"]:
coords[:, i_p, ] = coords_norm[:, i_p, ] / self.parameters_random[p].pdf_shape[1] + \
self.parameters_random[p].pdf_shape[2]
return coords
[docs]
def get_normalized_coordinates(self, coords):
"""
Normalize grid from original parameter to normalized space for simulations.
coords_norm = Grid.get_normalized_coordinates(coords)
Parameters
----------
coords : [N_samples x dim] np.ndarray
Denormalized coordinates xi in original parameter space
Returns
-------
coords_norm : [N_samples x dim] np.ndarray
Normalized coordinates xi
"""
coords_norm = np.zeros(coords.shape)
for i_p, p in enumerate(self.parameters_random):
if self.parameters_random[p].pdf_type == "beta":
coords_norm[:, i_p] = (coords[:, i_p] - self.parameters_random[p].pdf_limits[0])
coords_norm[:, i_p] = coords_norm[:, i_p] / \
(self.parameters_random[p].pdf_limits[1] -
self.parameters_random[p].pdf_limits[0]) * \
2.0 - 1
if self.parameters_random[p].pdf_type in ["norm", "normal"]:
coords_norm[:, i_p] = (coords[:, i_p] - self.parameters_random[p].pdf_shape[0]) / \
self.parameters_random[p].pdf_shape[1]
if self.parameters_random[p].pdf_type in ["gamma"]:
coords_norm[:, i_p] = (coords[:, i_p] - self.parameters_random[p].pdf_shape[2]) * \
self.parameters_random[p].pdf_shape[1]
return coords_norm
[docs]
def create_gradient_grid(self, delta=1e-3):
"""
Creates new grid points to determine gradient of model function.
Adds or updates self.coords_gradient, self.coords_gradient_norm and self.coords_gradient_id.
Parameters
----------
delta : float, optional, default: 1e-3
Shift of grid-points along axes in normalized parameter space
"""
# shift of gradient grid in normalized space
delta = delta * np.eye(self.dim)
# Create or update the gradient grid [n_grid x dim x dim]
if self.coords_gradient is not None:
n_grid_gradient = self.coords_gradient_norm.shape[0]
self.coords_gradient = np.vstack((self.coords_gradient,
np.zeros((self.n_grid-n_grid_gradient, self.dim, self.dim))))
self.coords_gradient_norm = np.vstack((self.coords_gradient_norm,
np.zeros((self.n_grid-n_grid_gradient, self.dim, self.dim))))
else:
n_grid_gradient = 0
self.coords_gradient = np.zeros((self.n_grid, self.dim, self.dim))
self.coords_gradient_norm = np.zeros((self.n_grid, self.dim, self.dim))
if n_grid_gradient < self.coords_gradient.shape[0]:
# shift the grid
for i_dim in range(self.dim):
self.coords_gradient_norm[n_grid_gradient:, :, i_dim] = self.coords_norm[n_grid_gradient:, ] - \
delta[i_dim, :]
# determine coordinates in original parameter space
self.coords_gradient[n_grid_gradient:, :, :] = \
self.get_denormalized_coordinates(self.coords_gradient_norm[n_grid_gradient:, :, :])
# total number of grid points
self.n_grid_gradient = self.coords_gradient.shape[0]*self.coords_gradient.shape[2]
# Generate unique IDs of grid points [n_grid]
self.coords_gradient_id = copy.deepcopy(self.coords_id)
[docs]
class TensorGrid(Grid):
"""
Generate TensorGrid object instance.
Parameters
----------
parameters_random : OrderedDict of RandomParameter instances
OrderedDict containing the RandomParameter instances the grids are generated for
options: dict
Grid options
- parameters["grid_type"] ... list of str [dim]: type of grid ('jacobi', 'hermite', 'cc', 'fejer2')
- parameters["n_dim"] ... list of int [dim]: Number of nodes in each dimension
coords : ndarray of float [n_grid_add x dim]
Grid points to add (model space)
coords_norm : ndarray of float [n_grid_add x dim]
Grid points to add (normalized space)
coords_gradient : ndarray of float [n_grid x dim x dim]
Denormalized coordinates xi
coords_gradient_norm : ndarray of float [n_grid x dim x dim]
Normalized coordinates xi
coords_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
coords_gradient_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
knots_dim_list : list of float [dim][n_knots]
Knots of polynomials in each dimension
weights_dim_list : list of float [dim][n_knots]
Weights of polynomials in each dimension
weights : ndarray of float [n_grid]
Quadrature weights for each grid point
Examples
--------
>>> import pygpc
>>> pygpc.TensorGrid(parameters_random, options={"grid_type": ["hermite", "jacobi"], "n_dim": [5, 6]})
Attributes
----------
grid_type : [N_vars] list of str
Type of quadrature used to construct tensor grid ('jacobi', 'hermite', 'clenshaw_curtis', 'fejer2')
parameters_random : OrderedDict of RandomParameter instances
OrderedDict containing the RandomParameter instances the grids are generated for
options: dict
Grid options
- parameters["grid_type"] ... list of str [dim]: type of grid ('jacobi', 'hermite', 'cc', 'fejer2')
- parameters["n_dim"] ... list of int [dim]: Number of nodes in each dimension
coords : ndarray of float [n_grid_add x dim]
Grid points to add (model space)
coords_norm : ndarray of float [n_grid_add x dim]
Grid points to add (normalized space)
coords_gradient : ndarray of float [n_grid x dim x dim]
Denormalized coordinates xi
coords_gradient_norm : ndarray of float [n_grid x dim x dim]
Normalized coordinates xi
coords_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
coords_gradient_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
knots_dim_list : list of float [dim][n_knots]
Knots of polynomials in each dimension
weights_dim_list : list of float [dim][n_knots]
Weights of polynomials in each dimension
weights : ndarray of float [n_grid]
Quadrature weights for each grid point
"""
def __init__(self, parameters_random, options, coords=None, coords_norm=None,
coords_gradient=None, coords_gradient_norm=None, coords_id=None, coords_gradient_id=None,
knots_dim_list=None, weights_dim_list=None, weights=None):
"""
Constructor; Initializes TensorGrid object instance; Generates grid
"""
super(TensorGrid, self).__init__(parameters_random,
coords=coords,
coords_norm=coords_norm,
coords_gradient=coords_gradient,
coords_gradient_norm=coords_gradient_norm,
coords_id=coords_id,
coords_gradient_id=coords_gradient_id)
self.options = options
self.grid_type = options["grid_type"]
self.n_dim = options["n_dim"]
if coords is not None and coords_norm is not None:
grid_present = True
self.knots_dim_list = knots_dim_list
self.weights_dim_list = weights_dim_list
self.weights = weights
else:
grid_present = False
if not grid_present:
# get knots and weights of polynomials in each dimension
self.knots_dim_list = []
self.weights_dim_list = []
for i_p, p in enumerate(self.parameters_random):
# Jacobi polynomials
if self.grid_type[i_p] == 'jacobi':
knots, weights = get_quadrature_jacobi_1d(self.n_dim[i_p],
self.parameters_random[p].pdf_shape[0] - 1,
self.parameters_random[p].pdf_shape[1] - 1,)
# Hermite polynomials
elif self.grid_type[i_p] == 'hermite':
knots, weights = get_quadrature_hermite_1d(self.n_dim[i_p])
# Clenshaw Curtis
elif self.grid_type[i_p] in ['clenshaw_curtis' or 'cc']:
knots, weights = get_quadrature_clenshaw_curtis_1d(self.n_dim[i_p])
# Fejer type 2 (Clenshaw Curtis without boundary nodes)
elif self.grid_type[i_p] == 'fejer2':
knots, weights = get_quadrature_fejer2_1d(self.n_dim[i_p])
# Gauss-Patterson (Nested Legendre rule)
elif self.grid_type[i_p] == 'patterson':
knots, weights = get_quadrature_patterson_1d(self.n_dim[i_p])
else:
knots = []
weights = []
AttributeError("Specified grid_type {} not implemented!".format(self.grid_type[i_p]))
self.knots_dim_list.append(knots)
self.weights_dim_list.append(weights)
# combine coordinates to full tensor grid (all combinations)
# self.coords_norm = cartesian(self.knots_dim_list)
self.coords_norm = get_cartesian_product(self.knots_dim_list)
# rescale normalized coordinates in case of normal distributions and "fejer2" or "cc" grids
# +- 0.675 * sigma -> 50%
# +- 1.645 * sigma -> 90%
# +- 1.960 * sigma -> 95%
# +- 2.576 * sigma -> 99%
# +- 3.000 * sigma -> 99.73%
for i_p, p in enumerate(self.parameters_random):
if self.parameters_random[p].pdf_type in ["norm", "normal"] and (not(self.grid_type[i_p] == "hermite")):
self.coords_norm[:, i_p] = self.coords_norm[:, i_p] * 1.960
# determine combined weights of Gauss quadrature
self.weights = np.prod(get_cartesian_product(self.weights_dim_list), axis=1) / (2.0 ** self.dim)
# denormalize grid to original parameter space
self.coords = self.get_denormalized_coordinates(self.coords_norm)
# Total number of nodes in grid
self.n_grid = self.coords.shape[0]
# Generate and append unique IDs of grid points
self.coords_id = [uuid.uuid4() for _ in range(self.n_grid)]
[docs]
class SparseGrid(Grid):
"""
SparseGrid object instance.
Parameters
----------
parameters_random : OrderedDict of RandomParameter instances
OrderedDict containing the RandomParameter instances the grids are generated for
options: dict
Grid parameters
- grid_type ([N_vars] list of str) ... Type of quadrature rule used to construct sparse grid
('jacobi', 'hermite', 'clenshaw_curtis', 'fejer2', 'patterson')
- level ([N_vars] list of int) ... Number of levels in each dimension
- level_max (int) ... Global combined level maximum
- interaction_order (int) ...Interaction order of parameters and grid, i.e. the grid points are lying
between this number of dimensions
- order_sequence_type (str) ... Type of order sequence ('lin', 'exp') common: 'exp'
- make_grid (boolean, optional, default=True) ... Boolean value to determine if to generate grid
during initialization
- verbose (bool, optional, default=True) ... Print output messages into stdout
coords : ndarray of float [n_grid_add x dim]
Grid points to add (model space)
coords_norm : ndarray of float [n_grid_add x dim]
Grid points to add (normalized space)
coords_gradient : ndarray of float [n_grid x dim x dim]
Denormalized coordinates xi
coords_gradient_norm : ndarray of float [n_grid x dim x dim]
Normalized coordinates xi
coords_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
coords_gradient_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
level_sequence: list of int
Integer sequence of levels
order_sequence: list of int
Integer sequence of polynomial order of levels
weights : ndarray of float [n_grid]
Quadrature weights for each grid point
Examples
--------
>>> import pygpc
>>> grid = pygpc.SparseGrid(parameters_random=parameters_random,
>>> options={"grid_type": ["jacobi", "jacobi"],
>>> "level": [3, 3],
>>> "level_max": 3,
>>> "interaction_order": 2,
>>> "order_sequence_type": "exp"})
Attributes
----------
grid_type: [N_vars] list of str
specify type of quadrature used to construct sparse grid ('jacobi', 'hermite', 'cc', 'fejer2')
level: [N_vars] list of int
number of levels in each dimension
level_max: int
global combined level maximum
level_sequence: list of int
list containing the levels
interaction_order: int
interaction order of parameters and grid, i.e. the grid points are lying between this number of dimensions
order_sequence_type: str
type of order sequence ('lin', 'exp') common: 'exp'
order_sequence: list of int
list containing the polynomial order of the levels
coords : ndarray of float [n_grid_add x dim]
Grid points to add (model space)
coords_norm : ndarray of float [n_grid_add x dim]
Grid points to add (normalized space)
coords_gradient : ndarray of float [n_grid x dim x dim]
Denormalized coordinates xi
coords_gradient_norm : ndarray of float [n_grid x dim x dim]
Normalized coordinates xi
coords_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
coords_gradient_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
weights : ndarray of float [n_grid]
Quadrature weights for each grid point
verbose: bool
boolean value to determine if to print out the progress into the standard output
"""
def __init__(self, parameters_random, options, coords=None, coords_norm=None,
coords_gradient=None, coords_gradient_norm=None, coords_id=None, coords_gradient_id=None,
level_sequence=None, order_sequence=None, weights=None):
"""
Constructor; Initializes SparseGrid class; Generates grid
"""
super(SparseGrid, self).__init__(parameters_random,
coords=coords,
coords_norm=coords_norm,
coords_gradient=coords_gradient,
coords_gradient_norm=coords_gradient_norm,
coords_id=coords_id,
coords_gradient_id=coords_gradient_id)
self.grid_type = options["grid_type"] # Quadrature rule ('jacobi', 'hermite', 'clenshaw_curtis', 'fejer2')
self.level = options["level"] # Number of levels in each dimension [dim x 1]
self.level_max = options["level_max"] # Global combined level maximum
self.interaction_order = options["interaction_order"] # Interaction order of parameters and grid
self.order_sequence_type = options["order_sequence_type"] # Order sequence ('lin', 'exp' (common))
self.level_sequence = [] # Integer sequence of levels
self.order_sequence = [] # Integer sequence of polynomial order of levels
# output while grid generation on/off
if "verbose" not in options.keys():
self.verbose = False
# Generate grid if not specified
if coords is not None and coords_norm is not None:
grid_present = True
self.level_sequence = level_sequence
self.order_sequence = order_sequence
self.weights = weights
else:
grid_present = False
# Grid is generated during initialization or coords, coords_norm and weights are added manually
if not grid_present:
self.calc_multi_indices()
self.calc_coords_weights()
# Determine total number of grid points
self.n_grid = self.coords.shape[0]
# Generate unique IDs of grid points
self.coords_id = [uuid.uuid4() for _ in range(self.n_grid)]
[docs]
def calc_multi_indices(self):
"""
Calculate the multi index list needed for the calculation of the SparseGrid.
"""
for i_p, p in enumerate(self.parameters_random):
if self.grid_type[i_p] == 'fejer2':
self.level_sequence.append([element for element in range(1, self.level[i_p] + 1)])
else:
self.level_sequence.append([element for element in range(self.level[i_p] + 1)])
if self.order_sequence_type == 'exp': # order = 2**level + 1
if self.grid_type[i_p] == 'fejer2': # start with order = 1 @ level = 1
self.order_sequence.append((np.power(2, np.arange(1, self.level[i_p])) - 1).tolist())
self.order_sequence[i_p][0] = 1
elif self.grid_type[i_p] == 'patterson': # start with order = 1 @ level = 0 [1,3,7,15,31,...]
self.order_sequence.append((np.power(2, np.arange(0, self.level[i_p])) + 1).tolist())
else: # start with order = 1 @ level = 0
self.order_sequence.append(
(2 ** np.linspace(0, self.level[i_p], self.level[i_p] + 1) + 1).tolist())
self.order_sequence[i_p][0] = 1
elif self.order_sequence_type == 'lin': # order = level
if self.grid_type[i_p] == 'fejer2': # start with level = 1 @ order = 1
self.order_sequence.append(np.linspace(1, self.level[i_p] + 1, self.level[i_p] + 1).tolist())
elif self.grid_type[i_p] == 'patterson': # start with order = 1 @ level = 0 [1,3,7,15,31,...]
iprint("Not possible in case of Gauss-Patterson grid.", tab=0, verbose=self.verbose)
else: # start with
self.order_sequence.append(np.linspace(1, self.level[i_p] + 1, self.level[i_p] + 1).tolist())
[docs]
def calc_l_level(self):
"""
Calculate the l-level needed for the Fejer grid type 2.
l_level = calc_l_level()
Returns
-------
l_level: np.ndarray
Multi indices filtered by level capacity and interaction order
"""
if "fejer2" in self.grid_type:
if self.dim == 1:
l_level = np.array([np.linspace(1, self.level_max, self.level_max)]).transpose()
else:
l_level = get_multi_indices(self.dim, self.level_max - self.dim)
l_level = l_level + 1
else:
if self.dim == 1:
l_level = np.array([np.linspace(0, self.level_max, self.level_max + 1)]).transpose()
else:
l_level = get_multi_indices(order=[self.level_max] * self.dim,
order_max=self.level_max,
interaction_order=self.dim,
order_max_norm=1.,
interaction_order_current=None)
# filter out rows exceeding the individual level cap
for i_p in range(self.dim):
l_level = l_level[l_level[:, i_p] <= self.level[i_p]]
# Consider interaction order (filter out multi-indices exceeding it)
if self.interaction_order < self.dim:
if any("fejer2" in s for s in self.grid_type):
l_level = l_level[np.sum(l_level > 1, axis=1) <= self.interaction_order, :]
else:
l_level = l_level[np.sum(l_level > 0, axis=1) <= self.interaction_order, :]
return l_level
[docs]
def calc_grid(self):
"""
Calculate a cubature lookup table for knots and weights.
dl_k, dl_w = calc_grid()
Returns
-------
dl_k: list of list of float
Cubature lookup table for knots
dl_w: list of list of float
Cubature lookup table for weights
"""
# make cubature lookup table for knots (dl_k) and weights (dl_w) [max(l) x dim]
iprint("Generating difference grids...", tab=0, verbose=self.verbose)
dl_k = [[0 for _ in range(self.dim)] for _ in range(int(np.amax(self.level) + 1))]
dl_w = [[0 for _ in range(self.dim)] for _ in range(int(np.amax(self.level) + 1))]
knots_l, weights_l, knots_l_1, weights_l_1 = 0, 0, 0, 0
for i_p, p in enumerate(self.parameters_random):
for i_level in self.level_sequence[i_p]:
# Jacobi polynomials
if self.grid_type[i_p] == 'jacobi':
knots_l, weights_l = get_quadrature_jacobi_1d(self.order_sequence[i_p][i_level],
self.parameters_random[p].pdf_shape[0] - 1,
self.parameters_random[p].pdf_shape[1] - 1)
knots_l_1, weights_l_1 = get_quadrature_jacobi_1d(self.order_sequence[i_p][i_level - 1],
self.parameters_random[p].pdf_shape[0] - 1,
self.parameters_random[p].pdf_shape[1] - 1)
# Hermite polynomials
if self.grid_type[i_p] == 'hermite':
knots_l, weights_l = get_quadrature_hermite_1d(
self.order_sequence[i_p][i_level])
knots_l_1, weights_l_1 = get_quadrature_hermite_1d(
self.order_sequence[i_p][i_level - 1])
# Gauss-Patterson
if self.grid_type[i_p] == 'patterson':
knots_l, weights_l = get_quadrature_patterson_1d(
self.order_sequence[i_p][i_level])
knots_l_1, weights_l_1 = get_quadrature_patterson_1d(
self.order_sequence[i_p][i_level - 1])
# Clenshaw Curtis
if self.grid_type[i_p] == 'clenshaw_curtis':
knots_l, weights_l = get_quadrature_clenshaw_curtis_1d(
self.order_sequence[i_p][i_level])
knots_l_1, weights_l_1 = get_quadrature_clenshaw_curtis_1d(
self.order_sequence[i_p][i_level - 1])
# Fejer type 2
if self.grid_type[i_p] == 'fejer2':
knots_l, weights_l = get_quadrature_fejer2_1d(
self.order_sequence[i_p][i_level - 1])
knots_l_1, weights_l_1 = get_quadrature_fejer2_1d(
self.order_sequence[i_p][i_level - 2])
if (i_level == 0 and not self.grid_type[i_p] == 'fejer2') or \
(i_level == 1 and (self.grid_type[i_p] == 'fejer2')):
dl_k[i_level][i_p] = knots_l
dl_w[i_level][i_p] = weights_l
else:
# noinspection PyTypeChecker
dl_k[i_level][i_p] = np.hstack((knots_l, knots_l_1))
# noinspection PyTypeChecker
dl_w[i_level][i_p] = np.hstack((weights_l, -weights_l_1))
return dl_k, dl_w
[docs]
def calc_tensor_products(self):
"""
Calculate the tensor products of the knots and the weights.
dll_k, dll_w = calc_tensor_products()
Returns
-------
dll_k: np.ndarray
Tensor product of knots
dll_w: np.ndarray
Tensor product of weights
"""
# make list of all tensor products according to multi-index list "l"
iprint("Generating sub-grids...", tab=0, verbose=self.verbose)
dl_k, dl_w = self.calc_grid()
l_level = self.calc_l_level()
dll_k = []
dll_w = []
for i_l_level in range(l_level.shape[0]):
knots = []
weights = []
for i_p in range(self.dim):
knots.append(np.asarray(dl_k[int(l_level[i_l_level, i_p])][i_p], dtype=float))
weights.append(np.asarray(dl_w[int(l_level[i_l_level, i_p])][i_p], dtype=float))
# tensor product of knots
dll_k.append(get_cartesian_product(knots))
# tensor product of weights
dll_w.append(np.prod(get_cartesian_product(weights), axis=1))
dll_w = np.hstack(dll_w)
dll_k = np.vstack(dll_k)
return dll_k, dll_w
[docs]
def calc_coords_weights(self):
"""
Determine coords and weights of sparse grid by generating, merging and subtracting sub-grids.
"""
# find similar points in grid and formulate Point list
dll_k, dll_w = self.calc_tensor_products()
point_number_list = np.zeros(dll_w.shape[0]) - 1
point_no = 0
epsilon_k = 1E-6
coords_norm = []
iprint("Merging sub-grids...", tab=0, verbose=self.verbose)
while any(point_number_list < 0):
not_found = point_number_list < 0
dll_k_nf = dll_k[not_found]
point_temp = np.zeros(dll_k_nf.shape[0]) - 1
point_temp[np.sum(np.abs(dll_k_nf - dll_k_nf[0]), axis=1) < epsilon_k] = point_no
point_number_list[not_found] = point_temp
point_no = point_no + 1
coords_norm.append(dll_k_nf[0, :])
coords_norm = np.array(coords_norm)
point_number_list = np.asarray(point_number_list, dtype=int)
weights = np.zeros(np.amax(point_number_list) + 1) - 999
for i_point in range(np.amax(point_number_list) + 1):
weights[i_point] = np.sum(dll_w[point_number_list == i_point])
# filter for very small weights
iprint("Filter grid for very small weights...", tab=0, verbose=self.verbose)
epsilon_w = 1E-8 / self.dim
keep_point = np.abs(weights) > epsilon_w
self.weights = weights[keep_point] / 2 ** self.dim
coords_norm = coords_norm[keep_point]
# rescale normalized coordinates in case of normal distributions and "fejer2" or "clenshaw_curtis" grids
# +- 0.675 * sigma -> 50%
# +- 1.645 * sigma -> 90%
# +- 1.960 * sigma -> 95%
# +- 2.576 * sigma -> 99%
# +- 3.000 * sigma -> 99.73%
for i_p, p in enumerate(self.parameters_random):
if self.parameters_random[p].pdf_type in ["norm", "normal"] and (not(self.grid_type[i_p] == "hermite")):
coords_norm[:, i_p] = coords_norm[:, i_p] * 1.960
# denormalize grid to original parameter space
iprint("Denormalizing grid for computations...", tab=0, verbose=self.verbose)
self.coords_norm = coords_norm
self.coords = self.get_denormalized_coordinates(coords_norm)
[docs]
class RandomGrid(Grid):
"""
RandomGrid object
Parameters
----------
parameters_random : OrderedDict of RandomParameter instances
OrderedDict containing the RandomParameter instances the grids are generated for
n_grid: int
Number of random samples in grid
seed : float, optional, default=None
Seeding point to replicate random grid
options : dict, optional, default=None
RandomGrid options depending on the grid type
coords : ndarray of float [n_grid_add x dim]
Grid points to add (model space)
coords_norm : ndarray of float [n_grid_add x dim]
Grid points to add (normalized space)
coords_gradient : ndarray of float [n_grid x dim x dim]
Denormalized coordinates xi
coords_gradient_norm : ndarray of float [n_grid x dim x dim]
Normalized coordinates xi
coords_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
coords_gradient_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
Examples
--------
>>> import pygpc
>>> grid = pygpc.RandomGrid(parameters_random=parameters_random, n_grid=100, options=None)
Attributes
----------
parameters_random : OrderedDict of RandomParameter instances
OrderedDict containing the RandomParameter instances the grids are generated for
n_grid: int
Number of random samples in grid
seed : float, optional, default=None
Seeding point to replicate random grid
options : dict, optional, default=None
RandomGrid options depending on the grid type
coords : ndarray of float [n_grid_add x dim]
Grid points to add (model space)
coords_norm : ndarray of float [n_grid_add x dim]
Grid points to add (normalized space)
coords_gradient : ndarray of float [n_grid x dim x dim]
Denormalized coordinates xi
coords_gradient_norm : ndarray of float [n_grid x dim x dim]
Normalized coordinates xi
coords_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
coords_gradient_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
"""
def __init__(self, parameters_random, n_grid=None, options=None, coords=None, coords_norm=None,
coords_gradient=None, coords_gradient_norm=None, coords_id=None, coords_gradient_id=None,
grid_pre=None):
"""
Constructor; Initializes RandomGrid instance; Generates grid
"""
super(RandomGrid, self).__init__(parameters_random,
coords=coords,
coords_norm=coords_norm,
coords_gradient=coords_gradient,
coords_gradient_norm=coords_gradient_norm,
coords_id=coords_id,
coords_gradient_id=coords_gradient_id,
grid_pre=grid_pre)
if n_grid is not None:
self.n_grid = int(n_grid)
self.options = options
self.seed = None
if self.options is None:
self.options = dict()
if "seed" in self.options.keys():
self.seed = self.options["seed"]
# Seed of random grid (if necessary to reproduce random grid)
np.random.seed(self.seed)
[docs]
def resample(self, idx, classifier=None, domain=None, gradient=False, results=None, type=None):
"""
Replace grid points specified by index. Modifies grid object in place.
Parameters
----------
idx : np.ndarray of int [n_grid_points_replace]
Indices of grid points to replace by resampling.
classifier : Classifier object, optional, default: None
Classifier
domain : int, optional, default: None
Adds grid points only in specified domain (needs Classifier object including a predict() method)
gradient : bool, optional, default: False
Add corresponding gradient grid points
results : np.ndarray of float [n_grid x n_qoi]
Results computed so far before adding additional grid points
type : str, optional, default: None
Type of adding new grid points
- "GP": Gaussian process regression (points are added where the uncertainty of sampling is very high).
Does only work for Random, LHS, and GP grids.
- None: grid points are added according to the grid type.
"""
# create temporary grid
grid_tmp = copy.deepcopy(self)
n_grid = self.n_grid
# extend grid by number of grid points to resample
grid_tmp.extend_random_grid(n_grid_new=self.n_grid + len(idx),
classifier=classifier,
domain=domain,
gradient=gradient,
results=results,
type=type)
# copy options (seed increment)
self.options = copy.deepcopy(grid_tmp.options)
# overwrite grid points to resample in original grid with new grid points from temporary grid
self.coords[idx, ] = grid_tmp.coords[n_grid:, ]
self.coords_norm[idx, ] = grid_tmp.coords_norm[n_grid:, ]
# generate and replace unique IDs of new grid points
for _idx in idx:
self.coords_id[_idx] = uuid.uuid4()
# create new gradient grid points if required
if self.coords_gradient is not None:
self._coords_gradient = None
self._coords_gradient_norm = None
self.n_grid_gradient = None
self.create_gradient_grid()
[docs]
def delete(self, idx):
"""
Delete grid points by index. Modifies grid object in place.
Parameters
----------
idx : int or np.ndarray of int
Indices of grid points to delete.
"""
if type(idx) in [int, float, np.ndarray, list]:
idx = np.array(idx)
# delete grid points
self.coords = np.delete(self.coords, idx, axis=0)
self.coords_norm = np.delete(self.coords_norm, idx, axis=0)
# remove unique IDs of grid points
self.coords_id = [self.coords_id[i] for i in range(self.n_grid) if i not in idx]
# delete gradient grid points
if self.coords_gradient is not None:
self.coords_gradient = np.delete(self.coords_gradient, idx, axis=0)
self.coords_gradient_norm = np.delete(self.coords_gradient_norm, idx, axis=0)
[docs]
def extend_random_grid(self, n_grid_new=None, coords=None, coords_norm=None, classifier=None, domain=None,
gradient=False, results=None, type=None):
"""
Add sample points according to input pdfs to grid (old points are kept). Define either the new total number of
grid points with "n_grid_new" or add grid-points manually by providing "coords" and "coords_norm".
extend_random_grid(n_grid_new, coords=None, coords_norm=None, seed=None, classifier=None, domain=None):
Parameters
----------
n_grid_new : float
Total number of grid points in extended random grid (old points are kept)
(n_grid_add = n_grid_new - n_grid_old)
coords : ndarray of float [n_grid_add x dim]
Grid points to add (model space)
coords_norm : ndarray of float [n_grid_add x dim]
Grid points to add (normalized space)
classifier : Classifier object, optional, default: None
Classifier
domain : int, optional, default: None
Adds grid points only in specified domain (needs Classifier object including a predict() method)
gradient : bool, optional, default: False
Add corresponding gradient grid points
results : np.ndarray of float [n_grid x n_qoi]
Results computed so far before adding additional grid points
type : str, optional, default: None
Type of adding new grid points
- "GP": Gaussian process regression (points are added where the uncertainty of sampling is very high).
Does only work for Random, LHS, and GP grids.
- None: grid points are added according to the grid type.
"""
# increase seed if needed to avoid creation of same grid points
if "seed" in self.options.keys() and self.options["seed"] is not None:
self.options["seed"] += 1
self.seed = self.options["seed"]
if isinstance(self, GP):
type = "GP"
if n_grid_new is not None:
# Number of new grid points
n_grid_add = int(n_grid_new - self.n_grid)
if n_grid_add > 0:
# Generate new grid points
if classifier is None:
if isinstance(self, Random) or isinstance(self, GP):
if type is None:
new_grid = Random(parameters_random=self.parameters_random,
n_grid=n_grid_add,
options=self.options)
# append points to existing grid
self.coords = np.vstack([self.coords, new_grid.coords])
self.coords_norm = np.vstack([self.coords_norm, new_grid.coords_norm])
elif type == "GP":
if "n_pool" not in list(self.options.keys()):
self.options["n_pool"] = 1000
# If results are given, determine optimal hyperparameters for Gaussian Process Regression
if results is not None:
self.options["lengthscale"], self.options["variance"] = \
get_parameters_gaussian_process(Xtrain=self.coords_norm,
ytrain=results[:, 0])
if (self.n_grid + 1) == n_grid_new:
tqdm.write(f"Adding GP grid point #{self.n_grid + 1}")
else:
tqdm.write(f"Adding GP grid points #{self.n_grid + 1} ... #{n_grid_new}")
for _ in tqdm(range(n_grid_add)):
# Determine new grid points where uncertainty of output is highest
new_grid = self.get_coords_gaussian_process(n_grid_add=1,
lengthscale=self.options["lengthscale"],
variance=self.options["variance"],
n_pool=self.options["n_pool"])
# append points to existing grid
self.coords = np.vstack([self.coords, new_grid.coords])
self.coords_norm = np.vstack([self.coords_norm, new_grid.coords_norm])
tqdm._instances.clear()
elif isinstance(self, LHS):
grid_pre = copy.deepcopy(self)
if type is None:
new_grid = LHS(parameters_random=self.parameters_random,
n_grid=n_grid_add,
grid_pre=grid_pre,
options=self.options)
# append points to existing grid
self.coords = np.vstack([self.coords, new_grid.coords])
self.coords_norm = np.vstack([self.coords_norm, new_grid.coords_norm])
elif type == "GP":
if "n_pool" not in list(self.options.keys()):
self.options["n_pool"] = 1000
# If results are given, determine optimal hyperparameters for Gaussian Process Regression
if results is not None:
self.options["lengthscale"], self.options["variance"] = \
get_parameters_gaussian_process(Xtrain=self.coords_norm,
ytrain=results[:, 0])
if (self.n_grid + 1) == n_grid_new:
tqdm.write(f"Adding GP grid point #{self.n_grid + 1}")
else:
tqdm.write(f"Adding GP grid points #{self.n_grid + 1} ... #{n_grid_new}")
for _ in tqdm(range(n_grid_add)):
# Determine new grid points where uncertainty of output is highest
new_grid = self.get_coords_gaussian_process(n_grid_add=1,
lengthscale=self.options["lengthscale"],
variance=self.options["variance"],
n_pool=self.options["n_pool"])
# append points to existing grid
self.coords = np.vstack([self.coords, new_grid.coords])
self.coords_norm = np.vstack([self.coords_norm, new_grid.coords_norm])
tqdm._instances.clear()
elif isinstance(self, L1) or isinstance(self, L1_LHS) or isinstance(self, LHS_L1) \
or isinstance(self, CO) or isinstance(self, FIM):
grid_pre = copy.deepcopy(self)
new_grid = self.__class__(parameters_random=self.parameters_random,
n_grid=n_grid_new,
grid_pre=grid_pre,
gpc=self.gpc,
options=self.options)
self.coords = new_grid.coords
self.coords_norm = new_grid.coords_norm
else:
coords = np.zeros((n_grid_add, len(self.parameters_random)))
coords_norm = np.zeros((n_grid_add, len(self.parameters_random)))
# add grid points one by one because we are looking for samples in the right domain
for i in range(n_grid_add):
resample = True
while resample:
if isinstance(self, Random):
new_grid = Random(parameters_random=self.parameters_random,
n_grid=1,
options=self.options)
# test if grid point lies in right domain
if classifier.predict(new_grid.coords_norm)[0] == domain:
coords[i, :] = new_grid.coords
coords_norm[i, :] = new_grid.coords_norm
resample = False
elif isinstance(self, LHS):
# check if gridpoint exceeds sampling reservoir
# if self.n_grid > max(10000, self.n_grid_lhs * 10)
# coords.np.append(pygpc.LSH( seed=seed+1))
coords_norm_test = self.lhs_extend(self.coords_norm_reservoir, 1)
# test if next grid point lies in right domain
if classifier.predict(coords_norm_test[len(coords_norm_test) - 1]) == domain:
self.coords_norm_reservoir = coords_norm_test
coords[i, :] = self.coords_reservoir[self.n_grid]
coords_norm[i, :] = self.coords_norm_reservoir[self.n_grid]
self.n_grid += 1
resample = False
# append points to existing grid
self.coords = np.vstack([self.coords, coords])
self.coords_norm = np.vstack([self.coords_norm, coords_norm])
elif coords is not None or coords_norm is not None:
# append points to existing grid
if coords_norm is None and coords is not None:
coords_norm = self.get_normalized_coordinates(coords=coords)
if coords is None and coords_norm is not None:
coords = self.get_denormalized_coordinates(coords_norm=coords_norm)
# Number of new grid points
n_grid_add = coords.shape[0]
# check if specified points are lying in right domain
if classifier is not None:
if not (classifier.predict(coords_norm) == domain).all():
raise AssertionError("Specified coordinates are not lying in right domain!")
self.coords = np.vstack([self.coords, coords])
self.coords_norm = np.vstack([self.coords_norm, coords_norm])
else:
raise ValueError("Specify either n_grid_new or coords or coords_norm")
# Generate and append unique IDs of new grid points
self.coords_id = self.coords_id + [uuid.uuid4() for _ in range(n_grid_add)]
self.n_grid = self.coords.shape[0]
if gradient:
self.create_gradient_grid()
[docs]
def lhs_extend(self, array, n_extend):
"""
Add sample points to already existing LHS samples
Parameters
----------
array: ndarray of float [m x n]
Existing LHS samples with m samples points per n dimensions
n_extend: int
Number of new rows of samples needed
Returns
-------
coords_norm : ndarray of float [m + n_extend x n]
Existing LHS samples with added new samples
"""
dim = np.shape(array)[1]
n_old = np.shape(array)[0]
n_new = n_old + n_extend
i = 1
n_new_loop = n_new
np.random.seed(seed=self.seed)
while i > 0:
a_new = np.zeros([n_new_loop, dim]) - 1
u = np.random.rand(n_new_loop, dim)
for d in range(dim):
k = 0
for j in range(n_new_loop - 1):
if not (float(j / n_new_loop) < float(np.sort(array[:, d])[min(j, len(array) - 1)]) < float((j + 1) / n_new_loop)) \
and (float((j + 1) / n_new_loop) <= float(np.sort(array[:, d])[min((j + 2), len(array) - 1)])):
k = k + 1
if k is np.shape(a_new)[0] + 1:
k = 1
a_new[k - 1, d] = float((j + u[j, d]) / n_new_loop)
a_new = a_new[(a_new != -1).all(axis=1), :]
if n_extend > np.min((a_new.shape[0], n_new)):
i = 1
n_new_loop = n_new_loop * 2
else:
i = 0
for d in range(dim):
np.random.shuffle(a_new[:, d])
a_extend = a_new[np.random.choice(a_new.shape[0], size=n_extend, replace=False), :]
coords_ = np.insert(array, n_old, a_extend, axis=0)
return coords_
[docs]
def get_coords_gaussian_process(self, n_grid_add, lengthscale=0.2, variance=1., n_pool=10000):
"""
Determine coordinates at highest variance determined by Gaussian Process Regression
Parameters
----------
n_grid_add : int
Number of grid points to add
lengthscale : float, optional, default: 1.
Lengthscale parameter
variance : float, optional, default: 1.
Output variance
n_pool : int, optional, default: None
Poolsize of random sampling points the best are selected from.
Returns
-------
grid_new : RandomGrid object
RandomGrid object which contains the new grid points in grid_new.coords and grid_new.coords_norm
"""
n_test = np.max((n_pool, 2 * self.n_grid))
# create test grid
grid_test = Random(parameters_random=self.parameters_random, n_grid=n_test, options={"seed": self.seed})
# kernels
K = squared_exponential_kernel(x=self.coords_norm, y=self.coords_norm,
lengthscale=lengthscale, variance=variance) # n_train x n_train
Ks = squared_exponential_kernel(x=self.coords_norm, y=grid_test.coords_norm,
lengthscale=lengthscale, variance=variance) # n_train x n_test
Kss = squared_exponential_kernel(x=grid_test.coords_norm, y=grid_test.coords_norm,
lengthscale=lengthscale, variance=variance) # n_test x n_test
try:
# cholesky decomposition
L = np.linalg.cholesky(K)
# compute v
v = np.linalg.solve(L, Ks)
except np.linalg.LinAlgError:
print(
"Warning: Cholesky decomposition of K* matrix did not converge ... using Moore-Penrose pseudo inverse.")
v = np.linalg.pinv(K) @ Ks
# alpha
# alpha = np.linalg.solve(L.T, np.linalg.solve(L, results))
# compute the mean function
# mu = Ks.T @ alpha
# compute the covariance
covariance = Kss - (v.T @ v)
# we get the standard deviation from the covariance matrix
std = np.sqrt(np.diag(covariance))
if np.isnan(std).all():
print("Warning: GP failed, adding random grid points instead.")
# take random samples if GP failed
coords = grid_test.coords[:n_grid_add, :]
coords_norm = grid_test.coords_norm[:n_grid_add, :]
else:
# weight std with joint probability
joint_pdf = np.ones(n_pool)
for i_p, p in enumerate(self.parameters_random):
_, tmp = self.parameters_random[p].pdf_norm(x=grid_test.coords_norm[:, i_p])
joint_pdf *= tmp
std_weighted = std * joint_pdf
idx_sorted = np.flip(np.argsort(std_weighted))
idx_sorted = idx_sorted[~np.isnan(std_weighted[idx_sorted])]
coords = grid_test.coords[idx_sorted[:n_grid_add], :]
coords_norm = grid_test.coords_norm[idx_sorted[:n_grid_add], :]
grid_new = Random(coords=coords, coords_norm=coords_norm, parameters_random=self.parameters_random)
return grid_new
[docs]
class Random(RandomGrid):
"""
Random grid object
Parameters
----------
parameters_random : OrderedDict of RandomParameter instances
OrderedDict containing the RandomParameter instances the grids are generated for
n_grid : int or float
Number of random samples in grid
seed : float, optional, default=None
Seeding point to replicate random grid
options : dict, optional, default=None
RandomGrid options depending on the grid type
coords : ndarray of float [n_grid_add x dim]
Grid points to add (model space)
coords_norm : ndarray of float [n_grid_add x dim]
Grid points to add (normalized space)
coords_gradient : ndarray of float [n_grid x dim x dim]
Denormalized coordinates xi
coords_gradient_norm : ndarray of float [n_grid x dim x dim]
Normalized coordinates xi
coords_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
coords_gradient_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
Examples
--------
>>> import pygpc
>>> grid = pygpc.Random(parameters_random=parameters_random, n_grid=100)
Attributes
----------
parameters_random : OrderedDict of RandomParameter instances
OrderedDict containing the RandomParameter instances the grids are generated for
n_grid : int or float
Number of random samples in grid
seed : float, optional, default=None
Seeding point to replicate random grid
options : dict, optional, default=None
RandomGrid options depending on the grid type
coords : ndarray of float [n_grid_add x dim]
Grid points to add (model space)
coords_norm : ndarray of float [n_grid_add x dim]
Grid points to add (normalized space)
coords_gradient : ndarray of float [n_grid x dim x dim]
Denormalized coordinates xi
coords_gradient_norm : ndarray of float [n_grid x dim x dim]
Normalized coordinates xi
coords_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
coords_gradient_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
"""
def __init__(self, parameters_random, n_grid=None, options=None, coords=None, coords_norm=None,
coords_gradient=None, coords_gradient_norm=None, coords_id=None, coords_gradient_id=None,
grid_pre=None):
"""
Constructor; Initializes RandomGrid instance; Generates grid or copies provided content
"""
if n_grid is not None:
n_grid = int(n_grid)
super(Random, self).__init__(parameters_random,
n_grid=n_grid,
options=options,
coords=coords,
coords_norm=coords_norm,
coords_gradient=coords_gradient,
coords_gradient_norm=coords_gradient_norm,
coords_id=coords_id,
coords_gradient_id=coords_gradient_id,
grid_pre=grid_pre)
if coords is not None or coords_norm is not None:
grid_present = True
else:
grid_present = False
if not grid_present:
# Generate random samples for each random input variable [n_grid x dim]
self.coords_norm = np.zeros([self.n_grid, self.dim])
if self.grid_pre is not None:
self.coords_norm[:self.grid_pre.n_grid, :] = self.grid_pre.coords_norm
n_grid_add = n_grid - self.grid_pre.n_grid
n_grid_start = self.grid_pre.n_grid
else:
n_grid_add = n_grid
n_grid_start = 0
# in case of seeding, the random grid is constructed element wise (same grid-points when n_grid differs)
if self.seed:
for i_grid in range(n_grid_start, n_grid):
for i_p, p in enumerate(self.parameters_random):
if self.parameters_random[p].pdf_type == "beta":
self.coords_norm[i_grid, i_p] = np.random.beta(self.parameters_random[p].pdf_shape[0],
self.parameters_random[p].pdf_shape[1],
1) * 2.0 - 1
if self.parameters_random[p].pdf_type in ["norm", "normal"]:
resample = True
while resample:
self.coords_norm[i_grid, i_p] = np.random.normal(loc=0,
scale=1,
size=1)
resample = self.coords_norm[i_grid, i_p] < self.parameters_random[p].x_perc_norm[0] or \
self.coords_norm[i_grid, i_p] > self.parameters_random[p].x_perc_norm[1]
if self.parameters_random[p].pdf_type in ["gamma"]:
resample = True
while resample:
self.coords_norm[i_grid, i_p] = np.random.gamma(
shape=self.parameters_random[p].pdf_shape[0],
scale=1.0,
size=1)
resample = self.coords_norm[i_grid, i_p] > self.parameters_random[p].x_perc_norm
else:
for i_p, p in enumerate(self.parameters_random):
if self.parameters_random[p].pdf_type == "beta":
self.coords_norm[n_grid_start:, i_p] = (np.random.beta(self.parameters_random[p].pdf_shape[0],
self.parameters_random[p].pdf_shape[1],
[n_grid_add, 1]) * 2.0 - 1)[:, 0]
if self.parameters_random[p].pdf_type in ["norm", "normal"]:
resample = True
if self.grid_pre is not None:
outlier_mask = np.hstack((np.zeros(self.grid_pre.n_grid, dtype=bool),
np.ones(n_grid_add, dtype=bool)))
else:
outlier_mask = np.ones(self.n_grid, dtype=bool)
while resample:
self.coords_norm[outlier_mask, i_p] = (np.random.normal(loc=0,
scale=1,
size=[np.sum(outlier_mask), 1]))[:, 0]
outlier_mask = np.logical_or(
self.coords_norm[:, i_p] < self.parameters_random[p].x_perc_norm[0],
self.coords_norm[:, i_p] > self.parameters_random[p].x_perc_norm[1])
resample = outlier_mask.any()
if self.parameters_random[p].pdf_type in ["gamma"]:
resample = True
outlier_mask = np.ones(self.n_grid, dtype=bool)
j = 0
while resample:
# print("Iteration: {}".format(j + 1))
self.coords_norm[outlier_mask, i_p] = (np.random.gamma(
shape=self.parameters_random[p].pdf_shape[0],
scale=1.0,
size=[np.sum(outlier_mask), 1]))[:, 0]
outlier_mask = np.array(self.coords_norm[:, i_p] > self.parameters_random[p].x_perc_norm)
resample = outlier_mask.any()
j += 1
# Denormalize grid to original parameter space
self.coords = self.get_denormalized_coordinates(self.coords_norm)
# Generate unique IDs of grid points
self.coords_id = [uuid.uuid4() for _ in range(self.n_grid)]
else:
# self.coords = coords
# self.coords_norm = coords_norm
#
# self.coords_gradient = coords_gradient
# self.coords_gradient_norm = coords_gradient_norm
#
# self.coords_id = coords_id
# self.coords_gradient_id = coords_gradient_id
if self.coords is None:
# Denormalize grid to original parameter space
self.coords = self.get_denormalized_coordinates(self.coords_norm)
if self.coords_norm is None:
# Normalize grid to original parameter space
self.coords = self.get_normalized_coordinates(self.coords)
if self.coords_gradient is None and self.coords_gradient_norm is not None:
# Denormalize grid to original parameter space
self.coords_gradient = self.get_denormalized_coordinates(self.coords_gradient_norm)
if self.coords_gradient_norm is None and self.coords_gradient is not None:
# Normalize grid to original parameter space
self.coords_gradient_norm = self.get_normalized_coordinates(self.coords_gradient)
[docs]
class GP(RandomGrid):
"""
Gaussian Process grid object
Parameters
----------
parameters_random : OrderedDict of RandomParameter instances
OrderedDict containing the RandomParameter instances the grids are generated for
n_grid : int or float
Number of random samples in grid
seed : float, optional, default=None
Seeding point to replicate random grid
options : dict, optional, default=None
RandomGrid options depending on the grid type
coords : ndarray of float [n_grid_add x dim]
Grid points to add (model space)
coords_norm : ndarray of float [n_grid_add x dim]
Grid points to add (normalized space)
coords_gradient : ndarray of float [n_grid x dim x dim]
Denormalized coordinates xi
coords_gradient_norm : ndarray of float [n_grid x dim x dim]
Normalized coordinates xi
coords_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
coords_gradient_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
Examples
--------
>>> import pygpc
>>> grid = pygpc.GP(parameters_random=parameters_random, n_grid=100)
Attributes
----------
parameters_random : OrderedDict of RandomParameter instances
OrderedDict containing the RandomParameter instances the grids are generated for
n_grid : int or float
Number of random samples in grid
seed : float, optional, default=None
Seeding point to replicate random grid
options : dict, optional, default=None
RandomGrid options depending on the grid type
coords : ndarray of float [n_grid_add x dim]
Grid points to add (model space)
coords_norm : ndarray of float [n_grid_add x dim]
Grid points to add (normalized space)
coords_gradient : ndarray of float [n_grid x dim x dim]
Denormalized coordinates xi
coords_gradient_norm : ndarray of float [n_grid x dim x dim]
Normalized coordinates xi
coords_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
coords_gradient_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
"""
def __init__(self, parameters_random, n_grid=None, options=None, coords=None, coords_norm=None,
coords_gradient=None, coords_gradient_norm=None, coords_id=None, coords_gradient_id=None,
grid_pre=None):
"""
Constructor; Initializes RandomGrid instance; Generates grid or copies provided content
"""
if n_grid is not None:
n_grid = int(n_grid)
super(GP, self).__init__(parameters_random,
n_grid=n_grid,
options=options,
coords=coords,
coords_norm=coords_norm,
coords_gradient=coords_gradient,
coords_gradient_norm=coords_gradient_norm,
coords_id=coords_id,
coords_gradient_id=coords_gradient_id,
grid_pre=grid_pre)
if coords is not None or coords_norm is not None:
grid_present = True
else:
grid_present = False
if self.options is None:
self.options["variance"] = 1.
self.options["lengthscale"] = 0.2
self.options["n_pool"] = 10000
if "variance" not in self.options.keys():
self.options["variance"] = 1.
if "lengthscale" not in self.options.keys():
self.options["lengthscale"] = 0.2
if "n_pool" not in self.options.keys():
self.options["n_pool"] = 10000
if not grid_present:
if self.grid_pre is not None:
self.coords_norm = self.grid_pre.coords_norm
n_grid_start = self.grid_pre.n_grid
else:
self.coords_norm = np.zeros((1, self.dim))
self.coords = self.get_denormalized_coordinates(self.coords_norm)
n_grid_start = 1
self.extend_random_grid(n_grid_new=n_grid, coords_norm=self.coords_norm, coords=self.coords, type="GP")
# Denormalize grid to original parameter space
self.coords = self.get_denormalized_coordinates(self.coords_norm)
# Generate unique IDs of grid points
self.coords_id = [uuid.uuid4() for _ in range(self.n_grid)]
else:
if self.coords is None:
# Denormalize grid to original parameter space
self.coords = self.get_denormalized_coordinates(self.coords_norm)
if self.coords_norm is None:
# Normalize grid to original parameter space
self.coords = self.get_normalized_coordinates(self.coords)
if self.coords_gradient is None and self.coords_gradient_norm is not None:
# Denormalize grid to original parameter space
self.coords_gradient = self.get_denormalized_coordinates(self.coords_gradient_norm)
if self.coords_gradient_norm is None and self.coords_gradient is not None:
# Normalize grid to original parameter space
self.coords_gradient_norm = self.get_normalized_coordinates(self.coords_gradient)
[docs]
class LHS(RandomGrid):
"""
LHS grid object
Parameters
----------
parameters_random : OrderedDict of RandomParameter instances
OrderedDict containing the RandomParameter instances the grids are generated for
n_grid: int
Number of random samples to generate
options: dict, optional, default=None
Grid options:
- criterion :
- 'corr' : optimizes design points in their spearman correlation coefficients
- 'maximin' or 'm' : optimizes design points in their maximum minimal distance using the Phi-P criterion
- 'ese' : uses an enhanced evolutionary algorithm to optimize the Phi-P criterion
- seed : Seeding point to replicate random grids
coords : ndarray of float [n_grid_add x dim]
Grid points to add (model space)
coords_norm : ndarray of float [n_grid_add x dim]
Grid points to add (normalized space)
coords_gradient : ndarray of float [n_grid x dim x dim]
Denormalized coordinates xi
coords_gradient_norm : ndarray of float [n_grid x dim x dim]
Normalized coordinates xi
coords_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
coords_gradient_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
Examples
--------
>>> import pygpc
>>> grid = pygpc.LHS(parameters_random=parameters_random, n_grid=100, options={"seed": None, "criterion": "ese"})
Attributes
----------
parameters_random : OrderedDict of RandomParameter instances
OrderedDict containing the RandomParameter instances the grids are generated for
n_grid : int or float
Number of random samples in grid
seed : float, optional, default=None
Seeding point to replicate random grid
options : dict, optional, default=None
- 'corr' : optimizes design points in their spearman correlation coefficients
- 'maximin' or 'm' : optimizes design points in their maximum minimal distance using the Phi-P criterion
- 'ese' : uses an enhanced evolutionary algorithm to optimize the Phi-P criterion
coords : ndarray of float [n_grid_add x dim]
Grid points to add (model space)
coords_norm : ndarray of float [n_grid_add x dim]
Grid points to add (normalized space)
coords_gradient : ndarray of float [n_grid x dim x dim]
Denormalized coordinates xi
coords_gradient_norm : ndarray of float [n_grid x dim x dim]
Normalized coordinates xi
coords_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
coords_gradient_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
"""
def __init__(self, parameters_random, n_grid=None, options=None, coords=None, coords_norm=None,
coords_gradient=None, coords_gradient_norm=None, coords_id=None, coords_gradient_id=None,
grid_pre=None):
"""
Constructor; Initializes RandomGrid instance; Generates grid or copies provided content
"""
self.coords_norm_lhs = None
self.perc_mask = None
self.coords_reservoir = None
self.coords_norm_reservoir = None
self.grid_pre = grid_pre
self.options = options
self.criterion = None
self.method = None
self.coords_norm_reservoir_perced = None
if type(self.options) is dict:
if "criterion" in self.options.keys():
self.criterion = options["criterion"]
else:
self.criterion = ["ese"]
if "method" in self.options.keys():
self.method = options["method"]
else:
self.criterion = ["standard"]
if type(self.criterion) is not list:
self.criterion = [self.criterion]
super(LHS, self).__init__(parameters_random,
n_grid=n_grid,
options=options,
coords=coords,
coords_norm=coords_norm,
coords_gradient=coords_gradient,
coords_gradient_norm=coords_gradient_norm,
coords_id=coords_id,
coords_gradient_id=coords_gradient_id)
self.shift_outer = False
# if self.criterion == ["ese"]:
# for p in parameters_random:
# if parameters_random[p].p_perc is None:
# self.shift_outer = True
if coords is not None and coords_norm is not None:
grid_present = True
else:
grid_present = False
if not grid_present:
if self.n_grid > 0:
self.sample_init(self.n_grid)
[docs]
def sample_init(self, n_grid):
"""
Initialises all parameters for Latin Hypercube Sampling and creates a new design
if there is at least one sampling point needed
Parameters
----------
n_grid : ndarray of float [n]
The number of needed sampling points
Returns
-------
coords : ndarray of float [n_grid_add x dim]
Grid points to add (model space)
coords_norm : ndarray of float [n_grid_add x dim]
Grid points to add (normalized space)
coords_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
"""
if n_grid > 0:
if n_grid == 1:
random_grid = Random(parameters_random=self.parameters_random,
n_grid=self.n_grid,
options=self.options)
self.coords_norm = random_grid.coords_norm
else:
if self.grid_pre is None:
n_grid_lhs = self.n_grid
else:
n_grid_lhs = self.n_grid + self.grid_pre.n_grid
self.perc_mask = np.zeros((n_grid_lhs, self.dim)).astype(bool)
self.coords_norm_reservoir = np.zeros([n_grid_lhs, self.dim])
# generate LHS coordinates
self.get_lhs_grid()
# Denormalize grid to original parameter space
self.coords = self.get_denormalized_coordinates(self.coords_norm)
# Generate unique IDs of grid points
self.coords_id = [uuid.uuid4() for _ in range(self.n_grid)]
[docs]
def CL2(self, array):
"""
Calculate the L2 discrepancy of the design
The discrepancy is a measure of the difference between the empirical cumulative distribution function
of an experimental design and the uniform cumulative distribution function [1].
Parameters
----------
array : ndarray of float [m x n]
Array with n rows of samples and m columns of variables/dimensions
Returns
-------
cl2d_crit : float
criterion for centered L2 discrepancy
Notes
-----
.. [1] Hickernell, F. (1998). A generalized discrepancy and quadrature error bound.
Mathematics of computation, 67(221), 299-322.
"""
subtract = np.ones([np.shape(array)[0], np.shape(array)[1]])
array = array - 0.5 * subtract
prod_array_1 = np.zeros(np.shape(array)[0])
for n in range(0, np.shape(array)[0]):
prod_array_1[n] = (np.ones(np.shape(array)[1]) + (1 / 2 * (np.abs(array[n, :]))) - (
1 / 2 * ((array[n, :]) ** 2))).prod()
prod_array_2 = np.zeros([np.shape(array)[0], np.shape(array)[0]])
for i in range(0, np.shape(array)[0]):
for j in range(0, np.shape(array)[0]):
prod_array_2[i, j] = (np.ones(np.shape(array)[1]) + (1 / 2 * (np.abs(array[i, :]))) - (
1 / 2 * np.abs(array[j, :]) - 1 / 2 * np.abs(array[i, :] - array[j, :]))).prod()
cl2d_crit = ((13 / 12) ** 2) - (2 / (np.shape(array)[0]) * prod_array_1.sum()) + (1 / (
np.shape(array)[0] ** 2) * prod_array_2.sum())
# centered L2 discrepancy criteria
return cl2d_crit
[docs]
def log_R(self, array):
"""
Determines the Log(R) Entropy Criterion[1]
Parameters
----------
array : ndarray of float [m x n]
Array
Returns
-------
log_R : float
Log(R) Entropy Criterion
Notes
-----
.. [1] Koehler, J.R., Owen, A.B., 1996. Computer experiments. in: Ghosh, S., Rao, C.R. (Eds.),
Handbook of Statistics. Elsevier Science, New York, pp.261-308
"""
# R will be [m x m]
R = np.corrcoef(array.T)
for i in range(0, np.shape(array)[1]):
for j in range(0, np.shape(array)[1]):
R[i, j] = np.exp((R[i, :] * np.abs(array[i, :] - array[j, :]) ** 2).sum())
log_R = np.log(np.linalg.norm(R))
return (log_R)
[docs]
def PhiP(self, x, p=10):
"""
Calculates the Phi-p criterion of the design x with power p [1].
Parameters
----------
x : ndarray of float [n x m]
The design to calculate Phi-p for
p : int, optional, default: 10
The power used for the calculation of PhiP
Returns
-------
phip : float
Phi-p criterion
Notes
-----
.. [1] Morris, M. D., & Mitchell, T. J. (1995). Exploratory designs for computational experiments.
Journal of statistical planning and inference, 43(3), 381-402.
"""
m, n = x.shape
dist = np.zeros((m * (m - 1)) // 2, dtype=np.double)
k = 0
if self.method == "standard" or None:
for i in range(0, m - 1):
for j in range(i + 1, m):
dist[k] = np.linalg.norm(x[i] - x[j])
k = k + 1
phip = ((dist ** (-p)).sum()) ** (1.0 / p)
elif self.method == "periodic":
for i in range(0, m - 1):
for j in range(i + 1, m):
periodic_dist = np.sqrt(np.sum(np.square(np.min(np.abs(x[i]-x[j])), 1 - np.abs(x[i]-x[j]))))
dist[k] = periodic_dist
k = k + 1
phip = ((dist ** (-p)).sum()) ** (1.0 / p)
return phip
[docs]
def PhiP_exchange(self, P, k, Phi, p, fixed_index):
"""
Performes a row exchange and return the altered design.
Parameters
----------
P : ndarray of float [m x n]
The design to perform the exchange on
k : int
modulus of the iteration divided by the dimension to pick a row of the design repeating through the
dimensions of the design
Phi: float
the PhiP criterion of the current best Design
p: int
The power used for the calculation of PhiP
fixed_index: list
an empty list to check if variables are assigned a value
Returns
-------
phip : float
Phi-p criterion
"""
# Choose two (different) random rows to perform the exchange
er = P.shape
i1 = np.random.randint(P.shape[0])
while i1 in fixed_index:
i1 = np.random.randint(P.shape[0])
i2 = np.random.randint(P.shape[0])
while i2 == i1 or i2 in fixed_index:
i2 = np.random.randint(P.shape[0])
P_= np.delete(P, [i1, i2], axis=0)
dist1 = scipy.spatial.distance.cdist([P[i1, :]], P_)
dist2 = scipy.spatial.distance.cdist([P[i2, :]], P_)
d1 = np.sqrt(dist1 ** 2 + (P[i2, k] - P_[:, k]) ** 2 - (P[i1, k] - P_[:, k]) ** 2)
d2 = np.sqrt(dist2 ** 2 - (P[i2, k] - P_[:, k]) ** 2 + (P[i1, k] - P_[:, k]) ** 2)
res = (Phi ** p + (d1 ** (-p) - dist1 ** (-p) + d2 ** (-p) - dist2 ** (-p)).sum()) ** (1.0 / p)
P[i1, k], P[i2, k] = P[i2, k], P[i1, k]
return res
[docs]
def get_lhs_grid(self):
"""
Create samples in an m*n matrix using Latin Hypercube Sampling [1].
Notes
-----
.. [1] McKay, M. D., Beckman, R. J., & Conover, W. J. (2000). A comparison of three methods for selecting
values of input variables in the analysis of output from a computer code. Technometrics, 42(1), 55-61.
"""
# create sample points in icdf space using specified criteria
if self.criterion[0] == 'corr':
self.coords_norm_lhs = self.lhs_corr()
elif self.criterion[0] == 'maximin' or self.criterion == 'm':
self.coords_norm_lhs = self.lhs_maximin()
elif self.criterion[0] == 'ese':
self.coords_norm_lhs = self.lhs_ese()
else:
self.coords_norm_lhs = self.lhs_initial()
# transform sample points from icdf to pdf space
for i_p, p in enumerate(self.parameters_random):
self.coords_norm_reservoir[:, i_p] = self.parameters_random[p].icdf(self.coords_norm_lhs[:, i_p])
if self.grid_pre is not None:
self.coords_norm_reservoir = get_different_rows_from_matrices(self.grid_pre.coords_norm,
self.coords_norm_reservoir)
self.coords_norm = self.coords_norm_reservoir[0:self.n_grid, :]
[docs]
def lhs_initial(self):
"""
Construct an initial LHS grid.
Returns
-------
design : ndarray of float [n, n_dim]
LHS grid points
"""
if self.grid_pre is not None:
# transform normalized coordinates back to LHS-Space (0, 1)
pre_coords_lhs = np.zeros(self.grid_pre.coords_norm.shape)
for i, p in enumerate(self.parameters_random):
pre_coords_lhs[:, i] = self.parameters_random[p].cdf_norm(self.grid_pre.coords_norm[:, i])
return self.lhs_extend(pre_coords_lhs, self.n_grid)
else:
design = np.zeros([self.n_grid, self.dim])
# u = matrix of uniform (0,1) that vary in n subareas
u = np.random.rand(self.n_grid, self.dim)
for i in range(0, self.dim):
for j in range(0, self.n_grid):
if self.shift_outer:
if j == 0:
design[j, i] = j + 1
u[j, i] = 1 - 1/4 * u[j, i]
elif (j + 1) == self.n_grid:
design[j, i] = j + 1
u[j, i] = 1/4 * u[j, i]
elif j <= self.n_grid/2:
design[j, i] = j + 1 - ((j*3/4) / ((self.n_grid - 2) * self.n_grid))
elif j > self.n_grid/2:
design[j, i] = j + 1 + ((j*3/4) / ((self.n_grid - 2) * self.n_grid))
else:
design[j, i] = j + 1
for i in range(0, self.dim):
for j in range(0, self.n_grid):
design[j, i] = (design[j, i] - u[j, i]) / self.n_grid
np.random.shuffle(design[:, i])
return design
[docs]
def lhs_corr(self):
"""
Create a correlation optimized LHS grid
Parameters
----------
dim : int
Number of random variables
n : int
Number of sampling points
iterations : int
Number of iterations to optimize
Returns
-------
design : ndarray of float [n, n_dim]
LHS grid points
"""
mincorr = np.inf
# Minimize the components correlation coefficients
for i in range(100):
# Generate a random LHS
test = self.lhs_initial()
R = scipy.stats.spearmanr(test)[0]
if np.max(np.abs(R)) < mincorr:
mincorr = np.max(np.abs(R))
design = test.copy()
return design
[docs]
def lhs_maximin(self):
"""
Create an optimized LHS grid with maximal minimal distance
Parameters
----------
dim : int
Number of random variables
n : int
Number of sampling points
iterations : int
Number of iterations to optimize
Returns
-------
design : ndarray of float [n, n_dim]
LHS grid points
"""
phi_best = max(1000, self.n_grid * 100)
# Maximize the minimum distance between points
for i in range(100):
test = self.lhs_initial()
phi = self.PhiP(test)
if phi_best > phi:
phi_best = phi
design = test.copy()
return design
[docs]
def lhs_ese(self):
"""
Create optimized LHS grid using a enhanced stochastic evolutionary algorithm for the PhiP Maximin criterion [1]
Returns
-------
design : ndarray of float [n, n_dim]
With ESE Algorithm for minimal Phi-P optimized grid points
Notes
-----
.. [1] Jin, R., Chen, W., & Sudjianto, A. (2005). An efficient algorithm for constructing optimal
design of computer experiments. Journal of statistical planning and inference, 134(1), 268-287.
"""
# Parameters
t0 = None
P0 = self.lhs_initial()
J = 25
tol = 1e-3
p = 10
outer_loop = min(int(1.5 * self.dim), 30)
inner_loop = min(20 * self.dim, 100)
if self.coords_norm_reservoir_perced is not None:
fixed_index = [*range(self.coords_norm_reservoir_perced.shape[0])]
elif self.grid_pre is not None:
fixed_index = [*range(self.grid_pre.coords_norm.shape[0])]
else:
fixed_index = []
if t0 is None:
t0 = 0.005 * self.PhiP(P0, p=p)
T = t0
P_ = P0[:] # copy of initial design
P_best = P_[:]
Phi = self.PhiP(P_best, p=p)
Phi_best = Phi
# Outer loop
for z in range(outer_loop):
Phi_oldbest = Phi_best
n_acpt = 0
n_imp = 0
# Inner loop
for i in range(inner_loop):
modulo = (i + 1) % self.dim
l_P = list()
l_Phi = list()
# Build J different designs with a single exchanged row
# See PhiP_exchange
for j in range(J):
l_P.append(P_.copy())
l_Phi.append(self.PhiP_exchange(l_P[j], k=modulo, Phi=Phi, p=p, fixed_index=fixed_index))
l_Phi = np.asarray(l_Phi)
k = np.argmin(l_Phi)
Phi_try = l_Phi[k]
# Threshold of acceptance
if Phi_try - Phi <= T * np.random.rand(1)[0]:
Phi = Phi_try
n_acpt = n_acpt + 1
P_ = l_P[k]
# Best design retained
if Phi < Phi_best:
P_best = P_
Phi_best = Phi
n_imp = n_imp + 1
p_accpt = float(n_acpt) / inner_loop # probability of acceptance
p_imp = float(n_imp) / inner_loop # probability of improvement
if Phi_best - Phi_oldbest < tol:
# flag_imp = 1
if p_accpt >= 0.1 and p_imp < p_accpt:
T = 0.8 * T
elif p_accpt >= 0.1 and p_imp == p_accpt:
pass
else:
T = T / 0.8
else:
# flag_imp = 0
if p_accpt <= 0.1:
T = T / 0.7
else:
T = 0.9 * T
return P_best
[docs]
class CO(RandomGrid):
"""
Coherence Optimal grid object
Parameters
----------
parameters_random : OrderedDict of RandomParameter instances
OrderedDict containing the RandomParameter instances the grids are generated for
n_grid: int
Number of random samples to generate
options: dict, optional, default=None
Grid options:
- 'seed' : Seeding point
coords : ndarray of float [n_grid_add x dim]
Grid points to add (model space)
coords_norm : ndarray of float [n_grid_add x dim]
Grid points to add (normalized space)
coords_gradient : ndarray of float [n_grid x dim x dim]
Denormalized coordinates xi
coords_gradient_norm : ndarray of float [n_grid x dim x dim]
Normalized coordinates xi
coords_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
coords_gradient_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
Examples
--------
>>> import pygpc
>>> grid = pygpc.CO(parameters_random=parameters_random, n_grid=100, options={"seed": None})
Attributes
----------
parameters_random : OrderedDict of RandomParameter instances
OrderedDict containing the RandomParameter instances the grids are generated for
n_grid : int or float
Number of random samples in grid
seed : float, optional, default=None
Seeding point to replicate random grid
options: dict, optional, default=None
Grid options:
coords : ndarray of float [n_grid_add x dim]
Grid points to add (model space)
coords_norm : ndarray of float [n_grid_add x dim]
Grid points to add (normalized space)
coords_gradient : ndarray of float [n_grid x dim x dim]
Denormalized coordinates xi
coords_gradient_norm : ndarray of float [n_grid x dim x dim]
Normalized coordinates xi
coords_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
coords_gradient_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
"""
def __init__(self, parameters_random, gpc, n_grid=None, options=None, coords=None, coords_norm=None,
coords_gradient=None, coords_gradient_norm=None, coords_id=None, coords_gradient_id=None,
grid_pre=None):
"""
Constructor; Initializes CO instance; Generates grid or copies provided content
"""
if options is None:
options = dict()
if "seed" not in options.keys():
options["seed"] = None
if "n_warmup" not in options.keys():
options["n_warmup"] = max(200, n_grid*2)
else:
options["n_warmup"] = max(options["n_warmup"], n_grid * 2)
if "n_pool" not in options.keys():
if 1000 > 2 * n_grid:
options["n_pool"] = 1000
else:
options["n_pool"] = 2*n_grid
self.gpc = gpc
self.grid_pre = grid_pre
self.coords_pool = []
self.gpc_matrix_pool = []
self.b2_pool = []
self.g_pool = []
self.f_pool = []
self.n_pool = options["n_pool"]
self.all_norm = []
super(CO, self).__init__(parameters_random,
n_grid=n_grid,
options=options,
coords=coords,
coords_norm=coords_norm,
coords_gradient=coords_gradient,
coords_gradient_norm=coords_gradient_norm,
coords_id=coords_id,
coords_gradient_id=coords_gradient_id,
grid_pre=grid_pre)
if n_grid == 0:
pass
else:
self.ball_volume = self.calc_ball_volume(dim=self.dim, radius=np.sqrt(2) * np.sqrt(2*self.gpc.order_max+1))
pdf_type = [self.parameters_random[rv].pdf_type for rv in self.parameters_random]
self.all_norm = np.array([p == "norm" for p in pdf_type]).all()
any_norm = np.array([True for p in pdf_type if p == "norm"]).any() and not self.all_norm
if any_norm:
raise AssertionError("Mixed distributions of beta and normal for CO grids not implemented..."
"All variables have to be either normal or beta distributed!")
# create proposal distributed random variables
self.parameters_random_proposal = dict()
for rv in self.parameters_random:
# check the order > dimension criteria
if gpc.order_max > self.dim:
# uniform distributed random variables -> Chebyshev distribution
if self.parameters_random[rv].pdf_type == "beta" and \
(self.parameters_random[rv].pdf_shape == [1, 1]).all():
self.parameters_random_proposal[rv] = Beta(pdf_shape=[0.5, 0.5],
pdf_limits=[-1, 1])
# normal distributed random variables -> sample uniformly from the d-dimensional ball of radius r
# here a standard normal distribution is created, the uniform sampling from the ball is considered in
# the method "create_pool"
elif self.parameters_random[rv].pdf_type == "norm":
self.parameters_random_proposal[rv] = Norm(pdf_shape=[0, 1],
p_perc=self.parameters_random[rv].p_perc)
else:
NotImplementedError("Coherence optimal sampling is only possible for uniform and normal "
"distributed random variables")
else:
self.parameters_random_proposal[rv] = self.parameters_random[rv]
#define number of warmup-samples
self.n_warmup = options["n_warmup"]
# # draw sample pool for warmup
# self.create_pool(n_samples=2*options["n_warmup"])
#
# # warmup
# self.warmup(n_warmup=options["n_warmup"])
# draw sample pool for actual sampling
self.create_pool(n_samples=self.n_pool + self.n_warmup)
# get coherence optimal samples
self.coords_norm = self.get_coherence_optimal_samples(n_grid=self.n_grid, n_warmup=self.n_warmup)
# Denormalize grid to original parameter space
self.coords = self.get_denormalized_coordinates(self.coords_norm)
[docs]
def create_pool(self, n_samples):
"""
Creates a pool of samples together with the corresponding gPC matrix.
Parameters
----------
n_samples : int
Number of samples
"""
self.n_pool = n_samples
self.coords_pool = np.zeros((n_samples, self.dim))
for i_rv, rv in enumerate(self.parameters_random_proposal):
self.coords_pool[:, i_rv] = self.parameters_random_proposal[rv].sample(n_samples=int(self.n_pool))
if self.all_norm:
# sample from d-dimensional ball of radius r (Hampton et al. 2015, pp. 369)
r = np.sqrt(2)*np.sqrt(2*self.gpc.order_max+1)
self.coords_pool = self.coords_pool / (np.linalg.norm(self.coords_pool, axis=1))[:, np.newaxis] * \
r * np.random.rand(1) ** (1/self.dim)
self.gpc_matrix_pool = self.gpc.create_gpc_matrix(b=self.gpc.basis.b, x=self.coords_pool)
self.b2_pool = np.linalg.norm(self.gpc_matrix_pool, axis=1)**2
self.g_pool = self.joint_pdf(x=self.coords_pool, parameters_random=self.parameters_random_proposal)
self.f_pool = self.joint_pdf(x=self.coords_pool, parameters_random=self.parameters_random)
[docs]
@staticmethod
def calc_ball_volume(dim, radius):
"""
Volume of n-dimensional ball.
Parameters
----------
dim : int
Number of random variables
radius : float
Radius
Returns
-------
vol : float
Volume of n-dimensional ball
"""
vol = radius**dim * np.pi**(dim/2) / gamma(dim/2 + 1)
return vol
[docs]
def joint_pdf(self, x, parameters_random):
"""
Joint probability density function of random variables
Parameters
----------
x : ndarray of float [n_samples x n_dim]
Samples of random variables
parameters_random : dict of RandomVariable instances
Random variables of proposal distribution
Returns
-------
f : float
Joint probability density
"""
f = np.ones(x.shape[0])
if np.array([True for p in parameters_random if parameters_random[p].pdf_type == "norm"]).all():
f *= 1/self.ball_volume
else:
for i_rv, rv in enumerate(parameters_random):
f *= parameters_random[rv].pdf_norm(x=x[:, i_rv])[1]
return f
[docs]
def acceptance_rate(self, idx1, idx2):
"""
Calculate acceptance rate of Metropolis Hastings algorithm
Parameters
----------
idx1 : int
Index of sampling points of previous sample
idx2 : int
Index of sampling points of current sample
Returns
-------
rho : float
Acceptance rate
"""
rho = np.min((1.,
(self.g_pool[idx1]*self.f_pool[idx2]*self.b2_pool[idx2]) /
(self.g_pool[idx2]*self.f_pool[idx1]*self.b2_pool[idx1])))
return rho
[docs]
def warmup(self, n_warmup):
"""
Warmup phase
Parameters
----------
n_warmup : int
Number of warmup samples
Returns
-------
coords_norm_opt : ndarray of float [n_warmup x n_dim]
Optimal coordinates determined during warmup phase
"""
coords_norm_opt = self.get_coherence_optimal_samples(n_grid=n_warmup)
return coords_norm_opt
[docs]
def get_coherence_optimal_samples(self, n_grid, n_warmup):
"""
Determine coherence optimal samples with Monte Carlo Markov Chain - Metropolis Hastings algorithm
Parameters
----------
n_grid : int
Number of grid points
n_warmup: int
Number of warmup samples
Returns
-------
coords_norm : ndarray of float [n_grid x n_dim]
Coherence optimal samples
"""
coords_norm_opt = np.zeros((n_grid, self.dim))
coords_norm_opt[0, :] = self.coords_pool[0, :][np.newaxis, :]
if self.grid_pre is not None:
coords_norm_opt[:self.grid_pre.n_grid, :] = self.grid_pre.coords_norm
i_grid_start = self.grid_pre.n_grid
else:
i_grid_start = 0
# init grid_index at -n_warmup, then the number of warmup-samples are automatically drawn,
# before index 0 is reached
i_grid = -(n_warmup - i_grid_start)
idx1 = 0
idx2 = 1
if self.all_norm:
x_perc_norm = np.zeros(len(self.parameters_random))
for i_rv, rv in enumerate(self.parameters_random):
x_perc_norm[i_rv] = self.parameters_random[rv].x_perc_norm[1]
while i_grid < n_grid:
# create a new pool if it is empty
if idx2 >= self.n_pool:
self.create_pool(2*n_grid)
idx1 = 0
idx2 = 1
# determine acceptance rate
rho = self.acceptance_rate(idx1=idx1, idx2=idx2)
# add point if acceptance rate is high
if rho > np.random.rand(1):
if self.all_norm:
if (np.abs(self.coords_pool[idx2, :]) < x_perc_norm).all():
if i_grid > (i_grid_start-1):
coords_norm_opt[i_grid, :] = self.coords_pool[idx2, :]
i_grid += 1
idx1 = idx2
else:
if i_grid > (i_grid_start-1):
coords_norm_opt[i_grid, :] = self.coords_pool[idx2, :]
i_grid += 1
idx1 = idx2
idx2 += 1
return coords_norm_opt
[docs]
class L1(RandomGrid):
"""
L1 optimized grid object
Parameters
----------
parameters_random : OrderedDict of RandomParameter instances
OrderedDict containing the RandomParameter instances the grids are generated for
n_grid: int
Number of random samples to generate
options: dict, optional, default=None
Grid options:
- method: "greedy", "iteration"
- criterion: ["mc"], ["tmc", "cc"], ["D"], ["D-coh"]
- weights: [1], [0.5, 0.5], [1]
- n_pool: size of samples in pool to choose greedy results from
- n_iter: number of iterations
- seed: random seed
coords : ndarray of float [n_grid_add x dim]
Grid points to add (model space)
coords_norm : ndarray of float [n_grid_add x dim]
Grid points to add (normalized space)
coords_gradient : ndarray of float [n_grid x dim x dim]
Denormalized coordinates xi
coords_gradient_norm : ndarray of float [n_grid x dim x dim]
Normalized coordinates xi
coords_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
coords_gradient_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
gpc : GPC object instance
GPC object
grid_pre : Grid object instance, optional, default: None
Existent grid, which will be extended.
Examples
--------
>>> import pygpc
>>> grid = pygpc.L1(parameters_random=parameters_random,
>>> n_grid=100,
>>> options={"method": "greedy",
>>> "criterion": ["mc"],
>>> "weights": [1],
>>> "n_pool": 1000,
>>> "seed": None})
Attributes
----------
parameters_random : OrderedDict of RandomParameter instances
OrderedDict containing the RandomParameter instances the grids are generated for
n_grid : int or float
Number of random samples in grid
seed : float, optional, default=None
Seeding point to replicate random grid
options: dict, optional, default=None
Grid options:
- method: "greedy", "iteration"
- criterion: ["mc"], ["tmc", "cc"], ["D"], ["D-coh"]
- weights: [1], [0.5, 0.5], [1]
- n_pool: size of samples in pool to choose greedy results from
- n_iter: number of iterations
- seed: random seed
coords : ndarray of float [n_grid_add x dim]
Grid points to add (model space)
coords_norm : ndarray of float [n_grid_add x dim]
Grid points to add (normalized space)
coords_gradient : ndarray of float [n_grid x dim x dim]
Denormalized coordinates xi
coords_gradient_norm : ndarray of float [n_grid x dim x dim]
Normalized coordinates xi
coords_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
coords_gradient_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
gpc : GPC Object instance
GPC object
grid_pre : Grid object instance, optional, default: None
Existent grid, which will be extended.
"""
def __init__(self, parameters_random, n_grid=None, options=None, coords=None, coords_norm=None,
coords_gradient=None, coords_gradient_norm=None, coords_id=None, coords_gradient_id=None, gpc=None,
grid_pre=None):
"""
Constructor; Initializes Grid instance; Generates grid or copies provided content
"""
if options is None:
options = dict()
if type(options) is dict:
if "method" not in options.keys():
options["method"] = "greedy"
if "n_pool" not in options.keys():
options["n_pool"] = 10000
if "n_iter" not in options.keys():
options["n_iter"] = 1000
if "seed" not in options.keys():
options["seed"] = None
if "criterion" not in options.keys():
options["criterion"] = ["mc"]
if "weights" not in options.keys() or options["weights"] is None:
options["weights"] = (np.ones(len(options["criterion"])) / len(options["criterion"])).tolist()
self.n_pool = options["n_pool"]
self.n_iter = options["n_iter"]
self.gpc = gpc
self.seed = options["seed"]
self.method = options["method"]
self.criterion = options["criterion"]
self.coords_norm_perced = None
self.perc_mask = None
if type(self.criterion) is not list:
self.criterion = [self.criterion]
super(L1, self).__init__(parameters_random,
n_grid=n_grid,
options=options,
coords=coords,
coords_norm=coords_norm,
coords_gradient=coords_gradient,
coords_gradient_norm=coords_gradient_norm,
coords_id=coords_id,
coords_gradient_id=coords_gradient_id,
grid_pre=grid_pre)
self.weights = options["weights"]
if self.method == "greedy" and self.n_grid > 0 and self.coords is None:
self.coords_norm = self.get_optimal_mu_greedy()
elif (self.method == 'iteration' or self.method == 'iter') and self.n_grid > 0 and self.coords is None:
self.coords_norm = self.get_optimal_mu_iteration()
if self.n_grid > 0:
# Denormalize grid to original parameter space
self.coords = self.get_denormalized_coordinates(self.coords_norm)
# Generate unique IDs of grid points
self.coords_id = [uuid.uuid4() for _ in range(self.n_grid)]
[docs]
def get_optimal_mu_greedy(self):
"""
This function computes a set of grid points with minimal mutual coherence using a greedy approach.
Returns
-------
coords_norm : ndarray of float [n_grid x dim]
Normalized sample coordinates in range [-1, 1]
"""
n_cpu = np.min((1, multiprocessing.cpu_count()))
# create pool (Standard random grid for D-optimal grids and CO else)
if "D" in self.criterion:
random_pool = Random(parameters_random=self.parameters_random,
n_grid=self.n_pool,
options=self.options)
else:
random_pool = CO(parameters_random=self.parameters_random,
n_grid=self.n_pool,
gpc=self.gpc,
options=self.options)
# full gpc matrix is needed for w_matrix, maybe build a function that creates weighted pools
# based of the coordinates
# self.gpc.get_weight_matrix()
# w_matrix = self.gpc.w
index_list = []
# project grid in case of projection approach
if self.gpc.p_matrix is not None:
# weight argument in create gpc matrix
random_pool_trans = project_grid(grid=random_pool, p_matrix=self.gpc.p_matrix, mode="reduce")
psy_pool = self.gpc.create_gpc_matrix(b=self.gpc.basis.b, x=random_pool_trans.coords_norm, gradient=False,
weighted=True)
else:
psy_pool = self.gpc.create_gpc_matrix(b=self.gpc.basis.b, x=random_pool.coords_norm, gradient=False,
weighted=True)
m = int(self.n_grid)
m_p = int(np.shape(psy_pool)[0])
# set up multiprocessing
pool = multiprocessing.Pool(n_cpu)
# set starting point for iteration
if self.grid_pre is None or self.grid_pre.n_grid == 0:
# get random row of psy to start
idx = np.random.randint(m_p)
index_list.append(idx)
index_list_remaining = [k for k in range(self.n_pool) if k not in index_list]
psy_opt = np.zeros((1, psy_pool.shape[1]))
psy_opt[0, :] = psy_pool[idx, :]
i_start = 1
else:
# project grid in case of projection approach
if self.gpc.p_matrix is not None:
grid_pre_trans = project_grid(grid=self.grid_pre, p_matrix=self.gpc.p_matrix, mode="reduce")
psy_opt = self.gpc.create_gpc_matrix(b=self.gpc.basis.b, x=grid_pre_trans.coords_norm, gradient=False,
weighted=True)
else:
psy_opt = self.gpc.create_gpc_matrix(b=self.gpc.basis.b, x=self.grid_pre.coords_norm, gradient=False,
weighted=True)
index_list = []
index_list_remaining = [k for k in range(self.n_pool) if k not in index_list]
i_start = self.grid_pre.n_grid
# loop over grid points
for i in range(i_start, m):
crit = np.ones((self.n_pool, len(self.criterion))) * 1e6
workhorse_partial = partial(workhorse_greedy, psy_opt=psy_opt, psy_pool=psy_pool, criterion=self.criterion)
idx_list_chunks = compute_chunks(index_list_remaining, n_cpu)
crit_tmp = pool.map(workhorse_partial, idx_list_chunks)
if "D" not in self.criterion and "D-coh" not in self.criterion:
crit_tmp = np.concatenate(crit_tmp)
else:
sign = []
neg_logdet = []
for res in crit_tmp:
sign.append(res[0])
neg_logdet.append(res[1])
sign = np.concatenate(sign)
neg_logdet = np.concatenate(neg_logdet)
neg_logdet_norm = neg_logdet / np.nan_to_num(np.max(np.abs(neg_logdet)))
crit_tmp = sign * np.nan_to_num(np.exp(neg_logdet_norm))
crit[index_list_remaining, :] = crit_tmp
# set 1e6 dummy values to max values
if "D" not in self.criterion and "D-coh" not in self.criterion:
for k in range(crit.shape[1]):
crit[crit[:, k] == 1e6, k] = np.max(crit[crit[:, k] != 1e6, k])
# normalize optimality criteria to [0, 1]
crit = np.nan_to_num(crit)
crit = (crit - np.nanmin(crit, axis=0)) / np.nan_to_num((np.nanmax(crit, axis=0) - np.nanmin(crit, axis=0)))
# apply weights
crit = np.sum(crit**2 * np.array(self.weights), axis=1)
# find best index
try:
index_list.append(np.nanargmin(crit))
# in very rare cases there the optimal grid point can not be determined (all nan), in this case the first
# grid point of the remaining indices is chosen
except ValueError:
index_list.append(index_list_remaining[0])
# add row with best minimal coherence and cross correlation properties to the matrix
psy_opt = np.vstack((psy_opt, psy_pool[index_list[-1], :]))
# create list of remaining indices
index_list_remaining = [k for k in range(self.n_pool) if k not in index_list]
coords_norm = random_pool.coords_norm[index_list, :]
pool.close()
pool.join()
if self.grid_pre is not None:
coords_norm = np.vstack((self.grid_pre.coords_norm, coords_norm))
return coords_norm
[docs]
def get_optimal_mu_iteration(self):
"""
This function computes a set of grid points with minimal mutual coherence using an iterative approach.
Returns
-------
coords_norm : ndarray of float [n_grid x dim]
Normalized sample coordinates in range [-1, 1]
"""
n_cpu = np.min((1, multiprocessing.cpu_count()))
coords_norm_list = []
crit = np.ones((self.n_iter, len(self.criterion))) * 1e6
# set up multiprocessing
pool = multiprocessing.Pool(n_cpu)
workhorse_partial = partial(workhorse_iteration,
gpc=self.gpc,
n_grid=self.n_grid,
criterion=self.criterion,
grid_pre=self.grid_pre,
options={"seed": self.seed})
idx_list_chunks = compute_chunks([k for k in range(self.n_iter)], n_cpu)
res = pool.map(workhorse_partial, idx_list_chunks)
for j in range(len(res)):
if j == 0:
if "D" not in self.criterion and "D-coh" not in self.criterion:
crit = res[j][0]
coords_norm_list = res[j][1]
else:
sign = res[j][0]
neg_logdet = res[j][1]
neg_logdet_norm = neg_logdet / np.max(np.abs(neg_logdet))
crit = sign * np.exp(neg_logdet_norm)
coords_norm_list = res[j][2]
else:
if "D" not in self.criterion and "D-coh" not in self.criterion:
crit = np.vstack((crit, res[j][0]))
coords_norm_list = coords_norm_list + res[j][1]
else:
sign = res[j][0]
neg_logdet = res[j][1]
neg_logdet_norm = neg_logdet / np.max(np.abs(neg_logdet))
crit = np.vstack((crit, sign * np.exp(neg_logdet_norm)))
coords_norm_list = coords_norm_list + res[j][2]
# normalize optimality criteria to [0, 1]
crit = (crit - np.min(crit, axis=0)) / np.nan_to_num((np.max(crit, axis=0) - np.min(crit, axis=0)))
# apply weights
crit = np.sum(crit**2 * np.array(self.weights), axis=1)
coords_norm = coords_norm_list[np.argmin(crit)]
pool.close()
return coords_norm
[docs]
class FIM(RandomGrid):
"""
FIM D-optimal grid object
Parameters
----------
parameters_random : OrderedDict of RandomParameter instances
OrderedDict containing the RandomParameter instances the grids are generated for
n_grid: int
Number of random samples to generate
seed: float
Seeding point to replicate random grids
options: dict, optional, default=None
Grid options:
- 'n_pool' : number of random samples to determine next optimal grid point
- 'seed' : random seed
coords : ndarray of float [n_grid_add x dim]
Grid points to add (model space)
coords_norm : ndarray of float [n_grid_add x dim]
Grid points to add (normalized space)
coords_gradient : ndarray of float [n_grid x dim x dim]
Denormalized coordinates xi
coords_gradient_norm : ndarray of float [n_grid x dim x dim]
Normalized coordinates xi
coords_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
coords_gradient_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
Examples
--------
>>> import pygpc
>>> grid = pygpc.FIM(parameters_random=parameters_random,
>>> n_grid=100,
>>> options={"n_pool": 1000,
>>> "seed": None})
Attributes
----------
parameters_random : OrderedDict of RandomParameter instances
OrderedDict containing the RandomParameter instances the grids are generated for
n_grid : int or float
Number of random samples in grid
seed : float, optional, default=None
Seeding point to replicate random grid
options: dict, optional, default=None
Grid options:
- method: "greedy", "iteration"
- criterion: ["mc"], ["tmc", "cc"]
- weights: [1], [0.5, 0.5]
- n_pool: size of samples in pool to choose greedy results from
- n_iter: number of iterations
- seed: random seed
coords : ndarray of float [n_grid_add x dim]
Grid points to add (model space)
coords_norm : ndarray of float [n_grid_add x dim]
Grid points to add (normalized space)
coords_gradient : ndarray of float [n_grid x dim x dim]
Denormalized coordinates xi
coords_gradient_norm : ndarray of float [n_grid x dim x dim]
Normalized coordinates xi
coords_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
coords_gradient_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
"""
def __init__(self, parameters_random, n_grid=None, options=None, coords=None, coords_norm=None,
coords_gradient=None, coords_gradient_norm=None, coords_id=None, coords_gradient_id=None, gpc=None,
grid_pre=None):
"""
Constructor; Initializes Grid instance; Generates grid or copies provided content
"""
if options is None:
options = dict()
if type(options) is dict:
if "n_pool" not in options.keys():
options["n_pool"] = 1000
if "seed" not in options.keys():
options["seed"] = None
self.n_pool = options["n_pool"]
self.gpc = copy.deepcopy(gpc)
super(FIM, self).__init__(parameters_random,
n_grid=n_grid,
options=options,
coords=coords,
coords_norm=coords_norm,
coords_gradient=coords_gradient,
coords_gradient_norm=coords_gradient_norm,
coords_id=coords_id,
coords_gradient_id=coords_gradient_id,
grid_pre=grid_pre)
if coords_norm is not None:
self.gpc.grid = Random(parameters_random=parameters_random,
coords_norm=coords_norm,
coords=coords,
options=self.options)
n_grid_add = self.gpc.grid.n_grid
if self.gpc.p_matrix is not None:
self.gpc.gpc_matrix = self.gpc.create_gpc_matrix(b=self.gpc.basis.b,
x=np.matmul(coords_norm,
self.gpc.p_matrix.transpose() /
self.gpc.p_matrix_norm[np.newaxis, :]),
gradient=False)
else:
self.gpc.gpc_matrix = self.gpc.create_gpc_matrix(b=self.gpc.basis.b,
x=coords_norm,
gradient=False)
elif self.grid_pre is not None:
self.gpc.grid = self.grid_pre
n_grid_add = self.n_grid - self.grid_pre.n_grid
if n_grid_add < 0:
raise RuntimeError(f"Number of grid points to add has to be >= 0 (it is {n_grid_add}")
if self.gpc.p_matrix is not None:
self.gpc.gpc_matrix = self.gpc.create_gpc_matrix(b=self.gpc.basis.b,
x=np.matmul(self.grid_pre.coords_norm,
self.gpc.p_matrix.transpose() /
self.gpc.p_matrix_norm[np.newaxis, :]),
gradient=False)
else:
self.gpc.gpc_matrix = self.gpc.create_gpc_matrix(b=self.gpc.basis.b,
x=self.grid_pre.coords_norm,
gradient=False)
else:
n_grid_add = self.n_grid
# add FIM optimal grid points (eventually to existing grid)
self.coords_norm = self.add_fim_optiomal_grid_points(parameters_random=parameters_random,
n_grid_add=n_grid_add)
# Denormalize grid to original parameter space
self.coords = self.get_denormalized_coordinates(self.coords_norm)
# Generate unique IDs of grid points
self.coords_id = [uuid.uuid4() for _ in range(self.n_grid)]
[docs]
def add_fim_optiomal_grid_points(self, parameters_random, n_grid_add):
"""
This function adds grid points (one by one) to the set of points by maximizing the Fisher-information matrix
in a D-optimal sense.
Parameters
----------
parameters_random : OrderedDict of RandomParameter [dim]
Random parameters (in case of projection, provide the original random parameters)
n_grid_add : int
Number of grid points to add
Returns
-------
coords_norm : ndarray of float [n_grid x dim]
Normalized sample coordinates in range [-1, 1]
"""
# coords_norm_opt = np.zeros((n_grid_add, self.dim))
#
# for i in range(n_grid_add):
# fim_matrix = self.calc_fim_matrix()
# grid_test = Random(parameters_random=self.gpc.problem.parameters_random,
# n_grid=self.n_pool,
# options=self.options)
#
# det = np.zeros(grid_test.coords_norm.shape[0])
#
# for i_c, c in enumerate(grid_test.coords_norm):
# det[i_c] = self.get_det_updated_fim_matrix(fim_matrix=fim_matrix, coords_norm=c)
#
# coords_norm_opt[i, :] = grid_test.coords_norm[np.argmax(det), :]
#
# return coords_norm_opt
coords_norm_opt = np.zeros((n_grid_add, self.dim))
n_cpu = multiprocessing.cpu_count()
pool = multiprocessing.Pool(n_cpu)
grid_pool = Random(parameters_random=parameters_random,
n_grid=self.n_pool,
options={"seed": self.seed})
if self.gpc.p_matrix is not None:
gpc_matrix_pool = self.gpc.create_gpc_matrix(b=self.gpc.basis.b,
x=np.matmul(self.grid_pool.coords_norm,
self.gpc.p_matrix.transpose() /
self.gpc.p_matrix_norm[np.newaxis, :]),
gradient=False)
else:
gpc_matrix_pool = self.gpc.create_gpc_matrix(b=self.gpc.basis.b,
x=grid_pool.coords_norm,
gradient=False)
if self.gpc.gpc_matrix is not None:
fim_matrix = self.calc_fim_matrix()
else:
fim_matrix = None
index_list = []
for i in range(n_grid_add):
det = np.zeros((self.n_pool))
if self.seed is not None:
self.seed += 1
self.options["seed"] += 1
# select random starting point
if self.gpc.gpc_matrix is None:
coords_opt = grid_pool.coords_norm[0, :][np.newaxis, ]
self.gpc.grid = Random(parameters_random=parameters_random,
options={"seed": self.seed},
coords_norm=coords_opt)
self.gpc.gpc_matrix = self.gpc.create_gpc_matrix(b=self.gpc.basis.b,
x=self.gpc.grid.coords_norm[-1, :][np.newaxis, ],
gradient=False)
index_list.append(0)
else:
index_list_remaining = [k for k in range(self.n_pool) if k not in index_list]
index_list_chunks = compute_chunks(index_list_remaining, n_cpu)
n_basis_limit = np.min((self.gpc.grid.n_grid, self.gpc.basis.n_basis))
workhorse_partial = partial(workhorse_get_det_updated_fim_matrix,
gpc_matrix_pool=gpc_matrix_pool,
fim_matrix=fim_matrix,
n_basis_limit=n_basis_limit)
res = pool.map(workhorse_partial, index_list_chunks)
sign = []
logdet = []
for r in res:
sign.append(r[0])
logdet.append(r[1])
sign = np.concatenate(sign)
logdet = np.concatenate(logdet)
logdet_norm = logdet / np.max(np.abs(logdet))
det[index_list_remaining] = (sign * np.exp(logdet_norm)).flatten()
index_list.append(np.nanargmax(det))
coords_opt = grid_pool.coords_norm[index_list[-1], :]
# add optimal grid point
self.gpc.grid.coords_norm = np.vstack((self.gpc.grid.coords_norm, coords_opt))
# update gpc matrix
self.gpc.gpc_matrix = np.vstack((self.gpc.gpc_matrix,
self.gpc.create_gpc_matrix(b=self.gpc.basis.b,
x=self.gpc.grid.coords_norm[-1, :][np.newaxis, ],
gradient=False)))
# update FIM matrix
n_basis_limit = np.min((self.gpc.grid.n_grid, self.gpc.basis.n_basis))
if n_basis_limit == (self.gpc.basis.n_basis+1):
fim_matrix = self.update_fim_matrix(fim_matrix=fim_matrix,
gpc_matrix_new_rows=self.gpc.gpc_matrix[-1, :][np.newaxis, ])
else:
fim_matrix = self.calc_fim_matrix(n_basis_limit=n_basis_limit)
pool.close()
return self.gpc.grid.coords_norm
[docs]
def calc_fim_matrix(self, n_basis_limit=None):
"""
Calculates Fisher-Information matrix based on the present grid.
Parameters
----------
n_basis_limit : int
Index of column the FIM matrix is calculated
Returns
-------
fim_matrix : ndarray of float [n_basis x n_basis]
Fisher information matrix
"""
if n_basis_limit is None:
n_basis_limit = self.gpc.gpc_matrix.shape[1]
fim_matrix = np.zeros((n_basis_limit, n_basis_limit))
for row in self.gpc.gpc_matrix:
fim_matrix += np.outer(row[:n_basis_limit], row[:n_basis_limit])
return fim_matrix
[docs]
@staticmethod
def update_fim_matrix(fim_matrix, gpc_matrix_new_rows):
"""
Updates Fisher-Information matrix based on the present grid.
Parameters
----------
fim_matrix : ndarray of float [n_basis x n_basis]
Fisher information matrix
gpc_matrix_new_rows : ndarray of float [n_new_rows x n_basis]
New rows of gpc matrix to add to FIM matrix
Returns
-------
fim_matrix : ndarray of float [n_basis x n_basis]
Updated Fisher information matrix
"""
if fim_matrix is None:
fim_matrix = np.zeros((gpc_matrix_new_rows.shape[1], gpc_matrix_new_rows.shape[1]))
for row in gpc_matrix_new_rows:
fim_matrix += np.outer(row, row)
return fim_matrix
[docs]
def get_det_updated_fim_matrix(self, fim_matrix, coords_norm):
"""
Calculates Fisher-Information matrix based on the present grid and determined determinant.
Parameters
----------
fim_matrix : ndarray of float [n_basis x n_basis]
Fisher information matrix
coords_norm : ndarray of float [1 x dim]
Candidate grid point
Returns
-------
det : float
Determinant of updated Fisher Information matrix
"""
new_row = self.gpc.create_gpc_matrix(b=self.gpc.basis.b, x=coords_norm, gradient=False)
fim_matrix += np.outer(new_row, new_row)
return np.linalg.det(fim_matrix)
[docs]
class L1_LHS(RandomGrid):
"""
L1-LHS optimized grid object
Parameters
----------
parameters_random : OrderedDict of RandomParameter instances
OrderedDict containing the RandomParameter instances the grids are generated for
n_grid: int
Number of random samples to generate
seed: float
Seeding point to replicate random grids
options: dict, optional, default=None
Grid options:
- 'corr' : optimizes design points in their spearman correlation coefficients
- 'maximin' or 'm' : optimizes design points in their maximum minimal distance using the Phi-P criterion
- 'ese' : uses an enhanced evolutionary algorithm to optimize the Phi-P criterion
coords : ndarray of float [n_grid_add x dim]
Grid points to add (model space)
coords_norm : ndarray of float [n_grid_add x dim]
Grid points to add (normalized space)
coords_gradient : ndarray of float [n_grid x dim x dim]
Denormalized coordinates xi
coords_gradient_norm : ndarray of float [n_grid x dim x dim]
Normalized coordinates xi
coords_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
coords_gradient_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
Examples
--------
>>> import pygpc
>>> grid = pygpc.L1_LHS(parameters_random=parameters_random,
>>> n_grid=100,
>>> gpc=gpc,
>>> options={"method": "greedy",
>>> "criterion": ["mc"],
>>> "weights_L1": [1],
>>> "weights": [0.25, 0.75],
>>> "n_pool": 1000,
>>> "seed": None})
Attributes
----------
parameters_random : OrderedDict of RandomParameter instances
OrderedDict containing the RandomParameter instances the grids are generated for
n_grid : int or float
Number of random samples in grid
seed : float, optional, default=None
Seeding point to replicate random grid
options: dict, optional, default=None
Grid options:
- method: "greedy", "iteration"
- criterion: ["mc"], ["tmc", "cc"]
- weights: [1], [0.5, 0.5]
- n_pool: size of samples in pool to choose greedy results from
- n_iter: number of iterations
- seed: random seed
coords : ndarray of float [n_grid_add x dim]
Grid points to add (model space), if coords are provided, no grid is generated
coords_norm : ndarray of float [n_grid_add x dim]
Grid points to add (normalized space), if coords are provided, no grid is generated
coords_gradient : ndarray of float [n_grid x dim x dim]
Denormalized coordinates xi
coords_gradient_norm : ndarray of float [n_grid x dim x dim]
Normalized coordinates xi
coords_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
coords_gradient_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
"""
def __init__(self, parameters_random, n_grid=None, options=None, coords=None, coords_norm=None,
coords_gradient=None, coords_gradient_norm=None, coords_id=None, coords_gradient_id=None, gpc=None,
grid_pre=None):
"""
Constructor; Initializes Grid instance; Generates grid or copies provided content
"""
if options is None:
options = dict()
if type(options) is dict:
if "weights" not in options.keys():
options["weights"] = [0.5, 0.5]
if "method" not in options.keys():
options["method"] = "iteration"
if "n_pool" not in options.keys():
options["n_pool"] = 10000
if "n_iter" not in options.keys():
options["n_iter"] = 1000
if "seed" not in options.keys():
options["seed"] = None
if "criterion" not in options.keys():
options["criterion"] = ["mc"]
if "weights_L1" not in options.keys() or options["weights_L1"] is None:
options["weights_L1"] = (np.ones(len(options["criterion"])) / len(options["criterion"])).tolist()
self.n_pool = options["n_pool"]
self.n_iter = options["n_iter"]
self.gpc = gpc
self.seed = options["seed"]
self.method = options["method"]
self.criterion = options["criterion"]
self.weights_L1 = options["weights_L1"]
self.grid_L1 = None
self.grid_LHS = None
self.grid_pre = grid_pre
if type(self.criterion) is not list:
self.criterion = [self.criterion]
super(L1_LHS, self).__init__(parameters_random,
n_grid=n_grid,
options=options,
coords=coords,
coords_norm=coords_norm,
coords_gradient=coords_gradient,
coords_gradient_norm=coords_gradient_norm,
coords_id=coords_id,
coords_gradient_id=coords_gradient_id)
self.weights = options["weights"]
if coords_norm is None:
self.n_grid_L1 = int(np.round(self.n_grid * self.weights[0]))
self.n_grid_LHS = self.n_grid - self.n_grid_L1
else:
self.n_grid_L1 = None
self.n_grid_LHS = None
# create L1 grid
if coords_norm is None and self.n_grid_L1 > 0:
self.grid_L1 = L1(parameters_random=parameters_random,
n_grid=self.n_grid_L1,
gpc=gpc,
grid_pre=grid_pre,
options={"method": self.method,
"criterion": self.criterion,
"weights": self.weights_L1,
"n_pool": self.n_pool,
"n_iter": self.n_iter,
"seed": self.seed})
if self.grid_pre is not None:
self.grid_pre.coords_norm = np.vstack((self.grid_pre.coords_norm, self.grid_L1.coords_norm))
self.grid_pre.coords = np.vstack((self.grid_pre.coords, self.grid_L1.coords))
self.grid_pre.n_grid = self.grid_pre.coords_norm.shape[0]
else:
self.grid_pre = self.grid_L1
# create LHS (ese) grid
if coords_norm is None and self.n_grid_LHS > 0:
self.grid_LHS = LHS(parameters_random=parameters_random,
n_grid=self.n_grid_LHS,
grid_pre=self.grid_pre,
options={"criterion": ["ese"],
"seed": self.seed})
if self.grid_L1 is None and self.grid_LHS is not None:
self.coords_norm = self.grid_LHS.coords_norm
elif self.n_grid_L1 is not None and self.grid_LHS is None:
self.coords_norm = self.grid_L1.coords_norm
elif self.n_grid_L1 is not None and self.grid_LHS is not None:
self.coords_norm = np.vstack((self.grid_L1.coords_norm, self.grid_LHS.coords_norm))
# Denormalize grid to original parameter space
self.coords = self.get_denormalized_coordinates(self.coords_norm)
# Generate unique IDs of grid points
self.coords_id = [uuid.uuid4() for _ in range(self.n_grid)]
[docs]
class LHS_L1(RandomGrid):
"""
LHS-L1 optimized grid object
Parameters
----------
parameters_random : OrderedDict of RandomParameter instances
OrderedDict containing the RandomParameter instances the grids are generated for
n_grid: int
Number of random samples to generate
options: dict, optional, default=None
Grid options:
- 'corr' : optimizes design points in their spearman correlation coefficients
- 'maximin' or 'm' : optimizes design points in their maximum minimal distance using the Phi-P criterion
- 'ese' : uses an enhanced evolutionary algorithm to optimize the Phi-P criterion
coords : ndarray of float [n_grid_add x dim]
Grid points to add (model space)
coords_norm : ndarray of float [n_grid_add x dim]
Grid points to add (normalized space)
coords_gradient : ndarray of float [n_grid x dim x dim]
Denormalized coordinates xi
coords_gradient_norm : ndarray of float [n_grid x dim x dim]
Normalized coordinates xi
coords_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
coords_gradient_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
Examples
--------
>>> import pygpc
>>> grid = pygpc.LHS_L1(parameters_random=parameters_random,
>>> n_grid=100,
>>> gpc=gpc,
>>> options={"method": "greedy",
>>> "criterion": ["mc"],
>>> "weights_L1": [1],
>>> "weights": [0.25, 0.75],
>>> "n_pool": 1000,
>>> "seed": None})
Attributes
----------
parameters_random : OrderedDict of RandomParameter instances
OrderedDict containing the RandomParameter instances the grids are generated for
n_grid : int or float
Number of random samples in grid
seed : float, optional, default=None
Seeding point to replicate random grid
options: dict, optional, default=None
Grid options:
- method: "greedy", "iteration"
- criterion: ["mc"], ["tmc", "cc"]
- weights: [1], [0.5, 0.5]
- n_pool: size of samples in pool to choose greedy results from
- n_iter: number of iterations
- seed: random seed
coords : ndarray of float [n_grid_add x dim]
Grid points to add (model space)
coords_norm : ndarray of float [n_grid_add x dim]
Grid points to add (normalized space)
coords_gradient : ndarray of float [n_grid x dim x dim]
Denormalized coordinates xi
coords_gradient_norm : ndarray of float [n_grid x dim x dim]
Normalized coordinates xi
coords_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
coords_gradient_id : list of UUID objects (version 4) [n_grid]
Unique IDs of grid points
"""
def __init__(self, parameters_random, gpc, n_grid=None, options=None, coords=None, coords_norm=None,
coords_gradient=None, coords_gradient_norm=None, coords_id=None, coords_gradient_id=None,
grid_pre=None):
"""
Constructor; Initializes Grid instance; Generates grid or copies provided content
"""
if options is None:
options = dict()
if type(options) is dict:
if "weights" not in options.keys():
options["weights"] = [0.5, 0.5]
if "method" not in options.keys():
options["method"] = "iteration"
if "n_pool" not in options.keys():
options["n_pool"] = 10000
if "n_iter" not in options.keys():
options["n_iter"] = 1000
if "seed" not in options.keys():
options["seed"] = None
if "criterion" not in options.keys():
options["criterion"] = ["mc"]
if "weights_L1" not in options.keys() or options["weights_L1"] is None:
options["weights_L1"] = (np.ones(len(options["criterion"])) / len(options["criterion"])).tolist()
self.n_pool = options["n_pool"]
self.n_iter = options["n_iter"]
self.gpc = gpc
self.seed = options["seed"]
self.method = options["method"]
self.criterion = options["criterion"]
self.weights_L1 = options["weights_L1"]
self.grid_L1 = None
self.grid_LHS = None
self.grid_pre = grid_pre
if type(self.criterion) is not list:
self.criterion = [self.criterion]
super(LHS_L1, self).__init__(parameters_random,
n_grid=n_grid,
options=options,
coords=coords,
coords_norm=coords_norm,
coords_gradient=coords_gradient,
coords_gradient_norm=coords_gradient_norm,
coords_id=coords_id,
coords_gradient_id=coords_gradient_id)
self.weights = options["weights"]
if coords_norm is None:
self.n_grid_LHS = int(np.round(self.n_grid * self.weights[0]))
self.n_grid_L1 = self.n_grid - self.n_grid_LHS
else:
self.n_grid_LHS = None
self.n_grid_L1 = None
# create LHS (ese) grid
if coords_norm is None and self.n_grid_LHS > 0:
self.grid_LHS = LHS(parameters_random=parameters_random,
n_grid=self.n_grid_LHS,
grid_pre=grid_pre,
options={"criterion": ["ese"],
"seed": self.seed})
if self.grid_pre is not None:
self.grid_pre.coords_norm = np.vstack((self.grid_pre.coords_norm, self.grid_LHS.coords_norm))
self.grid_pre.coords = np.vstack((self.grid_pre.coords, self.grid_LHS.coords))
self.grid_pre.n_grid = self.grid_pre.coords_norm.shape[0]
else:
self.grid_pre = self.grid_LHS
# create L1 grid
if coords_norm is None and self.n_grid_L1 > 0:
self.grid_L1 = L1(parameters_random=parameters_random,
n_grid=self.n_grid_L1,
grid_pre=self.grid_pre,
gpc=gpc,
options={"method": self.method,
"criterion": self.criterion,
"weights": self.weights_L1,
"n_pool": self.n_pool,
"n_iter": self.n_iter,
"seed": self.seed})
if self.grid_L1 is None and self.grid_LHS is not None:
self.coords_norm = self.grid_LHS.coords_norm
elif self.n_grid_L1 is not None and self.grid_LHS is None:
self.coords_norm = self.grid_L1.coords_norm
elif self.n_grid_L1 is not None and self.grid_LHS is not None:
self.coords_norm = np.vstack((self.grid_LHS.coords_norm, self.grid_L1.coords_norm))
# Denormalize grid to original parameter space
self.coords = self.get_denormalized_coordinates(self.coords_norm)
# Generate unique IDs of grid points
self.coords_id = [uuid.uuid4() for _ in range(self.n_grid)]
[docs]
def project_grid(grid, p_matrix, mode="reduce"):
"""
Transforms grid from original to reduced parameter space or vice versa.
Parameters
----------
grid : Grid object
Grid object to transform
p_matrix : ndarray of float [n_reduced, n_original]
Projection matrix
mode : str
Direction of transformation ("reduce", "expand")
Returns
-------
grid_trans : Grid object
Transformed grid object
"""
grid_trans = copy.deepcopy(grid)
p_matrix_norm = np.sum(np.abs(p_matrix), axis=1)
if mode == "reduce":
p = p_matrix.transpose()
p_n = p / p_matrix_norm[np.newaxis, :]
elif mode == "expand":
p = p_matrix
p_n = p / p_matrix_norm[:, np.newaxis]
else:
raise ValueError("Specified mode not implemented... ('reduce', 'expand')")
# transform variables of original grid to reduced parameter space
grid_trans.coords = np.matmul(grid.coords, p)
grid_trans.coords_norm = np.matmul(grid.coords_norm, p_n)
return grid_trans
[docs]
def workhorse_greedy(idx_list, psy_opt, psy_pool, criterion):
"""
Workhorse for coherence calculation (greedy algorithm)
Parameters
----------
idx_list : list of int [n_idx]
Indices of rows of pool matrix the coherence is calculated for
psy_opt : ndarray of float [n_grid_current, n_basis]
GPC matrix of previous iteration
psy_pool : ndarray of float [n_pool, n_basis]
GPC matrix of pool
criterion : list of str
Optimality criteria
Returns
-------
crit : ndarray of float [n_idx, n_criterion]
Optimality measures
"""
crit = np.ones((len(idx_list), len(criterion))) * 1e6
# determine gram matrix of psy_opt
psy_opt_gram = np.matmul(psy_opt.T, psy_opt)
if "D" in criterion or "D-coh" in criterion:
sign = np.zeros((len(idx_list), 1))
logdet = np.zeros((len(idx_list), 1))
for j in range(len(idx_list)):
psy_test = np.vstack((psy_opt, psy_pool[idx_list[j], :]))
# update gram matrix
psy_test_gram = psy_opt_gram + np.outer(psy_test[-1, :], psy_test[-1, :])
if "mc" in criterion:
crit[j, criterion.index("mc")] = mutual_coherence(psy_test)
if "tmc" in criterion:
crit[j, criterion.index("tmc")] = t_averaged_mutual_coherence(psy_test_gram)
if "cc" in criterion:
crit[j, criterion.index("cc")] = average_cross_correlation_gram(psy_test_gram)
if "D" in criterion or "D-coh" in criterion:
# for n_grid < n_basis only consider the first n_grid basis functions because of determinant
n_basis_det = np.min((psy_test.shape[0], psy_test.shape[1]))
# determinant of inverse of Gram is the inverse of the determinant
sign[j], logdet[j] = np.linalg.slogdet(psy_test_gram[:n_basis_det, :n_basis_det])
#sign[j], logdet[j] = np.linalg.slogdet(np.matmul(psy_test[:, :n_basis_det].T, psy_test[:, :n_basis_det]))
logdet[j] = -logdet[j]
if "D" not in criterion and "D-coh" not in criterion:
return crit
else:
return sign, logdet
[docs]
def workhorse_iteration(idx_list, gpc, n_grid, criterion, grid_pre=None, options=None):
"""
Workhorse for coherence calculation (iterative algorithm)
Parameters
----------
idx_list : list of int [n_idx]
Indices of iterations
gpc : GPC object
GPC object
n_grid : int
Number of grid points
criterion : list of str
Optimality criteria
grid_pre : Grid object, optional, default: None
Grid object, which is going to be extended.
options : dict, optional, default: False
Dictionary containing the grid options
Returns
-------
crit : ndarray of float [n_idx, n_criterion]
Optimality measures
coords_norm_list : list [n_idx] of ndarray [n_grid x dim]
Normalized grid coordinates of grid realizations
"""
coords_norm_list = []
crit = np.ones((len(idx_list), len(criterion))) * 1e6
backend_backup = gpc.backend
gpc.backend = "cpu"
if "D" in criterion or "D-coh" in criterion:
sign = np.zeros((len(idx_list), 1))
neg_logdet = np.zeros((len(idx_list), 1))
if grid_pre is not None and grid_pre.n_grid > 0:
psy_pool_pre = gpc.create_gpc_matrix(b=gpc.basis.b, x=grid_pre.coords_norm, gradient=False)
else:
psy_pool_pre = None
for i in range(len(idx_list)):
# print(f"idx_list iteration: {i}")
if gpc.p_matrix is not None:
if "D-coh" in criterion:
test_grid = CO(parameters_random=gpc.problem_original.parameters_random,
n_grid=n_grid,
grid_pre=grid_pre,
gpc=gpc,
options=options)
else:
test_grid = Random(parameters_random=gpc.problem_original.parameters_random,
n_grid=n_grid,
grid_pre=grid_pre,
options={"seed": options["seed"]})
else:
if "D-coh" in criterion:
test_grid = CO(parameters_random=gpc.problem.parameters_random,
n_grid=n_grid,
grid_pre=grid_pre,
gpc=gpc,
options=options)
else:
test_grid = Random(parameters_random=gpc.problem.parameters_random,
n_grid=n_grid,
grid_pre=grid_pre,
options={"seed": options["seed"]})
coords_norm = test_grid.coords_norm
# save current coords norm
coords_norm_list.append(coords_norm)
# get the normalized gpc matrix
if gpc.p_matrix is not None:
psy_pool = gpc.create_gpc_matrix(b=gpc.basis.b,
x=np.matmul(coords_norm, gpc.p_matrix.transpose() /
gpc.p_matrix_norm[np.newaxis, :]),
gradient=False)
else:
psy_pool = gpc.create_gpc_matrix(b=gpc.basis.b, x=coords_norm, gradient=False)
if psy_pool_pre is not None:
psy_pool = np.vstack((psy_pool_pre, psy_pool))
psy_pool_norm = psy_pool / np.abs(psy_pool).max(axis=0)
# test current matrix
if "mc" in criterion:
crit[i, criterion.index("mc")] = mutual_coherence(psy_pool_norm)
if "tmc" in criterion:
crit[i, criterion.index("tmc")] = t_averaged_mutual_coherence(np.matmul(psy_pool_norm.T, psy_pool_norm))
if "cc" in criterion:
crit[i, criterion.index("cc")] = average_cross_correlation_gram(np.matmul(psy_pool_norm.T, psy_pool_norm))
if "D" in criterion or "D-coh" in criterion:
# for n_grid < n_basis only consider the first n_grid basis functions because of determinant
n_basis_det = np.min((n_grid, gpc.basis.n_basis))
# determinant of inverse of Gram is the inverse of the determinant
sign[i], neg_logdet[i] = np.linalg.slogdet(np.matmul(psy_pool_norm[:, :n_basis_det].T, psy_pool_norm[:, :n_basis_det]))
neg_logdet[i] = -neg_logdet[i]
gpc.backend = backend_backup
if "D" not in criterion and "D-coh" not in criterion:
return crit, coords_norm_list
else:
return sign, neg_logdet, coords_norm_list
[docs]
def workhorse_get_det_updated_fim_matrix(index_list, gpc_matrix_pool, fim_matrix, n_basis_limit):
"""
Workhorse to determine the determinant of the Fisher Information matrix
Parameters
----------
index_list : list of int
Indices of coordinates to test
gpc_matrix_pool : ndarray of float [n_grid_pool x n_basis]
Gpc matrix of large pool
fim_matrix : ndarray of float [n_basis x n_basis]
Fisher information matrix
Returns
-------
det : float
Determinant of updated Fisher Information matrix
"""
sign = np.zeros(len(index_list))
logdet = np.zeros(len(index_list))
for i, idx in enumerate(index_list):
fim_matrix_test = fim_matrix + np.outer(gpc_matrix_pool[idx, :n_basis_limit],
gpc_matrix_pool[idx, :n_basis_limit])
sign[i], logdet[i] = np.linalg.slogdet(fim_matrix_test)
return sign, logdet
[docs]
def compute_neg_loglik(parameters, Xtrain, ytrain):
"""
Computes the negative log likelihood of the hyperparameters of the Gaussian Process Regression.
Parameters
----------
parameters : np.ndarray of float [2]
Hyperparameters (lengthscale, variance)
Xtrain : np.ndarray of float [N_train x dim]
Coordinates of the training data
ytrain : np.ndarray of float [N_train]
Function values at the training data points
Returns
-------
log_likelihood : float
Negative log likelihood
"""
lengthscale, variance = parameters
K = squared_exponential_kernel(Xtrain, Xtrain, lengthscale, variance) # n_train x n_train
try:
L = np.linalg.cholesky(K)
except np.linalg.LinAlgError:
return 0
alpha = np.linalg.solve(L.T, np.linalg.solve(L, ytrain))
log_likelihood = - 0.5 * ytrain.T @ alpha - np.log(np.diag(L)).sum() - len(ytrain) / 2 * np.log(2 * np.pi)
return - log_likelihood.squeeze()
[docs]
def get_parameters_gaussian_process(Xtrain, ytrain):
"""
Determine optimal hyperparameters for Gaussian Process Regression (lengthscale, variance), without noise.
Parameters
----------
Xtrain : np.ndarray of float [N_train x dim]
Coordinates of the training data
ytrain : np.ndarray of float [N_train]
Function values at the training data points
Returns
-------
lengthscale : float, optional, default: 1.
Lengthscale parameter
variance : float, optional, default: 1.
Output variance
"""
lengthscale = .2
kernel_variance = 1
bounds = ((1e-3, 1e2), (1e-3, 1e2))
initial_parameters = np.array([lengthscale, kernel_variance])
args = (Xtrain, ytrain)
result = minimize(compute_neg_loglik, initial_parameters, args, method='l-bfgs-b', bounds=bounds)
lengthscale = result.x[0]
variance = result.x[1]
return lengthscale, variance