import sys
import math
import copy
import random
import warnings
import itertools
import numpy as np
import scipy.stats
import scipy.special
import scipy.spatial
import matplotlib.pyplot as plt
from scipy.spatial.distance import cdist
from .Visualization import plot_beta_pdf_fit
from mpl_toolkits.mplot3d.art3d import Poly3DCollection
[docs]
def squared_exponential_kernel(x, y, lengthscale, variance):
"""
Computes the squared exponential kernel for Gaussian Processes.
Parameters
----------
x : np.ndarray of float [N x dim]
Input observation locations
y : np.ndarray of float [M x dim]
Output observation locations
lengthscale : float
Lengthscale parameter
variance : float
Output variance
Returns
-------
k : np.ndarray of float [M x X]
Kernel function values (covariance function or covariance matrix)
"""
sqdist = cdist(x, y, 'sqeuclidean')
k = variance * np.exp(-0.5 * sqdist * (1/lengthscale**2))
return k
[docs]
def is_instance(obj):
"""
Tests if obj is a class instance of any type.
Parameters
----------
obj : any
Input object
Returns
-------
out : bool
Flag if obj is class instance or not
"""
try:
_ = obj.__dict__
return True
except AttributeError:
return False
[docs]
def display_fancy_bar(text, i, n_i, more_text=None):
"""
Display a simple progress bar. Call in each iteration and start with i=1.
Parameters
----------
text: str
Text to display in front of actual iteration
i: str or int
Actual iteration
n_i: int
Total number of iterations
more_text: str, optional, default=None
Text that is displayed on an extra line above the bar.
Examples
--------
fancy_bar('Run',7,10):
Run 07 from 10 [================================ ] 70%
fancy_bar(Run,9,10,'Some more text'):
Some more text
Run 09 from 10 [======================================= ] 90%
"""
if not isinstance(i, str):
i = str(i)
assert isinstance(text, str)
assert isinstance(n_i, int)
if not text.endswith(' '):
text += ' '
# if i == '1':
# sys.stdout.write('')
sys.stdout.write('\r')
fill_width = len(str(n_i))
# terminal codes, working on windows as well?
cursor_two_up = '\x1b[2A'
erase_line = '\x1b[2K'
if more_text:
print(cursor_two_up + erase_line)
print(more_text)
sys.stdout.write(text + i.zfill(fill_width) + " from " + str(n_i))
# this prints [50-spaces], i% * =
# sys.stdout.write(" [%-40s] %d%%" % (
# '=' * int((float(i) + 0) / n_i * 100 / 2.5), float(i) / n_i * 100.))
sys.stdout.write(" [{}{}] {:.1f}%".format('=' * int(int(i) / n_i * 40),
" " * (40 - int(int(i) / n_i * 40)),
float(int(i) / n_i * 100)))
sys.stdout.flush()
# if int(i) == n_i:
# print("")
print("")
[docs]
def get_cartesian_product(array_list):
"""
Generate a cartesian product of input arrays (all combinations).
cartesian_product = get_cartesian_product(array_list)
Parameters
----------
array_list : list of 1D ndarray of float
Arrays to compute the cartesian product with
Returns
-------
cartesian_product : ndarray of float
Array containing the cartesian products (all combinations of input vectors)
(M, len(arrays))
Examples
--------
>>> import pygpc
>>> out = pygpc.get_cartesian_product(([1, 2, 3], [4, 5], [6, 7]))
>>> out
"""
cartesian_product = [element for element in itertools.product(*array_list)]
return np.array(cartesian_product)
[docs]
def get_rotation_matrix(theta):
"""
Generate rotation matrix from euler angles.
rotation_matrix = get_rotation_matrix(theta)
Parameters
----------
theta : list of float [3]
Euler angles
Returns
-------
rotation_matrix : ndarray of float [3, 3]
Rotation matrix computed from euler angles
"""
r_x = np.array([[1, 0, 0],
[0, math.cos(theta[0]), -math.sin(theta[0])],
[0, math.sin(theta[0]), math.cos(theta[0])]
])
r_y = np.array([[math.cos(theta[1]), 0, math.sin(theta[1])],
[0, 1, 0],
[-math.sin(theta[1]), 0, math.cos(theta[1])]
])
r_z = np.array([[math.cos(theta[2]), -math.sin(theta[2]), 0],
[math.sin(theta[2]), math.cos(theta[2]), 0],
[0, 0, 1]
])
rotation_matrix = np.dot(r_z, np.dot(r_y, r_x))
return rotation_matrix
[docs]
def get_different_rows_from_matrices(a, b):
"""
Compares rows from matrix a with rows from matrix b. It is assumed that b contains rows from a.
The function returns the rows of b, which are not included in a.
Parameters
----------
a : ndarray of float [m1 x n]
First matrix (usually the smaller one), where rows are part of b
b : ndarray of float [m2 x n]
Second matrix (usually the larger one), containing rows of a
Returns
-------
b_diff : ndarray of float [m3 x n]
Rows from b differing from a
"""
idx_in = []
for _a in a:
for i_b, _b in enumerate(b):
if (np.isclose(_a, _b, atol=1e-6)).all():
idx_in.append(i_b)
idx = np.arange(b.shape[0])
idx_not_in = [i for i in idx if i not in idx_in]
return b[idx_not_in, :]
[docs]
def get_list_multi_delete(input_list, index):
"""
Delete multiple entries from list.
input_list = get_list_multi_delete(input_list, index)
Parameters
----------
input_list : list
Simple list
index : list of integer
List of indices to delete
Returns
-------
input_list : list
Input list without entries specified in index
"""
indices = sorted(index, reverse=True)
for list_index in indices:
del input_list[list_index]
return input_list
[docs]
def get_array_unique_rows(array):
"""
Compute unique rows of 2D array and delete rows that are redundant.
unique = get_array_unique_rows(array)
Parameters
----------
array: ndarray of float
Matrix with k redundant rows
Returns
-------
unique: ndarray of float
Matrix without k redundant rows
"""
_, index = np.unique(array, axis=0, return_index=True)
index = np.sort(index)
return array[index]
[docs]
def nrmsd(array, array_ref, error_norm="relative", x_axis=False):
"""
Determine the normalized root mean square deviation between input data and reference data.
normalized_rms = get_normalized_rms(array, array_ref)
Parameters
----------
array: np.ndarray
input data [ (x), y0, y1, y2 ... ]
array_ref: np.ndarray
reference data [ (x_ref), y0_ref, y1_ref, y2_ref ... ]
if array_ref is 1D, all sizes have to match
error_norm: str, optional, default="relative"
Decide if error is determined "relative" or "absolute"
x_axis: boolean, optional, default=False
If True, the first column of array and array_ref is interpreted as the x-axis, where the data points are
evaluated. If False, the data points are assumed to be at the same location.
Returns
-------
normalized_rms: ndarray of float [array.shape[1]]
Normalized root mean square deviation between the columns of array and array_ref
"""
n_points = array.shape[0]
if x_axis:
# handle different array lengths
if len(array_ref.shape) == 1:
array_ref = array_ref[:, None]
if len(array.shape) == 1:
array = array[:, None]
# determine number of input arrays
if array_ref.shape[1] == 2:
n_data = array.shape[1]-1
else:
n_data = array.shape[1]
# interpolate array on array_ref data if necessary
if array_ref.shape[1] == 1:
data = array
data_ref = array_ref
else:
# crop reference if it is longer than the axis of the data
data_ref = array_ref[(array_ref[:, 0] >= min(array[:, 0])) & (array_ref[:, 0] <= max(array[:, 0])), 1]
array_ref = array_ref[(array_ref[:, 0] >= min(array[:, 0])) & (array_ref[:, 0] <= max(array[:, 0])), 0]
data = np.zeros([len(array_ref), n_data])
for i_data in range(n_data):
data[:, i_data] = np.interp(array_ref, array[:, 0], array[:, i_data+1])
else:
data_ref = array_ref
data = array
# determine "absolute" or "relative" error
if error_norm == "relative":
# max_min_idx = np.isclose(np.max(data_ref, axis=0), np.min(data_ref, axis=0))
delta = np.max(data_ref, axis=0) - np.min(data_ref, axis=0)
# if max_min_idx.any():
# delta[max_min_idx] = max(data_ref[max_min_idx])
else:
delta = 1
# filter nan
row_idx_data, col_idx_data = np.where(np.isnan(data))
row_idx_data_ref, col_idx_data_ref = np.where(np.isnan(data_ref))
row_idx = np.hstack((row_idx_data, row_idx_data_ref))
col_idx = np.hstack((col_idx_data, col_idx_data_ref))
if row_idx_data.size > 0:
warnings.warn("nan in input dataset found at rows={} cols={} (ignored)".format(row_idx, col_idx))
if row_idx_data_ref.size > 0:
warnings.warn("nan in reference dataset found at rows={} cols={} (ignored)".format(row_idx_data_ref,
col_idx_data_ref))
if row_idx.size > 0:
data = np.delete(data, np.unique(row_idx), axis=0)
data_ref = np.delete(data_ref, np.unique(row_idx), axis=0)
# determine normalized rms deviation and return
normalized_rms = np.sqrt(1.0/n_points * np.sum((data - data_ref)**2, axis=0)) / delta
return normalized_rms
[docs]
def get_beta_pdf_fit(data, beta_tolerance=0, uni_interval=0, fn_plot=None):
"""
Fit data to a beta distribution in the interval [a, b].
beta_parameters, moments, p_value, uni_parameters = get_beta_pdf_fit(data, beta_tolerance=0, uni_interval=0)
Parameters
----------
data: ndarray of float
Data to fit beta distribution on
beta_tolerance: float or list of float, optional, default=0
Specifies bounds of beta distribution. If float, it calculates the tolerance
from observed data, e.g. 0.2 (+-20% tolerance on observed max and min value).
If list, it takes [min, max] as bounds [a, b].
uni_interval: float or list of float, optional, default=0
uniform distribution interval. If float, the bounds are defined as a fraction of the beta distribution
interval (e.g. 0.95 (95%)). If list, it takes [min, max] as bounds [a, b].
fn_plot: str
Filename of plot so save (.pdf and .png)
Returns
-------
beta_parameters: list of float [4]
Two shape parameters and lower and upper limit [p, q, a, b]
moments: list of float [4]
Mean and std of raw data and fitted beta distribution [data_mean, data_std, beta_mean, beta_std]
p_value: float
p-value of the Kolmogorov Smirnov test
uni_parameters: list of float [2]
Lower and upper limits of uniform distribution [a, b]
"""
data_mean = np.mean(data)
data_std = np.std(data)
# fit beta distribution to data
if type(beta_tolerance) is list:
a_beta = beta_tolerance[0]
b_beta = beta_tolerance[1]
p_beta, q_beta, a_beta, ab_beta = scipy.stats.beta.fit(data, floc=a_beta, fscale=b_beta - a_beta)
elif beta_tolerance > 0:
# use user beta_tolerance of to set limits of distribution
data_range = data.max()-data.min()
a_beta = data.min()-beta_tolerance*data_range
b_beta = data.max()+beta_tolerance*data_range
p_beta, q_beta, a_beta, ab_beta = scipy.stats.beta.fit(data, floc=a_beta, fscale=b_beta-a_beta)
else:
# let scipy.stats.beta.fit determine the limits
p_beta, q_beta, a_beta, ab_beta = scipy.stats.beta.fit(data)
b_beta = a_beta + ab_beta
beta_mean, beta_var = scipy.stats.beta.stats(p_beta, q_beta, loc=a_beta, scale=(b_beta-a_beta), moments='mv')
beta_std = np.sqrt(beta_var)
moments = np.array([data_mean, data_std, beta_mean, beta_std])
# perform Kolmogorov Smirnov test
_, p_value = scipy.stats.kstest(data, "beta", [p_beta, q_beta, a_beta, ab_beta])
beta_parameters = np.array([p_beta, q_beta, a_beta, b_beta])
# determine limits of uniform distribution [a_uni, b_uni] covering the
# interval uni_interval of the beta distribution
if type(uni_interval) is list:
a_uni = uni_interval[0]
b_uni = uni_interval[1]
uni_parameters = np.array([a_uni, b_uni])
elif uni_interval > 0:
a_uni = scipy.stats.beta.ppf((1 - uni_interval) / 2, p_beta, q_beta, loc=a_beta, scale=b_beta - a_beta)
b_uni = scipy.stats.beta.ppf((1 + uni_interval) / 2, p_beta, q_beta, loc=a_beta, scale=b_beta - a_beta)
uni_parameters = np.array([a_uni, b_uni])
else:
a_uni = None
b_uni = None
uni_parameters = None
if fn_plot is not None:
plot_beta_pdf_fit(data=data,
a_beta=a_beta, b_beta=b_beta, p_beta=p_beta, q_beta=q_beta,
a_uni=a_uni, b_uni=b_uni,
interactive=True, fn_plot=fn_plot, xlabel="$x$", ylabel="$p(x)$")
return beta_parameters, moments, p_value, uni_parameters
[docs]
def mutual_coherence(array):
"""
Calculate the mutual coherence of a matrix A. It can also be referred as the cosine of the smallest angle
between two columns.
mutual_coherence = mutual_coherence(array)
Parameters
----------
array: ndarray of float
Input matrix
Returns
-------
mutual_coherence: float
Mutual coherence
"""
array = array / np.linalg.norm(array, axis=0)[np.newaxis, :]
t = np.dot(array.conj().T, array)
np.fill_diagonal(t, 0.0)
mu = np.max(t)
return mu
[docs]
def RIP(A, x):
"""
Calculate the restricted isometric property constant delta of a matrix A for a given sparse vector x.
delta = RIP(A, x)
Parameters
----------
A: ndarray of float
Input matrix
x: ndarray of float
applied vector
Returns
-------
delta: float
restricted isometric property constant
"""
norm_Ax = np.linalg.norm(np.dot(A, x))
norm_x = np.linalg.norm(x)
delta = (norm_Ax**2 / norm_x**2) - 1
return delta
[docs]
def wrap_function(fn, x, args):
"""
Function wrapper to call anonymous function with variable number of arguments (tuple).
wrap_function(fn, x, args)
Parameters
----------
fn: function
anonymous function to call
x: tuple
parameters of function
args: tuple
arguments of function
Returns
-------
function_wrapper: function
wrapped function
"""
def function_wrapper(*wrapper_args):
return fn(*(wrapper_args + x + args))
return function_wrapper
[docs]
def get_num_coeffs(order, dim):
"""
Calculate the number of gPC coefficients by the maximum order and the number of random variables.
num_coeffs = (order+dim)! / (order! * dim!)
num_coeffs = get_num_coeffs(order , dim)
Parameters
----------
order: int
Maximum order of expansion
dim: int
Number of random variables
Returns
-------
num_coeffs: int
Number of gPC coefficients and polynomials
"""
return scipy.special.factorial(order + dim) / (scipy.special.factorial(order) * scipy.special.factorial(dim))
[docs]
def get_num_coeffs_sparse(order_dim_max, order_glob_max, order_inter_max, dim, order_inter_current=None,
order_glob_max_norm=1):
"""
Calculate the number of gPC coefficients for a specific maximum order in each dimension "order_dim_max",
global maximum order "order_glob_max" and the interaction order "order_inter_max".
num_coeffs_sparse = get_num_coeffs_sparse(order_dim_max, order_glob_max, order_inter_max, dim)
Parameters
----------
order_dim_max: ndarray of int or list of int [dim]
Maximum order in each dimension
order_glob_max: int
Maximum global order of interacting polynomials
order_inter_max: int
Interaction order
dim: int
Number of random variables
order_inter_current : int
order_glob_max_norm: float
Norm, which defines how the orders are accumulated to derive the total order (default: 1-norm).
Values smaller than one restrict higher orders and shrink the basis.
Returns
-------
num_coeffs_sparse: int
Number of gPC coefficients and polynomials
"""
if order_inter_current is None:
order_inter_current = order_inter_max
if type(order_dim_max) is list:
order_dim_max = np.array(order_dim_max)
if order_dim_max.size == 1:
order_dim_max = order_dim_max * np.ones(dim)
# generate multi-index list up to maximum order
if dim == 1:
poly_idx = np.array([np.linspace(0, order_dim_max[0], int(order_dim_max[0] + 1))]).astype(int).transpose()
else:
poly_idx = get_multi_indices(order=order_dim_max,
order_max=order_glob_max,
interaction_order=order_inter_max,
order_max_norm=order_glob_max_norm,
interaction_order_current=order_inter_current)
for i_dim in range(dim):
# add multi-indexes to list when not yet included
if order_dim_max[i_dim] > order_glob_max:
poly_add_dim = np.linspace(order_glob_max + 1,
order_dim_max[i_dim],
order_dim_max[i_dim] - (order_glob_max + 1) + 1)
poly_add_all = np.zeros([poly_add_dim.shape[0], dim])
poly_add_all[:, i_dim] = poly_add_dim
poly_idx = np.vstack([poly_idx, poly_add_all.astype(int)])
# delete multi-indexes from list when they exceed individual max order of parameter
elif order_dim_max[i_dim] < order_glob_max:
poly_idx = poly_idx[poly_idx[:, i_dim] <= order_dim_max[i_dim], :]
# Consider interaction order (filter out multi-indices exceeding it)
poly_idx = poly_idx[np.sum(poly_idx > 0, axis=1) <= order_inter_max, :]
return poly_idx.shape[0]
[docs]
def get_pdf_beta(x, p, q, a, b):
"""
Calculate the probability density function of the beta distribution in the interval [a, b].
pdf = (gamma(p)*gamma(q)/gamma(p+q).*(b-a)**(p+q-1))**(-1) *
(x-a)**(p-1) * (b-x)**(q-1);
pdf = get_pdf_beta(x, p, q, a, b)
Parameters
----------
x: ndarray of float
Values of random variable
a: float
lower boundary
b: float
upper boundary
p: float
First shape parameter defining the distribution
q: float
Second shape parameter defining the distribution
Returns
-------
pdf: ndarray of float
Probability density
"""
return (scipy.special.gamma(p) * scipy.special.gamma(q) / scipy.special.gamma(p + q)
* (b - a) ** (p + q - 1)) ** (-1) * (x - a) ** (p - 1) * (b - x) ** (q - 1)
[docs]
def get_all_combinations(array, number_elements):
"""
Compute all k-tuples (e_1, e_2, ..., e_k) of combinations of the set of elements of the input array where
e_n+1 > e_n.
combinations = get_all_combinations(array, number_elements)
Parameters
----------
array: np.ndarray
Array to perform the combinatorial problem with
number_elements: int
Number of elements in tuple
Returns
-------
combinations: np.ndarray
Array of combination vectors
"""
combinations = itertools.combinations(array, number_elements)
return np.array([c for c in combinations])
[docs]
def get_multi_indices(order, order_max, interaction_order, order_max_norm=1., interaction_order_current=None):
"""
Computes all multi-indices with a maximum overall order of max_order considering a certain maximum order norm.
multi_indices = get_multi_indices(length, max_order)
Parameters
----------
order : list of int [dim]
Maximum individual expansion order
Generates individual polynomials also if maximum expansion order in order_max is exceeded
order_max : int
Maximum global expansion order.
The maximum expansion order considers the sum of the orders of combined polynomials together with the
chosen norm "order_max_norm". Typically this norm is 1 such that the maximum order is the sum of all
monomial orders.
order_max_norm : float
Norm for which the maximum global expansion order is defined [0, 1]. Values < 1 decrease the total number
of polynomials in the expansion such that interaction terms are penalized more.
sum(a_i^q)^1/q <= p, where p is order_max and q is order_max_norm (for more details see eq (11) in [1]).
interaction_order : int
Number of random variables, which can interact with each other
interaction_order_current : int, optional, default: interaction_order
Number of random variables currently interacting with respect to the highest order.
(interaction_order_current <= interaction_order)
The parameters for lower orders are all interacting with interaction_order.
Returns
-------
multi_indices: ndarray [n_basis x dim]
Multi-indices for a maximum order gPC assuming a certain order norm.
"""
dim = len(order)
order_max = int(order_max)
order = [int(o) for o in order]
if interaction_order_current is None or interaction_order_current > interaction_order:
interaction_order_current = interaction_order
else:
interaction_order_current = interaction_order_current
multi_indices = []
for i_order_max in range(order_max + 1):
s = get_all_combinations(np.arange(dim + i_order_max - 1) + 1, dim - 1)
m = s.shape[0]
s1 = np.zeros([m, 1])
s2 = (dim + i_order_max) + s1
v = np.diff(np.hstack([s1, s, s2]))
v = v - 1
if i_order_max == 0:
multi_indices = v
else:
multi_indices = np.vstack([multi_indices, v])
# remove polynomials exceeding order_max considering max_order_norm
if order_max_norm != 1:
multi_indices = multi_indices[np.linalg.norm(multi_indices, ord=order_max_norm, axis=1) <=
(order_max + 1e-6), :]
# add or delete monomials specified in order
for i_dim in range(dim):
# add multi-indexes to list when not yet included
if order[i_dim] > order_max:
multi_indices_add_dim = np.linspace(order_max + 1,
order[i_dim],
order[i_dim] - (order_max + 1) + 1)
multi_indices_add_all = np.zeros([multi_indices_add_dim.shape[0], dim])
multi_indices_add_all[:, i_dim] = multi_indices_add_dim
multi_indices = np.vstack([multi_indices, multi_indices_add_all.astype(int)])
# delete multi-indexes from list when they exceed individual max order of parameter
elif order[i_dim] < order_max:
multi_indices = multi_indices[multi_indices[:, i_dim] <= order[i_dim], :]
# Consider interaction order (filter out multi-indices exceeding it)
if interaction_order < dim:
multi_indices = multi_indices[np.sum(multi_indices > 0, axis=1) <= interaction_order, :]
# if interaction_order_current is smaller than interaction_order, delete those basis functions of highest order
if interaction_order_current < interaction_order:
mask_order_max = np.sum(multi_indices, axis=1) == order_max
mask_interaction_order = np.sum(multi_indices > 0, axis=1) > interaction_order_current
mask = np.logical_not(np.logical_and(mask_order_max, mask_interaction_order))
multi_indices = multi_indices[mask]
return multi_indices.astype(int)
[docs]
def sample_sphere(n_points, r):
"""
Creates n_points evenly spread in a sphere of radius r.
Parameters
----------
n_points: int
Number of points to be spread, must be odd
r: float
Radius of sphere
Returns
-------
points: ndarray of float [N x 3]
Evenly spread points in a unit sphere
"""
assert n_points % 2 == 1, "The number of points must be odd"
points = []
# The golden ratio
phi = (1 + math.sqrt(5)) / 2.
n = int((n_points - 1) / 2)
for i in range(-n, n + 1):
lat = math.asin(2 * i / n_points)
lon = 2 * math.pi * i / phi
x = r * math.cos(lat) * math.cos(lon)
y = r * math.cos(lat) * math.sin(lon)
z = r * math.sin(lat)
points.append((x, y, z))
points = np.array(points, dtype=float)
return points
[docs]
def mat2ten(mat, incr):
"""
Transforms gPC gradient matrix or gradient grid points from matrix to tensor form
Parameters
----------
mat : ndarray of float [n_grid*incr, m]
Matrix to transform
incr : int
Increment after every row, a new tensor slice is created
Returns
-------
ten : ndarray of float [n_grid, m, incr]
Tensor
Notes
-----
Builds chunks after every "incr" row and writes it in a new slice [i, :, :]
"""
ten = np.zeros((int(mat.shape[0]/incr), mat.shape[1], incr))
idx = np.arange(0, mat.shape[0], incr)
for i in range(incr):
ten[:, :, i] = mat[idx + i, :]
return ten
[docs]
def ten2mat(ten):
"""
Transforms gPC gradient tensor or gradient grid points from tensor to matrix form
Parameters
----------
ten : ndarray of float [n_grid, m, incr]
Tensor to transform
Returns
-------
mat : ndarray of float [n_grid*incr, m]
Matrix
Notes
-----
Stacks slices [i, :, :] vertically
"""
mat = np.vstack([ten[i, :, :].transpose() for i in range(ten.shape[0])])
return mat
[docs]
def list2dict(l):
"""
Transform list of dicts with same keys to dict of list
Parameters
----------
l : list of dict
List containing dictionaries with same keys
Returns
-------
d : dict of lists
Dictionary containing the entries in a list
"""
n = len(l)
keys = l[0].keys()
d = dict()
for key in keys:
d[key] = [0 for _ in range(n)]
for i in range(n):
d[key][i] = l[i][key]
return d
[docs]
def get_gradient_idx_domain(domains, d, gradient_idx):
"""
Determine local gradient_idx in domain "d" from global gradient_idx
Parameters
----------
domains : ndarray of float [n_grid_global]
Array containing the domain IDs
d : int
Domain ID for which the gradient index has to be computed for
gradient_idx : ndarray of int [n_grid_global]
Indices of grid points (global) where the gradient in gradient_results is provided
Returns
-------
gradient_idx_local : ndarray of int [len(domains[domains==d])]
Indices of grid points (local) where the gradient in gradient_results is provided
"""
arr = np.arange(len(domains))
gradient_idx_local = np.array([i
for i, c in enumerate(arr[domains == d]) # local
for cr in arr[gradient_idx] # global
if (c == cr).all()])
return gradient_idx_local
[docs]
def determine_projection_matrix(gradient_results, lambda_eps=0.95):
"""
Determines projection matrix [P].
.. math:: \\eta = [\\mathbf{P}] \\xi
Parameters
----------
gradient_results : ndarray of float [n_grid x n_out x dim]
Gradient of model function in grid points
lambda_eps : float, optional, default: 0.95
Bound of principal components in %. All eigenvectors are included until lambda_eps of total sum of all
eigenvalues is included in the system.
Returns
-------
p_matrix : ndarray of float [dim_reduced x dim]
Reduced projection matrix for QOI.
p_matrix_complete : ndarray of float [dim_reduced x dim]
Complete projection matrix for QOI.
"""
# Determine projection matrices by SVD of gradients
u, s, v = np.linalg.svd(gradient_results)
# determine dominant eigenvalues up to lambda_eps * s_sum
s_mask = [False]*len(s)
s_sum_part = 0
s_sum = np.sum(s)
i_s = 0
while s_sum_part <= lambda_eps*s_sum:
s_sum_part += s[i_s]
s_mask[i_s] = True
i_s += 1
s_filt = s[s_mask]
v_filt = v[np.append(s_mask, [False]*(v.shape[0]-u.shape[1])) > 0, :]
p_matrix = v_filt
p_matrix_complete = v
return p_matrix, p_matrix_complete
[docs]
def get_indices_of_k_smallest(arr, k):
"""
Find indices of k smallest elements in ndarray
Parameters
----------
arr : ndarray of float
Array
k : int
Number of smallest values to extract
Returns
-------
idx : tuple of ndarray [k]
Indices of k smallest elements in array
"""
# index = np.argpartition(arr.ravel(), k)
# idx = tuple(np.array(np.unravel_index(index, arr.shape))[:, range(min(k, 0), max(k, 0))])
#
# return idx
idx = np.argpartition(arr.ravel(), arr.size - k)[:k]
return tuple(np.unravel_index(idx, arr.shape))
[docs]
def get_coords_discontinuity(classifier, x_min, x_max, n_coords_disc=10, border_sampling="structured"):
"""
Determine n_coords_disc grid points close to discontinuity
Parameters
----------
classifier : Classifier object
Classifier object to predict classes from coordinates (needs to contain a classifier.predict() method)
x_min : ndarray of float [n_dim]
Minimal values in parameter space to search discontinuity
x_max : ndarray of float [n_dim]
Maximal values in parameter space to search discontinuity
n_coords_disc : int, optional, default: 10
Number of grid points to determine close to discontinuity
border_sampling : str, optional, default: "structured"
Sampling method to determine location of discontinuity
Returns
-------
coords_disc : ndarray of float [n_coords_disc]
"""
# if border_sampling == "random":
# coords_border_det = grid_learn_cluster.coords
# domains = model_kmeans.labels_
dim = len(x_min)
# create tensored mesh to find discontinuity
if border_sampling == "structured":
n_samples = int(1E4**(1./dim))
eval_str = "np.array(np.meshgrid("
for i in range(dim):
eval_str += "np.linspace(x_min[{}], x_max[{}], {})".format(i, i, n_samples)
if i < dim-1:
eval_str += ", "
eval_str += ")).T.reshape(-1, {})".format(dim)
coords_border_det = eval(eval_str)
domains = classifier.predict(coords_border_det)
else:
raise NotImplementedError("Please use valid border sampling method (""structured"")")
# determine mask of not equaling domain points
# dom_mat = np.tile(domains[:, np.newaxis], (1, domains.shape[0]))
dom_mat = np.broadcast_to(domains, (len(domains), len(domains))).T
mask = dom_mat != dom_mat.transpose()
mask = np.tril(mask) * False + np.triu(mask)
# determine distances between grid points in different domains
distance_matrix = np.ones(mask.shape) * np.nan
for i_c, c in enumerate(coords_border_det):
distance_matrix[i_c, mask[i_c, :]] = np.linalg.norm(coords_border_det[mask[i_c, :], :] - c, axis=1)
np.fill_diagonal(distance_matrix, np.nan)
# find n_smallest distances and determine midpoints between those points
n_smallest = 1000
idx = get_indices_of_k_smallest(distance_matrix, n_smallest)
coords_border = (coords_border_det[idx[0], :] + coords_border_det[idx[1], :]) / 2
# resample n_coords_disc equally distributed points from n_smallest points on discontinuity
n_reps = 1000 # number of repetitions
idx = np.zeros((n_coords_disc, n_reps))
distance_mean = np.zeros(n_reps)
# select n_coords_disc points randomly and determine average minimal distance between points
for i in range(n_reps):
# sample n_resample points from coords_border
idx[:, i] = random.sample(list(range(coords_border.shape[0])), n_coords_disc)
# determine all to all distances
distance_matrix_resample = np.ones((n_coords_disc, n_coords_disc)) * np.nan
for i_c, c in enumerate(coords_border[idx[:, i].astype(int), :]):
distance_matrix_resample[i_c, :] = np.linalg.norm(coords_border[idx[:, i].astype(int), :] - c, axis=1)
np.fill_diagonal(distance_matrix_resample, 1000)
distance_mean[i] = np.mean(np.min(distance_matrix_resample, axis=1))
coords_disc = coords_border[idx[:, np.argmax(distance_mean)].astype(int), :]
return coords_disc
[docs]
def increment_basis(order_current, interaction_order_current, interaction_order_max, incr):
"""
Increments basis
Parameters
----------
order_current: int
Maximum global expansion order.
The maximum expansion order considers the sum of the orders of combined polynomials together with the
chosen norm "order_max_norm". Typically this norm is 1 such that the maximum order is the sum of all
monomial orders.
interaction_order_current : int
Current number of random variables, which can interact with each other
All polynomials are ignored, which have an interaction order greater than specified
interaction_order_max : int
Maximum number of random variables, which can interact with each other
All polynomials are ignored, which have an interaction order greater than specified
incr : int
Number of sub-iteration increments
Returns
-------
order : int
Updated order
interaction_order : int
Updated interaction order
"""
while incr > 0:
if interaction_order_current + 1 <= min(order_current, interaction_order_max):
interaction_order_current += 1
else:
order_current += 1
interaction_order_current = 1
incr -= 1
return order_current, interaction_order_current
[docs]
def compute_chunks(seq, num):
"""
Splits up a sequence _seq_ into _num_ chunks of similar size.
If len(seq) < num, (num-len(seq)) empty chunks are returned so that len(out) == num
Parameters
----------
seq : list of something [N_ele]
List containing data or indices, which is divided into chunks
num : int
Number of chunks to generate
Returns
-------
out : list of num sublists
num sub-lists of seq with each of a similar number of elements (or empty).
"""
assert len(seq) > 0
assert num > 0
# assert isinstance(seq, list), f"{type(seq)} can't be chunked. Provide list."
avg = len(seq) / float(num)
n_empty = 0 # if len(seg) < num, how many empty lists to append to return?
if avg < 1:
avg = 1
n_empty = num - len(seq)
out = []
last = 0.0
while last < len(seq):
# if only one element would be left in the last run, add it to the current
if (int(last + avg) + 1) == len(seq):
last_append_idx = int(last + avg) + 1
else:
last_append_idx = int(last + avg)
out.append(seq[int(last):last_append_idx])
if (int(last + avg) + 1) == len(seq):
last += avg + 1
else:
last += avg
# append empty lists if len(seq) < num
out += [[]] * n_empty
return out
[docs]
def t_averaged_mutual_coherence(array, t=0.2):
"""
Computes the t-averaged mutual coherence.
Parameters
----------
array : ndarray of float [m x n]
Matrix
t : float
Threshold
Returns
-------
res : float
t-averaged mutual coherence
"""
array = np.abs(array)
mask = array > t
return np.sum(array[mask]) / np.sum(mask)
[docs]
def average_cross_correlation_gram(array):
"""
Computes the average cross correlation of the gram matrix.
Parameters
----------
array : ndarray of float [m x n]
Gram matrix
Returns
-------
res : float
cross correlation
"""
k = array.shape[1]
n = k * (k - 1)
return (1 / n) * (np.linalg.norm(np.identity(k) - array) ** 2)
[docs]
def PhiP(x, p=10):
"""
Calculates the Phi-p criterion of the design x with power p [1].
Parameters
----------
x : ndarray of float [n x m]
The design to calculate Phi-p for
p : int, optional, default: 10
The power used for the calculation of PhiP
Returns
-------
phip : float
Phi-p criterion
Notes
-----
.. [1] Morris, M. D., & Mitchell, T. J. (1995). Exploratory designs for computational experiments.
Journal of statistical planning and inference, 43(3), 381-402.
"""
phip = ((scipy.spatial.distance.pdist(x) ** (-p)).sum()) ** (1.0 / p)
return phip
[docs]
def poly_expand(current_set, to_expand, order_max, interaction_order):
"""
Algorithm by Gerstner and Griebel to expand polynomial basis [1] according to two criteria:
(1) The basis function may not be completely enclosed by already existing basis functions. In this case the added
basis would be already included in any case.
(2) The basis function is not a candidate if adding any basis would have no predecessors. The new basis must have
predecessors in all decreasing direction and may not "float".
Parameters
----------
current_set : ndarray of int [n_basis, dim]
Array of multi-indices of basis functions
to_expand : ndarray of int [dim]
Selected basis function (with highest gPC coefficient), which will be expanded in all possible direction
order_max : int
Maximal accumulated order (sum over all parameters)
interaction_order : int
Allowed interaction order between variables (<= dim)
Returns
-------
expand : ndarray of int [n_basis, dim]
Array of multi-indices, which will be added to the set of basis functions
Notes
-----
.. [1] T Gerstner and M Griebel. Dimension adaptive tensor product quadrature. Computing, 71:65–87, 2003.
"""
if type(current_set) is not list:
current_set = current_set.tolist()
expand = []
for e in range(len(to_expand)):
forward = copy.deepcopy(to_expand)
forward[e] += 1
has_predecessors = True
for e2 in range(len(to_expand)):
if forward[e2] > 0:
predecessor = forward.copy()
predecessor[e2] -= 1
has_predecessors *= list(predecessor) in current_set
if has_predecessors and (np.sum(np.abs(forward)) <= order_max) and (np.sum(forward > 0) <= interaction_order):
expand += [tuple(forward)]
return np.array(expand)
[docs]
def get_non_enclosed_multi_indices(multi_indices, interaction_order):
"""
Extract possible multi-indices from a given set which are potential candidates for anisotropic basis extension.
Two criteria must be met:
(1) The basis function may not be completely enclosed by already existing basis functions. In this case the added
basis would be already included in any case.
(2) The basis function is not a candidate if adding any basis would have no predecessors. The new basis must have
predecessors in all decreasing direction and may not "float".
Parameters
----------
multi_indices : ndarray of float [n_basis, dim]
Array of multi-indices of basis functions
interaction_order : int
Allowed interaction order between variables (<= dim)
Returns
-------
multi_indices_non_enclosed : ndarray of float [n_basis_candidates, dim]
Array of possible multi-indices of basis functions
poly_indices_non_enclosed : list of int
Indices of selected basis functions in global "multi-indices" array
"""
multi_indices_non_enclosed = []
if type(multi_indices) is not list:
multi_indices = multi_indices.tolist()
dim = len(multi_indices[0])
for m in np.array(multi_indices):
for i_dim in range(dim):
m_test = copy.deepcopy(np.array(m))
m_test[i_dim] = m_test[i_dim] + 1
has_predecessors = True
if np.sum(m_test > 0) <= interaction_order:
if list(m_test) not in multi_indices:
for e2 in range(dim):
if m_test[e2] > 0:
predecessor = copy.deepcopy(m_test)
predecessor[e2] = predecessor[e2] - 1
has_predecessors *= list(predecessor) in multi_indices
if has_predecessors:
multi_indices_non_enclosed.append(m)
multi_indices_non_enclosed = np.unique(multi_indices_non_enclosed, axis=0)
poly_indices_non_enclosed = [np.where((multi_indices == m).all(axis=1))[0][0] for m in multi_indices_non_enclosed]
return multi_indices_non_enclosed, poly_indices_non_enclosed
[docs]
def plot_basis_by_multiindex(a):
"""
Parameters
----------
a
Returns
-------
"""
fig = plt.figure(figsize=(4, 4))
ax = fig.add_subplot(111, projection='3d')
for i in range(len(a)):
e = 0.3
x = a[i][0] + e
y = a[i][1] + e
z = a[i][2] + e
vertices = [[(x + e, y + e, z + e), (x + e, y + e, z - e), (x + e, y - e, z - e), (x + e, y - e, z + e)],
[(x - e, y + e, z + e), (x - e, y + e, z - e), (x - e, y - e, z - e), (x - e, y - e, z + e)],
[(x + e, y + e, z + e), (x + e, y + e, z - e), (x - e, y + e, z - e), (x - e, y + e, z + e)],
[(x + e, y - e, z + e), (x + e, y - e, z - e), (x - e, y - e, z - e), (x - e, y - e, z + e)],
[(x + e, y + e, z + e), (x + e, y - e, z + e), (x - e, y - e, z + e), (x - e, y + e, z + e)],
[(x + e, y + e, z - e), (x + e, y - e, z - e), (x - e, y - e, z - e), (x - e, y + e, z - e)]]
poly = Poly3DCollection(vertices, alpha=0.5)
ax.add_collection3d(poly)
for j in range(6):
line_xs = np.hstack((np.array(vertices[j]).T[0], np.array(vertices[j]).T[0][0]))
line_ys = np.hstack((np.array(vertices[j]).T[1], np.array(vertices[j]).T[1][0]))
line_zs = np.hstack((np.array(vertices[j]).T[2], np.array(vertices[j]).T[2][0]))
plt.plot(line_xs, line_ys, line_zs, color='k')
ax.set_xlim(0, 3)
ax.set_ylim(0, 3)
ax.set_zlim(0, 3)
plt.show()