import os
import copy
import h5py
import time
import shutil
import warnings
import numpy as np
from .Problem import *
from .SGPC import *
from .misc import determine_projection_matrix, poly_expand, get_non_enclosed_multi_indices
from .misc import get_num_coeffs_sparse
from .misc import ten2mat
from .misc import mat2ten
from .misc import get_gradient_idx_domain
from .misc import get_coords_discontinuity
from .misc import increment_basis
from .misc import get_coords_discontinuity
from .misc import get_num_coeffs_sparse
from .testfunctions import Dummy
from .Grid import *
from .MEGPC import *
from .Classifier import Classifier
from .Gradient import get_gradient
[docs]
class Algorithm(object):
"""
Class for GPC algorithms
Parameters
----------
problem : Problem object
Object instance of gPC problem to investigate
options : dict
Algorithm specific options (see sub-classes for more details)
grid : Grid object
Grid object
validation : ValidationSet object
ValidationSet object
"""
def __init__(self, problem, options, grid=None, validation=None):
"""
Constructor; Initializes GPC algorithm
"""
self.problem = problem
self.problem_reduced = []
self.validation = validation
self.options = options
self.grid = grid
self.grid_gradient = []
self.qoi_specific = None
# Generate results folder if it doesn't exist
if self.options["fn_results"] is not None:
if not os.path.exists(os.path.split(self.options["fn_results"])[0]):
os.makedirs(os.path.split(self.options["fn_results"])[0])
self.check_basic_options()
[docs]
def check_basic_options(self):
"""
Checks self.options dictionary and sets default
options["eps"] : float, optional, default=1e-3
Relative mean error of leave-one-out cross validation
options["error_norm"] : str, optional, default="relative"
Choose if error is determined "relative" or "absolute". Use "absolute" error when the
model generates outputs equal to zero.
options["error_type"] : str, optional, default="loocv"
Choose type of error to validate gpc approximation. Use "loocv" (Leave-one-Out cross validation)
to omit any additional calculations and "nrmsd" (normalized root mean square deviation) to compare
against a Problem.ValidationSet.
options["fn_results"] : string, optional, default=None
If provided, model evaluations are saved in fn_results.hdf5 file and gpc object in fn_results.pkl file
options["gradient_enhanced"] : boolean, optional, default: False
Use gradient information to determine the gPC coefficients.
options["gradient_calculation"] : str, optional, default="standard_forward"
Type of the calculation scheme to determine the gradient in the grid points
- "FD_fwd": Finite difference forward approximation of the gradient using n_grid x dim additional sampling
points stored in self.grid.coords_gradient and self.grid.coords_gradient_norm [n_grid x dim x dim].
- "FD_1st": Finite difference approximation of 1st order accuracy using only the available samples [1]
- "FD_2nd": Finite difference approximation of 2nd order accuracy using only the available samples [1]
- "FD_1st2nd": Finite difference approximation of 1st and (where possible) 2nd order accuracy
options["gradient_calculation_options"] : dict, optional, default: {"dx": 0.01, "distance_weight": -2}
Options for gradient calculation (details in get_gradient() function in Gradient.py)
options["backend"] : str, optional, default: "python"
Default computing backend, certain functions can be computed with Multicore-CPU or GPU acceleration
options["lambda_eps_gradient"] : float, optional, default: 0.95
Bound of principal components in %. All eigenvectors are included until lambda_eps of total sum of all
eigenvalues is included in the system.
options["matrix_ratio"]: float, optional, default=1.5
Ration between the number of model evaluations and the number of basis functions.
If "adaptive_sampling" is activated this factor is only used to
construct the initial grid depending on the initial number of basis functions determined by "order_start".
(>1 results in an overdetermined system)
options["matlab_model"] : boolean, optional, default: False
Use a Matlab model function
options["method"]: str
GPC method to apply ['Reg', 'Quad']
options["n_cpu"] : int, optional, default=1
Number of threads to use for parallel evaluation of the model function.
options["n_samples_validation"] : int, optional, default: 1e4
Number of validation points used to determine the NRMSD if chosen as "error_type". Does not create a
validation set if there is already one present in the Problem instance (problem.validation).
options["print_func_time"] : boolean, optional, default: False
Print function evaluation time for every single run
options["projection"] : boolean, optional, default: False
Use projection approach
options["solver"]: str
Solver to determine the gPC coefficients
- 'Moore-Penrose' ... Pseudoinverse of gPC matrix (SGPC.Reg, EGPC)
- 'OMP' ... Orthogonal Matching Pursuit, sparse recovery approach (SGPC.Reg, EGPC)
options["settings"]: dict
Solver settings
- 'Moore-Penrose' ... None
- 'OMP' ... {"n_coeffs_sparse": int} Number of gPC coefficients != 0
options["verbose"] : boolean, optional, default=True
Print output of iterations and sub-iterations (True/False)
options["backend"] : str
Backend for performance intensive computations
- "python" ... Use native python implementation
- "cpu" .. Use C Implementaion without multicore-support
options["plot_basis"] : bool
Plot basis functions and save as fn_results + _basis_iter#.png
options["grid_extension_method"] : str, optional, default: GPR
Method to extend random grids when adaptive_sampling is turned on:
- "GPR": Gaussian Process Regression (sample location is optimized according to posterior variance)
- "random": Samples are added randomly
"""
if "eps" not in self.options.keys():
self.options["eps"] = 1e-3
if "error_norm" not in self.options.keys():
self.options["error_norm"] = "relative"
if "error_type" not in self.options.keys():
self.options["error_type"] = "loocv"
if "fn_results" not in self.options.keys():
self.options["fn_results"] = None
if "gradient_enhanced" not in self.options.keys():
self.options["gradient_enhanced"] = False
if "gradient_calculation" not in self.options.keys():
self.options["gradient_calculation"] = "FD_fwd"
if "gradient_calculation_options" not in self.options.keys():
self.options["gradient_calculation_options"] = {"dx": 0.001, "distance_weight": -2}
if "dx" not in self.options["gradient_calculation_options"]:
self.options["gradient_calculation_options"]["dx"] = 0.001
if "distance_weight" not in self.options["gradient_calculation_options"]:
self.options["gradient_calculation_options"]["distance_weight"] = -2
if "backend" not in self.options.keys():
self.options["backend"] = "python"
if "lambda_eps_gradient" not in self.options.keys():
self.options["lambda_eps_gradient"] = 0.95
if "matrix_ratio" not in self.options.keys():
self.options["matrix_ratio"] = 2
if "matlab_model" not in self.options.keys():
self.options["matlab_model"] = False
if "method" in self.options.keys():
if self.options["method"] == "quad":
self.options["solver"] = 'NumInt'
self.options["settings"] = None
elif self.options["method"] == "reg" and not (self.options["solver"] == "Moore-Penrose" or
self.options["solver"] == "OMP" or
self.options["solver"] == "LarsLasso"):
raise AssertionError("Please specify 'Moore-Penrose' or 'OMP' as solver for 'reg' method")
if "n_cpu" in self.options.keys():
self.n_cpu = self.options["n_cpu"]
else:
self.options["n_cpu"] = 1
self.n_cpu = 1
if "n_samples_validation" not in self.options.keys():
self.options["n_samples_validation"] = 1e4
if "save_session_format" not in self.options.keys():
self.options["save_session_format"] = ".hdf5"
elif self.options["save_session_format"] not in [".hdf5", ".pkl"]:
self.options["save_session_format"] = ".hdf5"
elif self.options["save_session_format"] in [".hdf5"]:
self.options["save_session_format"] = ".hdf5"
elif self.options["save_session_format"] in [".pkl"]:
self.options["save_session_format"] = ".pkl"
if self.options["fn_results"] is not None:
self.options["fn_session"] = os.path.splitext(self.options["fn_results"])[0] + \
self.options["save_session_format"]
if self.options["save_session_format"] == ".hdf5":
self.options["fn_session_folder"] = "session"
else:
self.options["fn_session_folder"] = None
else:
self.options["fn_session"] = None
self.options["fn_session_folder"] = None
if "print_func_time" not in self.options.keys():
self.options["print_func_time"] = False
if "projection" not in self.options.keys():
self.options["projection"] = False
if "seed" not in self.options.keys():
self.options["seed"] = None
if self.options["solver"] == "Moore-Penrose":
self.options["settings"] = None
if self.options["solver"] == "OMP" and ("settings" not in self.options.keys() or not (
"n_coeffs_sparse" not in self.options["settings"].keys() or
"sparsity" not in self.options["settings"].keys())):
raise AssertionError("Please specify correct solver settings for OMP in 'settings'")
if self.options["solver"] == "LarsLasso":
if "settings" in self.options.keys():
if type(self.options["settings"]) is dict:
if "alpha" not in self.options["settings"].keys():
self.options["settings"]["alpha"] = 1e-5
else:
self.options["settings"] = {"alpha": 1e-5}
else:
self.options["settings"] = {"alpha": 1e-5}
if "verbose" not in self.options.keys():
self.options["verbose"] = True
if "grid" not in self.options.keys():
self.options["grid"] = Random
self.options["grid_options"] = None
if "backend" not in self.options.keys():
self.options["backend"] = "python"
if "n_grid" not in self.options.keys():
self.options["n_grid"] = None
if "adaptive_sampling" not in self.options.keys():
self.options["adaptive_sampling"] = False
if "plot_basis" not in self.options.keys():
self.options["plot_basis"] = False
if "grid_extension_method" not in self.options.keys():
self.options["grid_extension_method"] = "GPR"
[docs]
def check_results(self, results, grid, gradient_results=None, gradient_results_idx=None, com=None, resample=True):
"""
Check the validity of the results and resample if required.
Updates the gPC object, the containing grid, and the results array.
Parameters
----------
results : np.ndarray of float [n_samples x n_qoi]
Model output at sampling points.
grid : Grid object instance
Grid object instance the results are computed for.
gradient_results : ndarray of float [n_grid x n_out x dim], optional, default: None
Gradient of model function in grid points.
gradient_results_idx : ndarray of int [n_grid], optional, default: None
Indices of grid points where the gradient was evaluated.
com : Computation class instance, optional, default: None
Computation class instance to run the model if resample is True.
resample : bool, optional, default: True
Resample grid points and rerun model (requires Computational class instance to run model).
If False, the grid points and results are just deleted.
Returns
-------
results : np.ndarray of float [n_samples x n_qoi]
Updated (fixed) model output at sampling points.
gpc : SGPC or MEGPC object instance or list of SGPC or MEGPC object instances
GPC object(s) containing the basis functions and the updated grid.
gradient_results : ndarray of float [n_grid x n_out x dim]
Updated (fixed) gradients of model function in grid points not containing the points where
the gradients were NaN.
gradient_results_idx : ndarray of int [n_grid], optional, default: None
Updated (fixed) indices of grid points where the gradient was evaluated not containing the points where
the gradients were NaN.
grid : Grid object instance
Updated (fixed) grid object instance the results are computed for not containing the grid points where
the results were NaN.
"""
# get the indices of the sampling points where any of the QOIs were NaN
idx_nan = np.unique(np.where(np.isnan(results))[0])
idx_nan_gradient = np.array([])
if gradient_results is not None:
idx_nan_gradient_local = np.unique(np.where(np.isnan(gradient_results))[0])
if len(idx_nan_gradient_local) > 0:
idx_nan_gradient = gradient_results_idx[idx_nan_gradient_local]
idx_nan = np.unique(np.hstack((idx_nan, idx_nan_gradient)).astype(int))
if resample:
while len(idx_nan) > 0:
if self.options["verbose"]:
print(f"WARNING! Detected {len(idx_nan)} grid points with NaN results. Resampling ...")
# resample grid points
grid.resample(idx=idx_nan)
# determine results at resampled grid points
results_resample = com.run(model=self.problem.model,
problem=self.problem,
coords=grid.coords[idx_nan, :],
coords_norm=grid.coords_norm[idx_nan, :],
i_iter=None,
i_subiter=None,
fn_results=None,
print_func_time=self.options["print_func_time"],
verbose=self.options["verbose"])
# Determine gradient [n_grid x n_out x dim]
if gradient_results is not None:
if self.options["gradient_calculation"] == "FD_fwd":
# for forward gradient calculation only pass the resampled grid points
grid_gradient = copy.deepcopy(grid)
idx_not_nan = np.array([i for i in range(grid.n_grid) if i not in idx_nan])
grid_gradient.delete(idx=idx_not_nan)
else:
# for gradient calculation from adjacent grid points pass the complete grid
grid_gradient = grid
gradient_results_resample, gradient_results_idx_resample = get_gradient(
model=self.problem.model,
problem=self.problem,
grid=grid_gradient,
results=results_resample,
com=com,
method=self.options["gradient_calculation"],
gradient_results_present=None,
gradient_idx_skip=None,
i_iter=None,
i_subiter=None,
print_func_time=self.options["print_func_time"],
dx=self.options["gradient_calculation_options"]["dx"],
distance_weight=self.options["gradient_calculation_options"]["distance_weight"],
verbose=self.options["verbose"])
if self.options["gradient_calculation"] == "FD_fwd":
gradient_results[idx_nan, :, :] = gradient_results_resample
gradient_results_idx[idx_nan] = idx_nan[gradient_results_idx_resample]
else:
gradient_results = gradient_results_resample
gradient_results_idx = gradient_results_idx_resample
# replace NaN results with new results at resampled grid points
results[idx_nan, :] = results_resample
# get the indices of the sampling points where any of the QOIs where NaN
idx_nan = np.unique(np.where(np.isnan(results))[0])
else:
# remove grid points with NaN results
grid.delete(idx=idx_nan)
# remove NaN results
results = np.delete(results, idx_nan, axis=0)
if gradient_results is not None:
gradient_results = np.delete(gradient_results, idx_nan_gradient_local, axis=0)
gradient_results_idx = np.delete(gradient_results_idx, idx_nan_gradient_local, axis=0)
return results, gradient_results, gradient_results_idx, grid
[docs]
class Static_IO(Algorithm):
"""
Static gPC algorithm, which uses precomputed input output relationships to construct the gPC approximation
Parameters
----------
parameters: OrderedDict containing the RandomParameter class instances
Dictionary (ordered) containing the properties of the random parameters
options["order"]: list of int [dim]
Maximum individual expansion order [order_1, order_2, ..., order_dim].
Generates individual polynomials also if maximum expansion order in order_max is exceeded
options["order_max"]: int
Maximum global expansion order.
The maximum expansion order considers the sum of the orders of combined polynomials together with the
chosen norm "order_max_norm". Typically this norm is 1 such that the maximum order is the sum of all
monomial orders.
options["order_max_norm"]: float
Norm for which the maximum global expansion order is defined [0, 1]. Values < 1 decrease the total number
of polynomials in the expansion such that interaction terms are penalized more. This truncation scheme
is also referred to "hyperbolic polynomial chaos expansion" such that sum(a_i^q)^1/q <= p,
where p is order_max and q is order_max_norm (for more details see eq. (27) in [1]).
options["interaction_order"]: int
Number of random variables, which can interact with each other.
All polynomials are ignored, which have an interaction order greater than the specified
grid: Grid object instance
Grid object to use for static gPC (Random, SparseGrid, TensorGrid) containing the parameter values, where the
output relations were calculated
results: ndarray of float [N_grid x N_qoi]
Model output at each grid point for each QOI
validation: Validation Set class instance, optional
Validation set containing reference solutions at precomputed grid points
Examples
--------
>>> import pygpc
>>> # initialize static gPC algorithm using precomputed IO relationships
>>> algorithm = pygpc.Static_IO(parameters=parameters, options=options, grid=grid, results=results)
>>> # run algorithm
>>> gpc, coeffs = algorithm.run()
"""
def __init__(self, parameters, options, results, grid, validation=None):
"""
Constructor; Initializes static gPC algorithm
"""
# create dummy model
model = Dummy()
# create dummy problem
problem = Problem(model, parameters)
super(Static_IO, self).__init__(problem=problem, options=options, validation=validation, grid=grid)
self.res = results
if "order" not in self.options.keys():
raise AssertionError("Please specify 'order'=[order_1, order_2, ..., order_dim] in options dictionary")
if "order_max" not in self.options.keys():
raise AssertionError("Please specify 'order_max' in options dictionary")
if "order_max_norm" not in self.options.keys():
self.options["order_max_norm"] = 1.
if "interaction_order" not in self.options.keys():
self.options["interaction_order"] = self.problem.dim
#if "error_type" not in self.options.keys():
# self.options["error_type"] = "loocv"
#if self.options["error_type"] != "loocv":
# self.options["error_type"] = "loocv"
#warnings.warn("Changing error calculation type to loocv ...")
[docs]
def run(self):
"""
Runs static gPC algorithm using precomputed IO relationships to construct surrogate model.
Returns
-------
gpc : GPC object instance
GPC object containing all information i.e., Problem, Model, Grid, Basis, RandomParameter instances
coeffs: ndarray of float [n_basis x n_out]
GPC coefficients
"""
if self.options["fn_results"] is not None:
fn_results = os.path.splitext(self.options["fn_results"])[0]
if os.path.exists(fn_results + ".hdf5"):
os.remove(fn_results + ".hdf5")
else:
fn_results = None
# Initialize gPC object
gpc = Reg(problem=self.problem,
order=self.options["order"],
order_max=self.options["order_max"],
order_max_norm=self.options["order_max_norm"],
interaction_order=self.options["interaction_order"],
interaction_order_current=self.options["interaction_order"],
options=self.options,
validation=self.validation)
gpc.backend = self.options["backend"]
# determine number of basis functions
n_coeffs = get_num_coeffs_sparse(order_dim_max=self.options["order"],
order_glob_max=self.options["order_max"],
order_inter_max=self.options["interaction_order"],
dim=self.problem.dim)
print(f" > Determining {n_coeffs} gPC coeffs with {self.res.shape[0]} simulations!")
# Write grid in gpc object
gpc.grid = self.grid
# Initialize gpc matrix
gpc.init_gpc_matrix()
# Compute gpc coefficients
coeffs = gpc.solve(results=self.res,
solver=self.options["solver"],
settings=self.options["settings"],
verbose=self.options["verbose"])
# validate gpc approximation (determine nrmsd or loocv specified in options["error_type"])
eps = gpc.validate(coeffs=coeffs, results=self.res)
iprint("-> {} {} error = {}".format(self.options["error_norm"],
self.options["error_type"],
eps), tab=0, verbose=self.options["verbose"])
# save gpc object and gpc coeffs
if self.options["fn_results"] is not None:
with h5py.File(fn_results + ".hdf5", "a") as f:
f.create_dataset("misc/fn_session",
data=np.array([os.path.split(self.options["fn_session"])[1]]).astype("|S"))
f.create_dataset("misc/fn_session_folder",
data=np.array([self.options["fn_session_folder"]]).astype("|S"))
f.create_dataset("misc/error_type", data=self.options["error_type"])
f.create_dataset("error", data=eps, maxshape=None, dtype="float64")
f.create_dataset("grid/coords", maxshape=None, data=gpc.grid.coords, dtype="float64")
f.create_dataset("grid/coords_norm", maxshape=None, data=gpc.grid.coords_norm, dtype="float64")
if gpc.grid.coords_gradient is not None:
f.create_dataset("grid/coords_gradient", data=gpc.grid.coords_gradient,
maxshape=None, dtype="float64")
f.create_dataset("grid/coords_gradient_norm", data=gpc.grid.coords_gradient_norm,
maxshape=None, dtype="float64")
f.create_dataset("coeffs", data=coeffs,
maxshape=None, dtype="float64")
f.create_dataset("gpc_matrix", data=gpc.gpc_matrix,
maxshape=None, dtype="float64")
if gpc.gpc_matrix_gradient is not None:
f.create_dataset("gpc_matrix_gradient",
data=gpc.gpc_matrix_gradient, maxshape=None, dtype="float64")
f.create_dataset("model_evaluations/results", data=self.res, maxshape=None, dtype="float64")
if gpc.validation is not None:
f.create_dataset("validation/model_evaluations/results", data=gpc.validation.results,
maxshape=None, dtype="float64")
f.create_dataset("validation/grid/coords", data=gpc.validation.grid.coords,
maxshape=None, dtype="float64")
f.create_dataset("validation/grid/coords_norm", data=gpc.validation.grid.coords_norm,
maxshape=None, dtype="float64")
return gpc, coeffs, self.res
[docs]
class Static(Algorithm):
"""
Static gPC algorithm
Parameters
----------
problem : Problem object
Object instance of gPC problem to investigate
options["method"]: str
GPC method to apply ['Reg', 'Quad']
options["order"]: list of int [dim]
Maximum individual expansion order [order_1, order_2, ..., order_dim].
Generates individual polynomials also if maximum expansion order in order_max is exceeded
options["order_max"]: int
Maximum global expansion order.
The maximum expansion order considers the sum of the orders of combined polynomials together with the
chosen norm "order_max_norm". Typically this norm is 1 such that the maximum order is the sum of all
monomial orders.
options["order_max_norm"]: float
Norm for which the maximum global expansion order is defined [0, 1]. Values < 1 decrease the total number
of polynomials in the expansion such that interaction terms are penalized more. This truncation scheme
is also referred to "hyperbolic polynomial chaos expansion" such that sum(a_i^q)^1/q <= p,
where p is order_max and q is order_max_norm (for more details see eq. (27) in [1]).
options["interaction_order"]: int
Number of random variables, which can interact with each other.
All polynomials are ignored, which have an interaction order greater than the specified
grid: Grid object instance
Grid object to use for static gPC (Random, SparseGrid, TensorGrid)
validation: Validation Set class instance, optional
Validation set containing reference solutions at precomputed grid points
Notes
-----
.. [1] Blatman, G., & Sudret, B. (2011). Adaptive sparse polynomial chaos expansion based on least angle
regression. Journal of Computational Physics, 230(6), 2345-2367.
Examples
--------
>>> import pygpc
>>> # initialize static gPC algorithm
>>> algorithm = pygpc.Static(problem=problem, options=options, grid=grid)
>>> # run algorithm
>>> gpc, coeffs, results = algorithm.run()
"""
def __init__(self, problem, options, grid=None, validation=None, gpc=None):
"""
Constructor; Initializes static gPC algorithm
"""
super(Static, self).__init__(problem=problem, options=options, validation=validation, grid=grid)
self.qoi_specific = False
self.gpc = gpc
# check contents of settings dict and set defaults
if "method" not in self.options.keys():
raise AssertionError("Please specify 'method' with either 'reg' or 'quad' in options dictionary")
if "order" not in self.options.keys():
raise AssertionError("Please specify 'order'=[order_1, order_2, ..., order_dim] in options dictionary")
if "order_max" not in self.options.keys():
raise AssertionError("Please specify 'order_max' in options dictionary")
if "order_max_norm" not in self.options.keys():
self.options["order_max_norm"] = 1.
if "interaction_order" not in self.options.keys():
self.options["interaction_order"] = self.problem.dim
[docs]
def run(self):
"""
Runs static gPC algorithm to solve problem.
Returns
-------
gpc : GPC object instance
GPC object containing all information i.e., Problem, Model, Grid, Basis, RandomParameter instances
coeffs: ndarray of float [n_basis x n_out]
GPC coefficients
res : ndarray of float [n_grid x n_out]
Simulation results at n_grid points of the n_out output variables
"""
if self.options["fn_results"] is not None:
fn_results = os.path.splitext(self.options["fn_results"])[0]
if os.path.exists(fn_results + ".hdf5"):
os.remove(fn_results + ".hdf5")
else:
fn_results = None
grad_res_3D = None
gradient_idx = None
# Create gPC object
if self.options["method"] == "reg":
if self.gpc is None:
gpc = Reg(problem=self.problem,
order=self.options["order"],
order_max=self.options["order_max"],
order_max_norm=self.options["order_max_norm"],
interaction_order=self.options["interaction_order"],
interaction_order_current=self.options["interaction_order"],
options=self.options,
validation=self.validation)
else:
gpc = self.gpc
gpc.options["fn_results"] = self.options["fn_results"]
gpc.fn_results = self.options["fn_results"]
elif self.options["method"] == "quad":
gpc = Quad(problem=self.problem,
order=self.options["order"],
order_max=self.options["order_max"],
order_max_norm=self.options["order_max_norm"],
interaction_order=self.options["interaction_order"],
interaction_order_current=self.options["interaction_order"],
options=self.options,
validation=self.validation)
else:
raise AssertionError("Please specify correct gPC method ('reg' or 'quad')")
gpc.backend = self.options["backend"]
# determine number of basis functions
n_coeffs = get_num_coeffs_sparse(order_dim_max=self.options["order"],
order_glob_max=self.options["order_max"],
order_inter_max=self.options["interaction_order"],
dim=self.problem.dim)
if self.options["n_grid"] is not None:
n_grid = self.options["n_grid"]
else:
n_grid = self.options["matrix_ratio"] * n_coeffs
# Write grid in gpc object
if self.grid is not None:
if self.options["method"] == "reg":
print(f"Using user-predefined grid with n_grid={self.grid.n_grid}")
gpc.grid = self.options["grid"](parameters_random=self.problem.parameters_random,
coords=self.grid.coords,
coords_norm=self.grid.coords_norm,
coords_gradient=self.grid.coords_gradient,
coords_gradient_norm=self.grid.coords_gradient_norm,
options=self.options["grid_options"])
elif self.options["method"] == "quad":
gpc.grid = self.grid
elif self.options["grid"] == Random or self.options["grid"] == LHS or self.options["grid"] == GP:
gpc.grid = self.options["grid"](parameters_random=self.problem.parameters_random,
n_grid=n_grid,
options=self.options["grid_options"])
elif self.options["grid"] == L1 or self.options["grid"] == L1_LHS or self.options["grid"] == LHS_L1\
or self.options["grid"] == FIM or self.options["grid"] == CO:
gpc.grid = self.options["grid"](parameters_random=self.problem.parameters_random,
n_grid=n_grid,
options=self.options["grid_options"],
gpc=gpc)
else:
raise ValueError("Grid not provided and specified grid type not known!")
gpc.interaction_order_current = copy.deepcopy(self.options["interaction_order"])
# Initialize parallel Computation class
com = Computation(n_cpu=self.n_cpu, matlab_model=self.options["matlab_model"])
eps = self.options["eps"] + 1
eps_pre = eps + 1
i_grid = 0
res = np.array([])
# determine gpc approximation and determine error (increase grid size in case of adaptive sampling)
while eps > self.options["eps"]:
# Run simulations
iprint("Performing {} simulations!".format(gpc.grid.n_grid - i_grid),
tab=0, verbose=self.options["verbose"])
start_time = time.time()
res_new = com.run(model=self.problem.model,
problem=self.problem,
coords=gpc.grid.coords[i_grid:gpc.grid.n_grid, :],
coords_norm=gpc.grid.coords_norm[i_grid:gpc.grid.n_grid, :],
i_iter=gpc.order_max,
i_subiter=gpc.interaction_order,
fn_results=None,
print_func_time=self.options["print_func_time"],
verbose=self.options["verbose"])
if len(res) > 0:
res = np.vstack((res, res_new))
else:
res = res_new
i_grid = gpc.grid.n_grid
iprint('Total parallel function evaluation: ' + str(time.time() - start_time) + ' sec',
tab=0, verbose=self.options["verbose"])
# Determine gradient [n_grid x n_out x dim]
if self.options["gradient_enhanced"]:
start_time = time.time()
grad_res_3D, gradient_idx = get_gradient(model=self.problem.model,
problem=self.problem,
grid=gpc.grid,
results=res,
com=com,
method=self.options["gradient_calculation"],
gradient_results_present=None,
gradient_idx_skip=None,
i_iter=gpc.order_max,
i_subiter=gpc.interaction_order,
print_func_time=self.options["print_func_time"],
dx=self.options["gradient_calculation_options"]["dx"],
distance_weight=self.options["gradient_calculation_options"]["distance_weight"],
verbose=self.options["verbose"])
iprint('Gradient evaluation: ' + str(time.time() - start_time) + ' sec',
tab=0, verbose=self.options["verbose"])
# check validity of results and resample in case the model could not be evaluated at some sampling points
res, grad_res_3D, gradient_idx, gpc.grid = self.check_results(results=res,
gradient_results=grad_res_3D,
gradient_results_idx=gradient_idx,
grid=gpc.grid,
com=com)
# Initialize gpc matrix
gpc.init_gpc_matrix(gradient_idx=gradient_idx)
# Compute gpc coefficients
coeffs = gpc.solve(results=res,
gradient_results=grad_res_3D,
solver=self.options["solver"],
settings=self.options["settings"],
verbose=self.options["verbose"])
# create validation set if necessary
if self.options["error_type"] == "nrmsd" and gpc.validation is None:
gpc.create_validation_set(n_samples=self.options["n_samples_validation"],
n_cpu=self.options["n_cpu"])
# validate gpc approximation (determine nrmsd or loocv specified in options["error_type"])
eps = gpc.validate(coeffs=coeffs, results=res, gradient_results=grad_res_3D)
iprint("-> {} {} error = {}".format(self.options["error_norm"],
self.options["error_type"],
eps), tab=0, verbose=self.options["verbose"])
if not self.options["adaptive_sampling"]: # (0 < (eps_pre-eps)/eps < 0.01):
break
if eps > self.options["eps"]:
# extend grid by 5% of number of basis functions and restart loop
n_grid_new = gpc.grid.n_grid + 1 # int(np.ceil(gpc.grid.n_grid + 5e-2 * gpc.basis.n_basis))
iprint('Extending grid from {} to {} by {} sampling points using grid_extension_method {}'.format(
gpc.grid.n_grid, n_grid_new, n_grid_new - gpc.grid.n_grid, self.options["grid_extension_method"]),
tab=0, verbose=self.options["verbose"])
if self.options["grid_extension_method"] == "GPR":
gpc.grid.extend_random_grid(n_grid_new=n_grid_new, results=res, type="GP")
else:
gpc.grid.extend_random_grid(n_grid_new=n_grid_new)
# eps_pre = eps
# save gpc object and gpc coeffs
if self.options["fn_results"] is not None:
with h5py.File(fn_results + ".hdf5", "a") as f:
f.create_dataset("misc/fn_session",
data=np.array([os.path.split(self.options["fn_session"])[1]]).astype("|S"))
f.create_dataset("misc/fn_session_folder",
data=np.array([self.options["fn_session_folder"]]).astype("|S"))
f.create_dataset("misc/error_type", data=self.options["error_type"])
f.create_dataset("error", data=eps, maxshape=None, dtype="float64")
f.create_dataset("grid/coords", maxshape=None, data=gpc.grid.coords, dtype="float64")
f.create_dataset("grid/coords_norm", maxshape=None, data=gpc.grid.coords_norm, dtype="float64")
if gpc.grid.coords_gradient is not None:
f.create_dataset("grid/coords_gradient", data=gpc.grid.coords_gradient,
maxshape=None, dtype="float64")
f.create_dataset("grid/coords_gradient_norm", data=gpc.grid.coords_gradient_norm,
maxshape=None, dtype="float64")
f.create_dataset("coeffs", data=coeffs,
maxshape=None, dtype="float64")
f.create_dataset("gpc_matrix", data=gpc.gpc_matrix,
maxshape=None, dtype="float64")
if gpc.gpc_matrix_gradient is not None:
f.create_dataset("gpc_matrix_gradient",
data=gpc.gpc_matrix_gradient, maxshape=None, dtype="float64")
f.create_dataset("model_evaluations/results", data=res, maxshape=None, dtype="float64")
if grad_res_3D is not None:
f.create_dataset("model_evaluations/gradient_results", data=ten2mat(grad_res_3D),
maxshape=None, dtype="float64")
f.create_dataset("model_evaluations/gradient_results_idx", data=gpc.gradient_idx,
maxshape=None, dtype="int64")
if gpc.validation is not None:
f.create_dataset("validation/model_evaluations/results", data=gpc.validation.results,
maxshape=None, dtype="float64")
f.create_dataset("validation/grid/coords", data=gpc.validation.grid.coords,
maxshape=None, dtype="float64")
f.create_dataset("validation/grid/coords_norm", data=gpc.validation.grid.coords_norm,
maxshape=None, dtype="float64")
com.close()
return gpc, coeffs, res
[docs]
class MEStatic(Algorithm):
"""
Multi-Element Static gPC algorithm
Parameters
----------
problem : Problem object
Object instance of gPC problem to investigate
options["method"]: str
GPC method to apply ['Reg', 'Quad']
options["order"]: list of int [dim]
Maximum individual expansion order [order_1, order_2, ..., order_dim].
Generates individual polynomials also if maximum expansion order in order_max is exceeded
options["order_max"]: int
Maximum global expansion order.
The maximum expansion order considers the sum of the orders of combined polynomials together with the
chosen norm "order_max_norm". Typically this norm is 1 such that the maximum order is the sum of all
monomial orders.
options["order_max_norm"]: float
Norm for which the maximum global expansion order is defined [0, 1]. Values < 1 decrease the total number
of polynomials in the expansion such that interaction terms are penalized more. This truncation scheme
is also referred to "hyperbolic polynomial chaos expansion" such that sum(a_i^q)^1/q <= p,
where p is order_max and q is order_max_norm (for more details see eq. (27) in [1]).
options["interaction_order"]: int
Number of random variables, which can interact with each other.
All polynomials are ignored, which have an interaction order greater than the specified
options["qoi"] : int or str, optional, default: 0
Choose for which QOI the projection is determined for. The other QOIs use the same projection.
Alternatively, the projection can be determined for every QOI independently (qoi_index or "all").
options["classifier"] : str, optional, default: "learning"
Classification algorithm to subdivide parameter domain.
- "learning" ... ClassifierLearning algorithm based on Unsupervised and supervised learning
options["classifier_options"] : dict, optional, default: default settings
Options of classifier
grid: Grid object instance
Grid object to use for static gPC (Random, SparseGrid, TensorGrid)
Notes
-----
.. [1] Blatman, G., & Sudret, B. (2011). Adaptive sparse polynomial chaos expansion based on least angle
regression. Journal of Computational Physics, 230(6), 2345-2367.
Examples
--------
>>> import pygpc
>>> # initialize static gPC algorithm
>>> algorithm = pygpc.MEStatic(problem=problem, options=options, grid=grid)
>>> # run algorithm
>>> gpc, coeffs, results = algorithm.run()
"""
def __init__(self, problem, options, grid=None, validation=None):
"""
Constructor; Initializes multi-element static gPC algorithm
"""
super(MEStatic, self).__init__(problem=problem, options=options, validation=validation, grid=grid)
# check contents of settings dict and set defaults
if "method" not in self.options.keys():
raise AssertionError("Please specify 'method' with either 'reg' or 'quad' in options dictionary")
if "order" not in self.options.keys():
raise AssertionError("Please specify 'order'=[order_1, order_2, ..., order_dim] in options dictionary")
if "order_max" not in self.options.keys():
raise AssertionError("Please specify 'order_max' in options dictionary")
if "interaction_order" not in self.options.keys():
self.options["interaction_order"] = self.problem.dim
if "order_max_norm" not in self.options.keys():
self.options["order_max_norm"] = 1.
if "qoi" not in self.options.keys():
self.options["qoi"] = 0
if "classifier" not in self.options.keys():
self.options["classifier"] = "learning"
if "classifier_options" not in self.options.keys():
self.options["classifier_options"] = None
if self.options["qoi"] == "all":
self.qoi_specific = True
else:
self.qoi_specific = False
[docs]
def run(self):
"""
Runs Multi-Element Static gPC algorithm to solve problem.
Returns
-------
megpc : Multi-element GPC object instance
MEGPC object containing all information i.e., Problem, Model, Grid, Basis, RandomParameter instances
coeffs: list of ndarray of float [n_gpc][n_basis x n_out]
GPC coefficients
res : ndarray of float [n_grid x n_out]
Simulation results at n_grid points of the n_out output variables
"""
if self.options["fn_results"] is not None:
fn_results = os.path.splitext(self.options["fn_results"])[0]
if os.path.exists(fn_results + ".hdf5"):
os.remove(fn_results + ".hdf5")
else:
fn_results = None
grad_res_3D = None
grad_res_3D_all = None
gradient_idx = None
res_all_list = []
if self.options["n_grid"] is not None:
n_grid = self.options["n_grid"]
else:
n_grid = None
# Write grid in gpc object
if self.grid is not None:
print(f"Using user-predefined grid with n_grid={n_grid}")
grid = self.options["grid"](parameters_random=self.problem.parameters_random,
coords=self.grid.coords,
coords_norm=self.grid.coords_norm,
coords_gradient=self.grid.coords_gradient,
coords_gradient_norm=self.grid.coords_gradient_norm,
options=self.options["grid_options"])
elif n_grid is None:
raise ValueError("If grid is not provided during initialization please provide options['n_grid']")
elif self.options["grid"] == Random or self.options["grid"] == LHS or self.options["grid"] == GP:
print(f"Creating initial grid ({self.options['grid'].__name__}) with n_grid={int(n_grid)}")
grid = self.options["grid"](parameters_random=self.problem.parameters_random,
n_grid=n_grid,
options=self.options["grid_options"])
elif self.options["grid"] == L1 or self.options["grid"] == L1_LHS or self.options["grid"] == LHS_L1 \
or self.options["grid"] == FIM:
raise NotImplementedError("Grid type not possible for MEStatic algorithm."
"Please use either 'Random' or 'LHS'.")
else:
raise ValueError("Grid not provided and specified grid type not known!")
# Initialize parallel Computation class
com = Computation(n_cpu=self.n_cpu, matlab_model=self.options["matlab_model"])
megpc = []
coeffs = []
eps = self.options["eps"] + 1
i_grid = 0
i_qoi = 0
if self.options["qoi"] is not None and self.options["qoi"] != "all":
q_idx = self.options["qoi"]
qoi_idx = [q_idx]
else:
qoi_idx = np.arange(1)
q_idx = qoi_idx[0]
n_qoi = len(qoi_idx)
while i_qoi < n_qoi:
q_idx = qoi_idx[i_qoi]
print_str = "Determining gPC approximation for QOI #{}:".format(q_idx)
iprint(print_str, tab=0, verbose=self.options["verbose"])
iprint("=" * len(print_str), tab=0, verbose=self.options["verbose"])
megpc.append(0)
coeffs.append(0)
eps_pre = eps + 1
# Create MEGPC object
megpc[i_qoi] = MEGPC(problem=self.problem,
options=self.options,
validation=self.validation)
res_all = np.array([])
# determine gpc approximation and determine error (increase grid size in case of adaptive sampling)
while eps > self.options["eps"]:
if i_grid < grid.n_grid:
# run simulations
iprint("Performing {} simulations!".format(grid.n_grid - i_grid),
tab=0, verbose=self.options["verbose"])
start_time = time.time()
res_new = com.run(model=self.problem.model,
problem=self.problem,
coords=grid.coords[i_grid:grid.n_grid, :],
coords_norm=grid.coords_norm[i_grid:grid.n_grid, :],
i_iter=self.options["order_max"],
i_subiter=self.options["interaction_order"],
fn_results=None,
print_func_time=self.options["print_func_time"],
verbose=self.options["verbose"])
if len(res_all) > 0:
res_all = np.vstack(res_all, res_new)
else:
res_all = res_new
if i_qoi == 0 and i_grid == 0:
if self.options["qoi"] == "all":
qoi_idx = np.arange(res_all.shape[1])
n_qoi = len(qoi_idx)
i_grid = grid.n_grid
iprint('Total function evaluation: ' + str(time.time() - start_time) + ' sec',
tab=0, verbose=self.options["verbose"])
# Determine gradient [n_grid x n_out x dim]
if self.options["gradient_enhanced"]:
start_time = time.time()
grad_res_3D_all, gradient_idx = get_gradient(model=self.problem.model,
problem=self.problem,
grid=grid,
results=res_all,
com=com,
method=self.options["gradient_calculation"],
gradient_results_present=grad_res_3D_all,
gradient_idx_skip=gradient_idx,
i_iter=self.options["order_max"],
i_subiter=self.options["interaction_order"],
print_func_time=self.options["print_func_time"],
dx=self.options["gradient_calculation_options"]["dx"],
distance_weight=
self.options["gradient_calculation_options"][
"distance_weight"],
verbose=self.options["verbose"])
iprint('Gradient evaluation: ' + str(time.time() - start_time) + ' sec',
tab=0, verbose=self.options["verbose"])
# check validity of results and resample in case the model could not be evaluated at some sampling points
res_all, grad_res_3D_all, gradient_idx, grid = self.check_results(results=res_all,
gradient_results=grad_res_3D_all,
gradient_results_idx=gradient_idx,
grid=grid,
com=com)
# crop results to considered qoi
if self.options["qoi"] != "all":
res = copy.deepcopy(res_all)
grad_res_3D = copy.deepcopy(grad_res_3D_all)
hdf5_subfolder = ""
output_idx_passed_validation = None
else:
res = res_all[:, q_idx][:, np.newaxis]
hdf5_subfolder = "/qoi_" + str(q_idx)
output_idx_passed_validation = q_idx
if grad_res_3D_all is not None:
grad_res_3D = grad_res_3D_all[:, q_idx, :][:, np.newaxis, :]
# Write grid in gpc object
megpc[i_qoi].grid = copy.deepcopy(grid)
# determine gpc domains
megpc[i_qoi].init_classifier(coords=megpc[i_qoi].grid.coords_norm,
results=res_all[:, q_idx][:, np.newaxis],
algorithm=self.options["classifier"],
options=self.options["classifier_options"])
# initialize sub-gPCs
for d in np.unique(megpc[i_qoi].domains):
megpc[i_qoi].add_sub_gpc(problem=megpc[i_qoi].problem,
order=[self.options["order"][0] for _ in range(megpc[i_qoi].problem.dim)],
order_max=self.options["order_max"],
order_max_norm=self.options["order_max_norm"],
interaction_order=self.options["interaction_order"],
interaction_order_current=self.options["interaction_order"],
options=self.options,
domain=d,
validation=None)
# assign grids to sub-gPCs (rotate sub-grids in case of projection)
megpc[i_qoi].assign_grids(gradient_idx=gradient_idx)
# Initialize gpc matrices
megpc[i_qoi].init_gpc_matrices()
# Compute gpc coefficients
coeffs[i_qoi] = megpc[i_qoi].solve(results=res,
gradient_results=grad_res_3D,
solver=self.options["solver"],
settings=self.options["settings"],
verbose=self.options["verbose"])
# create validation set if necessary
if self.options["error_type"] == "nrmsd" and megpc[0].validation is None:
megpc[0].create_validation_set(n_samples=self.options["n_samples_validation"],
n_cpu=self.options["n_cpu"])
elif self.options["error_type"] == "nrmsd" and megpc[0].validation is not None:
megpc[i_qoi].validation = copy.deepcopy(megpc[0].validation)
# validate gpc approximation (determine nrmsd or loocv specified in options["error_type"])
eps = megpc[i_qoi].validate(coeffs=coeffs[i_qoi], results=res, gradient_results=grad_res_3D)
iprint("-> {} {} error = {}".format(self.options["error_norm"],
self.options["error_type"],
eps), tab=0, verbose=self.options["verbose"])
# domain specific error
eps_domain = [0 for _ in range(len(np.unique(megpc[i_qoi].domains)))]
for i_gpc, d in enumerate(np.unique(megpc[i_qoi].domains)):
eps_domain[d] = megpc[i_qoi].validate(coeffs=coeffs[i_qoi],
results=res,
domain=d,
output_idx=output_idx_passed_validation)
if not self.options["adaptive_sampling"] or (0 < (eps_pre-eps)/eps < 0.01):
break
if eps > self.options["eps"]:
# extend grid by 10% of number of grid points
n_grid_new = int(np.ceil(1.1*grid.n_grid))
iprint("Extending grid from {} to {} by {} sampling points".format(
grid.n_grid, n_grid_new, n_grid_new - grid.n_grid),
tab=0, verbose=self.options["verbose"])
grid.extend_random_grid(n_grid_new=n_grid_new)
eps_pre = eps
# save data
if self.options["fn_results"] is not None:
with h5py.File(fn_results + ".hdf5", "a") as f:
try:
fn_session = f["misc/fn_session"]
except KeyError:
f.create_dataset("misc/fn_session",
data=np.array([os.path.split(self.options["fn_session"])[1]]).astype("|S"))
f.create_dataset("misc/fn_session_folder",
data=np.array([self.options["fn_session_folder"]]).astype("|S"))
for i_gpc in range(megpc[i_qoi].n_gpc):
f.create_dataset("error" + hdf5_subfolder + "/dom_" + str(i_gpc),
data=eps_domain[i_gpc],
maxshape=None, dtype="float64")
f.create_dataset("coeffs" + hdf5_subfolder + "/dom_" + str(i_gpc),
data=coeffs[i_qoi][i_gpc],
maxshape=None, dtype="float64")
f.create_dataset("domains" + hdf5_subfolder,
data=megpc[i_qoi].domains,
maxshape=None, dtype="int64")
for i_gpc in range(megpc[i_qoi].n_gpc):
f.create_dataset("gpc_matrix" + hdf5_subfolder + "/dom_" + str(i_gpc),
data=megpc[i_qoi].gpc[i_gpc].gpc_matrix,
maxshape=None, dtype="float64")
if megpc[i_qoi].gpc[0].gpc_matrix_gradient is not None:
if self.options["gradient_enhanced"]:
f.create_dataset("gpc_matrix_gradient" + hdf5_subfolder + "/dom_" + str(i_gpc),
data=megpc[i_qoi].gpc[i_gpc].gpc_matrix_gradient,
maxshape=None, dtype="float64")
i_qoi += 1
if self.options["fn_results"] is not None:
with h5py.File(fn_results + ".hdf5", "a") as f:
try:
del f["grid/coords"]
del f["grid/coords_norm"]
del f["grid/coords_gradient"]
del f["grid/coords_gradient_norm"]
except KeyError:
pass
f.create_dataset("grid/coords", data=grid.coords,
maxshape=None, dtype="float64")
f.create_dataset("grid/coords_norm", data=grid.coords_norm,
maxshape=None, dtype="float64")
if grid.coords_gradient is not None:
f.create_dataset("grid/coords_gradient",
data=grid.coords_gradient,
maxshape=None, dtype="float64")
f.create_dataset("grid/coords_gradient_norm",
data=grid.coords_gradient_norm,
maxshape=None, dtype="float64")
f.create_dataset("model_evaluations/results", data=res,
maxshape=None, dtype="float64")
if grad_res_3D is not None:
f.create_dataset("model_evaluations/gradient_results", data=ten2mat(grad_res_3D),
maxshape=None, dtype="float64")
f.create_dataset("model_evaluations/gradient_results_idx", data=megpc[-1].gradient_idx,
maxshape=None, dtype="int64")
f.create_dataset("misc/error_type", data=self.options["error_type"])
if megpc[0].validation is not None:
f.create_dataset("validation/model_evaluations/results", data=megpc[0].validation.results,
maxshape=None, dtype="float64")
f.create_dataset("validation/grid/coords", data=megpc[0].validation.grid.coords,
maxshape=None, dtype="float64")
f.create_dataset("validation/grid/coords_norm", data=megpc[0].validation.grid.coords_norm,
maxshape=None, dtype="float64")
com.close()
return megpc, coeffs, res
[docs]
class MEStatic_IO(Algorithm):
"""
Multi-Element Static gPC algorithm using precomputed IO data
Parameters
----------
parameters: OrderedDict containing the RandomParameter class instances
Dictionary (ordered) containing the properties of the random parameters
options["order"]: list of int [dim]
Maximum individual expansion order [order_1, order_2, ..., order_dim].
Generates individual polynomials also if maximum expansion order in order_max is exceeded
options["order_max"]: int
Maximum global expansion order.
The maximum expansion order considers the sum of the orders of combined polynomials together with the
chosen norm "order_max_norm". Typically this norm is 1 such that the maximum order is the sum of all
monomial orders.
options["order_max_norm"]: float
Norm for which the maximum global expansion order is defined [0, 1]. Values < 1 decrease the total number
of polynomials in the expansion such that interaction terms are penalized more. This truncation scheme
is also referred to "hyperbolic polynomial chaos expansion" such that sum(a_i^q)^1/q <= p,
where p is order_max and q is order_max_norm (for more details see eq. (27) in [1]).
options["interaction_order"]: int
Number of random variables, which can interact with each other.
All polynomials are ignored, which have an interaction order greater than the specified
options["qoi"] : int or str, optional, default: 0
Choose for which QOI the projection is determined for. The other QOIs use the same projection.
Alternatively, the projection can be determined for every QOI independently (qoi_index or "all").
options["classifier"] : str, optional, default: "learning"
Classification algorithm to subdivide parameter domain.
- "learning" ... ClassifierLearning algorithm based on Unsupervised and supervised learning
options["classifier_options"] : dict, optional, default: default settings
Options of classifier
grid: Grid object instance
Grid object to use for static gPC (Random, SparseGrid, TensorGrid) containing the parameter values, where the
output relations were calculated
results: ndarray of float [N_grid x N_qoi]
Model output at each grid point for each QOI
Notes
-----
.. [1] Blatman, G., & Sudret, B. (2011). Adaptive sparse polynomial chaos expansion based on least angle
regression. Journal of Computational Physics, 230(6), 2345-2367.
Examples
--------
>>> import pygpc
>>> # initialize static gPC algorithm
>>> algorithm = pygpc.MEStatic_IO(parameters=parameters, options=options, results=results, grid=grid)
>>> # run algorithm
>>> gpc, coeffs, results = algorithm.run()
"""
def __init__(self, parameters, options, results, grid, validation=None):
"""
Constructor; Initializes multi-element static gPC algorithm
"""
# create dummy model
model = Dummy()
# create dummy problem
problem = Problem(model, parameters)
super(MEStatic_IO, self).__init__(problem=problem, options=options, validation=validation, grid=grid)
self.res = results
# check contents of settings dict and set defaults
if "order" not in self.options.keys():
raise AssertionError("Please specify 'order'=[order_1, order_2, ..., order_dim] in options dictionary")
if "order_max" not in self.options.keys():
raise AssertionError("Please specify 'order_max' in options dictionary")
if "interaction_order" not in self.options.keys():
self.options["interaction_order"] = self.problem.dim
if "order_max_norm" not in self.options.keys():
self.options["order_max_norm"] = 1.
if "qoi" not in self.options.keys():
self.options["qoi"] = 0
if "classifier" not in self.options.keys():
self.options["classifier"] = "learning"
if "classifier_options" not in self.options.keys():
self.options["classifier_options"] = None
if self.options["qoi"] == "all":
self.qoi_specific = True
else:
self.qoi_specific = False
# if self.options["error_type"] != "loocv":
# self.options["error_type"] = "loocv"
# warnings.warn("Changing error calculation type to loocv ...")
[docs]
def run(self):
"""
Runs Multi-Element Static gPC algorithm using precomputed IO data to construct gPC approximation.
Returns
-------
megpc : Multi-element GPC object instance
MEGPC object containing all information i.e., Problem, Model, Grid, Basis, RandomParameter instances
coeffs: list of ndarray of float [n_gpc][n_basis x n_out]
GPC coefficients
res : ndarray of float [n_grid x n_out]
Simulation results at n_grid points of the n_out output variables
"""
if self.options["fn_results"] is not None:
fn_results = os.path.splitext(self.options["fn_results"])[0]
if os.path.exists(fn_results + ".hdf5"):
os.remove(fn_results + ".hdf5")
else:
fn_results = None
megpc = []
coeffs = []
i_grid = 0
i_qoi = 0
if self.options["qoi"] is not None and self.options["qoi"] != "all":
q_idx = self.options["qoi"]
qoi_idx = [q_idx]
else:
qoi_idx = np.arange(1)
q_idx = qoi_idx[0]
n_qoi = len(qoi_idx)
while i_qoi < n_qoi:
q_idx = qoi_idx[i_qoi]
print_str = "Determining gPC approximation for QOI #{}:".format(q_idx)
iprint(print_str, tab=0, verbose=self.options["verbose"])
iprint("=" * len(print_str), tab=0, verbose=self.options["verbose"])
megpc.append(0)
coeffs.append(0)
# Create MEGPC object
megpc[i_qoi] = MEGPC(problem=self.problem,
options=self.options,
validation=self.validation)
if i_qoi == 0 and i_grid == 0:
if self.options["qoi"] == "all":
qoi_idx = np.arange(self.res.shape[1])
n_qoi = len(qoi_idx)
# crop results to considered qoi
if self.options["qoi"] != "all":
res = copy.deepcopy(self.res)
hdf5_subfolder = ""
output_idx_passed_validation = None
else:
res = self.res[:, q_idx][:, np.newaxis]
hdf5_subfolder = "/qoi_" + str(q_idx)
output_idx_passed_validation = q_idx
# Write grid in gpc object
megpc[i_qoi].grid = copy.deepcopy(self.grid)
# determine gpc domains
megpc[i_qoi].init_classifier(coords=megpc[i_qoi].grid.coords_norm,
results=self.res[:, q_idx][:, np.newaxis],
algorithm=self.options["classifier"],
options=self.options["classifier_options"])
# initialize sub-gPCs
for d in np.unique(megpc[i_qoi].domains):
megpc[i_qoi].add_sub_gpc(problem=megpc[i_qoi].problem,
order=[self.options["order"][0] for _ in range(megpc[i_qoi].problem.dim)],
order_max=self.options["order_max"],
order_max_norm=self.options["order_max_norm"],
interaction_order=self.options["interaction_order"],
interaction_order_current=self.options["interaction_order"],
options=self.options,
domain=d,
validation=None)
# assign grids to sub-gPCs (rotate sub-grids in case of projection)
megpc[i_qoi].assign_grids()
# Initialize gpc matrices
megpc[i_qoi].init_gpc_matrices()
# Compute gpc coefficients
coeffs[i_qoi] = megpc[i_qoi].solve(results=res,
solver=self.options["solver"],
settings=self.options["settings"],
verbose=self.options["verbose"])
# validate gpc approximation (determine nrmsd or loocv specified in options["error_type"])
eps = megpc[i_qoi].validate(coeffs=coeffs[i_qoi], results=res)
iprint("-> {} {} error = {}".format(self.options["error_norm"],
self.options["error_type"],
eps), tab=0, verbose=self.options["verbose"])
# domain specific error
eps_domain = [0 for _ in range(len(np.unique(megpc[i_qoi].domains)))]
for i_gpc, d in enumerate(np.unique(megpc[i_qoi].domains)):
eps_domain[d] = megpc[i_qoi].validate(coeffs=coeffs[i_qoi],
results=res,
domain=d,
output_idx=output_idx_passed_validation)
# save data
if self.options["fn_results"] is not None:
with h5py.File(fn_results + ".hdf5", "a") as f:
try:
fn_session = f["misc/fn_session"]
except KeyError:
f.create_dataset("misc/fn_session",
data=np.array([os.path.split(self.options["fn_session"])[1]]).astype("|S"))
f.create_dataset("misc/fn_session_folder",
data=np.array([self.options["fn_session_folder"]]).astype("|S"))
for i_gpc in range(megpc[i_qoi].n_gpc):
f.create_dataset("error" + hdf5_subfolder + "/dom_" + str(i_gpc),
data=eps_domain[i_gpc],
maxshape=None, dtype="float64")
f.create_dataset("coeffs" + hdf5_subfolder + "/dom_" + str(i_gpc),
data=coeffs[i_qoi][i_gpc],
maxshape=None, dtype="float64")
f.create_dataset("domains" + hdf5_subfolder,
data=megpc[i_qoi].domains,
maxshape=None, dtype="int64")
for i_gpc in range(megpc[i_qoi].n_gpc):
f.create_dataset("gpc_matrix" + hdf5_subfolder + "/dom_" + str(i_gpc),
data=megpc[i_qoi].gpc[i_gpc].gpc_matrix,
maxshape=None, dtype="float64")
if megpc[i_qoi].gpc[0].gpc_matrix_gradient is not None:
if self.options["gradient_enhanced"]:
f.create_dataset("gpc_matrix_gradient" + hdf5_subfolder + "/dom_" + str(i_gpc),
data=megpc[i_qoi].gpc[i_gpc].gpc_matrix_gradient,
maxshape=None, dtype="float64")
i_qoi += 1
if self.options["fn_results"] is not None:
with h5py.File(fn_results + ".hdf5", "a") as f:
try:
del f["grid/coords"]
del f["grid/coords_norm"]
del f["grid/coords_gradient"]
del f["grid/coords_gradient_norm"]
except KeyError:
pass
f.create_dataset("grid/coords", data=self.grid.coords,
maxshape=None, dtype="float64")
f.create_dataset("grid/coords_norm", data=self.grid.coords_norm,
maxshape=None, dtype="float64")
if self.grid.coords_gradient is not None:
f.create_dataset("grid/coords_gradient",
data=self.grid.coords_gradient,
maxshape=None, dtype="float64")
f.create_dataset("grid/coords_gradient_norm",
data=self.grid.coords_gradient_norm,
maxshape=None, dtype="float64")
f.create_dataset("model_evaluations/results", data=self.res,
maxshape=None, dtype="float64")
f.create_dataset("misc/error_type", data=self.options["error_type"])
if megpc[0].validation is not None:
f.create_dataset("validation/model_evaluations/results", data=megpc[0].validation.results,
maxshape=None, dtype="float64")
f.create_dataset("validation/grid/coords", data=megpc[0].validation.grid.coords,
maxshape=None, dtype="float64")
f.create_dataset("validation/grid/coords_norm", data=megpc[0].validation.grid.coords_norm,
maxshape=None, dtype="float64")
return megpc, coeffs, self.res
[docs]
class StaticProjection(Algorithm):
"""
Static gPC algorithm using Basis Projection approach
Parameters
----------
problem : Problem object
Object instance of gPC problem to investigate
options["method"]: str
GPC method to apply ['Reg', 'Quad']
options["order"]: int
Expansion order, each projected variable \\eta is expanded to.
Generates individual polynomials also if maximum expansion order in order_max is exceeded
options["order_max"]: int
Maximum global expansion order.
The maximum expansion order considers the sum of the orders of combined polynomials together with the
chosen norm "order_max_norm". Typically this norm is 1 such that the maximum order is the sum of all
monomial orders.
options["order_max_norm"]: float
Norm for which the maximum global expansion order is defined [0, 1]. Values < 1 decrease the total number
of polynomials in the expansion such that interaction terms are penalized more. This truncation scheme
is also referred to "hyperbolic polynomial chaos expansion" such that sum(a_i^q)^1/q <= p,
where p is order_max and q is order_max_norm (for more details see eq. (27) in [1]).
options["interaction_order"]: int
Number of random variables, which can interact with each other.
All polynomials are ignored, which have an interaction order greater than the specified
options["qoi"] : int or str, optional, default: 0
Choose for which QOI the projection is determined for. The other QOIs use the same projection.
Alternatively, the projection can be determined for every QOI independently (qoi_index or "all").
options["n_grid"] : float, optional, default: 10
Number of initial grid points to determine gradient and projection matrix
Notes
-----
.. [1] Blatman, G., & Sudret, B. (2011). Adaptive sparse polynomial chaos expansion based on least angle
regression. Journal of Computational Physics, 230(6), 2345-2367.
Examples
--------
>>> import pygpc
>>> # initialize static gPC algorithm
>>> algorithm = pygpc.StaticProjection(problem=problem, options=options)
>>> # run algorithm
>>> gpc, coeffs, results = algorithm.run
"""
def __init__(self, problem, options, validation=None, grid=None):
"""
Constructor; Initializes static gPC algorithm
"""
super(StaticProjection, self).__init__(problem=problem, options=options, validation=validation, grid=grid)
# check contents of settings dict and set defaults
if "method" not in self.options.keys():
raise AssertionError("Please specify 'method' with either 'reg' or 'quad' in options dictionary")
if "order" not in self.options.keys():
raise AssertionError("Please specify 'order'=[order_1, order_2, ..., order_dim] in options dictionary")
if "order_max" not in self.options.keys():
raise AssertionError("Please specify 'order_max' in options dictionary")
if "interaction_order" not in self.options.keys():
self.options["interaction_order"] = self.problem.dim
if "order_max_norm" not in self.options.keys():
self.options["order_max_norm"] = 1.
if "n_grid" not in self.options.keys():
self.options["n_grid"] = 10
if "qoi" not in self.options.keys():
self.options["qoi"] = 0
if self.options["qoi"] == "all":
self.qoi_specific = True
else:
self.qoi_specific = False
[docs]
def run(self):
"""
Runs static gPC algorithm using Projection to solve problem.
Returns
-------
gpc : GPC object instance
GPC object containing all information i.e., Problem, Model, Grid, Basis, RandomParameter instances
coeffs: list of ndarray of float [n_qoi][n_basis x n_out]
GPC coefficients for different qoi
res : ndarray of float [n_grid x n_out]
Simulation results at n_grid points of the n_out output variables
"""
if self.options["fn_results"] is not None:
fn_results = os.path.splitext(self.options["fn_results"])[0]
if os.path.exists(fn_results + ".hdf5"):
os.remove(fn_results + ".hdf5")
else:
fn_results = None
grad_res_3D = None
grad_res_3D_all = None
gradient_idx = None
res_all_list = []
n_grid = self.options["n_grid"]
# make initial grid to determine gradients and projection matrix. By default, it is an LHS (ese) grid
if self.grid is not None:
print(f"Using user-predefined grid with n_grid={self.grid.n_grid}")
grid_original = self.options["grid"](parameters_random=self.problem.parameters_random,
coords=self.grid.coords,
coords_norm=self.grid.coords_norm,
coords_gradient=self.grid.coords_gradient,
coords_gradient_norm=self.grid.coords_gradient_norm,
options=self.options["grid_options"])
elif self.options["grid"] == Random or self.options["grid"] == GP:
print(f"Creating initial grid ({self.options['grid'].__name__}) with n_grid={int(n_grid)}")
grid_original = self.options["grid"](parameters_random=self.problem.parameters_random,
n_grid=n_grid,
options=self.options["grid_options"])
else:
print(f"Creating initial grid ({self.options['grid'].__name__}) with n_grid={int(n_grid)}")
grid_original = LHS(parameters_random=self.problem.parameters_random,
n_grid=n_grid,
options={"criterion": "ese",
"seed": self.options["seed"]})
# Initialize parallel Computation class
com = Computation(n_cpu=self.n_cpu, matlab_model=self.options["matlab_model"])
# Set up reduced gPC
self.problem_reduced = []
gpc = []
coeffs = []
eps = self.options["eps"] + 1
i_grid = 0
i_qoi = 0
if self.options["qoi"] is not None and self.options["qoi"] != "all":
q_idx = self.options["qoi"]
qoi_idx = [q_idx]
else:
qoi_idx = np.arange(1)
q_idx = qoi_idx[0]
n_qoi = len(qoi_idx)
while i_qoi < n_qoi:
q_idx = qoi_idx[i_qoi]
print_str = "Determining gPC approximation for QOI #{}:".format(q_idx)
iprint(print_str, tab=0, verbose=self.options["verbose"])
iprint("=" * len(print_str), tab=0, verbose=self.options["verbose"])
self.problem_reduced.append(0)
gpc.append(0)
coeffs.append(0)
eps_pre = eps + 1
# determine gpc approximation and determine error (increase grid size in case of adaptive sampling)
while eps > self.options["eps"]:
# run simulations
if i_grid < grid_original.n_grid:
iprint("Performing {} simulations!".format(grid_original.n_grid - i_grid),
tab=0, verbose=self.options["verbose"])
start_time = time.time()
res_all_list.append(com.run(model=self.problem.model,
problem=self.problem,
coords=grid_original.coords[i_grid:grid_original.n_grid, :],
coords_norm=grid_original.coords_norm[i_grid:grid_original.n_grid, :],
i_iter=self.options["order_max"],
i_subiter=self.options["interaction_order"],
fn_results=None,
print_func_time=self.options["print_func_time"],
verbose=self.options["verbose"]))
res_all = np.vstack(res_all_list)
if i_qoi == 0 and i_grid == 0:
if self.options["qoi"] == "all":
qoi_idx = np.arange(res_all.shape[1])
n_qoi = len(qoi_idx)
i_grid = grid_original.n_grid
iprint('Total function evaluation: ' + str(time.time() - start_time) + ' sec',
tab=0, verbose=self.options["verbose"])
# Determine gradient [n_grid x n_out x dim]
start_time = time.time()
grad_res_3D_all, gradient_idx = get_gradient(model=self.problem.model,
problem=self.problem,
grid=grid_original,
results=res_all,
com=com,
method="FD_fwd",
gradient_results_present=grad_res_3D_all,
gradient_idx_skip=gradient_idx,
i_iter=self.options["order_max"],
i_subiter=self.options["interaction_order"],
print_func_time=self.options["print_func_time"],
dx=self.options["gradient_calculation_options"]["dx"],
distance_weight=
self.options["gradient_calculation_options"][
"distance_weight"],
verbose=self.options["verbose"])
iprint('Gradient evaluation: ' + str(time.time() - start_time) + ' sec',
tab=0, verbose=self.options["verbose"])
# check validity of results and resample in case the model could not be evaluated at some sampling points
res_all, grad_res_3D_all, gradient_idx, grid_original = self.check_results(results=res_all,
gradient_results=grad_res_3D_all,
gradient_results_idx=gradient_idx,
grid=grid_original,
com=com)
# crop results to considered qoi
if self.options["qoi"] != "all":
res = copy.deepcopy(res_all)
grad_res_3D = copy.deepcopy(grad_res_3D_all)
hdf5_subfolder = ""
else:
res = res_all[:, q_idx][:, np.newaxis]
grad_res_3D = grad_res_3D_all[:, q_idx, :][:, np.newaxis, :]
hdf5_subfolder = "/qoi_" + str(q_idx)
# Determine projection matrix
p_matrix, p_matrix_complete = determine_projection_matrix(gradient_results=grad_res_3D_all[:, q_idx, :],
lambda_eps=self.options["lambda_eps_gradient"])
p_matrix_norm = np.sum(np.abs(p_matrix), axis=1)
dim_reduced = p_matrix.shape[0]
parameters_reduced = OrderedDict()
for i in range(dim_reduced):
parameters_reduced["n{}".format(i)] = Beta(pdf_shape=[1., 1.], pdf_limits=[-1., 1.])
self.problem_reduced[i_qoi] = Problem(model=self.problem.model, parameters=parameters_reduced)
# Create reduced gPC object
gpc[i_qoi] = Reg(problem=self.problem_reduced[i_qoi],
order=[self.options["order"][0] for _ in range(dim_reduced)],
order_max=self.options["order_max"],
order_max_norm=self.options["order_max_norm"],
interaction_order=self.options["interaction_order"],
interaction_order_current=self.options["interaction_order"],
options=self.options,
validation=self.validation)
# save original problem in gpc object
gpc[i_qoi].problem_original = self.problem
# save projection matrix in gPC object
gpc[i_qoi].p_matrix = copy.deepcopy(p_matrix)
gpc[i_qoi].p_matrix_norm = copy.deepcopy(p_matrix_norm)
# re-initialize grid in case of [L1, L1_LHS, LHS_L1, FIM] because initial grid was Random or LHS (ese)
if self.options["grid"] in [L1, L1_LHS, LHS_L1, FIM]:
grid_original = self.options["grid"](parameters_random=self.problem.parameters_random,
coords=grid_original.coords,
coords_norm=grid_original.coords_norm,
coords_gradient=grid_original.coords_gradient,
coords_gradient_norm=grid_original.coords_gradient_norm,
options=self.options["grid_options"],
gpc=gpc[i_qoi])
# copy grid to gPC object and initialize transformed grid
gpc[i_qoi].grid_original = copy.deepcopy(grid_original)
gpc[i_qoi].grid = project_grid(grid=grid_original, p_matrix=p_matrix, mode="reduce")
gpc[i_qoi].options = copy.deepcopy(self.options)
# Initialize gpc matrix
gpc[i_qoi].init_gpc_matrix(gradient_idx=gradient_idx)
# Someone might not use the gradient to determine the gpc coeffs
if self.options["gradient_enhanced"]:
grad_res_3D_passed = grad_res_3D
else:
grad_res_3D_passed = None
# Compute gpc coefficients
coeffs[i_qoi] = gpc[i_qoi].solve(results=res,
gradient_results=grad_res_3D_passed,
solver=self.options["solver"],
settings=self.options["settings"],
verbose=self.options["verbose"])
# validate gpc approximation (determine nrmsd or loocv specified in options["error_type"])
if self.options["error_type"] == "nrmsd" and gpc[0].validation is None:
gpc[0].create_validation_set(n_samples=self.options["n_samples_validation"],
n_cpu=self.options["n_cpu"])
elif self.options["error_type"] == "nrmsd" and gpc[0].validation is not None:
gpc[i_qoi].validation = copy.deepcopy(gpc[0].validation)
eps = gpc[i_qoi].validate(coeffs=coeffs[i_qoi], results=res, gradient_results=grad_res_3D_passed)
iprint("-> {} {} error = {}".format(self.options["error_norm"],
self.options["error_type"],
eps), tab=0, verbose=self.options["verbose"])
if not self.options["adaptive_sampling"] or (0 < (eps_pre-eps)/eps < 0.01):
break
if eps > self.options["eps"]:
# extend grid by 5% of number of basis functions and restart loop
n_grid_new = int(np.ceil(grid_original.n_grid + 5e-2 * gpc[i_qoi].basis.n_basis))
iprint("Extending grid from {} to {} by {} sampling points".format(
grid_original.n_grid, n_grid_new, n_grid_new - grid_original.n_grid),
tab=0, verbose=self.options["verbose"])
grid_original.extend_random_grid(n_grid_new=n_grid_new)
eps_pre = eps
# save gpc objects and gpc coeffs
if self.options["fn_results"] is not None:
with h5py.File(fn_results + ".hdf5", "a") as f:
try:
fn_session = f["misc/fn_session"]
except KeyError:
f.create_dataset("misc/fn_session",
data=np.array([os.path.split(self.options["fn_session"])[1]]).astype("|S"))
f.create_dataset("misc/fn_session_folder",
data=np.array([self.options["fn_session_folder"]]).astype("|S"))
f.create_dataset("error" + hdf5_subfolder,
data=eps,
maxshape=None, dtype="float64")
f.create_dataset("coeffs" + hdf5_subfolder,
data=coeffs[i_qoi],
maxshape=None, dtype="float64")
f.create_dataset("gpc_matrix" + hdf5_subfolder,
data=gpc[i_qoi].gpc_matrix,
maxshape=None, dtype="float64")
if gpc[i_qoi].gpc_matrix_gradient is not None:
f.create_dataset("gpc_matrix_gradient" + hdf5_subfolder,
data=gpc[i_qoi].gpc_matrix_gradient,
maxshape=None, dtype="float64")
f.create_dataset("p_matrix" + hdf5_subfolder,
data=gpc[i_qoi].p_matrix,
maxshape=None, dtype="float64")
i_qoi += 1
if self.options["fn_results"] is not None:
with h5py.File(fn_results + ".hdf5", "a") as f:
f.create_dataset("grid/coords", data=grid_original.coords,
maxshape=None, dtype="float64")
f.create_dataset("grid/coords_norm", data=grid_original.coords_norm,
maxshape=None, dtype="float64")
if grid_original.coords_gradient is not None:
f.create_dataset("grid/coords_gradient",
data=grid_original.coords_gradient,
maxshape=None, dtype="float64")
f.create_dataset("grid/coords_gradient_norm",
data=grid_original.coords_gradient_norm,
maxshape=None, dtype="float64")
f.create_dataset("model_evaluations/results", data=res,
maxshape=None, dtype="float64")
if grad_res_3D is not None:
f.create_dataset("model_evaluations/gradient_results", data=ten2mat(grad_res_3D),
maxshape=None, dtype="float64")
f.create_dataset("model_evaluations/gradient_results_idx", data=gpc[-1].gradient_idx,
maxshape=None, dtype="int64")
f.create_dataset("misc/error_type", data=self.options["error_type"])
if gpc[0].validation is not None:
f.create_dataset("validation/model_evaluations/results", data=gpc[0].validation.results,
maxshape=None, dtype="float64")
f.create_dataset("validation/grid/coords", data=gpc[0].validation.grid.coords,
maxshape=None, dtype="float64")
f.create_dataset("validation/grid/coords_norm", data=gpc[0].validation.grid.coords_norm,
maxshape=None, dtype="float64")
com.close()
return gpc, coeffs, res_all
[docs]
class MEStaticProjection(Algorithm):
"""
Static gPC algorithm using Basis Projection approach
Parameters
----------
problem : Problem object
Object instance of gPC problem to investigate
options["order"]: int
Expansion order, each projected variable \\eta is expanded to.
Generates individual polynomials also if maximum expansion order in order_max is exceeded
options["order_max"]: int
Maximum global expansion order.
The maximum expansion order considers the sum of the orders of combined polynomials together with the
chosen norm "order_max_norm". Typically this norm is 1 such that the maximum order is the sum of all
monomial orders.
options["interaction_order"]: int
Number of random variables, which can interact with each other.
All polynomials are ignored, which have an interaction order greater than the specified
options["qoi"] : int or str, optional, default: 0
Choose for which QOI the projection is determined for. The other QOIs use the same projection.
Alternatively, the projection can be determined for every QOI independently (qoi_index or "all").
options["n_grid_gradient"] : float, optional, default: 10
Number of initial grid points to determine gradient and projection matrix
options["classifier"] : str, optional, default: "learning"
Classification algorithm to subdivide parameter domain.
- "learning" ... ClassifierLearning algorithm based on Unsupervised and supervised learning
options["classifier_options"] : dict, optional, default: default settings
Options of classifier
Notes
-----
.. [1] Blatman, G., & Sudret, B. (2011). Adaptive sparse polynomial chaos expansion based on least angle
regression. Journal of Computational Physics, 230(6), 2345-2367.
Examples
--------
>>> import pygpc
>>> # initialize static gPC algorithm
>>> algorithm = pygpc.MEStaticProjection(problem=problem, options=options)
>>> # run algorithm
>>> gpc, coeffs, results = algorithm.run
"""
def __init__(self, problem, options, validation=None, grid=None):
"""
Constructor; Initializes static gPC algorithm
"""
super(MEStaticProjection, self).__init__(problem=problem, options=options, validation=validation, grid=grid)
# check contents of settings dict and set defaults
if "method" not in self.options.keys():
raise AssertionError("Please specify 'method' with either 'reg' or 'quad' in options dictionary")
if "order" not in self.options.keys():
raise AssertionError("Please specify 'order'=[order_1, order_2, ..., order_dim] in options dictionary")
if "order_max" not in self.options.keys():
raise AssertionError("Please specify 'order_max' in options dictionary")
if "interaction_order" not in self.options.keys():
self.options["interaction_order"] = self.problem.dim
if "order_max_norm" not in self.options.keys():
self.options["order_max_norm"] = 1.
if "n_grid_gradient" not in self.options.keys():
self.options["n_grid_gradient"] = 10
if "qoi" not in self.options.keys():
self.options["qoi"] = 0
if "classifier" not in self.options.keys():
self.options["classifier"] = "learning"
if "classifier_options" not in self.options.keys():
self.options["classifier_options"] = None
if self.options["qoi"] == "all":
self.qoi_specific = True
else:
self.qoi_specific = False
[docs]
def run(self):
"""
Runs static multi-element gPC algorithm with projection.
Returns
-------
megpc : MEGPC object instance
MEGPC object containing all information i.e., Problem, Model, Grid, Basis, RandomParameter instances
and sub-gPCs
coeffs: list of list of ndarray of float [n_qoi][n_gpc][n_basis x n_out]
GPC coefficients of different qoi and sub-gPCs
res : ndarray of float [n_grid x n_out]
Simulation results at n_grid points of the n_out output variables
"""
if self.options["fn_results"] is not None:
fn_results = os.path.splitext(self.options["fn_results"])[0]
if os.path.exists(fn_results + ".hdf5"):
os.remove(fn_results + ".hdf5")
else:
fn_results = None
grad_res_3D = None
grad_res_3D_all = None
gradient_idx = None
res_all_list = []
# make initial random grid to determine gradients and projection matrix
if self.grid is not None:
print(f"Using user-predefined grid with n_grid={self.grid.n_grid}")
grid = self.options["grid"](parameters_random=self.problem.parameters_random,
coords=self.grid.coords,
coords_norm=self.grid.coords_norm,
coords_gradient=self.grid.coords_gradient,
coords_gradient_norm=self.grid.coords_gradient_norm,
options=self.options["grid_options"])
elif self.options["grid"] == Random or self.options["grid"] == LHS or self.options["grid"] == GP:
print(f"Creating initial grid ({self.options['grid'].__name__}) with n_grid={int(self.options['n_grid'])}")
grid = self.options["grid"](parameters_random=self.problem.parameters_random,
n_grid=self.options["n_grid"],
options=self.options["grid_options"])
elif self.options["grid"] == L1 or self.options["grid"] == L1_LHS or self.options["grid"] == LHS_L1 \
or self.options["grid"] == FIM:
raise NotImplementedError("Grid type not possible for MEStaticProjection algorithm."
"Please use either 'Random' or 'LHS'.")
# Initialize parallel Computation class
com = Computation(n_cpu=self.n_cpu, matlab_model=self.options["matlab_model"])
megpc = []
coeffs = []
eps = self.options["eps"] + 1
i_grid = 0
i_qoi = 0
if self.options["qoi"] is not None and self.options["qoi"] != "all":
q_idx = self.options["qoi"]
qoi_idx = [q_idx]
else:
qoi_idx = np.arange(1)
q_idx = qoi_idx[0]
n_qoi = len(qoi_idx)
res_all = np.array([])
while i_qoi < n_qoi:
q_idx = qoi_idx[i_qoi]
print_str = "Determining gPC approximation for QOI #{}:".format(q_idx)
iprint(print_str, tab=0, verbose=self.options["verbose"])
iprint("=" * len(print_str), tab=0, verbose=self.options["verbose"])
megpc.append(0)
coeffs.append(0)
# Create MEGPC object
megpc[i_qoi] = MEGPC(problem=self.problem,
options=self.options,
validation=self.validation)
eps = self.options["eps"] + 1
# determine gpc approximation and determine error (increase grid size in case of adaptive sampling)
while eps > self.options["eps"]:
if i_grid < grid.n_grid:
# run simulations
iprint("Performing {} simulations!".format(grid.n_grid - i_grid),
tab=0, verbose=self.options["verbose"])
start_time = time.time()
res_new = com.run(model=self.problem.model,
problem=self.problem,
coords=grid.coords[i_grid:grid.n_grid, :],
coords_norm=grid.coords_norm[i_grid:grid.n_grid, :],
i_iter=self.options["order_max"],
i_subiter=self.options["interaction_order"],
fn_results=None,
print_func_time=self.options["print_func_time"],
verbose=self.options["verbose"])
if len(res_all) > 0:
res_all = np.vstack(res_all, res_new)
else:
res_all = res_new
if i_qoi == 0 and i_grid == 0:
if self.options["qoi"] == "all":
qoi_idx = np.arange(res_all.shape[1])
n_qoi = len(qoi_idx)
i_grid = grid.n_grid
iprint('Total function evaluation: ' + str(time.time() - start_time) + ' sec',
tab=0, verbose=self.options["verbose"])
# Determine gradient [n_grid x n_out x dim]
start_time = time.time()
grad_res_3D_all, gradient_idx = get_gradient(model=self.problem.model,
problem=self.problem,
grid=grid,
results=res_all,
com=com,
method="FD_fwd",
gradient_results_present=grad_res_3D_all,
gradient_idx_skip=gradient_idx,
i_iter=self.options["order_max"],
i_subiter=self.options["interaction_order"],
print_func_time=self.options["print_func_time"],
dx=self.options["gradient_calculation_options"]["dx"],
distance_weight=
self.options["gradient_calculation_options"][
"distance_weight"],
verbose=self.options["verbose"])
iprint('Gradient evaluation: ' + str(time.time() - start_time) + ' sec',
tab=0, verbose=self.options["verbose"])
# check validity of results and resample in case the model could not be evaluated at some sampling points
res_all, grad_res_3D_all, gradient_idx, grid = self.check_results(results=res_all,
gradient_results=grad_res_3D_all,
gradient_results_idx=gradient_idx,
grid=grid,
com=com)
# crop results to considered qoi
if self.options["qoi"] != "all":
res = copy.deepcopy(res_all)
grad_res_3D = copy.deepcopy(grad_res_3D_all)
hdf5_subfolder = ""
output_idx_passed_validation = None
else:
res = res_all[:, q_idx][:, np.newaxis]
grad_res_3D = grad_res_3D_all[:, q_idx, :][:, np.newaxis, :]
hdf5_subfolder = "/qoi_" + str(q_idx)
output_idx_passed_validation = q_idx
megpc[i_qoi].grid = copy.deepcopy(grid)
# determine gpc domains
megpc[i_qoi].init_classifier(coords=megpc[i_qoi].grid.coords_norm,
results=res_all[:, q_idx][:, np.newaxis],
algorithm=self.options["classifier"],
options=self.options["classifier_options"])
problem_reduced = [0 for _ in range(megpc[i_qoi].n_gpc)]
p_matrix = [0 for _ in range(megpc[i_qoi].n_gpc)]
p_matrix_norm = [0 for _ in range(megpc[i_qoi].n_gpc)]
dim_reduced = [0 for _ in range(megpc[i_qoi].n_gpc)]
parameters_reduced = [OrderedDict() for _ in range(megpc[i_qoi].n_gpc)]
megpc[i_qoi].gpc = [0 for _ in range(megpc[i_qoi].n_gpc)]
# Determine projection matrices for sub gPCs
for d in np.unique(megpc[i_qoi].domains):
p_matrix[d], _ = determine_projection_matrix(
gradient_results=grad_res_3D_all[megpc[i_qoi].domains[gradient_idx] == d, q_idx, :],
lambda_eps=self.options["lambda_eps_gradient"])
p_matrix_norm[d] = np.sum(np.abs(p_matrix[d]), axis=1)
dim_reduced[d] = p_matrix[d].shape[0]
for i in range(dim_reduced[d]):
parameters_reduced[d]["n{}".format(i)] = Beta(pdf_shape=[1., 1.], pdf_limits=[-1., 1.])
problem_reduced[d] = Problem(model=self.problem.model, parameters=parameters_reduced[d])
# Set up reduced gPC for this domain
megpc[i_qoi].add_sub_gpc(problem=problem_reduced[d],
order=[self.options["order"][0] for _ in range(dim_reduced[d])],
order_max=self.options["order_max"],
order_max_norm=self.options["order_max_norm"],
interaction_order=self.options["interaction_order"],
interaction_order_current=self.options["interaction_order"],
options=self.options,
domain=d,
validation=None)
# save original problem in gpc object
megpc[i_qoi].gpc[d].problem_original = self.problem
# save projection matrix in gPC object
megpc[i_qoi].gpc[d].p_matrix = copy.deepcopy(p_matrix[d])
megpc[i_qoi].gpc[d].p_matrix_norm = copy.deepcopy(p_matrix_norm[d])
# copy options to MEGPC object
megpc[i_qoi].options = copy.deepcopy(self.options)
# assign grids to sub-gPCs (rotate sub-grids in case of projection)
megpc[i_qoi].assign_grids(gradient_idx=gradient_idx)
# Initialize gpc matrices
megpc[i_qoi].init_gpc_matrices()
# Someone might not use the gradient to determine the gpc coeffs
if megpc[i_qoi].gradient:
grad_res_3D_passed = grad_res_3D
else:
grad_res_3D_passed = None
# Compute gpc coefficients
coeffs[i_qoi] = megpc[i_qoi].solve(results=res,
gradient_results=grad_res_3D_passed,
solver=self.options["solver"],
settings=self.options["settings"],
verbose=self.options["verbose"])
# validate gpc approximation (determine nrmsd or loocv specified in options["error_type"])
if self.options["error_type"] == "nrmsd" and megpc[0].validation is None:
megpc[0].create_validation_set(n_samples=self.options["n_samples_validation"],
n_cpu=self.options["n_cpu"])
elif self.options["error_type"] == "nrmsd" and megpc[0].validation is not None:
megpc[i_qoi].validation = copy.deepcopy(megpc[0].validation)
eps = megpc[i_qoi].validate(coeffs=coeffs[i_qoi], results=res, gradient_results=grad_res_3D_passed)
iprint("-> {} {} error = {}".format(self.options["error_norm"],
self.options["error_type"],
eps), tab=0, verbose=self.options["verbose"])
# domain specific error
eps_domain = [0 for _ in range(len(np.unique(megpc[i_qoi].domains)))]
for i_gpc, d in enumerate(np.unique(megpc[i_qoi].domains)):
eps_domain[d] = megpc[i_qoi].validate(coeffs=coeffs[i_qoi],
results=res,
domain=d,
output_idx=output_idx_passed_validation)
if not self.options["adaptive_sampling"] or (0 < (eps_pre-eps)/eps < 0.01):
break
if eps > self.options["eps"]:
# extend grid by 10% of number of grid points
n_grid_new = int(np.ceil(1.1*grid.n_grid))
iprint("Extending grid from {} to {} by {} sampling points".format(
grid.n_grid, n_grid_new, n_grid_new - grid.n_grid),
tab=0, verbose=self.options["verbose"])
grid.extend_random_grid(n_grid_new=n_grid_new)
eps_pre = eps
# save data
if self.options["fn_results"] is not None:
with h5py.File(fn_results + ".hdf5", "a") as f:
try:
fn_session = f["misc/fn_session"]
except KeyError:
f.create_dataset("misc/fn_session",
data=np.array([os.path.split(self.options["fn_session"])[1]]).astype("|S"))
f.create_dataset("misc/fn_session_folder",
data=np.array([self.options["fn_session_folder"]]).astype("|S"))
for i_gpc in range(megpc[i_qoi].n_gpc):
f.create_dataset("error" + hdf5_subfolder + "/dom_" + str(i_gpc),
data=eps_domain[i_gpc],
maxshape=None, dtype="float64")
f.create_dataset("coeffs" + hdf5_subfolder + "/dom_" + str(i_gpc),
data=coeffs[i_qoi][i_gpc],
maxshape=None, dtype="float64")
f.create_dataset("domains" + hdf5_subfolder,
data=megpc[i_qoi].domains,
maxshape=None, dtype="int64")
for i_gpc in range(megpc[i_qoi].n_gpc):
f.create_dataset("gpc_matrix" + hdf5_subfolder + "/dom_" + str(i_gpc),
data=megpc[i_qoi].gpc[i_gpc].gpc_matrix,
maxshape=None, dtype="float64")
if megpc[i_qoi].gpc[0].gpc_matrix_gradient is not None:
if self.options["gradient_enhanced"]:
f.create_dataset("gpc_matrix_gradient" + hdf5_subfolder + "/dom_" + str(i_gpc),
data=megpc[i_qoi].gpc[i_gpc].gpc_matrix_gradient,
maxshape=None, dtype="float64")
for i_gpc in range(megpc[i_qoi].n_gpc):
f.create_dataset("p_matrix" + hdf5_subfolder + "/dom_" + str(i_gpc),
data=megpc[i_qoi].gpc[i_gpc].p_matrix,
maxshape=None, dtype="float64")
i_qoi += 1
if self.options["fn_results"] is not None:
with h5py.File(fn_results + ".hdf5", "a") as f:
try:
del f["grid/coords"]
del f["grid/coords_norm"]
del f["grid/coords_gradient"]
del f["grid/coords_gradient_norm"]
except KeyError:
pass
f.create_dataset("grid/coords", data=grid.coords,
maxshape=None, dtype="float64")
f.create_dataset("grid/coords_norm", data=grid.coords_norm,
maxshape=None, dtype="float64")
if grid.coords_gradient is not None:
f.create_dataset("grid/coords_gradient",
data=grid.coords_gradient,
maxshape=None, dtype="float64")
f.create_dataset("grid/coords_gradient_norm",
data=grid.coords_gradient_norm,
maxshape=None, dtype="float64")
f.create_dataset("model_evaluations/results", data=res,
maxshape=None, dtype="float64")
if grad_res_3D is not None:
f.create_dataset("model_evaluations/gradient_results", data=ten2mat(grad_res_3D),
maxshape=None, dtype="float64")
f.create_dataset("model_evaluations/gradient_results_idx", data=megpc[-1].gradient_idx,
maxshape=None, dtype="int64")
f.create_dataset("misc/error_type", data=self.options["error_type"])
if megpc[0].validation is not None:
f.create_dataset("validation/model_evaluations/results", data=megpc[0].validation.results,
maxshape=None, dtype="float64")
f.create_dataset("validation/grid/coords", data=megpc[0].validation.grid.coords,
maxshape=None, dtype="float64")
f.create_dataset("validation/grid/coords_norm", data=megpc[0].validation.grid.coords_norm,
maxshape=None, dtype="float64")
com.close()
return megpc, coeffs, res
[docs]
class RegAdaptive(Algorithm):
"""
Adaptive regression approach based on leave one out cross validation error estimation
Parameters
----------
problem: Problem class instance
GPC problem under investigation
options["order_start"] : int, optional, default=0
Initial gPC expansion order (maximum order)
options["order_end"] : int, optional, default=10
Maximum Gpc expansion order to expand to (algorithm will terminate afterwards)
options["interaction_order"]: int, optional, default=dim
Define maximum interaction order of parameters (default: all interactions)
options["order_max_norm"]: float
Norm for which the maximum global expansion order is defined [0, 1]. Values < 1 decrease the total number
of polynomials in the expansion such that interaction terms are penalized more. This truncation scheme
is also referred to "hyperbolic polynomial chaos expansion" such that sum(a_i^q)^1/q <= p,
where p is order_max and q is order_max_norm (for more details see eq. (27) in [1]).
options["adaptive_sampling"] : boolean, optional, default: True
Adds samples adaptively to the expansion until the error is converged and continues by
adding new basis functions.
Examples
--------
>>> import pygpc
>>> # initialize adaptive gPC algorithm
>>> algorithm = pygpc.RegAdaptive(problem=problem, options=options)
>>> # run algorithm
>>> gpc, coeffs, results = algorithm.run()
"""
def __init__(self, problem, options, validation=None, grid=None):
"""
Constructor; Initializes RegAdaptive algorithm
"""
super(RegAdaptive, self).__init__(problem=problem, options=options, validation=validation, grid=grid)
self.qoi_specific = False
# check contents of settings dict and set defaults
if "order_start" not in self.options.keys():
self.options["order_start"] = 0
if "order_end" not in self.options.keys():
self.options["order_end"] = 10
if "interaction_order" not in self.options.keys():
self.options["interaction_order"] = problem.dim
if "order_max_norm" not in self.options.keys():
self.options["order_max_norm"] = 1.
if "adaptive_sampling" not in self.options.keys():
self.options["adaptive_sampling"] = True
if "basis_increment_strategy" not in self.options.keys():
self.options["basis_increment_strategy"] = "isotropic"
[docs]
def run(self):
"""
Runs adaptive gPC algorithm to solve problem.
Returns
-------
gpc : GPC object instance
GPC object containing all information i.e., Problem, Model, Grid, Basis, RandomParameter instances
coeffs: ndarray of float [n_basis x n_out]
GPC coefficients
res : ndarray of float [n_grid x n_out]
Simulation results at n_grid points of the n_out output variables
"""
if self.options["fn_results"] is not None:
fn_results = os.path.splitext(self.options["fn_results"])[0]
if os.path.exists(fn_results + ".hdf5"):
os.remove(fn_results + ".hdf5")
else:
fn_results = None
# initialize iterators
eps = self.options["eps"] + 1.0
i_grid = 0
order = self.options["order_start"]
first_iter = True
grad_res_3D = None
gradient_idx = None
gradient_idx_FD_fwd = None
grad_res_3D_FD_fwd = None
basis_order = np.array([self.options["order_start"],
min(self.options["interaction_order"], self.options["order_start"])])
# Initialize parallel Computation class
com = Computation(n_cpu=self.n_cpu, matlab_model=self.options["matlab_model"])
# Initialize Reg gPC object
print("Initializing gPC object...")
gpc = Reg(problem=self.problem,
order=self.options["order_start"] * np.ones(self.problem.dim),
order_max=self.options["order_start"],
order_max_norm=self.options["order_max_norm"],
interaction_order=self.options["interaction_order"],
interaction_order_current=self.options["interaction_order"],
options=self.options,
validation=self.validation)
extended_basis = True
# Add a validation set if nrmsd is chosen and no validation set is yet present
if self.options["error_type"] == "nrmsd" and not isinstance(self.validation, ValidationSet):
gpc.create_validation_set(n_samples=self.options["n_samples_validation"],
n_cpu=self.options["n_cpu"])
# Initialize Grid object
if self.grid is not None:
print(f"Using user-predefined grid with n_grid={self.grid.n_grid}")
gpc.grid = self.options["grid"](parameters_random=self.problem.parameters_random,
coords=self.grid.coords,
coords_norm=self.grid.coords_norm,
coords_gradient=self.grid.coords_gradient,
coords_gradient_norm=self.grid.coords_gradient_norm,
options=self.options["grid_options"])
else:
n_grid_init = np.ceil(self.options["matrix_ratio"] * gpc.basis.n_basis)
print(f"Creating initial grid ({self.options['grid'].__name__}) with n_grid={int(n_grid_init)}")
if self.options["grid"] in [L1, L1_LHS, LHS_L1, FIM, CO]:
if "n_pool" in self.options["grid_options"]:
if self.options["grid_options"]["n_pool"] < int(n_grid_init):
warnings.warn('self.options["grid_options"]["n_pool"] < n_grid_init ... setting n_pool to 2*n_grid_init')
self.options["grid_options"]["n_pool"] = 2*int(n_grid_init)
gpc.grid = self.options["grid"](parameters_random=self.problem.parameters_random,
n_grid=int(n_grid_init),
options=self.options["grid_options"],
gpc=gpc)
else:
gpc.grid = self.options["grid"](parameters_random=self.problem.parameters_random,
n_grid=n_grid_init,
options=self.options["grid_options"])
gpc.solver = self.options["solver"]
gpc.settings = self.options["settings"]
gpc.options = copy.deepcopy(self.options)
# Initialize gpc matrix
print("Initializing gPC matrix...")
gpc.init_gpc_matrix(gradient_idx=gradient_idx)
gpc.n_grid.pop(0)
gpc.n_basis.pop(0)
if gpc.options["gradient_enhanced"]:
gpc.grid.create_gradient_grid()
# Main iterations (order)
i_iter = 0
while eps > self.options["eps"]:
if first_iter:
basis_increment = 0
else:
basis_increment = 1
if self.options["basis_increment_strategy"] == "anisotropic":
if not first_iter:
if np.max(np.sum(gpc.basis.multi_indices, axis=1)) >= self.options["order_end"]:
break
# determine potential polynomials which can be extended
# (not enclosed by other already existing polynomials)
active_non_enclosed_set, poly_indices_non_enclosed = get_non_enclosed_multi_indices(
multi_indices=gpc.basis.multi_indices,
interaction_order=self.options["interaction_order"])
# get index of highest non enclosed coefficient
coeff_max_idx_non_enclosed = np.argmax(np.linalg.norm(coeffs[poly_indices_non_enclosed, :], axis=1))
# determine multi-indices to add
multi_indices_to_add = poly_expand(current_set=gpc.basis.multi_indices,
to_expand=active_non_enclosed_set[coeff_max_idx_non_enclosed],
order_max=self.options["order_end"],
interaction_order=self.options["interaction_order"])
# update basis
b_added = gpc.basis.add_basis_poly_by_order(multi_indices=multi_indices_to_add,
problem=gpc.problem)
if b_added is not None:
print_str = f"Added multi-indices to basis: \n {np.matrix(multi_indices_to_add)}"
iprint(print_str, tab=0, verbose=self.options["verbose"])
iprint("=" * 100, tab=0, verbose=self.options["verbose"])
extended_basis = True
else:
# increase basis isotropic
basis_order[0], basis_order[1] = increment_basis(order_current=basis_order[0],
interaction_order_current=basis_order[1],
interaction_order_max=self.options["interaction_order"],
incr=basis_increment)
if basis_order[0] > self.options["order_end"]:
break
# update basis
b_added = gpc.basis.set_basis_poly(order=basis_order[0] * np.ones(self.problem.dim),
order_max=basis_order[0],
order_max_norm=self.options["order_max_norm"],
interaction_order=self.options["interaction_order"],
interaction_order_current=basis_order[1],
problem=gpc.problem)
if b_added is not None:
print_str = "Order/Interaction order: {}/{}".format(basis_order[0], basis_order[1])
iprint(print_str, tab=0, verbose=self.options["verbose"])
iprint("=" * len(print_str), tab=0, verbose=self.options["verbose"])
extended_basis = True
# plot basis
if self.options["plot_basis"]:
gpc.basis.plot_basis(dims=np.arange(np.min((gpc.problem.dim, 3))),
fn_plot=self.options["fn_results"] + f"_basis_{i_iter}")
i_iter += 1
if self.options["adaptive_sampling"]:
iprint("Starting adaptive sampling:", tab=0, verbose=self.options["verbose"])
add_samples = True # if adaptive sampling is False, the while loop will be only executed once
delta_eps_target = 1e-1
delta_eps = delta_eps_target + 1
delta_samples = 5e-2
while add_samples and delta_eps > delta_eps_target and eps > self.options["eps"]:
if not self.options["adaptive_sampling"]:
add_samples = False
# new sample size
if extended_basis and self.options["adaptive_sampling"]:
# do not increase sample size immediately when basis was extended, try first with old samples
n_grid_new = gpc.grid.n_grid
elif self.options["adaptive_sampling"] and not first_iter:
# increase sample size stepwise (adaptive sampling)
n_grid_new = int(np.ceil(gpc.grid.n_grid + delta_samples * gpc.basis.n_basis))
else:
# increase sample size according to matrix ratio w.r.t. number of basis functions
n_grid_new = int(np.ceil(gpc.basis.n_basis * self.options["matrix_ratio"]))
# run model if grid points were added
if i_grid < n_grid_new or extended_basis:
# extend grid
if i_grid < n_grid_new:
iprint("Extending grid from {} to {} by {} sampling points".format(
gpc.grid.n_grid, n_grid_new, n_grid_new - gpc.grid.n_grid),
tab=0, verbose=self.options["verbose"])
if self.options["grid"] in [L1, L1_LHS, LHS_L1, FIM, CO]:
if "n_pool" in self.options["grid_options"]:
if self.options["grid_options"]["n_pool"] < int(n_grid_init):
warnings.warn(
'self.options["grid_options"]["n_pool"] < n_grid_new ... '
'setting n_pool to 2*n_grid_new')
self.options["grid_options"]["n_pool"] = 2 * int(n_grid_new)
gpc.grid.extend_random_grid(n_grid_new=n_grid_new)
# run simulations
iprint("Performing simulations " + str(i_grid + 1) + " to " + str(gpc.grid.coords.shape[0]),
tab=0, verbose=self.options["verbose"])
start_time = time.time()
res_new = com.run(model=gpc.problem.model,
problem=gpc.problem,
coords=gpc.grid.coords[int(i_grid):int(len(gpc.grid.coords))],
coords_norm=gpc.grid.coords_norm[int(i_grid):int(len(gpc.grid.coords))],
i_iter=basis_order[0],
i_subiter=basis_order[1],
fn_results=gpc.fn_results,
print_func_time=self.options["print_func_time"],
verbose=self.options["verbose"])
iprint('Total parallel function evaluation: ' + str(time.time() - start_time) + ' sec',
tab=0, verbose=self.options["verbose"])
# Append result to solution matrix (RHS)
if i_grid == 0:
res = res_new
else:
res = np.vstack([res, res_new])
if self.options["gradient_enhanced"]:
start_time = time.time()
grad_res_3D, gradient_idx = get_gradient(model=self.problem.model,
problem=self.problem,
grid=gpc.grid,
results=res,
com=com,
method=self.options["gradient_calculation"],
gradient_results_present=grad_res_3D_FD_fwd,
gradient_idx_skip=gradient_idx_FD_fwd,
i_iter=basis_order[0],
i_subiter=basis_order[1],
print_func_time=self.options["print_func_time"],
dx=self.options["gradient_calculation_options"]["dx"],
distance_weight=self.options["gradient_calculation_options"]["distance_weight"],
verbose=self.options["verbose"])
iprint('Gradient evaluation: ' + str(time.time() - start_time) + ' sec',
tab=0, verbose=self.options["verbose"])
# check validity of results and resample in case the model could not be evaluated at some sampling points
res, grad_res_3D, gradient_idx, gpc.grid = self.check_results(results=res,
gradient_results=grad_res_3D,
gradient_results_idx=gradient_idx,
grid=gpc.grid,
com=com)
if self.options["gradient_enhanced"] and self.options["gradient_calculation"] == "FD_fwd":
gradient_idx_FD_fwd = gradient_idx
grad_res_3D_FD_fwd = grad_res_3D
i_grid = gpc.grid.coords.shape[0]
# update gpc matrix
gpc.init_gpc_matrix(gradient_idx=gradient_idx)
# determine gpc coefficients
coeffs = gpc.solve(results=res,
gradient_results=grad_res_3D,
solver=gpc.solver,
settings=gpc.settings,
verbose=self.options["verbose"])
# validate gpc approximation (determine nrmsd or loocv specified in options["error_type"])
eps = gpc.validate(coeffs=coeffs,
results=res,
gradient_results=grad_res_3D)
if extended_basis:
eps_ref = copy.deepcopy(eps)
else:
delta_eps = np.abs((gpc.error[-1] - gpc.error[-2]) / eps_ref)
iprint("-> {} {} error = {}".format(self.options["error_norm"],
self.options["error_type"],
eps), tab=0, verbose=self.options["verbose"])
# extend basis further if error was decreased (except in very first iteration)
if not first_iter and extended_basis and gpc.error[-1] < gpc.error[-2]:
break
extended_basis = False
first_iter = False
# exit adaptive sampling loop if no adaptive sampling was chosen
if not self.options["adaptive_sampling"]:
break
# save gpc coeffs for this sub-iteration
if self.options["fn_results"] is not None:
with h5py.File(os.path.splitext(self.options["fn_results"])[0] + ".hdf5", "a") as f:
# overwrite coeffs
if "coeffs" in f.keys():
del f['coeffs']
f.create_dataset("coeffs", data=coeffs, maxshape=None, dtype="float64")
# Append gradient of results
if grad_res_3D is not None:
grad_res_2D = ten2mat(grad_res_3D)
try:
del f["model_evaluations/gradient_results"]
del f["model_evaluations/gradient_results_idx"]
except KeyError:
pass
f.create_dataset("model_evaluations/gradient_results",
(grad_res_2D.shape[0], grad_res_2D.shape[1]),
maxshape=(None, None),
dtype="float64",
data=grad_res_2D)
f.create_dataset("model_evaluations/gradient_results_idx", data=gpc.gradient_idx,
maxshape=None, dtype="int64")
try:
del f["gpc_matrix"]
except KeyError:
pass
f.create_dataset("gpc_matrix",
data=gpc.gpc_matrix,
maxshape=None, dtype="float64")
if gpc.gpc_matrix_gradient is not None:
try:
del f["gpc_matrix_gradient"]
except KeyError:
pass
f.create_dataset("gpc_matrix_gradient",
data=gpc.gpc_matrix_gradient,
maxshape=None, dtype="float64")
# determine gpc coefficients
coeffs = gpc.solve(results=res,
gradient_results=grad_res_3D,
solver=gpc.solver,
settings=gpc.settings,
verbose=self.options["verbose"])
# save gpc object and gpc coeffs
if self.options["fn_results"] is not None:
with h5py.File(os.path.splitext(self.options["fn_results"])[0] + ".hdf5", "a") as f:
if "coeffs" in f.keys():
del f['coeffs']
f.create_dataset("coeffs", data=coeffs, maxshape=None, dtype="float64")
try:
del f["gpc_matrix"]
except KeyError:
pass
f.create_dataset("gpc_matrix",
data=gpc.gpc_matrix,
maxshape=None, dtype="float64")
if gpc.gpc_matrix_gradient is not None:
try:
del f["gpc_matrix_gradient"]
except KeyError:
pass
f.create_dataset("gpc_matrix_gradient",
data=gpc.gpc_matrix_gradient,
maxshape=None, dtype="float64")
# misc
f.create_dataset("misc/fn_session",
data=np.array([os.path.split(self.options["fn_session"])[1]]).astype("|S"))
f.create_dataset("misc/fn_session_folder",
data=np.array([self.options["fn_session_folder"]]).astype("|S"))
f.create_dataset("misc/error_type", data=self.options["error_type"])
f.create_dataset("error", data=eps, maxshape=None, dtype="float64")
if gpc.validation is not None:
f.create_dataset("validation/model_evaluations/results", data=gpc.validation.results,
maxshape=None, dtype="float64")
f.create_dataset("validation/grid/coords", data=gpc.validation.grid.coords,
maxshape=None, dtype="float64")
f.create_dataset("validation/grid/coords_norm", data=gpc.validation.grid.coords_norm,
maxshape=None, dtype="float64")
if self.options["gradient_enhanced"]:
f.create_dataset("grid/coords_gradient", data=gpc.grid.coords_gradient,
maxshape=None, dtype="float64")
f.create_dataset("grid/coords_gradient_norm", data=gpc.grid.coords_gradient_norm,
maxshape=None, dtype="float64")
com.close()
return gpc, coeffs, res
[docs]
class MERegAdaptiveProjection(Algorithm):
"""
Adaptive regression approach based on leave one out cross validation error estimation
Parameters
----------
problem: Problem class instance
GPC problem under investigation
options["order_start"] : int, optional, default=0
Initial gPC expansion order (maximum order)
options["order_end"] : int, optional, default=10
Maximum Gpc expansion order to expand to (algorithm will terminate afterwards)
options["interaction_order"]: int, optional, default=dim
Define maximum interaction order of parameters (default: all interactions)
options["order_max_norm"]: float
Norm for which the maximum global expansion order is defined [0, 1]. Values < 1 decrease the total number
of polynomials in the expansion such that interaction terms are penalized more. This truncation scheme
is also referred to "hyperbolic polynomial chaos expansion" such that sum(a_i^q)^1/q <= p,
where p is order_max and q is order_max_norm (for more details see eq. (27) in [1]).
options["adaptive_sampling"] : boolean, optional, default: True
Adds samples adaptively to the expansion until the error is converged and continues by
adding new basis functions.
options["n_samples_discontinuity"] : int, optional, default: 10
Number of grid points close to discontinuity to refine its location
options["n_grid_init"] : int, optional, default: 10
Number of initial simulations to explore the parameter space
Examples
--------
>>> import pygpc
>>> # initialize adaptive gPC algorithm
>>> algorithm = pygpc.MERegAdaptiveProjection(problem=problem, options=options)
>>> # run algorithm
>>> gpc, coeffs, results = algorithm.run()
"""
def __init__(self, problem, options, validation=None, grid=None):
"""
Constructor; Initializes MERegAdaptiveProjection Algorithm
"""
super(MERegAdaptiveProjection, self).__init__(problem=problem, options=options, validation=validation, grid=grid)
# check contents of settings dict and set defaults
if "order_start" not in self.options.keys():
self.options["order_start"] = 0
if "order_end" not in self.options.keys():
self.options["order_end"] = 10
if "interaction_order" not in self.options.keys():
self.options["interaction_order"] = problem.dim
if "order_max_norm" not in self.options.keys():
self.options["order_max_norm"] = 1.
if "adaptive_sampling" not in self.options.keys():
self.options["adaptive_sampling"] = True
if "n_samples_discontinuity" not in self.options.keys():
self.options["n_samples_discontinuity"] = 10
if "n_grid_init" not in self.options.keys():
self.options["n_grid_init"] = 10
if self.options["qoi"] == "all":
self.qoi_specific = True
else:
self.qoi_specific = False
[docs]
def run(self):
"""
Runs Multi-Element adaptive gPC algorithm to solve problem (optional projection).
Returns
-------
megpc : Multi-element GPC object instance
MEGPC object containing all information i.e., Problem, Model, Grid, Basis, RandomParameter instances
coeffs: list of ndarray of float [n_gpc][n_basis x n_out]
GPC coefficients
res : ndarray of float [n_grid x n_out]
Simulation results at n_grid points of the n_out output variables
"""
if self.options["fn_results"] is not None:
fn_results = os.path.splitext(self.options["fn_results"])[0]
if os.path.exists(fn_results + ".hdf5"):
os.remove(fn_results + ".hdf5")
else:
fn_results = None
grid = self.options["grid"]
problem_original = copy.deepcopy(self.problem)
# initialize iterators
grad_res_3D = None
grad_res_3D_all = None
gradient_idx = None
gradient_idx_FD_fwd = None
basis_increment = 0
n_grid_init = self.options["n_grid_init"]
# make initial random grid to determine number of output variables and to estimate projection
if self.grid is not None:
print(f"Using user-predefined grid with n_grid={grid.n_grid}")
self.options["grid"](parameters_random=self.problem.parameters_random,
coords=grid.coords,
coords_norm=grid.coords_norm,
coords_gradient=grid.coords_gradient,
coords_gradient_norm=grid.coords_gradient_norm,
options=self.options["grid_options"])
elif self.options["grid"] == Random or self.options["grid"] == LHS or self.options["grid"] == GP:
print(f"Creating initial grid ({self.options['grid'].__name__}) with n_grid={int(n_grid_init)}")
grid = self.options["grid"](parameters_random=self.problem.parameters_random,
n_grid=n_grid_init,
options=self.options["grid_options"])
elif self.options["grid"] == L1 or self.options["grid"] == L1_LHS or self.options["grid"] == LHS_L1 \
or self.options["grid"] == FIM:
raise NotImplementedError("Grid type not possible for MERegAdaptiveProjection algorithm."
"Please use either 'Random', 'LHS' or 'GP'.")
# Initialize parallel Computation class
com = Computation(n_cpu=self.n_cpu, matlab_model=self.options["matlab_model"])
# Run initial simulations to determine initial projection matrix
iprint("Performing {} initial simulations!".format(grid.coords.shape[0]),
tab=0, verbose=self.options["verbose"])
start_time = time.time()
res_all = com.run(model=self.problem.model,
problem=self.problem,
coords=grid.coords,
coords_norm=grid.coords_norm,
i_iter=self.options["order_start"],
i_subiter=self.options["interaction_order"],
fn_results=self.options["fn_results"], # + "_temp"
print_func_time=self.options["print_func_time"],
verbose=self.options["verbose"])
i_grid = grid.n_grid
iprint('Total function evaluation: ' + str(time.time() - start_time) + ' sec',
tab=0, verbose=self.options["verbose"])
if self.options["qoi"] == "all":
qoi_idx = np.arange(res_all.shape[1])
n_qoi = len(qoi_idx)
error = [None for _ in range(n_qoi)]
else:
qoi_idx = [self.options["qoi"]]
n_qoi = 1
error = [0]
# Determine gradient for projection [n_grid x n_out x dim]
if self.options["gradient_enhanced"] or self.options["projection"]:
if self.options["projection"] or self.options["gradient_calculation"] == "FD_fwd":
method = "FD_fwd"
dx = 1e-3
distance_weight = None
else:
method = self.options["gradient_calculation"]
dx = self.options["gradient_calculation_options"]["dx"]
distance_weight = self.options["gradient_calculation_options"]["distance_weight"]
start_time = time.time()
grad_res_3D_all, gradient_idx = get_gradient(model=self.problem.model,
problem=self.problem,
grid=grid,
results=res_all,
com=com,
method=method,
gradient_results_present=None,
gradient_idx_skip=None,
i_iter=self.options["order_start"],
i_subiter=self.options["interaction_order"],
print_func_time=self.options["print_func_time"],
dx=dx,
distance_weight=distance_weight,
verbose=self.options["verbose"])
if method == "FD_fwd":
gradient_idx_FD_fwd = gradient_idx
grad_res_3D_all_FD_fwd = grad_res_3D_all
else:
gradient_idx_FD_fwd = None
grad_res_3D_all_FD_fwd = None
iprint('Gradient evaluation: ' + str(time.time() - start_time) + ' sec',
tab=0, verbose=self.options["verbose"])
# check validity of results and resample in case the model could not be evaluated at some sampling points
res_all, grad_res_3D_all, gradient_idx, grid = self.check_results(results=res_all,
gradient_results=grad_res_3D_all,
gradient_results_idx=gradient_idx,
grid=grid,
com=com)
megpc = [0 for _ in range(n_qoi)]
coeffs = [0 for _ in range(n_qoi)]
for i_qoi, q_idx in enumerate(qoi_idx):
print_str = "Determining gPC approximation for QOI #{}:".format(q_idx)
iprint(print_str, tab=0, verbose=self.options["verbose"])
iprint("=" * len(print_str), tab=0, verbose=self.options["verbose"])
first_iter = True
# crop results to considered qoi
if self.options["qoi"] != "all":
res = copy.deepcopy(res_all)
grad_res_3D = copy.deepcopy(grad_res_3D_all)
hdf5_subfolder = ""
output_idx_passed_validation = None
# the gPC is constructed for all QOI but only using info for projection etc of desired QOI
# validation is done for all qoi
else:
res = res_all[:, q_idx][:, np.newaxis]
hdf5_subfolder = "/qoi_" + str(q_idx)
output_idx_passed_validation = q_idx
if grad_res_3D_all is not None:
grad_res_3D = grad_res_3D_all[:, q_idx, :][:, np.newaxis, :]
# Create MEGPC object
megpc[i_qoi] = MEGPC(problem=self.problem,
options=self.options,
validation=self.validation)
# Write grid in gpc object
megpc[i_qoi].grid = copy.deepcopy(grid)
# determine gpc domains
iprint("Determining gPC domains ...", tab=0, verbose=self.options["verbose"])
megpc[i_qoi].init_classifier(coords=megpc[i_qoi].grid.coords_norm,
results=res_all[:, q_idx][:, np.newaxis],
algorithm=self.options["classifier"],
options=self.options["classifier_options"])
error[i_qoi] = [[] for _ in range(len(np.unique(megpc[i_qoi].classifier.domains)))]
p_matrix = [0 for _ in range(megpc[i_qoi].n_gpc)]
p_matrix_norm = [0 for _ in range(megpc[i_qoi].n_gpc)]
dim = [0 for _ in range(megpc[i_qoi].n_gpc)]
parameters= [OrderedDict() for _ in range(megpc[i_qoi].n_gpc)]
problem = [0 for _ in range(megpc[i_qoi].n_gpc)]
basis_order = OrderedDict()
n_grid_reinit = [0 for _ in range(megpc[i_qoi].n_gpc)]
# determine initial projection and initialize sub-gPCs
for d in np.unique(megpc[i_qoi].domains):
if self.options["projection"]:
p_matrix[d], _ = determine_projection_matrix(
gradient_results=grad_res_3D_all[megpc[i_qoi].domains[gradient_idx] == d, q_idx, :],
lambda_eps=self.options["lambda_eps_gradient"])
p_matrix_norm[d] = np.sum(np.abs(p_matrix[d]), axis=1)
dim[d] = p_matrix[d].shape[0]
for i in range(dim[d]):
parameters[d]["n{}".format(i)] = Beta(pdf_shape=[1., 1.], pdf_limits=[-1., 1.])
problem[d] = Problem(model=self.problem.model, parameters=parameters[d])
else:
p_matrix[d] = None
p_matrix_norm[d] = None
dim[d] = problem_original.dim
parameters[d] = problem_original.parameters_random
problem[d] = copy.deepcopy(problem_original)
# Set up reduced gPC for this domain
megpc[i_qoi].add_sub_gpc(problem=problem[d],
order=[self.options["order_start"] for _ in range(dim[d])],
order_max=self.options["order_start"],
order_max_norm=self.options["order_max_norm"],
interaction_order=self.options["interaction_order"],
interaction_order_current=self.options["interaction_order"],
options=self.options,
domain=d,
validation=None)
# save original problem in gpc object
megpc[i_qoi].gpc[d].problem_original = copy.deepcopy(problem_original)
# save projection matrix in gPC object
megpc[i_qoi].gpc[d].p_matrix = copy.deepcopy(p_matrix[d])
megpc[i_qoi].gpc[d].p_matrix_norm = copy.deepcopy(p_matrix_norm[d])
# initialize dict containing approximation orders of sub-gPCs [order, interaction_order_current]
basis_order["poly_dom_{}".format(d)] = np.array([self.options["order_start"],
self.options["interaction_order"]])
# initialize solver settings
megpc[i_qoi].gpc[d].solver = self.options["solver"]
megpc[i_qoi].gpc[d].settings = self.options["settings"]
# extend initial grid and perform additional simulations if necessary
if not self.options["adaptive_sampling"] or megpc[i_qoi].gpc[d].solver == "Moore-Penrose":
n_coeffs = get_num_coeffs_sparse(
order_dim_max=[self.options["order_start"] for _ in range(dim[d])],
order_glob_max=self.options["order_start"],
order_inter_max=self.options["interaction_order"],
order_inter_current=self.options["interaction_order"],
dim=dim[d])
n_grid_reinit[d] = n_coeffs * self.options["matrix_ratio"]
# Check if we have enough samples in this particular domain for the given order we start
if n_grid_reinit[d] > np.sum(megpc[i_qoi].domains == d):
# extend random grid
grid.extend_random_grid(n_grid_new=grid.n_grid - np.sum(megpc[i_qoi].domains == d) + n_grid_reinit[d],
domain=d)
megpc[i_qoi].grid = copy.deepcopy(grid)
if grid.n_grid > i_grid:
# Run some more initial simulations
iprint("Performing {} more initial simulations "
"to fulfil order constraint!".format(grid.n_grid - i_grid),
tab=0, verbose=self.options["verbose"])
start_time = time.time()
res_new = com.run(model=self.problem.model,
problem=self.problem,
coords=grid.coords[i_grid:, ],
coords_norm=grid.coords_norm[i_grid:, ],
i_iter=None,
i_subiter=None,
fn_results=self.options["fn_results"],
print_func_time=self.options["print_func_time"],
verbose=self.options["verbose"])
# add results to results array
res_all = np.vstack((res_all, res_new))
i_grid = grid.n_grid
iprint('Total function evaluation: ' + str(time.time() - start_time) + ' sec',
tab=0, verbose=self.options["verbose"])
# Determine gradient [n_grid x n_out x dim]
if self.options["gradient_enhanced"] or self.options["projection"]:
if self.options["projection"] or self.options["gradient_calculation"] == "FD_fwd":
method = "FD_fwd"
dx = 1e-3
distance_weight = None
else:
method = self.options["gradient_calculation"]
dx = self.options["gradient_calculation_options"]["dx"]
distance_weight = self.options["gradient_calculation_options"]["distance_weight"]
start_time = time.time()
grad_res_3D_all, gradient_idx = get_gradient(model=self.problem.model,
problem=self.problem,
grid=grid,
results=res_all,
com=com,
method=method,
gradient_results_present=grad_res_3D_all_FD_fwd,
gradient_idx_skip=gradient_idx_FD_fwd,
i_iter=None,
i_subiter=None,
print_func_time=self.options["print_func_time"],
dx=dx,
distance_weight=distance_weight,
verbose=self.options["verbose"])
if method == "FD_fwd":
gradient_idx_FD_fwd = gradient_idx
grad_res_3D_all_FD_fwd = grad_res_3D_all
else:
gradient_idx_FD_fwd = None
grad_res_3D_all_FD_fwd = None
iprint('Gradient evaluation: ' + str(time.time() - start_time) + ' sec',
tab=0, verbose=self.options["verbose"])
# check validity of results and resample in case the model could not be evaluated at some sampling points
res_all, grad_res_3D_all, gradient_idx, grid = self.check_results(results=res_all,
gradient_results=grad_res_3D_all,
gradient_results_idx=gradient_idx,
grid=grid,
com=com)
megpc[i_qoi].grid = copy.deepcopy(grid)
# update classifier
iprint("Updating classifier ...", tab=0, verbose=self.options["verbose"])
megpc[i_qoi].update_classifier(coords=megpc[i_qoi].grid.coords_norm,
results=res_all[:, q_idx][:, np.newaxis])
# create validation set if necessary
if self.options["error_type"] == "nrmsd" and megpc[0].validation is None:
iprint("Determining validation set of size {} "
"for NRMSD error calculation ...".format(int(self.options["n_samples_validation"])),
tab=0, verbose=self.options["verbose"])
megpc[0].create_validation_set(n_samples=self.options["n_samples_validation"],
n_cpu=self.options["n_cpu"],
gradient=self.options["gradient_enhanced"])
elif self.options["error_type"] == "nrmsd" and megpc[0].validation is not None:
megpc[i_qoi].validation = copy.deepcopy(megpc[0].validation)
extended_basis = True
# initialize domain specific error
eps = np.array([self.options["eps"] + 1.0 for _ in range(megpc[i_qoi].n_gpc)])
# Main iterations (order)
while (eps > self.options["eps"]).any():
stop_by_order = [(basis_order["poly_dom_{}".format(i)] == [self.options["order_end"],
self.options["interaction_order"]]).all() for
i in range(megpc[i_qoi].n_gpc)]
stop_by_error = eps < self.options["eps"]
# print("stop_by_order: {}".format(stop_by_order))
# print("stop_by_error: {}".format(stop_by_error))
# print("eps: {}".format(eps))
# TODO: ValueError: operands could not be broadcast together with shapes (2,) (3,)
if np.logical_or(stop_by_order, stop_by_error).all():
break
iprint("Refining domain boundary ...", tab=0, verbose=self.options["verbose"])
# determine grid points close to discontinuity
coords_norm_disc = get_coords_discontinuity(classifier=megpc[i_qoi].classifier,
x_min=[-1 for _ in range(megpc[i_qoi].problem.dim)],
x_max=[+1 for _ in range(megpc[i_qoi].problem.dim)],
n_coords_disc=self.options["n_samples_discontinuity"],
border_sampling="structured")
coords_disc = grid.get_denormalized_coordinates(coords_norm_disc)
# add grid points close to discontinuity to global grid
grid.extend_random_grid(coords=coords_disc,
coords_norm=coords_norm_disc,
gradient=self.options["gradient_enhanced"])
# run simulations close to discontinuity
iprint("Performing {} simulations to refine discontinuity location!".format(
self.options["n_samples_discontinuity"]), tab=0, verbose=self.options["verbose"])
start_time = time.time()
res_disc = com.run(model=self.problem.model,
problem=self.problem,
coords=coords_disc,
coords_norm=coords_norm_disc,
i_iter="Domain boundary",
i_subiter=None,
fn_results=self.options["fn_results"],
print_func_time=self.options["print_func_time"],
verbose=self.options["verbose"])
iprint('Total function evaluation: ' + str(time.time() - start_time) + ' sec',
tab=0, verbose=self.options["verbose"])
# add results to results array
res_all = np.vstack((res_all, res_disc))
# Determine gradient [n_grid x n_out x dim]
if self.options["gradient_enhanced"] or self.options["projection"]:
start_time = time.time()
grad_res_3D_all, gradient_idx = get_gradient(model=self.problem.model,
problem=self.problem,
grid=grid,
results=res_all,
com=com,
method=self.options["gradient_calculation"],
gradient_results_present=grad_res_3D_all_FD_fwd,
gradient_idx_skip=gradient_idx_FD_fwd,
i_iter="Domain boundary",
i_subiter=None,
print_func_time=self.options["print_func_time"],
dx=self.options["gradient_calculation_options"]["dx"],
distance_weight=self.options["gradient_calculation_options"]["distance_weight"],
verbose=self.options["verbose"])
if self.options["gradient_calculation"] == "FD_fwd":
gradient_idx_FD_fwd = gradient_idx
grad_res_3D_all_FD_fwd = grad_res_3D_all
iprint('Gradient evaluation: ' + str(time.time() - start_time) + ' sec',
tab=0, verbose=self.options["verbose"])
# check validity of results and resample in case the model could not be evaluated at some sampling points
res_all, grad_res_3D_all, gradient_idx, grid = self.check_results(results=res_all,
gradient_results=grad_res_3D_all,
gradient_results_idx=gradient_idx,
grid=grid,
com=com)
i_grid = grid.n_grid
# crop results to considered qoi
if self.options["qoi"] != "all":
res = copy.deepcopy(res_all)
grad_res_3D = copy.deepcopy(grad_res_3D_all)
else:
res = res_all[:, q_idx][:, np.newaxis]
if grad_res_3D_all is not None:
grad_res_3D = grad_res_3D_all[:, q_idx, :][:, np.newaxis, :]
# Write grid in gpc object
megpc[i_qoi].grid = copy.deepcopy(grid)
# update classifier
iprint("Updating classifier ...", tab=0, verbose=self.options["verbose"])
megpc[i_qoi].update_classifier(coords=megpc[i_qoi].grid.coords_norm,
results=res_all[:, q_idx][:, np.newaxis])
# update sub-gPCs if number of domains changed
if len(np.unique(megpc[i_qoi].domains)) != len(megpc[i_qoi].gpc):
iprint("New domains found! Updating number of sub-gPCs from {} to {} ".
format(len(megpc[i_qoi].gpc), len(np.unique(megpc[i_qoi].domains))),
tab=0, verbose=self.options["verbose"])
megpc[i_qoi].gpc = None
megpc[i_qoi].init_classifier(coords=megpc[i_qoi].grid.coords_norm,
results=res_all[:, q_idx][:, np.newaxis],
algorithm=self.options["classifier"],
options=self.options["classifier_options"])
basis_order["poly_dom_{}".format(d)][0] = self.options["order_start"]
basis_order["poly_dom_{}".format(d)][1] = self.options["interaction_order"]
# eps = np.hstack((eps, np.array(self.options["eps"] + 1)))
eps = np.array([self.options["eps"] + 1.0 for _ in range(len(np.unique(megpc[i_qoi].domains)))])
for i_gpc, d in enumerate(np.unique(megpc[i_qoi].domains)):
megpc[i_qoi].add_sub_gpc(problem=problem_original,
order=basis_order["poly_dom_{}".format(d)][0] * np.ones(
self.problem.dim),
order_max=self.options["order_start"],
order_max_norm=self.options["order_max_norm"],
interaction_order=self.options["interaction_order"],
interaction_order_current=basis_order["poly_dom_{}".format(d)][1],
options=self.options,
domain=d,
validation=None)
# save original problem in gpc object
megpc[i_qoi].gpc[d].problem_original = copy.deepcopy(problem_original)
# initialize domain specific interaction order and other settings
megpc[i_qoi].gpc[i_gpc].solver = self.options["solver"]
megpc[i_qoi].gpc[i_gpc].settings = self.options["settings"]
# update projection matrices
if self.options["projection"]:
p_matrix = [0 for _ in range(megpc[i_qoi].n_gpc)]
p_matrix_norm = [0 for _ in range(megpc[i_qoi].n_gpc)]
dim = [0 for _ in range(megpc[i_qoi].n_gpc)]
parameters = [OrderedDict() for _ in range(megpc[i_qoi].n_gpc)]
problem = [0 for _ in range(megpc[i_qoi].n_gpc)]
for d in np.unique(megpc[i_qoi].domains):
p_matrix[d], _ = determine_projection_matrix(
gradient_results=grad_res_3D_all[megpc[i_qoi].domains[gradient_idx] == d, q_idx, :],
lambda_eps=self.options["lambda_eps_gradient"])
p_matrix_norm[d] = np.sum(np.abs(p_matrix[d]), axis=1)
dim[d] = p_matrix[d].shape[0]
for i in range(dim[d]):
parameters[d]["n{}".format(i)] = Beta(pdf_shape=[1., 1.], pdf_limits=[-1., 1.])
problem[d] = Problem(model=self.problem.model, parameters=parameters[d])
# replace sub-gpc with the one containing the reduced problem
megpc[i_qoi].add_sub_gpc(problem=problem[d],
order=basis_order["poly_dom_{}".format(d)][0] * np.ones(dim[d]),
order_max=self.options["order_start"],
order_max_norm=self.options["order_max_norm"],
interaction_order=self.options["interaction_order"],
interaction_order_current=basis_order["poly_dom_{}".format(d)][1],
options=self.options,
domain=d,
validation=None)
# save original problem in gpc object
megpc[i_qoi].gpc[d].problem_original = copy.deepcopy(problem_original)
# save projection matrix in gPC object
megpc[i_qoi].gpc[d].p_matrix = copy.deepcopy(p_matrix[d])
megpc[i_qoi].gpc[d].p_matrix_norm = copy.deepcopy(p_matrix_norm[d])
# initialize domain specific interaction order and other settings
megpc[i_qoi].gpc[d].solver = self.options["solver"]
megpc[i_qoi].gpc[d].settings = self.options["settings"]
# update gpc approximation with new grid points close to discontinuity
# assign grids to sub-gPCs (rotate sub-grids in case of projection)
megpc[i_qoi].assign_grids(gradient_idx=gradient_idx)
# Initialize gpc matrices
megpc[i_qoi].init_gpc_matrices()
# Compute gpc coefficients
if self.options["gradient_enhanced"]:
grad_res_3D_passed = grad_res_3D
else:
grad_res_3D_passed = None
coeffs[i_qoi] = megpc[i_qoi].solve(results=res,
gradient_results=grad_res_3D_passed,
solver=self.options["solver"],
settings=self.options["settings"],
verbose=self.options["verbose"])
# domain specific error
for i_gpc, d in enumerate(np.unique(megpc[i_qoi].domains)):
eps[d] = megpc[i_qoi].validate(coeffs=coeffs[i_qoi],
results=res,
domain=d,
output_idx=output_idx_passed_validation)
error[i_qoi][d].append(eps[d])
iprint("-> Domain: {} {} {} "
"error = {}".format(d,
self.options["error_norm"],
self.options["error_type"],
eps[d]), tab=0, verbose=self.options["verbose"])
# loop over domains and increase order if necessary
for i_gpc, d in enumerate(np.unique(megpc[i_qoi].domains)):
skip = (basis_order["poly_dom_{}".format(d)] ==
[self.options["order_end"], self.options["interaction_order"]]).all()
if (eps[d] > self.options["eps"]) and not skip:
# increase basis by 1 interaction order
order_new = increment_basis(order_current=basis_order["poly_dom_{}".format(d)][0],
interaction_order_current=basis_order["poly_dom_{}".format(d)][1],
interaction_order_max=self.options["interaction_order"],
incr=basis_increment)
basis_order["poly_dom_{}".format(d)][0] = order_new[0]
basis_order["poly_dom_{}".format(d)][1] = order_new[1]
print_str = "Domain: {}, Order: #{}, Sub-iteration: #{}".format(
d,
basis_order["poly_dom_{}".format(d)][0],
basis_order["poly_dom_{}".format(d)][1])
iprint(print_str, tab=0, verbose=self.options["verbose"])
iprint("=" * len(print_str), tab=0, verbose=self.options["verbose"])
# update basis
b_added = megpc[i_qoi].gpc[d].basis.set_basis_poly(
order=basis_order["poly_dom_{}".format(d)][0] * np.ones(dim[d]),
order_max=basis_order["poly_dom_{}".format(d)][0],
order_max_norm=self.options["order_max_norm"],
interaction_order=self.options["interaction_order"],
interaction_order_current=basis_order["poly_dom_{}".format(d)][1],
problem=problem[d])
# continue algorithm if no basis function was added because of max norm constraint
if b_added is not None:
extended_basis = True
else:
extended_basis = False
# iprint("-> Domain: {} {} {} "
# "error = {}".format(d,
# self.options["error_norm"],
# self.options["error_type"],
# eps[d]), tab=0, verbose=self.options["verbose"])
# iprint("-> No basis functions to add in domain {} ... Continuing ... ".format(d),
# tab=0, verbose=self.options["verbose"])
continue
# update gpc matrix
gradient_idx_gpc = get_gradient_idx_domain(domains=megpc[i_qoi].domains,
d=d,
gradient_idx=megpc[i_qoi].gradient_idx)
megpc[i_qoi].gpc[d].init_gpc_matrix(gradient_idx=gradient_idx_gpc)
# determine gpc coefficients with new basis but old samples
if self.options["gradient_enhanced"]:
grad_res_3D_passed = grad_res_3D[megpc[i_qoi].domains[gradient_idx] == d, :, :]
else:
grad_res_3D_passed = None
coeffs[i_qoi][d] = megpc[i_qoi].gpc[d].solve(results=res[megpc[i_qoi].domains == d, ],
gradient_results=grad_res_3D_passed,
solver=megpc[i_qoi].gpc[d].solver,
settings=megpc[i_qoi].gpc[d].settings,
verbose=self.options["verbose"])
# Add samples
add_samples = True # if adaptive sampling is False, the loop will be only executed once
delta_eps_target = 1e-1
delta_eps = delta_eps_target + 1
delta_samples = 4*5e-2
if self.options["adaptive_sampling"]:
iprint("Starting adaptive sampling:", tab=0, verbose=self.options["verbose"])
# only increase samples if error increased and until error converges again
while add_samples and delta_eps > delta_eps_target and eps[d] > self.options["eps"]:
if not self.options["adaptive_sampling"]:
add_samples = False
# new sample size
if extended_basis and self.options["adaptive_sampling"]:
# do not increase sample size immediately when basis was extended
# try first with old samples
n_grid_new = megpc[i_qoi].gpc[d].grid.n_grid
elif self.options["adaptive_sampling"] and not first_iter:
# increase sample size stepwise (adaptive sampling)
n_grid_new = int(np.ceil(megpc[i_qoi].gpc[d].grid.n_grid +
delta_samples * megpc[i_qoi].gpc[d].basis.n_basis))
else:
# increase sample size according to matrix ratio w.r.t. number of basis functions
n_grid_new = int(
np.ceil(megpc[i_qoi].gpc[d].basis.n_basis * self.options["matrix_ratio"]))
# run model if grid points were added
if megpc[i_qoi].gpc[d].grid.n_grid < n_grid_new or extended_basis:
# extend grid
if megpc[i_qoi].gpc[d].grid.n_grid < n_grid_new:
iprint("Extending grid in domain {} from {} to {} by {} sampling points "
"(global grid: {})".format(d, megpc[i_qoi].gpc[d].grid.n_grid, n_grid_new,
n_grid_new - megpc[i_qoi].gpc[d].grid.n_grid,
megpc[i_qoi].grid.n_grid),
tab=0, verbose=self.options["verbose"])
# add grid points in this domain to global grid
grid.extend_random_grid(
n_grid_new=grid.n_grid - megpc[i_qoi].gpc[d].grid.n_grid + n_grid_new,
classifier=megpc[i_qoi].classifier,
domain=d,
gradient=self.options["gradient_enhanced"])
# run simulations
iprint("Performing simulations {} to {}".format(
i_grid + 1, grid.coords.shape[0]),
tab=0, verbose=self.options["verbose"])
start_time = time.time()
res_new = com.run(model=self.problem.model,
problem=self.problem,
coords=grid.coords[int(i_grid):, :],
coords_norm=grid.coords_norm[int(i_grid):, :],
i_iter=basis_order["poly_dom_{}".format(d)][0],
i_subiter=basis_order["poly_dom_{}".format(d)][1],
fn_results=self.options["fn_results"],
print_func_time=self.options["print_func_time"],
verbose=self.options["verbose"])
iprint('Total parallel function evaluation {} sec'.format(
str(time.time() - start_time)),
tab=0, verbose=self.options["verbose"])
# append to results array containing all qoi
res_all = np.vstack([res_all, res_new])
if self.options["gradient_enhanced"] or self.options["projection"]:
start_time = time.time()
grad_res_3D_all, gradient_idx = get_gradient(model=self.problem.model,
problem=self.problem,
grid=grid,
results=res_all,
com=com,
method=self.options["gradient_calculation"],
gradient_results_present=grad_res_3D_all_FD_fwd,
gradient_idx_skip=gradient_idx_FD_fwd,
i_iter=basis_order["poly_dom_{}".format(d)][0],
i_subiter=basis_order["poly_dom_{}".format(d)][1],
print_func_time=self.options["print_func_time"],
dx=self.options["gradient_calculation_options"]["dx"],
distance_weight=self.options["gradient_calculation_options"]["distance_weight"],
verbose=self.options["verbose"])
if self.options["gradient_calculation"] == "FD_fwd":
gradient_idx_FD_fwd = gradient_idx
grad_res_3D_all_FD_fwd = grad_res_3D_all
iprint('Gradient evaluation: ' + str(time.time() - start_time) + ' sec',
tab=0, verbose=self.options["verbose"])
# check validity of results and resample in case the model could not be evaluated at some sampling points
res_all, grad_res_3D_all, gradient_idx, grid = self.check_results(
results=res_all,
gradient_results=grad_res_3D_all,
gradient_results_idx=gradient_idx,
grid=grid,
com=com)
# crop results to considered qoi
if self.options["qoi"] != "all":
res = copy.deepcopy(res_all)
grad_res_3D = copy.deepcopy(grad_res_3D_all)
else:
res = res_all[:, q_idx][:, np.newaxis]
if grad_res_3D_all is not None:
grad_res_3D = grad_res_3D_all[:, q_idx, :][:, np.newaxis, :]
i_grid = grid.coords.shape[0]
# update classifier
iprint("Updating classifier ...", tab=0, verbose=self.options["verbose"])
megpc[i_qoi].update_classifier(coords=grid.coords_norm,
results=res_all[:, q_idx][:, np.newaxis])
# TODO: the number of sub-gpcs could change here :/
# update projection matrices
if self.options["projection"]:
for dd in np.unique(megpc[i_qoi].domains):
p_matrix[dd], _ = determine_projection_matrix(
gradient_results=grad_res_3D_all[megpc[i_qoi].domains[gradient_idx] ==
dd, q_idx, :],
lambda_eps=self.options["lambda_eps_gradient"])
p_matrix_norm[dd] = np.sum(np.abs(p_matrix[dd]), axis=1)
dim[dd] = p_matrix[dd].shape[0]
for i in range(dim[d]):
parameters[d]["n{}".format(i)] = Beta(pdf_shape=[1., 1.],
pdf_limits=[-1., 1.])
problem[d] = Problem(model=self.problem.model, parameters=parameters[d])
# replace sub-gpc with the one containing the reduced problem
megpc[i_qoi].add_sub_gpc(problem=problem[d],
order=basis_order["poly_dom_{}".format(d)][
0] * np.ones(dim[d]),
order_max=self.options["order_start"],
order_max_norm=self.options["order_max_norm"],
interaction_order=self.options[
"interaction_order"],
interaction_order_current=
basis_order["poly_dom_{}".format(d)][1],
options=self.options,
domain=d,
validation=None)
# save original problem in gpc object
megpc[i_qoi].gpc[d].problem_original = copy.deepcopy(problem_original)
# save projection matrix in gPC object
megpc[i_qoi].gpc[d].p_matrix = copy.deepcopy(p_matrix[d])
megpc[i_qoi].gpc[d].p_matrix_norm = copy.deepcopy(p_matrix_norm[d])
# initialize domain specific interaction order and other settings
megpc[i_qoi].gpc[d].solver = self.options["solver"]
megpc[i_qoi].gpc[d].settings = self.options["settings"]
# update and assign grids
megpc[i_qoi].grid = copy.deepcopy(grid)
# assign grids to sub-gPCs (rotate sub-grids in case of projection)
megpc[i_qoi].assign_grids(gradient_idx=gradient_idx)
# update gpc matrix
gradient_idx_gpc = get_gradient_idx_domain(domains=megpc[i_qoi].domains,
d=d,
gradient_idx=megpc[i_qoi].gradient_idx)
megpc[i_qoi].gpc[d].init_gpc_matrix(gradient_idx=gradient_idx_gpc)
# determine gpc coefficients
if self.options["gradient_enhanced"]:
grad_res_3D_passed = grad_res_3D[megpc[i_qoi].domains[gradient_idx] == d, :, :]
else:
grad_res_3D_passed = None
coeffs[i_qoi][d] = megpc[i_qoi].gpc[d].solve(
results=res[megpc[i_qoi].domains == d, ],
gradient_results=grad_res_3D_passed,
solver=megpc[i_qoi].gpc[d].solver,
settings=megpc[i_qoi].gpc[d].settings,
verbose=self.options["verbose"])
# validate gpc approximation
eps[d] = megpc[i_qoi].validate(coeffs=coeffs[i_qoi],
results=res,
domain=d,
output_idx=output_idx_passed_validation)
error[i_qoi][d].append(eps[d])
if extended_basis or first_iter:
eps_ref = copy.deepcopy(eps[d])
else:
delta_eps = np.abs((error[i_qoi][d][-1] -
error[i_qoi][d][-2]) / eps_ref)
first_iter = False
iprint("-> Domain: {} {} {} "
"error = {}".format(d,
self.options["error_norm"],
self.options["error_type"],
eps[d]), tab=0, verbose=self.options["verbose"])
# stop adaptive sampling and extend basis further if error
# was decreased (except in very first iteration)
if extended_basis and error[i_qoi][d][-1] < error[i_qoi][d][-2]:
break
extended_basis = False
# exit adaptive sampling loop if no adaptive sampling was chosen
if not self.options["adaptive_sampling"]:
break
# save gpc object and coeffs for this sub-iteration
if self.options["fn_results"] is not None:
with h5py.File(os.path.splitext(self.options["fn_results"])[0] + ".hdf5", "a") as f:
# overwrite coeffs
try:
del f["coeffs" + hdf5_subfolder + "/dom_" + str(d)]
except KeyError:
pass
f.create_dataset("coeffs" + hdf5_subfolder + "/dom_" + str(d),
data=coeffs[i_qoi][d], maxshape=None, dtype="float64")
# overwrite domains
try:
del f["domains" + hdf5_subfolder]
except KeyError:
pass
f.create_dataset("domains" + hdf5_subfolder,
data=megpc[i_qoi].domains, maxshape=None, dtype="int64")
# save gpc matrix
try:
del f["gpc_matrix" + hdf5_subfolder + "/dom_" + str(d)]
except KeyError:
pass
f.create_dataset("gpc_matrix" + hdf5_subfolder + "/dom_" + str(d),
data=megpc[i_qoi].gpc[d].gpc_matrix,
maxshape=None, dtype="float64")
if megpc[i_qoi].gpc[d].p_matrix is not None:
try:
del f["p_matrix" + hdf5_subfolder + "/dom_" + str(d)]
except KeyError:
pass
f.create_dataset("p_matrix" + hdf5_subfolder + "/dom_" + str(d),
data=megpc[i_qoi].gpc[d].p_matrix,
maxshape=None, dtype="float64")
# save gradient gpc matrix
if megpc[i_qoi].gpc[0].gpc_matrix_gradient is not None:
try:
del f["gpc_matrix_gradient" + hdf5_subfolder + "/dom_" + str(d)]
except KeyError:
pass
if self.options["gradient_enhanced"]:
f.create_dataset("gpc_matrix_gradient" + hdf5_subfolder + "/dom_" + str(d),
data=megpc[i_qoi].gpc[d].gpc_matrix_gradient,
maxshape=None, dtype="float64")
# save results
try:
del f["model_evaluations/results"]
except KeyError:
pass
f.create_dataset("model_evaluations/results",
(res_all.shape[0], res_all.shape[1]),
maxshape=(None, None),
dtype="float64",
data=res_all)
# save gradient of results
if grad_res_3D is not None:
try:
del f["model_evaluations/gradient_results"]
del f["model_evaluations/gradient_results_idx"]
except KeyError:
pass
grad_res_2D_all = ten2mat(grad_res_3D_all)
f.create_dataset("model_evaluations/gradient_results",
(grad_res_2D_all.shape[0], grad_res_2D_all.shape[1]),
maxshape=(None, None),
dtype="float64",
data=grad_res_2D_all)
f.create_dataset("model_evaluations/gradient_results_idx",
dtype="int64",
data=gradient_idx)
try:
del f["error" + hdf5_subfolder + "/dom_" + str(d)]
except KeyError:
pass
f.create_dataset("error" + hdf5_subfolder + "/dom_" + str(d),
data=eps[d],
maxshape=None, dtype="float64")
basis_increment = 1
megpc[i_qoi].update_classifier(coords=megpc[i_qoi].grid.coords_norm,
results=res_all[:, q_idx][:, np.newaxis])
megpc[i_qoi].assign_grids(gradient_idx=gradient_idx)
megpc[i_qoi].init_gpc_matrices()
# determine gpc coefficients
#
# grad_res_3D_passed = grad_res_3D_all[:, q_idx, :][:, np.newaxis, :]
# else:
# grad_res_3D_passed = None
# crop results to considered qoi
if self.options["qoi"] != "all":
res = copy.deepcopy(res_all)
grad_res_3D_passed = copy.deepcopy(grad_res_3D_all)
else:
res = res_all[:, q_idx][:, np.newaxis]
if grad_res_3D_all is not None:
grad_res_3D_passed = grad_res_3D_all[:, q_idx, :][:, np.newaxis, :]
if not self.options["gradient_enhanced"]:
grad_res_3D_passed = None
# determine gpc coefficients
coeffs[i_qoi] = megpc[i_qoi].solve(results=res,
gradient_results=grad_res_3D_passed,
solver=megpc[i_qoi].gpc[d].solver,
settings=megpc[i_qoi].gpc[d].settings,
verbose=self.options["verbose"])
megpc[i_qoi].error = error[i_qoi]
# save gpc object and gpc coeffs
if self.options["fn_results"] is not None:
with h5py.File(os.path.splitext(self.options["fn_results"])[0] + ".hdf5", "a") as f:
try:
fn_session = f["misc/fn_session"]
except KeyError:
f.create_dataset("misc/fn_session",
data=np.array([os.path.split(self.options["fn_session"])[1]]).astype("|S"))
f.create_dataset("misc/fn_session_folder",
data=np.array([self.options["fn_session_folder"]]).astype("|S"))
try:
del f["grid"]
except KeyError:
pass
f.create_dataset("grid/coords", data=grid.coords,
maxshape=None, dtype="float64")
f.create_dataset("grid/coords_norm", data=grid.coords_norm,
maxshape=None, dtype="float64")
if megpc[i_qoi].grid.coords_gradient is not None:
f.create_dataset("grid/coords_gradient",
data=grid.coords_gradient,
maxshape=None, dtype="float64")
f.create_dataset("grid/coords_gradient_norm",
data=grid.coords_gradient_norm,
maxshape=None, dtype="float64")
try:
del f["model_evaluations"]
except KeyError:
pass
f.create_dataset("model_evaluations/results", data=res_all,
maxshape=None, dtype="float64")
if grad_res_3D_all is not None:
f.create_dataset("model_evaluations/gradient_results", data=ten2mat(grad_res_3D_all),
maxshape=None, dtype="float64")
f.create_dataset("model_evaluations/gradient_results_idx", data=gradient_idx,
maxshape=None, dtype="int64")
try:
f.create_dataset("misc/error_type", data=self.options["error_type"])
except RuntimeError:
pass
if megpc[0].validation is not None:
try:
del f["validation"]
except KeyError:
f.create_dataset("validation/model_evaluations/results", data=megpc[0].validation.results,
maxshape=None, dtype="float64")
f.create_dataset("validation/grid/coords", data=megpc[0].validation.grid.coords,
maxshape=None, dtype="float64")
f.create_dataset("validation/grid/coords_norm", data=megpc[0].validation.grid.coords_norm,
maxshape=None, dtype="float64")
# save gpc matrix
for i_gpc, d in enumerate(np.unique(megpc[i_qoi].domains)):
try:
del f["gpc_matrix" + hdf5_subfolder + "/dom_" + str(d)]
except KeyError:
pass
f.create_dataset("gpc_matrix" + hdf5_subfolder + "/dom_" + str(d),
data=megpc[i_qoi].gpc[d].gpc_matrix,
maxshape=None, dtype="float64")
# save gradient gpc matrix
if megpc[i_qoi].gpc[0].gpc_matrix_gradient is not None:
try:
del f["gpc_matrix_gradient" + hdf5_subfolder + "/dom_" + str(d)]
except KeyError:
pass
if self.options["gradient_enhanced"]:
f.create_dataset("gpc_matrix_gradient" + hdf5_subfolder + "/dom_" + str(d),
data=megpc[i_qoi].gpc[d].gpc_matrix_gradient,
maxshape=None, dtype="float64")
try:
for i_gpc in range(megpc[i_qoi].n_gpc):
del f["coeffs" + hdf5_subfolder + "/dom_" + str(i_gpc)]
except KeyError:
pass
for i_gpc in range(megpc[i_qoi].n_gpc):
f.create_dataset("coeffs" + hdf5_subfolder + "/dom_" + str(i_gpc),
data=coeffs[i_qoi][i_gpc],
maxshape=None, dtype="float64")
com.close()
return megpc, coeffs, res_all
[docs]
class RegAdaptiveProjection(Algorithm):
"""
Adaptive regression approach using projection and leave one out cross validation error estimation
Parameters
----------
problem: Problem class instance
GPC problem under investigation
options["order_start"] : int, optional, default=0
Initial gPC expansion order (maximum order)
options["order_end"] : int, optional, default=10
Maximum Gpc expansion order to expand to (algorithm will terminate afterwards)
options["interaction_order"]: int, optional, default=dim
Define maximum interaction order of parameters (default: all interactions)
options["order_max_norm"]: float
Norm for which the maximum global expansion order is defined [0, 1]. Values < 1 decrease the total number
of polynomials in the expansion such that interaction terms are penalized more. This truncation scheme
is also referred to "hyperbolic polynomial chaos expansion" such that sum(a_i^q)^1/q <= p,
where p is order_max and q is order_max_norm (for more details see eq. (27) in [1]).
options["n_grid_gradient"] : float, optional, default: 10
Number of initial grid points to determine gradient and projection matrix. When the algorithm goes
into the main interations the number will be increased depending on the options "matrix_ratio"
and "adaptive_sampling".
options["qoi"] : int or str, optional, default: 0
Choose for which QOI the projection is determined for. The other QOIs use the same projection.
Alternatively, the projection can be determined for every QOI independently (qoi_index or "all").
options["adaptive_sampling"] : boolean, optional, default: True
Adds samples adaptively to the expansion until the error is converged and continues by
adding new basis functions.
Examples
--------
>>> import pygpc
>>> # initialize adaptive gPC algorithm
>>> algorithm = pygpc.RegAdaptiveProjection(problem=problem, options=options)
>>> # run algorithm
>>> gpc, coeffs, results = algorithm.run()
"""
def __init__(self, problem, options, validation=None, grid=None):
"""
Constructor; Initializes RegAdaptiveProjection algorithm
"""
super(RegAdaptiveProjection, self).__init__(problem=problem, options=options, validation=validation, grid=grid)
# check contents of settings dict and set defaults
if "order_start" not in self.options.keys():
self.options["order_start"] = 0
if "order_end" not in self.options.keys():
self.options["order_end"] = 10
if "interaction_order" not in self.options.keys():
self.options["interaction_order"] = problem.dim
if "order_max_norm" not in self.options.keys():
self.options["order_max_norm"] = 1.
if "n_grid_gradient" not in self.options.keys():
self.options["n_grid_gradient"] = 10
if "qoi" not in self.options.keys():
self.options["qoi"] = 0
if "adaptive_sampling" not in self.options.keys():
self.options["adaptive_sampling"] = True
if self.options["qoi"] == "all":
self.qoi_specific = True
else:
self.qoi_specific = False
[docs]
def run(self):
"""
Runs adaptive gPC algorithm using projection to solve problem.
Returns
-------
gpc : GPC object instance
GPC object containing all information i.e., Problem, Model, Grid, Basis, RandomParameter instances
coeffs: ndarray of float [n_basis x n_out]
GPC coefficients
res : ndarray of float [n_grid x n_out]
Simulation results at n_grid points of the n_out output variables
"""
if self.options["fn_results"] is not None:
fn_results = os.path.splitext(self.options["fn_results"])[0]
if os.path.exists(fn_results + ".hdf5"):
os.remove(fn_results + ".hdf5")
if os.path.exists(fn_results + "_temp.hdf5"):
os.remove(fn_results + "_temp.hdf5")
else:
fn_results = None
grad_res_3D = None
gradient_idx = None
# initialize iterators
eps = self.options["eps"] + 1.0
order = self.options["order_start"]
error = []
nrmsd = []
loocv = []
# make initial grid to determine gradients and projection matrix. By default, it is an LHS (ese) grid
if self.grid is not None:
print(f"Using user-predefined grid with n_grid={self.grid.n_grid}")
grid_original = self.options["grid"](parameters_random=self.problem.parameters_random,
coords=self.grid.coords,
coords_norm=self.grid.coords_norm,
coords_gradient=self.grid.coords_gradient,
coords_gradient_norm=self.grid.coords_gradient_norm,
options=self.options["grid_options"])
elif self.options["grid"] == Random or self.options["grid"] == GP:
print(f"Creating initial grid ({self.options['grid'].__init__}) with n_grid={int(self.options['n_grid_gradient'])}")
grid_original = self.options["grid"](parameters_random=self.problem.parameters_random,
n_grid=self.options["n_grid_gradient"],
options=self.options["grid_options"])
else:
print(f"Creating initial grid ({self.options['grid'].__init__}) with n_grid={int(self.options['n_grid_gradient'])}")
grid_original = LHS(parameters_random=self.problem.parameters_random,
n_grid=self.options["n_grid_gradient"],
options={"criterion": "ese",
"seed": self.options["grid_options"]["seed"]})
# Initialize parallel Computation class
com = Computation(n_cpu=self.n_cpu, matlab_model=self.options["matlab_model"])
# Run initial simulations to determine initial projection matrix
iprint("Performing {} simulations!".format(grid_original.coords.shape[0]),
tab=0, verbose=self.options["verbose"])
start_time = time.time()
res_all = com.run(model=self.problem.model,
problem=self.problem,
coords=grid_original.coords,
coords_norm=grid_original.coords_norm,
i_iter=self.options["order_start"],
i_subiter=self.options["interaction_order"],
fn_results=self.options["fn_results"],
print_func_time=self.options["print_func_time"],
verbose=self.options["verbose"])
i_grid = grid_original.n_grid
iprint('Total function evaluation: ' + str(time.time() - start_time) + ' sec',
tab=0, verbose=self.options["verbose"])
# Determine gradient for projection matrix (method: FD_fwd)
start_time = time.time()
grad_res_3D_all, gradient_idx = get_gradient(model=self.problem.model,
problem=self.problem,
grid=grid_original,
results=res_all,
com=com,
method="FD_fwd",
gradient_results_present=None,
gradient_idx_skip=None,
i_iter=self.options["order_start"],
i_subiter=self.options["interaction_order"],
print_func_time=self.options["print_func_time"],
dx=1e-3,
distance_weight=None,
verbose=self.options["verbose"])
gradient_idx_FD_fwd = gradient_idx
grad_res_3D_all_FD_fwd = grad_res_3D_all
iprint('Gradient evaluation: ' + str(time.time() - start_time) + ' sec',
tab=0, verbose=self.options["verbose"])
# check validity of results and resample in case the model could not be evaluated at some sampling points
res_all, grad_res_3D_all, gradient_idx, grid_original = self.check_results(
results=res_all,
gradient_results=grad_res_3D_all,
gradient_results_idx=gradient_idx,
grid=grid_original,
com=com)
# set qoi indices
if self.options["qoi"] == "all":
qoi_idx = np.arange(res_all.shape[1])
n_qoi = len(qoi_idx)
else:
qoi_idx = [self.options["qoi"]]
n_qoi = 1
# init variables
self.problem_reduced = [None for _ in range(n_qoi)]
gpc = [None for _ in range(n_qoi)]
coeffs = [None for _ in range(n_qoi)]
self.options["order_max"] = None
# loop over qoi (projection is qoi specific)
for i_qoi, q_idx in enumerate(qoi_idx):
basis_order = np.array([self.options["order_start"],
min(self.options["interaction_order"], self.options["order_start"])])
if self.options["qoi"] == "all":
qoi_idx_validate = q_idx
else:
qoi_idx_validate = np.arange(res_all.shape[1])
first_iter = True
# crop results to considered qoi
if self.options["qoi"] != "all":
res = copy.deepcopy(res_all)
grad_res_3D = copy.deepcopy(grad_res_3D_all)
hdf5_subfolder = ""
else:
res = res_all[:, q_idx][:, np.newaxis]
grad_res_3D = grad_res_3D_all[:, q_idx, :][:, np.newaxis, :]
hdf5_subfolder = "/qoi_" + str(q_idx)
# copy results of initial simulation
# shutil.copy2(os.path.splitext(self.options["fn_results"])[0] + "_temp.hdf5", fn_results + ".hdf5")
# Set up initial reduced problem
# Determine projection matrix
p_matrix, p_matrix_complete = determine_projection_matrix(gradient_results=grad_res_3D_all[:, q_idx, :],
lambda_eps=self.options["lambda_eps_gradient"])
p_matrix_norm = np.sum(np.abs(p_matrix), axis=1)
# Set up initial reduced problem
dim_reduced = p_matrix.shape[0]
parameters_reduced = OrderedDict()
for i in range(dim_reduced):
parameters_reduced["n{}".format(i)] = Beta(pdf_shape=[1., 1.], pdf_limits=[-1., 1.])
self.problem_reduced[i_qoi] = Problem(model=self.problem.model, parameters=parameters_reduced)
# Create initial reduced gPC object
gpc[i_qoi] = Reg(problem=self.problem_reduced[i_qoi],
order=[self.options["order_start"] for _ in range(dim_reduced)],
order_max=self.options["order_start"],
order_max_norm=self.options["order_max_norm"],
interaction_order=self.options["interaction_order"],
interaction_order_current=self.options["interaction_order"],
options=self.options,
validation=self.validation)
# save original problem in gpc object
gpc[i_qoi].problem_original = self.problem
extended_basis = False
# save projection matrix in gPC object
gpc[i_qoi].p_matrix = copy.deepcopy(p_matrix)
gpc[i_qoi].p_matrix_norm = copy.deepcopy(p_matrix_norm)
# copy global grid, passing it from qoi to qoi but in the first iteration, we have to initialize a new
# grid in case of L1, L1_LHS, LHS_L1 and FIM because they depend on the gpc object which can be different
# for every QOI due to different projections and termination criteria. We are passing the coordinates
# of the initial LHS (ese) grid to it
if self.options["grid"] in [L1, L1_LHS, LHS_L1, FIM]:
grid_original = self.options["grid"](parameters_random=self.problem.parameters_random,
coords=grid_original.coords,
coords_norm=grid_original.coords_norm,
coords_gradient=grid_original.coords_gradient,
coords_gradient_norm=grid_original.coords_gradient_norm,
options=self.options["grid_options"],
gpc=gpc[i_qoi])
# assign transformed grid
gpc[i_qoi].grid = project_grid(grid=grid_original, p_matrix=p_matrix, mode="reduce")
# Initialize gpc matrix
gpc[i_qoi].init_gpc_matrix(gradient_idx=gradient_idx)
gpc[i_qoi].n_grid.pop(0)
gpc[i_qoi].n_basis.pop(0)
gpc[i_qoi].solver = self.options["solver"]
gpc[i_qoi].settings = self.options["settings"]
# Main iterations (order)
while eps > self.options["eps"]:
if first_iter:
basis_increment = 0
else:
basis_increment = 1
# increase basis
basis_order[0], basis_order[1] = increment_basis(order_current=basis_order[0],
interaction_order_current=basis_order[1],
interaction_order_max=np.min([
self.options["interaction_order"],
self.problem_reduced[i_qoi].dim]),
incr=basis_increment)
if basis_order[0] > self.options["order_end"]:
break
# update basis
b_added = gpc[i_qoi].basis.set_basis_poly(order=basis_order[0] *
np.ones(self.problem_reduced[i_qoi].dim),
order_max=basis_order[0],
order_max_norm=self.options["order_max_norm"],
interaction_order=self.options["interaction_order"],
interaction_order_current=basis_order[1],
problem=self.problem_reduced[i_qoi])
print_str = "Order/Interaction order: {}/{}".format(basis_order[0], basis_order[1])
iprint(print_str, tab=0, verbose=self.options["verbose"])
iprint("=" * len(print_str), tab=0, verbose=self.options["verbose"])
if b_added is not None:
extended_basis = True
if self.options["adaptive_sampling"]:
iprint("Starting adaptive sampling:", tab=0, verbose=self.options["verbose"])
add_samples = True # if adaptive sampling is False, the while loop will be only executed once
delta_eps_target = 1e-1
delta_eps = delta_eps_target + 1
delta_samples = 5e-2
if gpc[i_qoi].error:
eps_ref = gpc[i_qoi].error[-1]
while add_samples and delta_eps > delta_eps_target and eps > self.options["eps"]:
if not self.options["adaptive_sampling"]:
add_samples = False
# new sample size
if extended_basis and self.options["adaptive_sampling"]:
# don't increase sample size immediately when basis was extended, try first with old samples
n_grid_new = gpc[i_qoi].grid.n_grid
elif self.options["adaptive_sampling"]:
# increase sample size stepwise (adaptive sampling)
n_grid_new = int(np.ceil(gpc[i_qoi].grid.n_grid + delta_samples * gpc[i_qoi].basis.n_basis))
else:
# increase sample size according to matrix ratio w.r.t. number of basis functions
n_grid_new = int(np.ceil(gpc[i_qoi].basis.n_basis * self.options["matrix_ratio"]))
# run model and update projection matrix if grid points were added
# (Skip simulations of first run because we already simulated it)
if i_grid < n_grid_new or extended_basis:
# extend grid
if i_grid < n_grid_new:
iprint("Extending grid from {} to {} by {} sampling points".format(
gpc[i_qoi].grid.n_grid, n_grid_new, n_grid_new - gpc[i_qoi].grid.n_grid),
tab=0, verbose=self.options["verbose"])
grid_original.gpc = gpc[i_qoi]
grid_original.extend_random_grid(n_grid_new=n_grid_new)
# run simulations
iprint("Performing simulations " + str(i_grid + 1) + " to " +
str(grid_original.coords.shape[0]),
tab=0, verbose=self.options["verbose"])
start_time = time.time()
res_new = com.run(model=self.problem.model,
problem=self.problem,
coords=grid_original.coords[i_grid:grid_original.coords.shape[0]],
coords_norm=grid_original.coords_norm[
i_grid:grid_original.coords.shape[0]],
i_iter=basis_order[0],
i_subiter=basis_order[1],
fn_results=gpc[i_qoi].fn_results,
print_func_time=self.options["print_func_time"],
verbose=self.options["verbose"])
res_all = np.vstack((res_all, res_new))
iprint('Total parallel function evaluation: ' + str(time.time() - start_time) + ' sec',
tab=0, verbose=self.options["verbose"])
i_grid = grid_original.coords.shape[0]
# Determine gradient and update projection matrix in case of gradient enhanced gPC
start_time = time.time()
grad_res_3D_all, gradient_idx = get_gradient(model=self.problem.model,
problem=self.problem,
grid=grid_original,
results=res_all,
com=com,
method=self.options["gradient_calculation"],
gradient_results_present=grad_res_3D_all_FD_fwd,
gradient_idx_skip=gradient_idx_FD_fwd,
i_iter=basis_order[0],
i_subiter=basis_order[1],
print_func_time=self.options["print_func_time"],
dx=self.options["gradient_calculation_options"]["dx"],
distance_weight=self.options["gradient_calculation_options"]["distance_weight"],
verbose=self.options["verbose"])
if self.options["gradient_calculation"] == "FD_fwd":
gradient_idx_FD_fwd = gradient_idx
grad_res_3D_all_FD_fwd = grad_res_3D_all
iprint('Gradient evaluation: ' + str(time.time() - start_time) + ' sec',
tab=0, verbose=self.options["verbose"])
# check validity of results and resample in case the model could not be evaluated at some sampling points
res_all, grad_res_3D_all, gradient_idx, grid_original = self.check_results(
results=res_all,
gradient_results=grad_res_3D_all,
gradient_results_idx=gradient_idx,
grid=grid_original,
com=com)
# Determine projection matrix
p_matrix, p_matrix_complete = determine_projection_matrix(gradient_results=grad_res_3D_all[:, q_idx, :],
lambda_eps=self.options["lambda_eps_gradient"])
p_matrix_norm = np.sum(np.abs(p_matrix), axis=1)
# save projection matrix in gPC object
gpc[i_qoi].p_matrix = copy.deepcopy(p_matrix)
gpc[i_qoi].p_matrix_norm = copy.deepcopy(p_matrix_norm)
# Set up reduced gPC
dim_reduced = p_matrix.shape[0]
iprint("Dimension of reduced problem: {}".format(dim_reduced),
tab=0, verbose=self.options["verbose"])
# Update gPC object if dimension has changed
if dim_reduced != gpc[i_qoi].problem.dim:
parameters_reduced = OrderedDict()
for i in range(dim_reduced):
parameters_reduced["n{}".format(i)] = Beta(pdf_shape=[1., 1.],
pdf_limits=[-1., 1.])
self.problem_reduced[i_qoi] = Problem(model=self.problem.model,
parameters=parameters_reduced)
# Create reduced gPC object of order - 1 and add rest of basisfunctions
# of this subiteration afterwards
gpc[i_qoi] = Reg(problem=self.problem_reduced[i_qoi],
order=basis_order[0] * np.ones(self.problem_reduced[i_qoi].dim),
order_max=basis_order[0],
order_max_norm=self.options["order_max_norm"],
interaction_order=self.options["interaction_order"],
interaction_order_current=basis_order[1],
options=self.options,
validation=self.validation)
# save original problem
gpc[i_qoi].problem_original = self.problem
# save projection matrix in gPC object
gpc[i_qoi].p_matrix = copy.deepcopy(p_matrix)
gpc[i_qoi].p_matrix_norm = copy.deepcopy(p_matrix_norm)
# Save settings and options in gpc object
gpc[i_qoi].solver = self.options["solver"]
gpc[i_qoi].settings = self.options["settings"]
gpc[i_qoi].options = copy.deepcopy(self.options)
gpc[i_qoi].error = error
gpc[i_qoi].relative_error_nrmsd = nrmsd
gpc[i_qoi].relative_error_loocv = loocv
# assign transformed grid
gpc[i_qoi].grid = project_grid(grid=grid_original, p_matrix=p_matrix, mode="reduce")
# in case of L1, L1-LHS, LHS-L1 or FIM grids copy new gpc object into it
if self.options["grid"] in [L1, L1_LHS, LHS_L1, FIM]:
gpc[i_qoi].grid.gpc = gpc[i_qoi]
# crop results to considered qoi
if self.options["qoi"] != "all":
res = copy.deepcopy(res_all)
grad_res_3D = copy.deepcopy(grad_res_3D_all)
else:
res = res_all[:, q_idx][:, np.newaxis]
grad_res_3D = grad_res_3D_all[:, q_idx, :][:, np.newaxis, :]
# Someone might not use the gradient to determine the gpc coeffs
if gpc[i_qoi].gradient:
grad_res_3D_passed = grad_res_3D
gpc[i_qoi].init_gpc_matrix(gradient_idx=gradient_idx)
else:
grad_res_3D_passed = None
gpc[i_qoi].init_gpc_matrix(gradient_idx=None)
# determine gpc coefficients
coeffs[i_qoi] = gpc[i_qoi].solve(results=res,
gradient_results=grad_res_3D_passed,
solver=gpc[i_qoi].solver,
settings=gpc[i_qoi].settings,
verbose=self.options["verbose"])
# Add a validation set if nrmsd is chosen and no validation set is yet present
if self.options["error_type"] == "nrmsd" and not isinstance(gpc[0].validation, ValidationSet):
gpc[0].create_validation_set(n_samples=self.options["n_samples_validation"],
n_cpu=self.options["n_cpu"])
elif self.options["error_type"] == "nrmsd" and isinstance(gpc[0].validation, ValidationSet):
gpc[i_qoi].validation = copy.deepcopy(gpc[0].validation)
# validate gpc approximation (determine nrmsd or loocv specified in options["error_type"])
eps = gpc[i_qoi].validate(coeffs=coeffs[i_qoi],
results=res,
gradient_results=grad_res_3D_passed,
qoi_idx=qoi_idx_validate)
# save error in case that dimension has changed and the gpc object had to be reinitialized
error = copy.deepcopy(gpc[i_qoi].error)
nrmsd = copy.deepcopy(gpc[i_qoi].relative_error_nrmsd)
loocv = copy.deepcopy(gpc[i_qoi].relative_error_loocv)
if extended_basis or first_iter:
eps_ref = copy.deepcopy(eps)
else:
delta_eps = np.abs((gpc[i_qoi].error[-1] - gpc[i_qoi].error[-2]) / eps_ref)
first_iter = False
iprint("-> {} {} error = {}".format(self.options["error_norm"],
self.options["error_type"],
eps), tab=0, verbose=self.options["verbose"])
# extend basis further if error was decreased (except in very first iteration)
if order != self.options["order_start"]:
if extended_basis and gpc[i_qoi].error[-1] < gpc[i_qoi].error[-2]:
break
extended_basis = False
# exit adaptive sampling loop if no adaptive sampling was chosen
if not self.options["adaptive_sampling"]:
break
# save gpc object and coeffs for this sub-iteration
if self.options["fn_results"] is not None:
with h5py.File(os.path.splitext(fn_results)[0] + ".hdf5", "a") as f:
# overwrite coeffs
try:
del f["coeffs" + hdf5_subfolder]
except KeyError:
pass
f.create_dataset("coeffs" + hdf5_subfolder,
data=coeffs[i_qoi], maxshape=None, dtype="float64")
# save projection matrix
try:
del f["p_matrix" + hdf5_subfolder]
except KeyError:
f.create_dataset("p_matrix" + hdf5_subfolder,
data=p_matrix, maxshape=None, dtype="float64")
# save gradient of results
if grad_res_3D is not None:
try:
del f["model_evaluations/gradient_results"]
del f["model_evaluations/gradient_results_idx"]
except KeyError:
pass
grad_res_2D_all = ten2mat(grad_res_3D_all)
f.create_dataset("model_evaluations/gradient_results",
(grad_res_2D_all.shape[0], grad_res_2D_all.shape[1]),
maxshape=(None, None),
dtype="float64",
data=grad_res_2D_all)
f.create_dataset("model_evaluations/gradient_results_idx",
dtype="int64",
data=gradient_idx)
try:
del f["gpc_matrix" + hdf5_subfolder]
except KeyError:
pass
f.create_dataset("gpc_matrix" + hdf5_subfolder,
data=gpc[i_qoi].gpc_matrix,
maxshape=None, dtype="float64")
if gpc[i_qoi].gpc_matrix_gradient is not None:
try:
del f["gpc_matrix_gradient" + hdf5_subfolder]
except KeyError:
pass
f.create_dataset("gpc_matrix_gradient" + hdf5_subfolder,
data=gpc[i_qoi].gpc_matrix_gradient,
maxshape=None, dtype="float64")
try:
del f["error" + hdf5_subfolder]
except KeyError:
pass
f.create_dataset("error" + hdf5_subfolder,
data=eps,
maxshape=None, dtype="float64")
# determine gpc coefficients
coeffs[i_qoi] = gpc[i_qoi].solve(results=res,
gradient_results=grad_res_3D_passed,
solver=gpc[i_qoi].solver,
settings=gpc[i_qoi].settings,
verbose=self.options["verbose"])
# save original grid
gpc[i_qoi].grid_original = copy.deepcopy(grid_original)
# save gpc object gpc coeffs and projection matrix
if self.options["fn_results"] is not None:
with h5py.File(fn_results + ".hdf5", "a") as f:
try:
fn_session = f["misc/fn_session"][:]
except KeyError:
f.create_dataset("misc/fn_session",
data=np.array([os.path.split(self.options["fn_session"])[1]]).astype("|S"))
f.create_dataset("misc/fn_session_folder",
data=np.array([self.options["fn_session_folder"]]).astype("|S"))
try:
del f["coeffs" + hdf5_subfolder]
except KeyError:
pass
f.create_dataset("coeffs" + hdf5_subfolder, data=coeffs[i_qoi], maxshape=None, dtype="float64")
try:
del f["p_matrix" + hdf5_subfolder]
except KeyError:
pass
f.create_dataset("p_matrix" + hdf5_subfolder, data=p_matrix, maxshape=None, dtype="float64")
f.create_dataset("misc/error_type", data=self.options["error_type"])
if self.options["gradient_enhanced"] or gpc[-1].grid.coords_gradient is not None:
f.create_dataset("grid/coords_gradient", data=gpc[-1].grid.coords_gradient,
maxshape=None, dtype="float64")
f.create_dataset("grid/coords_gradient_norm", data=gpc[-1].grid.coords_gradient_norm,
maxshape=None, dtype="float64")
# reset iterators
eps = self.options["eps"] + 1.0
order = self.options["order_start"]
error = []
nrmsd = []
loocv = []
if self.options["fn_results"] is not None:
with h5py.File(fn_results + ".hdf5", "a") as f:
if gpc[0].validation is not None:
f.create_dataset("validation/model_evaluations/results", data=gpc[0].validation.results,
maxshape=None, dtype="float64")
f.create_dataset("validation/grid/coords", data=gpc[0].validation.grid.coords,
maxshape=None, dtype="float64")
f.create_dataset("validation/grid/coords_norm", data=gpc[0].validation.grid.coords_norm,
maxshape=None, dtype="float64")
try:
f.create_dataset("misc/error_type", data=self.options["error_type"])
except (RuntimeError, ValueError):
del f["misc/error_type"]
f.create_dataset("misc/error_type", data=self.options["error_type"])
com.close()
return gpc, coeffs, res