Source code for pygpc.AbstractModel

import numpy as np
import os
import h5py
import copy
from abc import ABCMeta, abstractmethod
from .misc import display_fancy_bar


[docs] class AbstractModel: """ Abstract base class for the SimulationWrapper. This base class provides basic functions for serialization/deserialization and printing progress. It cannot be used directly, but a derived class implementing the "simulate" method must be created. """ __metaclass__ = ABCMeta def __init__(self, matlab_model=False): """ Constructor; initialized the SimulationWrapper class The model is initialized once. The parameters are set with the set_parameters class. Depending on the model, the user may call here some functions to initialize the model like starting a Matlab engine etc... """ self.matlab_model = matlab_model def __copy__(self): return copy.deepcopy(self) def __clean__(self): if self.__dict__: del self.__dict__
[docs] def set_parameters(self, p, context=None): """ Set model parameters and context of simulations. Parameters ---------- p : dictionary Dictionary containing the model parameters context : dictionary dictionary that contains information about this worker's context - lock : reference to the Lock object that all processes share for synchronization - max_grid : size of the current sub-grid that is processed - global_task_counter : reference to the Value object that is shared among all processes to keep track of the overall progress - seq_number : sequence number of the task this object represents; necessary to maintain the correct sequence of results - fn_results : location of the hdf5 file to serialize the results to - i_grid : current iteration in the sub-grid that is processed - i_iter : current main-iteration - i_subiter : current sub-iteration - coords : parameters of particular simulation in original parameter space - coords_norm : parameters of particular simulation in normalized parameter space - verbose : print progress """ self.p = p if context is not None: for key in context.keys(): setattr(self, key, context[key]) # return copy.deepcopy(self) return self
[docs] def read_previous_results(self, coords): """ This functions reads previous results from the hard disk (if present). When reading from the array containing the results, the current grid-index (i_grid) is considered to maintain the order of the results when the SimulationModels are executed in parallel. If the function evaluated the results in parallel internally, i_grid is a range [i_grid_min, i_grid_max]. Parameters ---------- coords : ndarray of float [n_sims x dim] Grid coordinates the simulations are conducted with Returns ------- None : if no serialized results could be found or does not fit to grid list : data at coords """ if self.fn_results: if self.lock: self.lock.acquire() try: if os.path.exists(self.fn_results + ".hdf5"): # read results and coords try: with h5py.File(self.fn_results + ".hdf5", 'r') as f: if type(self.i_grid) is list: res = f['model_evaluations/results'][self.i_grid[0]:self.i_grid[1], :] coords_read = f['grid/coords'][self.i_grid[0]:self.i_grid[1], :] else: res = f['model_evaluations/results'][self.i_grid, :] coords_read = f['grid/coords'][self.i_grid, :] if np.isclose(coords_read, coords).all(): return res #.tolist() else: return None except (KeyError, ValueError, IndexError): return None finally: if self.lock: self.lock.release() return None
[docs] def write_results(self, data_dict): """ This function writes the data to a file on hard disk. When writing the data the current grid-index (i_grid) is considered. The data are written to the row corresponding i_grid in order to maintain the order of the results when the SimulationModels are executed in parallel. Parameters ---------- data_dict : dict of ndarray Dictionary, containing the data to write in an .hdf5 file. The keys are the dataset names. """ if self.fn_results: # full filename if self.lock: self.lock.acquire() try: # get new size of array if type(self.i_grid) is list: require_size = np.max(self.i_grid) else: require_size = self.i_grid + 1 with h5py.File(self.fn_results + ".hdf5", 'a') as f: for d in data_dict: # # change list or single str to np.array # if type(data_dict[d]) is list or type(data_dict[d]) is str: # data_dict[d] = np.array(data_dict[d]).flatten() # change single numbers to np.array # if type(data_dict[d]) is float or type(data_dict[d]) is int \ # or type(data_dict[d]) is np.float64 or type(data_dict[d]) is np.int: # data_dict[d] = np.array([[data_dict[d]]]).flatten() # # always flatten data because it has to be saved for every grid point # if data_dict[d].ndim > 1: # data_dict[d] = data_dict[d].flatten() # add axes such that it can be added to previous array # if data_dict[d].ndim == 1: # data_dict[d] = data_dict[d][np.newaxis, :] # check datatype if type(data_dict[d][0][0]) is np.float64 or type(data_dict[d][0]) is float: dtype='float64' elif type(data_dict[d][0][0]) is np.int64: dtype = 'int' elif type(data_dict[d][0][0]) is np.string_ or type(data_dict[d][0][0]) is np.str_: dtype = 'str' else: dtype='float64' try: ds = f[d] # append # for strings, the whole array has to be rewritten if dtype == "str": # ds = f[d][:] ds = f[d] del f[d] ds = np.vstack((ds, data_dict[d])) f.create_dataset(d, data=ds.astype("|S")) else: # change size of array and write data in it if ds.shape[0] < require_size: # check if resize is necessary ds.resize(require_size, axis=0) if type(self.i_grid) is list: ds[self.i_grid[0]:self.i_grid[1], :] = data_dict[d] else: ds[self.i_grid, :] = data_dict[d] except (KeyError, ValueError, TypeError, IndexError): # create try: del f[d] except KeyError: pass if dtype == "str": f.create_dataset(d, data=data_dict[d].astype("|S")) else: ds = f.create_dataset(d, (require_size, data_dict[d].shape[1]), maxshape=(None, data_dict[d].shape[1]), dtype=dtype) if type(self.i_grid) is list: ds[self.i_grid[0]:self.i_grid[1], :] = data_dict[d] else: ds[self.i_grid, :] = data_dict[d] finally: if self.lock: self.lock.release()
[docs] def increment_ctr(self): """ This functions increments the global counter by 1. """ if self.lock: self.lock.acquire() try: if self.lock: self.global_task_counter.value += 1 else: self.global_task_counter += 1 finally: if self.lock: self.lock.release()
[docs] def print_progress(self, func_time=None, read_from_file=False): """ This function prints the progress according to the current context and global_counter. """ if self.lock: self.lock.acquire() try: if func_time: more_text = "Function evaluation took: " + repr(func_time) + "s" elif read_from_file: more_text = "Read data from " + self.fn_results + ".hdf5" else: more_text = None if self.lock: global_task_counter = self.global_task_counter.value else: global_task_counter = self.global_task_counter display_fancy_bar("It/Sub-it: {}/{} Performing simulation".format(self.i_iter, self.i_subiter), global_task_counter, self.max_grid, more_text) finally: if self.lock: self.lock.release()
[docs] def get_seq_number(self): return self.seq_number
[docs] @abstractmethod def simulate(self, process_id=None, matlab_engine=None): """ This abstract method must be implemented by the subclass. It should perform the simulation task depending on the input_values provided to the object on instantiation. Parameters ---------- process_id : int A unique identifier; no two processes of the pool will run concurrently with the same identifier matlab_engine : Matlab engine object Matlab engine to run Matlab models """ pass
[docs] @abstractmethod def validate(self): """ This abstract method must be implemented by the subclass. It should perform the validation task depending on the parameters defined in the problem. In cases, the model may not run correctly for some parameter combinations, this function changes the definition of the random parameters and the constants. """ pass