from collections import deque
from math import isnan
from functools import partial
from os import path
from runana.read_numbers import ignored
from runana.run import cwd, get_subdirs
from runana.input_file_handling import read_input_files_f90nml, string_or_iterable, superset
def collecting_loop_recursive(dir_, read_func,
look_for_these=["hostname.txt"],
ignore_these=["ignore"]):
"""Collect information from runs.
Recursively locate directories under `dir_` containing any of the
files in `look_for_these`, but leaves out directories that contain
any files in `ignore_these`. In each of the located directories the
function `read_func` is run with no arguments (the working directory
is temporarily changed to the located directory).
"""
subdirs = get_subdirs(dir_)
for subdir in subdirs or []:
asubdir = path.join(dir_, subdir)
if not any(path.exists(path.join(asubdir, fname)) for fname in ignore_these):
if any(path.exists(path.join(asubdir, fname)) for fname in look_for_these):
with cwd(asubdir):
value = read_func()
yield (subdir, ), value
else:
for dirs, vals in collecting_loop_recursive(path.join(dir_, subdir),
read_func):
yield (subdir, )+dirs, vals
[docs]class ParamDict(dict):
""" Dictionary that holds dictionaries of parameters """
def read(self, workdir, read_func=read_input_files_f90nml):
for index, result in collecting_loop_recursive(workdir, read_func):
self[index] = result
def from_list_of_indices(self, workdir, indices,
read_func=read_input_files_f90nml):
for index in indices:
with cwd(path.join(workdir, *index)):
self[index] = read_func()
[docs] def diff(self):
""" Call :func:`dictdiff` on `ParamDict` object """
dictdiff(self)
[docs] def unpack_list_values(self):
""" Takes any numerical parameter value in `ParamDict` object that is
a list and packs it into individual slots with name numparam_name + idx
Works in-place
"""
for param_dict in self.values():
spread_out_lists(param_dict)
def spread_out_lists(dict_):
newdict = {}
delete_keys = []
for key, val in dict_.items():
if isinstance(val, list):
delete_keys += [key]
for idx, elem in enumerate(val):
if isinstance(key, tuple):
newkey = (key[0], str(key[1])+'_'+str(idx+1))
else:
newkey = str(key)+'_'+str(idx+1)
newdict[newkey] = elem
dict_.update(newdict)
for key in delete_keys:
del dict_[key]
[docs]def make_collector_function(workdir, read_func, *args, **kwargs):
""" Returns a function that runs `read_func(*args,**kwargs)` in the
directory that is the join of `workdir` and the argument to the function
"""
return read_from_dir(roll_in_args(read_func, *args, **kwargs), workdir)
# @ignored(TypeError)
def collect(dir_, read_func):
""" Switches to `dir_` and runs `read_func`"""
with cwd(dir_):
return read_func()
def roll_in_args(read_func, *args, **kwargs):
read_func_no_args = partial(read_func, *args, **kwargs)
read_func_subdir = partial(collect, read_func=read_func_no_args)
return read_func_subdir
def compose2(f__, g__):
def fg_(*a, **kw):
return f__(g__(*a, **kw))
return fg_
@ignored(TypeError)
def join_dirs(subdirs, workdir):
dir_ = path.join(workdir,
*tuple(string_or_iterable(subdirs)))
return dir_
[docs]def prepend_dir(workdir):
""" Returns a function that takes a tuple of directories and returns the
combination of those into a path, with `workdir` prepended """
return partial(join_dirs, workdir=workdir)
[docs]def read_from_dir(read_func, workdir):
""" Composes `read_func` with :func:`prepend_dir(workdir)<prepend_dir>` """
return compose2(read_func, prepend_dir(workdir))
[docs]class Seqs(dict):
""" Seqeunces of related runs
:param dict param_dicts: Dictionary containing dictionaries of parameters,
in the form returned from e.g. :func:`collect_from_all`
"""
def __init__(self, param_dicts, *args, **kwargs):
super(Seqs, self).__init__(*args, **kwargs)
keys = deque(param_dicts.keys())
for index in list(keys):
del keys[0]
for key, indices in get_indices_dict(index,
param_dicts, keys).items():
seqs_list = self.get(key, [])
if all(not indices_sub(param_dicts, indices.values(),
indices_seqs.values())
for indices_seqs in seqs_list):
self[key] = seqs_list + [indices]
def iterator(self):
for key in self:
for indx, seq_list in enumerate(self[key]):
yield key, indx, seq_list
[docs]class ChangedParams(dict):
""" Parameters that changed. Derived from dict
:param dict param_dicts: Dictionary containing dictionaries of parameters,
in the form returned from e.g. :func:`collect_from_all`
"""
def __init__(self, param_dicts, *args, **kwargs):
super(ChangedParams, self).__init__(*args, **kwargs)
ignore = []
keys = list(param_dicts.keys())
for idx in list(keys):
del keys[0]
for idx_compare in keys:
diffs = find_diff_elements(param_dicts[idx],
param_dicts[idx_compare])
if len(diffs) == 0:
ignore.append(idx_compare)
elif idx_compare not in ignore and idx not in ignore:
self[(idx, idx_compare)] = diffs
[docs] def groupby_varname(self):
""" Groups elements according to the name of the variable.
Returns a dictionary containing the values of the variables, and
another with all the pairs of runs"""
varvalues = {}
pairs = {}
for dirs, varnameval in self.items():
# varnames = frozenset(varnameval.keys())
varnames = tuple(varnameval.keys())
varvals = varnameval.values()
value = dict(zip(dirs, zip(*varvals)))
sdirs = set(dirs)
pairs.setdefault(varnames, []).append(sdirs)
varvalues.setdefault(varnames, {}).update(value)
return varvalues, pairs
def groupby_params(dict_w_params, params):
""" Groups `dict_w_params` according the given set of parameters. This
can be used to e.g. seperate physical from numerical parameters. """
grouped_dict = {}
for key, item in dict_w_params.items():
item_copy = item.copy()
index_dict = {}
for param in params:
if param in item_copy:
index_dict[param] = item_copy[param]
del item_copy[param]
index_tuple = tuple((key, index_dict[key])
for key in sorted(index_dict))
grouped_dict.setdefault(index_tuple, {}).update({key: item_copy})
return grouped_dict
def remove_nested_dict_keys(dict_w_params, remove_keys=[]):
for dict_ in dict_w_params.values():
for key in remove_keys:
if key in dict_:
del dict_[key]
def select_by_key_len(connected, length=1):
return dict((key, list_) for key, list_ in connected.items()
if len(key) == length)
def find_connected_components(pairs):
connected = {}
for key, value in pairs.items():
connected[key] = conso(value)
return connected
type_ignore_isnan = ignored(TypeError)(isnan)
def find_diff_elements(dict1, dict2):
diffs = {}
for key, value1 in dict1.items():
if key in dict2:
value2 = dict2[key]
if value1 != value2 and not type_ignore_isnan(value1) and not type_ignore_isnan(value2):
diffs[key] = (value1, value2)
else:
diffs[key] = (value1, None)
return diffs
def catch_list_values(value):
try:
hash(value)
except TypeError:
value = tuple(value)
return value
def get_indices_dict(idx, param_dicts, keys):
indices_dict = {}
for idx_compare in keys:
for key, val, val_compare in get_index_for_all_but_one_changed(param_dicts[idx], param_dicts[idx_compare]):
# if key not in indices_dict:
# indices_dict[key] = catch_list_values(val, idx)
# indices_dict[key].update(catch_list_values(val_compare, idx_compare))
indices_dict.setdefault(key, {catch_list_values(val): idx}).update({catch_list_values(val_compare): idx_compare})
return indices_dict
def get_index_for_all_but_one_changed(nl1, nl2):
for key, value in nl1.items():
with ignored(KeyError):
value2 = nl2[key]
if value != value2:
if dict_eq_ignore(nl1, nl2, [key]):
yield key, value, value2
def dict_eq_ignore(dict1, dict2, ignore_keys=()):
dict1_flat = copy_w_ignore_keys(dict1, ignore_keys)
dict2_flat = copy_w_ignore_keys(dict2, ignore_keys)
return dict1_flat == dict2_flat
def copy_w_ignore_keys(dict_, ignore_keys=()):
return dict((key, value) for key, value in dict_.items()
if key not in ignore_keys)
def indices_sub(dicts, indices1, indices2):
dict_list1 = list(dicts[indx] for indx in indices1)
dict_list2 = list(dicts[indx] for indx in indices2)
return issubset(dict_list2, dict_list1)
def issubset(list1, list2):
""" Checks if list1 is subset of list2 """
list3 = []
for elem in list2:
if elem in list1:
list3.append(elem)
return list2 == list3
# This function was inspired by nmltab
[docs]def dictdiff(alldicts):
""" In-place removes all key:value pairs that are shared across all dicts
:param dict alldicts: a dictionary containing dictionaries"""
superdict = superset(alldicts)
for key in superdict:
all_dicts_have_key = all((key in dict_) for dict_ in alldicts.values())
if all_dicts_have_key:
value_is_same = all((superdict[key] == dict_[key])
for dict_ in alldicts.values())
if value_is_same:
for dict_ in alldicts.values():
del dict_[key]
# From: https://rosettacode.org/wiki/Set_consolidation#Python
def consolidate(sets):
setlist = [s for s in sets if s]
for i, s1 in enumerate(setlist):
if s1:
for s2 in setlist[i+1:]:
intersection = s1.intersection(s2)
if intersection:
s2.update(s1)
s1.clear()
s1 = s2
return [s for s in setlist if s]
# From: https://rosettacode.org/wiki/Set_consolidation#Python
def conso(s):
if len(s) < 2:
return s
r, b = [s[0]], conso(s[1:])
for x in b:
if r[0].intersection(x):
r[0].update(x)
else:
r.append(x)
return r
[docs]def groupby_n_var_params(dict_w_params, n_var_params=1):
""" Convenience function for finding sets of data for which
`n_var_params` parameters are changing """
dict_w_params = dict_w_params.copy()
dictdiff(dict_w_params)
changedsparams = ChangedParams(dict_w_params)
varvals, pairs = changedsparams.groupby_varname()
connected = find_connected_components(pairs)
nseqs = select_by_key_len(connected, length=n_var_params)
return nseqs, varvals