import os
import re
import sys
import h5py
import uuid
import pickle
import inspect
import logging
import numpy as np
from .misc import is_instance
from collections import OrderedDict
from importlib import import_module
[docs]
def write_session(obj, fname, folder="session", overwrite=True):
"""
Saves a gpc session in pickle or hdf5 file formal depending on the
file extension in fname (.pkl or .hdf5)
Parameters
----------
obj : Session object
Session class instance containing the gPC information
fname : str
Path to output file (.pkl or .hdf5)
folder : str, optional, default: "session"
Path in .hdf5 file (for .hdf5 format only)
overwrite : bool, optional, default: True
Overwrite existing file
Returns
-------
<file>: .hdf5 or .pkl file
.hdf5 or .pkl file containing the gpc session
"""
file_format = os.path.splitext(fname)[1]
if file_format == ".pkl":
write_session_pkl(obj, fname, overwrite=overwrite)
elif file_format == ".hdf5":
write_session_hdf5(obj, fname, folder, overwrite=overwrite)
else:
raise IOError("Session can only be saved in .pkl or .hdf5 format.")
[docs]
def write_session_pkl(obj, fname, overwrite=True):
"""
Write Session object including information about the Basis, Problem and Model as pickle file.
Parameters
----------
obj: GPC or derived class
Class instance containing the gPC information
fname: str
Path to output file
overwrite : bool, optional, default: True
Overwrite existing file
Returns
-------
<file>: .pkl file
File containing the GPC object
"""
if not overwrite and os.path.exists(fname):
Warning("File already exists.")
else:
with open(fname, 'wb') as f:
pickle.dump(obj, f, -1)
[docs]
def write_session_hdf5(obj, fname, folder="session", overwrite=True):
"""
Write Session object including information about the Basis, Problem and Model as .hdf5 file.
Parameters
----------
obj : Session object
Session class instance containing the gPC information
fname : str
Path to output file
folder : str, optional, default: "session"
Path in .hdf5 file
overwrite : bool, optional, default: True
Overwrite existing file
Returns
-------
<file>: .hdf5 file
.hdf5 file containing the gpc session
"""
if overwrite and os.path.exists(fname):
os.remove(fname)
write_dict_to_hdf5(fn_hdf5=fname, data=obj.__dict__, folder=folder)
[docs]
def read_session(fname, folder=None):
"""
Reads a gpc session in pickle or hdf5 file formal depending on the
file extension in fname (.pkl or .hdf5)
Parameters
----------
fname : str
path to input file
folder : str, optional, default: None
Path in .hdf5 file
Returns
-------
obj : Session Object
Session object containing instances of Basis, Problem and Model etc.
"""
file_format = os.path.splitext(fname)[1]
if file_format == ".pkl":
obj = read_session_pkl(fname)
elif file_format == ".hdf5":
obj = read_session_hdf5(fname=fname, folder=folder)
else:
raise IOError("Session can only be read from .pkl or .hdf5 files.")
return obj
[docs]
def read_session_pkl(fname):
"""
Read Session object in pickle format.
Parameters
----------
fname: str
path to input file
Returns
-------
obj: Session Object
Session object containing instances of Basis, Problem and Model etc.
"""
with open(fname, 'rb') as f:
obj = pickle.load(f)
return obj
[docs]
def read_session_hdf5(fname, folder="session", verbose=False):
"""
Read gPC object including information about input pdfs, polynomials, grid etc.
object = read_gpc_obj(fname)
Parameters
----------
fname : str
path to input file
folder : str, optional, default: "session"
Path in .hdf5 file
verbose : bool, optional, default: False
Print output info
Returns
-------
obj: GPC Object
GPC object containing instances of Basis, Problem and Model.
"""
from .Problem import Problem
from .Session import Session
from .RandomParameter import RandomParameter
# model
try:
model = read_model_from_hdf5(fn_hdf5=fname, folder=folder + "/model", verbose=verbose)
except KeyError:
model = None
# parameters
parameters_unsorted = read_parameters_from_hdf5(fn_hdf5=fname, folder=folder + "/problem/parameters",
verbose=verbose)
problem_dict = read_group_from_hdf5(fn_hdf5=fname, folder=folder + "/problem", verbose=verbose)
parameters = OrderedDict()
parameters_random = OrderedDict()
for p in problem_dict["parameters_keys"]:
parameters[p] = parameters_unsorted[p]
if isinstance(parameters_unsorted[p], RandomParameter):
parameters_random[p] = parameters_unsorted[p]
# problem(model, parameters)
problem = Problem(model, parameters)
# options
options = read_group_from_hdf5(fn_hdf5=fname, folder=folder + "/algorithm/options")
# validation
try:
validation = read_validation_from_hdf5(fn_hdf5=fname, folder=folder + "/validation", verbose=verbose)
except KeyError:
validation = None
# grid
grid = read_grid_from_hdf5(fn_hdf5=fname, folder=folder + "/grid", verbose=verbose)
# algorithm
module = import_module(".Algorithm", package="pygpc")
algorithm_dict = read_group_from_hdf5(fn_hdf5=fname, folder=folder + "/algorithm", verbose=verbose)
alg = getattr(module, algorithm_dict["attrs"]["dtype"].split(".")[-1])
args = inspect.getfullargspec(alg).args[1:]
args_dict = dict()
args = [a for a in args if a != "gpc"]
for a in args:
args_dict[a] = locals()[a]
algorithm = alg(**args_dict)
# gpc
gpc_raw_list = read_group_from_hdf5(fn_hdf5=fname, folder=folder + "/gpc", verbose=verbose)
module = import_module(".Algorithm", package="pygpc")
gpc_list = [0 for _ in range(len(gpc_raw_list))]
for i_gpc, gpc_raw in enumerate(gpc_raw_list):
# read and initialize classifier if present
if "classifier" in gpc_raw.keys():
classifier = read_classifier_from_hdf5(fn_hdf5=fname,
folder=folder + "/gpc/{}/classifier".format(i_gpc),
verbose=verbose)
# read SGPC object if present (sub-gpc)
if "gpc" in gpc_raw.keys():
gpc = read_sgpc_from_hdf5(fn_hdf5=fname,
folder=folder + "/gpc/{}/gpc".format(i_gpc),
verbose=verbose)
# get gpc class (SGPC or MEGPC)
g = getattr(module, gpc_raw["attrs"]["dtype"].rsplit(".", 1)[1])
del gpc_raw["attrs"]
# SGPC
if "SGPC" in g.__module__:
gpc_list = read_sgpc_from_hdf5(fn_hdf5=fname,
folder=folder + "/gpc/{}".format(i_gpc),
verbose=verbose)
# MEGPC with sub-gpcs
else:
# get input parameters of gpc
args = inspect.getfullargspec(g).args[1:]
args_dict = dict()
for a in args:
args_dict[a] = locals()[a]
# initialize gpc
gpc_list[i_gpc] = g(**args_dict)
# loop over entries and save in self (if we have it in locals() we take this,
# e.g. gpc, grid, validation etc)
for key in gpc_raw:
if key in locals():
setattr(gpc_list[i_gpc], key, locals()[key])
else:
setattr(gpc_list[i_gpc], key, gpc_raw[key])
# session(algorithm)
session = Session(algorithm=algorithm)
# read session hdf5 content
session_dict = read_group_from_hdf5(fn_hdf5=fname, folder=folder, verbose=verbose)
for key in session_dict:
if key in locals():
setattr(session, key, locals()[key])
else:
setattr(session, key, session_dict[key])
# add path of .hdf5 script to python which generated the session (needed in case of relative imports)
sys.path.append(os.path.split(session_dict["fn_script"])[0])
# set gpc type in session
session.set_gpc(gpc_list)
return session
[docs]
def read_problem_from_hdf5(fn_hdf5, folder, verbose=False):
"""
Reads problem from hdf5 file
Parameters
----------
fn_hdf5 : str
Filename of .hdf5 file to write in
folder : str
Folder inside .hdf5 file where dict is saved
verbose : bool, optional, default: False
Print output info
Returns
-------
problem : Problem object
Problem
"""
from .Problem import Problem
# read content of problem
problem_dict = read_group_from_hdf5(fn_hdf5=fn_hdf5, folder=folder, verbose=verbose)
# model
model = read_model_from_hdf5(fn_hdf5=fn_hdf5, folder=folder + "/model", verbose=verbose)
# parameters
parameters_unsorted = read_parameters_from_hdf5(fn_hdf5=fn_hdf5, folder=folder + "/parameters", verbose=False)
# sort parameters
parameters = OrderedDict()
for p in problem_dict["parameters_keys"]:
parameters[p] = parameters_unsorted[p]
# initialize problem
problem = Problem(model, parameters)
return problem
[docs]
def read_classifier_from_hdf5(fn_hdf5, folder, verbose=False):
"""
Reads classifier from hdf5 file
Parameters
----------
fn_hdf5 : str
Filename of .hdf5 file to write in
folder : str
Folder inside .hdf5 file where dict is saved
verbose : bool, optional, default: False
Print output info
Returns
-------
classifier : classifier object
Classifier
"""
classifier_dict = read_group_from_hdf5(fn_hdf5=fn_hdf5, folder=folder, verbose=verbose)
module = import_module(".Classifier", package="pygpc")
c = getattr(module, classifier_dict["attrs"]["dtype"].rsplit(".", 1)[1])
# get input parameters of classifier
args = inspect.getfullargspec(c).args[1:]
args_dict = dict()
for a in args:
args_dict[a] = classifier_dict[a]
init_classifier = True
# for some reason the domains may be swapped in very rare cases so we do the init again
while init_classifier:
# initialize classifier
classifier = c(**args_dict)
# ensure that domains are not swapped
classifier.domains = classifier_dict["domains"]
classifier.update(coords=classifier_dict["coords"],
results=classifier_dict["results"])
if np.sum(classifier.predict(coords=classifier_dict["coords"]) == classifier_dict["domains"])/ \
len(classifier_dict["domains"]) > 0.95:
init_classifier = False
return classifier
[docs]
def read_basis_from_hdf5(fn_hdf5, folder, verbose=False):
"""
Reads Basis from hdf5 file
Parameters
----------
fn_hdf5 : str
Filename of .hdf5 file to write in
folder : str
Folder inside .hdf5 file where dict is saved
verbose : bool, optional, default: False
Print output info
Returns
-------
basis : Basis object
basis
"""
basis_dict = read_group_from_hdf5(fn_hdf5=fn_hdf5, folder=folder, verbose=verbose)
module_basis = import_module(".Basis", package="pygpc")
module_basis_function = import_module(".BasisFunction", package="pygpc")
b = getattr(module_basis, basis_dict["attrs"]["dtype"].rsplit(".", 1)[1])
# get arguments to initialize basis
args = inspect.getfullargspec(b).args[1:]
# collect arguments from hdf5 file content
args_dict = dict()
for a in args:
args_dict[a] = basis_dict[a]
# initialize basis
basis = b(**args_dict)
# write content in self
for key in basis_dict:
if key != "b":
setattr(basis, key, basis_dict[key])
b = [[0 for _ in range(basis_dict["dim"])] for _ in range(basis_dict["n_basis"])]
for i_basis, b_lst in enumerate(basis_dict["b"]):
for i_dim, b_ in enumerate(b_lst):
# get type of basis function
bf = getattr(module_basis_function, b_["attrs"]["dtype"].rsplit(".", 1)[1])
# read content of hdf5
bf_dict = read_group_from_hdf5(fn_hdf5=fn_hdf5,
folder=folder + "/b/{}/{}".format(i_basis, i_dim),
verbose=verbose)
# get arguments to initialize basis function
args = inspect.getfullargspec(bf).args[1:]
# collect arguments from hdf5 file content
args_dict = dict()
for a in args:
args_dict[a] = bf_dict[a]
# initialize basis function
b[i_basis][i_dim] = bf(**args_dict)
# extend basis
basis.extend_basis(b)
return basis
[docs]
def read_sgpc_from_hdf5(fn_hdf5, folder, verbose=False):
"""
Reads SGPC from hdf5 file
Parameters
----------
fn_hdf5 : str
Filename of .hdf5 file to write in
folder : str
Folder inside .hdf5 file where dict is saved
verbose : bool, optional, default: False
Print output info
Returns
-------
sgpc : SGPC object or list of SGPC objects
SGPC
"""
sgpc_raw_list = read_group_from_hdf5(fn_hdf5=fn_hdf5, folder=folder, verbose=verbose)
module = import_module(".SGPC", package="pygpc")
if type(sgpc_raw_list) is not list:
sgpc_raw_list = [sgpc_raw_list]
sub_gpc = False
else:
sub_gpc = True
sgpc_list = [0 for _ in range(len(sgpc_raw_list))]
for i_gpc, sgpc_raw in enumerate(sgpc_raw_list):
if sub_gpc:
hdf5_loc = "/{}/".format(i_gpc)
else:
hdf5_loc = "/"
# get gpc by type
g = getattr(module, sgpc_raw["attrs"]["dtype"].rsplit(".", 1)[1])
# get input parameters of classifier
args = inspect.getfullargspec(g).args[1:]
args_dict = dict()
for a in args:
if a == "problem":
args_dict[a] = read_problem_from_hdf5(fn_hdf5=fn_hdf5,
folder=folder + hdf5_loc + a,
verbose=verbose)
elif a == "validation":
args_dict[a] = read_validation_from_hdf5(fn_hdf5=fn_hdf5,
folder=folder + hdf5_loc + a,
verbose=verbose)
else:
args_dict[a] = sgpc_raw[a]
# initialize SGPC object
sgpc_list[i_gpc] = g(**args_dict)
# write objects in self
for key in sgpc_raw:
try:
dtype = sgpc_raw[key]["attrs"]["dtype"]
if "pygpc.Basis" in dtype:
basis = read_basis_from_hdf5(fn_hdf5=fn_hdf5,
folder=folder + hdf5_loc + key,
verbose=verbose)
setattr(sgpc_list[i_gpc], key, basis)
elif "pygpc.Grid" in dtype:
grid = read_grid_from_hdf5(fn_hdf5=fn_hdf5,
folder=folder + hdf5_loc + key,
verbose=verbose)
setattr(sgpc_list[i_gpc], key, grid)
elif "pygpc.Problem" in dtype:
problem = read_problem_from_hdf5(fn_hdf5=fn_hdf5,
folder=folder + hdf5_loc + key,
verbose=verbose)
setattr(sgpc_list[i_gpc], key, problem)
else:
setattr(sgpc_list[i_gpc], key, sgpc_raw[key])
except (KeyError, IndexError, TypeError):
setattr(sgpc_list[i_gpc], key, sgpc_raw[key])
return sgpc_list
[docs]
def read_model_from_hdf5(fn_hdf5, folder, verbose=False):
"""
Reads model from hdf5 file
Parameters
----------
fn_hdf5 : str
Filename of .hdf5 file to write in
folder : str
Folder inside .hdf5 file where dict is saved
verbose : bool, optional, default: False
Print output info
Returns
-------
model : Model object
Model
"""
model_dict = read_group_from_hdf5(fn_hdf5=fn_hdf5, folder=folder, verbose=verbose)
sys.path.append(os.path.split(model_dict["fname"])[0])
module = import_module(os.path.splitext(os.path.split(model_dict["fname"])[1])[0])
model = getattr(module, model_dict["attrs"]["dtype"].rsplit(".", 1)[1])()
return model
[docs]
def read_parameters_from_hdf5(fn_hdf5, folder, verbose=False):
"""
Reads parameters from hdf5 file
Parameters
----------
fn_hdf5 : str
Filename of .hdf5 file to write in
folder : str
Folder inside .hdf5 file where dict is saved
verbose : bool, optional, default: False
Print output info
Returns
-------
parameters : OrderedDict
OrdererDict containing the parameters (random and deterministic)
"""
parameters = OrderedDict()
parameters_dict = read_group_from_hdf5(fn_hdf5=fn_hdf5, folder=folder, verbose=verbose)
module = import_module(".RandomParameter", package="pygpc")
for p in parameters_dict:
if (type(parameters_dict[p]) is dict or type(parameters_dict[p]) is OrderedDict) and \
"RandomParameter" in parameters_dict[p]["attrs"]["dtype"]:
rp = getattr(module, parameters_dict[p]["attrs"]["dtype"].split(".")[-1])
args = inspect.getfullargspec(rp).args[1:]
args_dict = dict()
for a in args:
args_dict[a] = parameters_dict[p][a]
parameters[p] = rp(**args_dict)
else:
parameters[p] = parameters_dict[p]
return parameters
[docs]
def read_grid_from_hdf5(fn_hdf5, folder, verbose=False):
"""
Reads and initializes grid from hdf5 file
Parameters
----------
fn_hdf5 : str
Filename of .hdf5 file to write in
folder : str
Folder inside .hdf5 file where dict is saved
verbose : bool, optional, default: False
Print output info
Returns
-------
grid : Grid object
Grid
"""
grid_dict = read_group_from_hdf5(fn_hdf5=fn_hdf5, folder=folder, verbose=verbose)
module = import_module(".Grid", package="pygpc")
g = getattr(module, grid_dict["attrs"]["dtype"].split(".")[-1])
# get arguments of grid function
args = inspect.getfullargspec(g).args[1:]
# read content of hdf5 and relate to arguments
args_dict = dict()
for a in args:
if a in ["coords", "coords_norm", "coords_gradient", "coords_gradient_norm", "weights"]:
args_dict[a] = grid_dict["_" + a]
elif a == "parameters_random":
parameters_random = read_parameters_from_hdf5(fn_hdf5=fn_hdf5,
folder=folder + "/parameters_random",
verbose=verbose)
args_dict[a] = parameters_random
else:
args_dict[a] = grid_dict[a]
# regenerate unique grid IDs
args_dict["coords_id"] = [uuid.uuid4() for _ in range(grid_dict["n_grid"])]
if args_dict["coords_gradient"] is not None:
args_dict["coords_gradient_id"] = args_dict["coords_id"]
grid = g(**args_dict)
return grid
[docs]
def read_validation_from_hdf5(fn_hdf5, folder, verbose=False):
"""
Reads and initializes ValidatioSet from hdf5 file
Parameters
----------
fn_hdf5 : str
Filename of .hdf5 file to write in
folder : str
Folder inside .hdf5 file where dict is saved
verbose : bool, optional, default: False
Print output info
Returns
-------
validation : ValidationSet object
ValidationSet
"""
from .ValidationSet import ValidationSet
validation_dict = read_group_from_hdf5(fn_hdf5=fn_hdf5, folder=folder)
if validation_dict is None:
validation = None
else:
args_dict = dict()
args = inspect.getfullargspec(ValidationSet).args[1:]
for a in args:
if a == "grid":
args_dict[a] = read_grid_from_hdf5(fn_hdf5=fn_hdf5, folder=folder + "/grid", verbose=verbose)
else:
args_dict[a] = validation_dict[a]
validation = ValidationSet(**args_dict)
return validation
[docs]
def read_group_from_hdf5(fn_hdf5, folder, verbose=False):
"""
Read data from group (folder) in hdf5 file
Parameters
----------
fn_hdf5 : str
Filename of .hdf5 file to write in
folder : str
Folder inside .hdf5 file where dict is saved
verbose : bool, optional, default: False
Print output info
Returns
-------
data : dict or list or OrderedDict
Folder content
"""
f = h5py.File(fn_hdf5, "r")
attrs = dict()
for a in f[folder].attrs:
attrs[a] = f[folder].attrs.__getitem__(a)
data = dict()
if isinstance(f[folder], h5py.Group) and len(f[folder].keys()) > 0:
for key in f[folder].keys():
if folder != "/":
data["attrs"] = attrs
data[key] = read_array_from_hdf5(fn_hdf5=fn_hdf5,
arr_name=folder + "/" + key)
if folder != "/":
if data["attrs"]["dtype"] == "list":
data = [data[key] for key in data if key != "attrs"]
elif data["attrs"]["dtype"] == "dict":
del data["attrs"]
elif data["attrs"]["dtype"] == "collections.OrderedDict":
data_ordered = OrderedDict()
for key in data:
if key != "attrs":
data_ordered[key] = data[key]
data = data_ordered
else:
data = None
return data
[docs]
def read_array_from_hdf5(fn_hdf5, arr_name, verbose=False):
"""
Parameters
----------
fn_hdf5 : str
Filename of .hdf5 file to write in
folder : str
Folder inside .hdf5 file where dict is saved
verbose : bool, optional, default: False
Print output info
Returns
-------
data
attrs
"""
f = h5py.File(fn_hdf5, "r")
if isinstance(f[arr_name], h5py.Group):
data = read_group_from_hdf5(fn_hdf5=fn_hdf5, folder=arr_name)
else:
data = f[arr_name][()]
if type(data) == np.bytes_:
data = str(data.astype(str))
if type(data) == str and (data == "None" or data == "N/A"):
data = None
return data
[docs]
def write_dict_to_hdf5(fn_hdf5, data, folder, verbose=False):
"""
Takes dict and passes its keys to write_arr_to_hdf5()
fn_hdf5:folder/
|--key1
|--key2
|...
Parameters
----------
fn_hdf5 : str
Filename of .hdf5 file to write in
data : dict
Dictionary to save in .hdf5 file
folder : str
Folder inside .hdf5 file where dict is saved
verbose : bool, optional, default: False
Print output info
"""
max_recursion_depth = 12
# object (dict)
if is_instance(data) and not isinstance(data, OrderedDict):
t, dt = get_dtype(data)
# do not save uuids in hdf5
if dt == "uuid.UUID":
return
else:
# create group and set type and dtype attributes
with h5py.File(fn_hdf5, "a") as f:
f.create_group(str(folder))
f[str(folder)].attrs.__setitem__("type", t)
f[str(folder)].attrs.__setitem__("dtype", dt)
# write content
for key in data.__dict__:
if len(folder.split("/")) >= max_recursion_depth:
data.__dict__[key] = "None"
write_arr_to_hdf5(fn_hdf5=fn_hdf5,
arr_name=folder+"/"+key,
data=data.__dict__[key],
verbose=verbose)
# mappingproxy (can not be saved)
elif str(type(data)) == "<class 'mappingproxy'>":
data = "mappingproxy"
write_arr_to_hdf5(fn_hdf5=fn_hdf5,
arr_name="mappingproxy",
data=data,
verbose=verbose)
# list or tuple
elif type(data) is list or type(data) is tuple:
t, dt = get_dtype(data)
# create group and set type and dtype attributes
with h5py.File(fn_hdf5, "a") as f:
f.create_group(str(folder))
f[str(folder)].attrs.__setitem__("type", t)
f[str(folder)].attrs.__setitem__("dtype", dt)
for idx, lst in enumerate(data):
if len(folder.split("/")) >= max_recursion_depth:
lst = "None"
write_arr_to_hdf5(fn_hdf5=fn_hdf5,
arr_name=folder+"/"+str(idx),
data=lst,
verbose=verbose)
# dict or OrderedDict
else:
t, dt = get_dtype(data)
# create group and set type and dtype attributes
with h5py.File(fn_hdf5, "a") as f:
try:
f.create_group(str(folder))
f[str(folder)].attrs.__setitem__("type", t)
f[str(folder)].attrs.__setitem__("dtype", dt)
except ValueError:
pass
for key in list(data.keys()):
if len(folder.split("/")) >= max_recursion_depth:
data[key] = "None"
write_arr_to_hdf5(fn_hdf5=fn_hdf5,
arr_name=folder+"/"+str(key),
data=data[key],
verbose=verbose)
[docs]
def write_arr_to_hdf5(fn_hdf5, arr_name, data, overwrite_arr=True, verbose=False):
"""
Takes an array and adds it to an .hdf5 file
If data is list of dict, write_dict_to_hdf5() is called for each dict with adapted hdf5-folder name
Otherwise, data is casted to np.ndarray and dtype of unicode data casted to '|S'.
Parameters
----------
fn_hdf5 : str
Filename of .hdf5 file
arr_name : str
Complete path in .hdf5 file with array name
data : ndarray, list or dict
Data to write
overwrite_arr : bool, optional, default: True
Overwrite existing array
verbose : bool, optional, default: False
Print information
"""
max_recursion_depth = 12
# dict or OrderedDict
if isinstance(data, dict) or isinstance(data, OrderedDict):
if len(arr_name.split("/")) >= max_recursion_depth:
data = np.array("None")
else:
write_dict_to_hdf5(fn_hdf5=fn_hdf5,
data=data,
folder=arr_name,
verbose=verbose)
return
# list of dictionaries:
elif isinstance(data, list) and len(data) > 0 and (isinstance(data[0], dict) or is_instance(data[0])):
t, dt = get_dtype(data)
# do not save uuids in hdf5
if dt == "uuid.UUID":
return
else:
# create group and set type and dtype attributes
with h5py.File(fn_hdf5, "a") as f:
f.create_group(str(arr_name))
f[str(arr_name)].attrs.__setitem__("type", t)
f[str(arr_name)].attrs.__setitem__("dtype", dt)
for idx, lst in enumerate(data):
if len(arr_name.split("/")) >= max_recursion_depth:
lst = np.array("None")
write_dict_to_hdf5(fn_hdf5=fn_hdf5,
data=lst,
folder=arr_name+"/"+str(idx),
verbose=verbose)
return
# object
elif is_instance(data):
if len(arr_name.split("/")) >= max_recursion_depth:
data = np.array("None")
else:
t, dt = get_dtype(data)
# create group and set type and dtype attributes
with h5py.File(fn_hdf5, "a") as f:
f.create_group(str(arr_name))
f[str(arr_name)].attrs.__setitem__("type", t)
f[str(arr_name)].attrs.__setitem__("dtype", dt)
write_dict_to_hdf5(fn_hdf5=fn_hdf5,
data=data.__dict__,
folder=arr_name,
verbose=verbose)
return
# list or tuple
elif type(data) is list or type(data) is tuple:
if len(arr_name.split("/")) >= max_recursion_depth:
data = np.array(["None"])
t, dt = get_dtype(data)
# do not save uuids in hdf5
if dt == "uuid.UUID":
return
else:
# create group and set type and dtype attributes
with h5py.File(fn_hdf5, "a") as f:
f.create_group(str(arr_name))
f[str(arr_name)].attrs.__setitem__("type", t)
f[str(arr_name)].attrs.__setitem__("dtype", dt)
data_dict = dict()
for idx, lst in enumerate(data):
data_dict[idx] = lst
write_dict_to_hdf5(fn_hdf5=fn_hdf5,
data=data_dict,
folder=arr_name,
verbose=verbose)
return
elif not isinstance(data, np.ndarray):
if len(arr_name.split("/")) >= max_recursion_depth:
data = np.array("None")
else:
data = np.array(data)
# np.arrays of np.arrays
elif data.dtype == 'O' and len(data) > 1:
if len(arr_name.split("/")) >= max_recursion_depth:
return
else:
t, dt = get_dtype(data)
# create group and set type and dtype attributes
with h5py.File(fn_hdf5, "a") as f:
f.create_group(str(arr_name))
f[str(arr_name)].attrs.__setitem__("type", t)
f[str(arr_name)].attrs.__setitem__("dtype", dt)
data = data.tolist()
write_dict_to_hdf5(fn_hdf5=fn_hdf5,
data=data,
folder=arr_name,
verbose=verbose)
return
# do some type casting from numpy/pd -> h5py
# date column from experiment.csv is O
# plotsetting["view"] is O list of list of different length
# coil1 and coil2 columns names from experiment.csv is <U8
# coil_mean column name from experiment.csv is <U12
if data.dtype == 'O' or data.dtype.kind == 'U':
data = data.astype('|S')
if verbose:
print("Converting array " + arr_name + " to string")
t, dt = get_dtype(data)
with h5py.File(fn_hdf5, 'a') as f:
# create data_set
if overwrite_arr:
try:
del f[arr_name]
except KeyError:
pass
f.create_dataset(arr_name, data=data)
f[str(arr_name)].attrs.__setitem__("type", t)
f[str(arr_name)].attrs.__setitem__("dtype", dt)
return
[docs]
def get_dtype(obj):
"""
Get type and datatype of object
Parameters
----------
obj : Object
Input object (any)
Returns
-------
type : str
Type of object (e.g. 'class')
dtype : str
Datatype of object (e.g. 'numpy.ndarray')
"""
type_str = str(type(obj))
type_attr = re.match(pattern=r"\<(.*?)\ '", string=type_str).group(1)
dtype_attr = re.findall(pattern=r"'(.*?)'", string=type_str)[0]
return type_attr, dtype_attr
[docs]
def write_data_txt(data, fname):
"""
Write data (quantity of interest) in .txt file (e.g. coeffs, mean, std, ...).
write_data_txt(data, fname)
Parameters
----------
data: ndarray of float
Data to save
fname: str
Path to output file
Returns
-------
<file>: .txt file
File containing the data (tab delimited)
"""
np.savetxt(fname, data, fmt='%.10e', delimiter='\t', newline='\n', header='', footer='')
[docs]
def read_data_hdf5(fname, loc):
"""
Read data from .hdf5 file (e.g. coeffs, mean, std, ...).
load_data_hdf5(fname, loc)
Parameters
----------
fname: str
path to input file
loc: str
location (folder and name) in hdf5 file (e.g. data/phi)
Returns
-------
data: ndarray of float
Loaded data from .hdf5 file
"""
with h5py.File(fname, 'r') as f:
d = f[loc]
return d
[docs]
def write_data_hdf5(data, fname, loc):
"""
Write quantity of interest in .hdf5 file (e.g. coeffs, mean, std, ...).
write_data_hdf5(data, fname, loc)
Parameters
----------
data: np.ndarray
data to save
fname: str
path to output file
loc: str
location (folder and name) in hdf5 file (e.g. data/phi)
"""
with h5py.File(fname, 'a') as f:
f.create_dataset(loc, data=data)
[docs]
def write_sobol_idx_txt(sobol_idx, fname):
"""
Write sobol_idx list in file.
write_sobol_idx_txt(sobol_idx, filename)
Parameters
----------
sobol_idx: [N_sobol] list of np.ndarray
List of parameter label indices belonging to Sobol indices
fname: str
Path to output file
Returns
-------
<file>: .txt file
File containing the sobol index list.
"""
f = open(fname, 'w')
f.write('# Parameter index list of Sobol indices:\n')
for line in sobol_idx:
for entry in line:
if entry != line[0]:
f.write(', ')
f.write('{}'.format(entry))
if line != sobol_idx[-1]:
f.write('\n')
f.close()
[docs]
def read_sobol_idx_txt(fname):
"""
Read sobol_idx list from file.
read_sobol_idx_txt(fname)
Parameters
----------
fname: str
Path to input file
Returns
-------
sobol_idx: [N_sobol] list of np.array
List of parameter label indices belonging to Sobol indices
"""
f = open(fname, 'r')
line = f.readline().strip('\n')
sobol_idx = []
while line:
# ignore comments in text file
if line[0] == '#':
line = f.readline().strip('\n')
continue
else:
# read comma separated indices and convert to ndarray
sobol_idx.append(np.asarray([int(x) for x in line.split(',') if x]))
line = f.readline().strip('\n')
return sobol_idx
[docs]
def write_log_sobol(fname, random_vars, sobol_rel_order_mean, sobol_rel_1st_order_mean, sobol_extracted_idx_1st):
"""
Write average ratios of Sobol indices into logfile.
Parameters
----------
fname: str
Path of logfile
random_vars: list of str
Labels of random variables
sobol_rel_order_mean: np.ndarray
Average proportion of the Sobol indices of the different order to the total variance (1st, 2nd, etc..,).
(over all output quantities)
sobol_rel_1st_order_mean: np.ndarray
Average proportion of the random variables of the 1st order Sobol indices to the total variance.
(over all output quantities)
sobol_extracted_idx_1st: list of int [N_sobol_1st]
Indices of extracted 1st order Sobol indices corresponding to SGPC.random_vars.
Returns
-------
<File>: .txt file
Logfile containing information about the average ratios of 1st order Sobol indices w.r.t. the total variance
"""
# start log
log = open(os.path.splitext(fname)[0] + '.txt', 'w')
log.write("Sobol indices:\n")
log.write("==============\n")
log.write("\n")
# print order ratios
log.write("Ratio: order / total variance over all output quantities:\n")
log.write("---------------------------------------------------------\n")
for i in range(len(sobol_rel_order_mean)):
log.write("Order {}: {:.4f}\n".format(i + 1, sobol_rel_order_mean[i]))
log.write("\n")
# print 1st order ratios of parameters
log.write("Ratio: 1st order Sobol indices of parameters / total variance over all output quantities\n")
log.write("----------------------------------------------------------------------------------------\n")
# random_vars = []
max_len = max([len(random_vars[i]) for i in range(len(random_vars))])
for i in range(len(sobol_rel_1st_order_mean)):
log.write("{}{:s}: {:.4f}\n".format(
(max_len - len(random_vars[sobol_extracted_idx_1st[i]])) * ' ',
random_vars[sobol_extracted_idx_1st[i]],
sobol_rel_1st_order_mean[i]))
# random_vars.append(self.random_vars[sobol_extracted_idx_1st[i]])
log.close()
# # initialize logger
# file_logger = logging.getLogger('gPC')
# file_logger.setLevel(logging.DEBUG)
# file_logger_handler = logging.FileHandler('gPC.log')
# file_logger_formatter = logging.Formatter('%(asctime)s - %(name)s - %(message)s')
# file_logger_handler.setFormatter(file_logger_formatter)
# file_logger.addHandler(file_logger_handler)
console_logger = logging.getLogger('gPC_console_output')
console_logger.setLevel(logging.DEBUG)
console_logger_handler = logging.StreamHandler()
console_logger_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console_logger_handler.setFormatter(console_logger_formatter)
console_logger.addHandler(console_logger_handler)
# file_logger.disabled = False
console_logger.disabled = False
# def activate_terminal_output():
# console_logger.disabled = False
#
#
# def activate_logfile_output():
# file_logger.disabled = False
#
#
# def deactivate_terminal_output():
# console_logger.disabled = True
#
#
# def deactivate_logfile_output():
# file_logger.disabled = True
[docs]
def iprint(message, verbose=True, tab=None):
"""
Function that prints out a message over the python logging module
iprint(message, verbose=True)
Parameters
----------
message: string
String to print in standard output
verbose: bool, optional, default=True
Determines if string is printed out
tab: int
Number of tabs before message
"""
if verbose:
if tab:
message = '\t' * tab + message
# console_logger.info(message)
print(message)
[docs]
def wprint(message, verbose=True, tab=None):
"""
Function that prints out a warning message over the python logging module
wprint(message, verbose=True)
Parameters
----------
message: string
String to print in standard output
verbose: bool, optional, default=True
Determines if string is printed out
tab: int
Number of tabs before message
"""
if verbose:
if tab:
message = '\t' * tab + message
console_logger.warning(message)