Source code for pygpc.Worker
import time
import numpy as np
from .misc import list2dict
[docs]
def init(queue):
"""
This is a wrapper script to be called by the 'multiprocessing.map' function
to calculate the model functions in parallel.
This function will be called upon initialization of the process.
It sets a global variable denoting the ID of this process that can
be read by any function of this process
Parameters
----------
queue : multiprocessing.Queue
the queue object that manages the unique IDs of the process pool
"""
global process_id
process_id = queue.get()
[docs]
def run(obj, matlab_engine=None):
"""
This is the main worker function of the process.
Methods of the provided object will be called here.
Parameters
----------
obj : any callable object
The object that
a) handles the simulation work
b) reading previous results
c) writing the calculated result fields
d) printing global process
matlab_engine : Matlab engine object, optional, default: None
Matlab engine object to run Matlab functions
"""
global process_id
if 'process_id' not in globals():
process_id = 0
if process_id is None:
process_id = 0
res = obj.read_previous_results(obj.coords)
start_time = 0
end_time = 0
skip_sim = True
# skip if there was no data row for that i_grid or if it was prematurely inserted (= all zero)
if res is None or not np.any(res):
start_time = time.time()
out = obj.simulate(process_id, matlab_engine)
# dictionary containing the results, the coords and (optionally) the additional data
data_dict = dict()
data_dict["grid/coords"] = obj.coords
data_dict["grid/coords_norm"] = obj.coords_norm
n_sim = obj.coords.shape[0]
if type(out) is tuple:
# results (nparray)
res = out[0]
# additional data (dict)
if len(out) == 2:
# in case of function parallelization transform list of dict to dict containing the lists
if type(out[1]) is list:
additional_data = list2dict(out[1])
else:
additional_data = out[1]
for o in additional_data:
# make entries of additional data to list of list [n_grid][n_data[o]]
# make single entries to list
if type(additional_data[o]) is not list and type(additional_data[o]) is not np.ndarray:
additional_data[o] = [[additional_data[o]]]
if n_sim == 1:
if type(additional_data[o][0]) is not list:
additional_data[o] = [additional_data[o]]
else:
if type(additional_data[o][0]) is not list:
additional_data[o] = [[k] for k in additional_data[o]]
data_dict[o] = np.array(additional_data[o])
if n_sim == 1 and data_dict[o].shape[0] != 1:
data_dict[o] = data_dict[o].transpose()
else:
# results (nparray), no additional data
res = out
# make res to a 2D ndarray [n_sim x n_out]
if n_sim == 1 and res.ndim == 1:
res = res[np.newaxis, :]
elif n_sim == 1 and res.shape[0] > 1 and res.ndim == 2:
res = res.transpose()
# add results to data_dict
data_dict["model_evaluations/results"] = res
end_time = time.time()
obj.write_results(data_dict=data_dict)
skip_sim = False
obj.increment_ctr()
# determine function time
if obj.print_func_time:
func_time = end_time - start_time
else:
func_time = None
if obj.verbose:
obj.print_progress(func_time=func_time, read_from_file=skip_sim, )
return obj.get_seq_number(), res