import subprocess
import time
import copy
import numpy as np
import os
import re
import multiprocessing
import multiprocessing.pool
# import dispy
from collections import OrderedDict
from pygpc import Worker
from .io import iprint
from .RandomParameter import *
[docs]
def Computation(n_cpu, matlab_model=False):
"""
Helper function to initialize the Computation class.
n_cpu = 0 : use this if the model is capable of to evaluate several parameterizations in parallel
n_cpu = 1 : the model is called in serial for every paramerization.
n_cpu > 1 : A multiprocessing.Pool will be opened and n_cpu parameterizations are calculated in parallel
Parameters
----------
n_cpu : int
Number of CPU cores to use (parallel model evaluations)
matlab_model : boolean, optional, default: False
Use a Matlab model
Returns
-------
obj : object instance of Computation class
Object instance of Computation class
"""
if n_cpu == 0:
return ComputationFuncPar(n_cpu, matlab_model=matlab_model)
else:
return ComputationPoolMap(n_cpu, matlab_model=matlab_model)
[docs]
class ComputationPoolMap:
"""
Computation sub-class to run the model using a processing pool for parallelization
Parameters
----------
n_cpu : int
Number of CPU cores to use (parallel model evaluations)
matlab_model : boolean, optional, default: False
Use a Matlab model
"""
def __init__(self, n_cpu, matlab_model=False):
"""
Constructor; Initializes ComputationPoolMap class
"""
# Setting up parallelization (setup thread pool)
n_cpu_available = multiprocessing.cpu_count()
self.n_cpu = min(n_cpu, n_cpu_available)
self.i_grid = 0
# Use a process queue to assign persistent, unique IDs to the processes in the pool
self.process_manager = multiprocessing.Manager()
self.process_queue = self.process_manager.Queue()
self.process_pool = multiprocessing.Pool(self.n_cpu, Worker.init, (self.process_queue,))
# Global counter used by all threads to keep track of the progress
self.global_task_counter = self.process_manager.Value('i', 0)
for i in range(0, n_cpu):
self.process_queue.put(i)
# Necessary to synchronize read/write access to serialized results
self.global_lock = self.process_manager.RLock()
self.matlab_engine = None
# start matlab engine
if matlab_model:
import matlab.engine
iprint("Starting Matlab engine ...", tab=0, verbose=False)
self.matlab_engine = matlab.engine.start_matlab()
[docs]
def run(self, model, problem, coords, coords_norm=None, i_iter=None, i_subiter=None, fn_results=None,
print_func_time=False, increment_grid=True, verbose=False):
"""
Runs model evaluations for parameter combinations specified in coords array
Parameters
----------
model: Model object
Model object instance of model to investigate (derived from AbstractModel class, implemented by user)
problem: Problem class instance
GPC Problem under investigation, includes the parameters of the model (constant and random)
coords: ndarray of float [n_sims, n_dim]
Set of n_sims parameter combinations to run the model with (only the random parameters!).
coords_norm: ndarray of float [n_sims, n_dim]
Set of n_sims parameter combinations to run the model with (normalized coordinates [-1, 1].
i_iter: int
Index of main-iteration
i_subiter: int
Index of sub-iteration
fn_results : string, optional, default=None
If provided, model evaluations are saved in fn_results.hdf5 file and gpc object in fn_results.pkl file
print_func_time : bool
Print time of single function evaluation
increment_grid : bool
Increment grid counter (not done in case of gradient calculation)
verbose : bool, optional, default: False
Print progress
Returns
-------
res: ndarray of float [n_sims x n_out]
n_sims simulation results of the n_out output quantities of the model under investigation.
"""
if i_iter is None:
i_iter = "N/A"
if i_subiter is None:
i_subiter = "N/A"
# read new grid points and convert to list for multiprocessing
grid_new = coords.tolist()
n_grid_new = len(grid_new)
# create worker objects that will evaluate the function
worker_objs = []
self.global_task_counter.value = 0 # since we re-use the global counter, we need to reset it first
seq_num = 0
# assign the instances of the random_vars to the respective
# replace random vars of the Problem with single instances
# determined by the PyGPC framework:
# assign the instances of the random_vars to the respective
# entries of the dictionary
# -> As a result we have the same keys in the dictionary but
# no RandomParameters anymore but a sample from the defined PDF.
# deepcopy model and delete attributes
model_ = copy.deepcopy(model)
model_.__clean__()
for j, random_var_instances in enumerate(grid_new):
if coords_norm is None:
c_norm = None
else:
c_norm = coords_norm[j, :][np.newaxis, :]
# setup context (let the process know which iteration, interaction order etc.)
context = {
'global_task_counter': self.global_task_counter,
'lock': self.global_lock,
'seq_number': seq_num,
'i_grid': self.i_grid,
'max_grid': n_grid_new,
'i_iter': i_iter,
'i_subiter': i_subiter,
'fn_results': fn_results,
'coords': np.array(random_var_instances)[np.newaxis, :],
'coords_norm': c_norm,
'print_func_time': print_func_time,
'verbose': verbose,
}
# deepcopy parameters
parameters = OrderedDict()
for key in problem.parameters:
parameters[key] = problem.parameters[key]
# replace RandomParameters with grid points
for i in range(0, len(random_var_instances)):
if type(random_var_instances[i]) is not np.array:
random_var_instances[i] = np.array([random_var_instances[i]])
parameters[list(problem.parameters_random.keys())[i]] = random_var_instances[i]
# append new worker which will evaluate the model with particular parameters from grid
import time
# start = time.time()
# model_ = copy.deepcopy(model)
# end = time.time()
# print("copy.deepcopy: {}s".format(end-start))
# start = time.time()
# model_worker = model_.__copy__().set_parameters(p=parameters, context=context)
# model_worker1 = model_.__copy__().set_parameters(p=parameters, context=context)
# end = time.time()
# print("__copy__: {}s".format(end - start))
# model_worker.p["exp_idx"] = 99
# model_worker1.p["exp_idx"]
worker_objs.append(model_.__copy__().set_parameters(p=parameters, context=context))
if increment_grid:
self.i_grid += 1
seq_num += 1
# start model evaluations
if self.n_cpu == 1:
res_new_list = []
for i in range(len(worker_objs)):
res_new_list.append(Worker.run(obj=worker_objs[i], matlab_engine=self.matlab_engine))
else:
# The map-function deals with chunking the data
res_new_list = self.process_pool.map(Worker.run, worker_objs, self.matlab_engine)
# Initialize the result array with the correct size and set the elements according to their order
# (the first element in 'res' might not necessarily be the result of the first Process/i_grid)
res = [None] * n_grid_new
for result in res_new_list:
res[result[0]] = result[1]
res = np.vstack(res)
return res
[docs]
def close(self):
""" Closes the pool """
self.process_pool.close()
self.process_pool.join()
[docs]
class ComputationFuncPar:
"""
Computation sub-class to run the model using a the models internal parallelization
Parameters
----------
n_cpu : int
Number of CPU cores to use (parallel model evaluations)
matlab_model : boolean, optional, default: False
Use a Matlab model
"""
def __init__(self, n_cpu, matlab_model):
"""
Constructor; Initializes ComputationPoolMap class
"""
# Setting up parallelization (setup thread pool)
n_cpu_available = multiprocessing.cpu_count()
self.n_cpu = min(n_cpu, n_cpu_available)
self.i_grid = 0
# Global counter used by all threads to keep track of the progress
self.global_task_counter = 0
self.matlab_engine = None
# start matlab engine
if matlab_model:
import matlab.engine
iprint("Starting Matlab engine ...", tab=0, verbose=True)
self.matlab_engine = matlab.engine.start_matlab()
[docs]
def run(self, model, problem, coords, coords_norm=None, i_iter=None, i_subiter=None, fn_results=None,
print_func_time=False, increment_grid=True, verbose=False):
"""
Runs model evaluations for parameter combinations specified in coords array
Parameters
----------
model: Model object
Model object instance of model to investigate (derived from AbstractModel class, implemented by user)
problem: Problem class instance
GPC Problem under investigation, includes the parameters of the model (constant and random)
coords: ndarray of float [n_sims, n_dim]
Set of n_sims parameter combinations to run the model with (only the random parameters!).
coords_norm: ndarray of float [n_sims, n_dim]
Set of n_sims parameter combinations to run the model with (normalized coordinates [-1, 1].
i_iter: int
Index of main-iteration
i_subiter: int
Index of sub-iteration
fn_results : string, optional, default=None
If provided, model evaluations are saved in fn_results.hdf5 file and gpc object in fn_results.pkl file
print_func_time : bool
Print time of single function evaluation
increment_grid : bool
Increment grid counter (not done in case of gradient calculation)
verbose : bool, optional, default: False
Print progress
Returns
-------
res: ndarray of float [n_sims x n_out]
n_sims simulation results of the n_out output quantities of the model under investigation.
"""
if i_iter is None:
i_iter = "N/A"
if i_subiter is None:
i_subiter = "N/A"
n_grid = coords.shape[0]
# i_grid indices is now a range [min_idx, max_idx]
if increment_grid:
self.i_grid = [np.max(self.i_grid), np.max(self.i_grid) + n_grid]
# assign the instances of the random_vars to the respective
# replace random vars of the Problem with single instances
# determined by the PyGPC framework:
# assign the instances of the random_vars to the respective
# entries of the dictionary
# -> As a result we have the same keys in the dictionary but
# no RandomParameters anymore but a sample from the defined PDF.
if coords_norm is None:
c_norm = None
else:
c_norm = coords_norm
# setup context (let the process know which iteration, interaction order etc.)
context = {
'global_task_counter': self.global_task_counter,
'lock': None,
'seq_number': None,
'i_grid': self.i_grid,
'max_grid': n_grid,
'i_iter': i_iter,
'i_subiter': i_subiter,
'fn_results': fn_results,
'coords': coords,
'coords_norm': c_norm,
'print_func_time': print_func_time,
'verbose': verbose
}
parameters = OrderedDict()
i_random_parameter = 0
for key in problem.parameters:
if isinstance(problem.parameters[key], RandomParameter):
# replace RandomParameters with grid points
parameters[key] = coords[:, i_random_parameter]
i_random_parameter += 1
else:
# copy constant parameters n_grid times
if type(problem.parameters[key]) == str:
parameters[key] = [problem.parameters[key] for _ in range(n_grid)]
elif type(problem.parameters[key]) == float or problem.parameters[key].size == 1:
parameters[key] = problem.parameters[key] * np.ones(n_grid)
else:
if str(type(problem.parameters[key])) == "<class 'matlab.engine.matlabengine.MatlabEngine'>":
parameters[key] = problem.parameters[key]
else:
parameters[key] = np.tile(problem.parameters[key], (n_grid, 1))
# generate worker, which will evaluate the model (here only one for all grid points in coords)
worker_objs = model.set_parameters(p=parameters, context=context)
# start model evaluations
res = Worker.run(obj=worker_objs, matlab_engine=self.matlab_engine)
res = np.array(res[1])
return res
[docs]
def close(self):
""" Closes the pool """
pass
# def compute_cluster(algorithms, nodes, start_scheduler=True):
# """
# Computes Algorithm instances on compute cluster composed of nodes. The first node is also the dispy-scheduler.
# Afterwards, the dispy-nodes are started on every node. On every node, screen sessions are started with the names
# "scheduler" and "node", where the scheduler and the nodes are residing, respectively.
# They can be accessed by "screen -rD scheduler" or "screen -rD node" when connected via ssh to the machines.
#
# Parameters
# ----------
# algorithms : list of Algorithm instances
# Algorithm instances initialized with different gPC problems and/or models
# nodes : str or list of str
# Node names
# start_scheduler : bool
# Starts a scheduler on the first machine in the nodes list or not. Set this to False if a scheduler is already
# running somewhere on the cluster.
# """
#
# def _algorithm_run(f):
# f.run
#
# dispy.MsgTimeout = 90
#
# for n in nodes:
# # screen/dispy output will be send to devnull, to keep the terminal window clean
# with open(os.devnull, 'w') as f:
#
# # get PIDs for old scheduler and node screens and kill them
# regexp_pid = "\t(\d*)." # after \tab, get digits until '.'
#
# for name in ["scheduler", "node"]:
# # get screen -list output for correct screen, which also has the pid
# stdout, stderr = subprocess.Popen(['ssh', n, 'screen -list | grep {}'.format(name)],
# stdout=subprocess.PIPE,
# stderr=subprocess.PIPE).communicate()
# subprocess.Popen(['ssh', n, 'screen', "-wipe"]).communicate()
# try:
# pid = re.search(regexp_pid, stdout).group(0)[:-1] # remove last char (.)
# subprocess.Popen(['ssh', n, 'kill', pid]).communicate()
#
# except AttributeError:
# # no 'scheduler' or 'node' screen session found on host
# pass
#
# # start scheduler on first node
# if start_scheduler:
# print("Starting dispy scheduler on " + n)
#
# # subprocess.Popen("ssh -tt " + n + " screen -R scheduler -d -m python "
# # + os.path.join(dispy.__path__[0], "dispyscheduler.py &"), shell=True)
#
# # ssh -tt: pseudo terminal allocation
# #
# # screen
# # -R scheduler: Reconnect or create session with name scheduler
# # -d detach (is it needed?)
# # -m "ignore $STY variable, do create a new screen session" ??
# #
# # subprocess
# # -shell: False. If True, opens new shell and does not return
# # If true, do not use [] argument passing style.
# # -stdout: devnull. Pipe leads to flooded terminal.
# #
# # "export", "TERM=screen", "&&",
# #
# subprocess.Popen(["ssh", "-tt", n,
# "screen", "-dmS", "scheduler",
# "python " + os.path.join(dispy.__path__[0], "dispyscheduler.py")],
# shell=False, stdout=f)
# time.sleep(5)
#
# print("Starting dispy node on " + n)
# subprocess.Popen(["ssh", "-tt", n,
# "screen", "-dmS", "node",
# "python " + os.path.join(dispy.__path__[0], "dispynode.py --clean")],
# shell=False, stdout=f)
# time.sleep(5)
#
# cluster = dispy.SharedJobCluster(_algorithm_run, scheduler_node=nodes[0], reentrant=True, port=0)
#
# time.sleep(5)
#
# # build job list and start computations
# jobs = []
# for a in algorithms:
# job = cluster.submit(a)
# job.id = a
# jobs.append(job)
#
# # wait until cluster finished the computations
# cluster.wait()