import subprocess
import time
import copy
import numpy as np
import os
import re
import multiprocessing
import multiprocessing.pool
# import dispy
from collections import OrderedDict
from pygpc import Worker
from .io import iprint
from .RandomParameter import *

[docs] def Computation(n_cpu, matlab_model=False): """ Helper function to initialize the Computation class. n_cpu = 0 : use this if the model is capable of to evaluate several parameterizations in parallel n_cpu = 1 : the model is called in serial for every paramerization. n_cpu > 1 : A multiprocessing.Pool will be opened and n_cpu parameterizations are calculated in parallel Parameters ---------- n_cpu : int Number of CPU cores to use (parallel model evaluations) matlab_model : boolean, optional, default: False Use a Matlab model Returns ------- obj : object instance of Computation class Object instance of Computation class """ if n_cpu == 0: return ComputationFuncPar(n_cpu, matlab_model=matlab_model) else: return ComputationPoolMap(n_cpu, matlab_model=matlab_model)
[docs] class ComputationPoolMap: """ Computation sub-class to run the model using a processing pool for parallelization Parameters ---------- n_cpu : int Number of CPU cores to use (parallel model evaluations) matlab_model : boolean, optional, default: False Use a Matlab model """ def __init__(self, n_cpu, matlab_model=False): """ Constructor; Initializes ComputationPoolMap class """ # Setting up parallelization (setup thread pool) n_cpu_available = multiprocessing.cpu_count() self.n_cpu = min(n_cpu, n_cpu_available) self.i_grid = 0 # Use a process queue to assign persistent, unique IDs to the processes in the pool self.process_manager = multiprocessing.Manager() self.process_queue = self.process_manager.Queue() self.process_pool = multiprocessing.Pool(self.n_cpu, Worker.init, (self.process_queue,)) # Global counter used by all threads to keep track of the progress self.global_task_counter = self.process_manager.Value('i', 0) for i in range(0, n_cpu): self.process_queue.put(i) # Necessary to synchronize read/write access to serialized results self.global_lock = self.process_manager.RLock() self.matlab_engine = None # start matlab engine if matlab_model: import matlab.engine iprint("Starting Matlab engine ...", tab=0, verbose=False) self.matlab_engine = matlab.engine.start_matlab()
[docs] def run(self, model, problem, coords, coords_norm=None, i_iter=None, i_subiter=None, fn_results=None, print_func_time=False, increment_grid=True, verbose=False): """ Runs model evaluations for parameter combinations specified in coords array Parameters ---------- model: Model object Model object instance of model to investigate (derived from AbstractModel class, implemented by user) problem: Problem class instance GPC Problem under investigation, includes the parameters of the model (constant and random) coords: ndarray of float [n_sims, n_dim] Set of n_sims parameter combinations to run the model with (only the random parameters!). coords_norm: ndarray of float [n_sims, n_dim] Set of n_sims parameter combinations to run the model with (normalized coordinates [-1, 1]. i_iter: int Index of main-iteration i_subiter: int Index of sub-iteration fn_results : string, optional, default=None If provided, model evaluations are saved in fn_results.hdf5 file and gpc object in fn_results.pkl file print_func_time : bool Print time of single function evaluation increment_grid : bool Increment grid counter (not done in case of gradient calculation) verbose : bool, optional, default: False Print progress Returns ------- res: ndarray of float [n_sims x n_out] n_sims simulation results of the n_out output quantities of the model under investigation. """ if i_iter is None: i_iter = "N/A" if i_subiter is None: i_subiter = "N/A" # read new grid points and convert to list for multiprocessing grid_new = coords.tolist() n_grid_new = len(grid_new) # create worker objects that will evaluate the function worker_objs = [] self.global_task_counter.value = 0 # since we re-use the global counter, we need to reset it first seq_num = 0 # assign the instances of the random_vars to the respective # replace random vars of the Problem with single instances # determined by the PyGPC framework: # assign the instances of the random_vars to the respective # entries of the dictionary # -> As a result we have the same keys in the dictionary but # no RandomParameters anymore but a sample from the defined PDF. # deepcopy model and delete attributes model_ = copy.deepcopy(model) model_.__clean__() for j, random_var_instances in enumerate(grid_new): if coords_norm is None: c_norm = None else: c_norm = coords_norm[j, :][np.newaxis, :] # setup context (let the process know which iteration, interaction order etc.) context = { 'global_task_counter': self.global_task_counter, 'lock': self.global_lock, 'seq_number': seq_num, 'i_grid': self.i_grid, 'max_grid': n_grid_new, 'i_iter': i_iter, 'i_subiter': i_subiter, 'fn_results': fn_results, 'coords': np.array(random_var_instances)[np.newaxis, :], 'coords_norm': c_norm, 'print_func_time': print_func_time, 'verbose': verbose, } # deepcopy parameters parameters = OrderedDict() for key in problem.parameters: parameters[key] = problem.parameters[key] # replace RandomParameters with grid points for i in range(0, len(random_var_instances)): if type(random_var_instances[i]) is not np.array: random_var_instances[i] = np.array([random_var_instances[i]]) parameters[list(problem.parameters_random.keys())[i]] = random_var_instances[i] # append new worker which will evaluate the model with particular parameters from grid import time # start = time.time() # model_ = copy.deepcopy(model) # end = time.time() # print("copy.deepcopy: {}s".format(end-start)) # start = time.time() # model_worker = model_.__copy__().set_parameters(p=parameters, context=context) # model_worker1 = model_.__copy__().set_parameters(p=parameters, context=context) # end = time.time() # print("__copy__: {}s".format(end - start)) # model_worker.p["exp_idx"] = 99 # model_worker1.p["exp_idx"] worker_objs.append(model_.__copy__().set_parameters(p=parameters, context=context)) if increment_grid: self.i_grid += 1 seq_num += 1 # start model evaluations if self.n_cpu == 1: res_new_list = [] for i in range(len(worker_objs)): res_new_list.append([i], matlab_engine=self.matlab_engine)) else: # The map-function deals with chunking the data res_new_list =, worker_objs, self.matlab_engine) # Initialize the result array with the correct size and set the elements according to their order # (the first element in 'res' might not necessarily be the result of the first Process/i_grid) res = [None] * n_grid_new for result in res_new_list: res[result[0]] = result[1] res = np.vstack(res) return res
[docs] def close(self): """ Closes the pool """ self.process_pool.close() self.process_pool.join()
[docs] class ComputationFuncPar: """ Computation sub-class to run the model using a the models internal parallelization Parameters ---------- n_cpu : int Number of CPU cores to use (parallel model evaluations) matlab_model : boolean, optional, default: False Use a Matlab model """ def __init__(self, n_cpu, matlab_model): """ Constructor; Initializes ComputationPoolMap class """ # Setting up parallelization (setup thread pool) n_cpu_available = multiprocessing.cpu_count() self.n_cpu = min(n_cpu, n_cpu_available) self.i_grid = 0 # Global counter used by all threads to keep track of the progress self.global_task_counter = 0 self.matlab_engine = None # start matlab engine if matlab_model: import matlab.engine iprint("Starting Matlab engine ...", tab=0, verbose=True) self.matlab_engine = matlab.engine.start_matlab()
[docs] def run(self, model, problem, coords, coords_norm=None, i_iter=None, i_subiter=None, fn_results=None, print_func_time=False, increment_grid=True, verbose=False): """ Runs model evaluations for parameter combinations specified in coords array Parameters ---------- model: Model object Model object instance of model to investigate (derived from AbstractModel class, implemented by user) problem: Problem class instance GPC Problem under investigation, includes the parameters of the model (constant and random) coords: ndarray of float [n_sims, n_dim] Set of n_sims parameter combinations to run the model with (only the random parameters!). coords_norm: ndarray of float [n_sims, n_dim] Set of n_sims parameter combinations to run the model with (normalized coordinates [-1, 1]. i_iter: int Index of main-iteration i_subiter: int Index of sub-iteration fn_results : string, optional, default=None If provided, model evaluations are saved in fn_results.hdf5 file and gpc object in fn_results.pkl file print_func_time : bool Print time of single function evaluation increment_grid : bool Increment grid counter (not done in case of gradient calculation) verbose : bool, optional, default: False Print progress Returns ------- res: ndarray of float [n_sims x n_out] n_sims simulation results of the n_out output quantities of the model under investigation. """ if i_iter is None: i_iter = "N/A" if i_subiter is None: i_subiter = "N/A" n_grid = coords.shape[0] # i_grid indices is now a range [min_idx, max_idx] if increment_grid: self.i_grid = [np.max(self.i_grid), np.max(self.i_grid) + n_grid] # assign the instances of the random_vars to the respective # replace random vars of the Problem with single instances # determined by the PyGPC framework: # assign the instances of the random_vars to the respective # entries of the dictionary # -> As a result we have the same keys in the dictionary but # no RandomParameters anymore but a sample from the defined PDF. if coords_norm is None: c_norm = None else: c_norm = coords_norm # setup context (let the process know which iteration, interaction order etc.) context = { 'global_task_counter': self.global_task_counter, 'lock': None, 'seq_number': None, 'i_grid': self.i_grid, 'max_grid': n_grid, 'i_iter': i_iter, 'i_subiter': i_subiter, 'fn_results': fn_results, 'coords': coords, 'coords_norm': c_norm, 'print_func_time': print_func_time, 'verbose': verbose } parameters = OrderedDict() i_random_parameter = 0 for key in problem.parameters: if isinstance(problem.parameters[key], RandomParameter): # replace RandomParameters with grid points parameters[key] = coords[:, i_random_parameter] i_random_parameter += 1 else: # copy constant parameters n_grid times if type(problem.parameters[key]) == str: parameters[key] = [problem.parameters[key] for _ in range(n_grid)] elif type(problem.parameters[key]) == float or problem.parameters[key].size == 1: parameters[key] = problem.parameters[key] * np.ones(n_grid) else: if str(type(problem.parameters[key])) == "<class 'matlab.engine.matlabengine.MatlabEngine'>": parameters[key] = problem.parameters[key] else: parameters[key] = np.tile(problem.parameters[key], (n_grid, 1)) # generate worker, which will evaluate the model (here only one for all grid points in coords) worker_objs = model.set_parameters(p=parameters, context=context) # start model evaluations res =, matlab_engine=self.matlab_engine) res = np.array(res[1]) return res
[docs] def close(self): """ Closes the pool """ pass
