"""
This module implements an inversion of control framework. It allows
dependencies among functions and classes to be declared with decorators and the
resulting dependency graphs to be executed.
A decorator used to declare dependencies is called a :class:`ComponentType`, a
decorated function or class is called a component, and a collection of
interdependent components is called a graph.
In the example below, ``needs`` is a :class:`ComponentType`, ``one``, ``two``,
and ``add`` are components, and the relationship formed by their dependencies
is a graph.
.. code-block:: python
from insights import dr
class needs(dr.ComponentType):
pass
@needs()
def one():
return 1
@needs()
def two():
return 2
@needs(one, two)
def add(a, b):
return a + b
results = dr.run(add)
Once all components have been imported, the graphs they form can be run. To
execute a graph, ``dr`` sorts its components into an order that guarantees
dependencies are tried before dependents. Components that raise exceptions are
considered invalid, and their dependents will not be executed. If a component
is skipped because of a missing dependency, its dependents also will not be
executed.
During evaluation, results are accumulated into an object called a
:class:`Broker`, which is just a fancy dictionary. Brokers can be inspected
after a run for results, exceptions, tracebacks, and execution times. You also
can register callbacks with a broker that get invoked after the attempted
execution of every component, so you can inspect it during an evaluation
instead of at the end.
"""
from __future__ import print_function
import inspect
import logging
import json
import os
import pkgutil
import re
import six
import sys
import time
import traceback
from collections import defaultdict
from functools import reduce as _reduce
from insights.contrib import importlib
from insights.contrib.toposort import toposort_flatten
from insights.core.blacklist import BLACKLISTED_SPECS
from insights.core.context import SerializedArchiveContext
from insights.core.exceptions import BlacklistedSpec, MissingRequirements, SkipComponent
from insights.util import defaults, enum, KeyPassingDefaultDict
log = logging.getLogger(__name__)
GROUPS = enum("single", "cluster")
MODULE_NAMES = {}
BASE_MODULE_NAMES = {}
TYPE_OBSERVERS = defaultdict(set)
COMPONENTS_BY_TYPE = defaultdict(set)
DEPENDENCIES = defaultdict(set)
DEPENDENTS = defaultdict(set)
COMPONENTS = defaultdict(lambda: defaultdict(set))
DELEGATES = {}
HIDDEN = set()
IGNORE = defaultdict(set)
ENABLED = defaultdict(lambda: True)
[docs]
def set_enabled(component, enabled=True):
"""
Enable a component for evaluation. If set to False, the component is
skipped, and all components that require it will not execute.
If component is a fully qualified name string of a callable object
instead of the callable object itself, the component's module is loaded
as a side effect of calling this function.
Args:
component (str or callable): fully qualified name of the component or
the component object itself.
enabled (bool): whether the component is enabled for evaluation.
Returns:
None
"""
if isinstance(component, six.string_types):
component = get_component(component)
if component:
ENABLED[component] = enabled
[docs]
def is_enabled(component):
"""
Check to see if a component is enabled.
Args:
component (callable): The component to check. The component must
already be loaded.
Returns:
True if the component is enabled. False otherwise.
"""
return ENABLED[component]
[docs]
def get_delegate(component):
return DELEGATES.get(component)
[docs]
def add_ignore(c, i):
IGNORE[c].add(i)
[docs]
def hashable(v):
try:
hash(v)
except:
return False
return True
def _get_from_module(name):
mod, _, n = name.rpartition(".")
if mod not in sys.modules:
importlib.import_module(mod)
return getattr(sys.modules[mod], n)
def _get_from_class(name):
mod, _, n = name.rpartition(".")
cls = _get_from_module(mod)
return getattr(cls, n)
def _import_component(name):
"""
Returns a class, function, or class method specified by the fully qualified
name.
"""
for f in (_get_from_module, _get_from_class):
try:
return f(name)
except:
pass
COMPONENT_IMPORT_CACHE = KeyPassingDefaultDict(_import_component)
[docs]
def get_component(name):
""" Returns the class or function specified, importing it if necessary. """
return COMPONENT_IMPORT_CACHE[name]
def _find_component(name):
for d in DELEGATES:
if get_name(d) == name:
return d
COMPONENTS_BY_NAME = KeyPassingDefaultDict(_find_component)
[docs]
def get_component_by_name(name):
"""
Look up a component by its fully qualified name. Return None if the
component hasn't been loaded.
"""
return COMPONENTS_BY_NAME[name]
[docs]
@defaults(None)
def get_component_type(component):
return get_delegate(component).type
[docs]
def get_components_of_type(_type):
return COMPONENTS_BY_TYPE.get(_type)
[docs]
@defaults(None)
def get_group(component):
return get_delegate(component).group
[docs]
def add_dependent(component, dep):
DEPENDENTS[component].add(dep)
[docs]
def get_dependents(component):
return DEPENDENTS.get(component, set())
[docs]
@defaults(set())
def get_dependencies(component):
return get_delegate(component).get_dependencies()
[docs]
def add_dependency(component, dep):
get_delegate(component).add_dependency(dep)
[docs]
def get_name(component):
"""
Attempt to get the string name of component, including module and class if
applicable.
"""
if six.callable(component):
name = getattr(component, "__qualname__", component.__name__)
return '.'.join([component.__module__, name])
return str(component)
[docs]
def get_simple_name(component):
if six.callable(component):
return component.__name__
return str(component)
[docs]
def get_links(component):
"""
Return the dictionary of links associated with the component. Defaults to
``dict()``.
"""
return get_delegate(component).links if component in DELEGATES else dict()
[docs]
def get_module_name(obj):
try:
return inspect.getmodule(obj).__name__
except:
return None
[docs]
def get_base_module_name(obj):
try:
return get_module_name(obj).split(".")[-1]
except:
return None
[docs]
def mark_hidden(component):
global HIDDEN
if isinstance(component, (list, set)):
HIDDEN |= set(component)
else:
HIDDEN.add(component)
[docs]
def is_hidden(component):
return component in HIDDEN
[docs]
def walk_tree(root, method=get_dependencies):
for d in method(root):
yield d
for c in walk_tree(d, method=method):
yield c
[docs]
def walk_dependencies(root, visitor):
"""
Call visitor on root and all dependencies reachable from it in breadth
first order.
Args:
root (component): component function or class
visitor (function): signature is `func(component, parent)`. The
call on root is `visitor(root, None)`.
"""
def visit(parent, visitor):
for d in get_dependencies(parent):
visitor(d, parent)
visit(d, visitor)
visitor(root, None)
visit(root, visitor)
[docs]
def get_dependency_graph(component):
"""
Generate a component's graph of dependencies, which can be passed to
:func:`run` or :func:`run_incremental`.
"""
if component not in DEPENDENCIES:
raise Exception("%s is not a registered component." % get_name(component))
if not DEPENDENCIES[component]:
return {component: set()}
graph = defaultdict(set)
def visitor(c, parent):
if parent is not None:
graph[parent].add(c)
walk_dependencies(component, visitor)
graph = dict(graph)
# Find all items that don't depend on anything.
extra_items_in_deps = _reduce(set.union, graph.values(), set()) - set(graph.keys())
# Add empty dependencies where needed.
graph.update(dict((item, set()) for item in extra_items_in_deps))
return graph
[docs]
def get_dependency_specs(component):
"""
Get the dependency specs of the specified `component`. Only `requires` and
`at_least_one` specs will be returned. The `optional` specs is not
considered in this function.
Arguments:
component (callable): The component to check. The component must
already be loaded.
Returns:
list: The `requires` and `at_least_one` spec sets of the `component`.
The return list is in the following format::
[
requires_1,
requires_2,
(at_least_one_11, at_least_one_12),
(at_least_one_21, [req_alo22, (alo_23, alo_24)]),
]
Note:
- The 'requires_1' and 'requires_2' are `requires` specs.
Each of them are required.
- The 'at_least_one_11' and 'at_least_one_12' are `at_least_one`
specs in the same at least one set.
At least one of them is required
- The 'alo_23' and 'alo_24' are `at_least_one` specs and
together with the 'req_alo22' are `requires` for the
sub-set. This sub-set specs and the 'at_least_one_21' are
`at_least_one` specs in the same at least one set.
"""
def get_requires(comp):
req = list()
for cmp in get_delegate(comp).requires:
if get_name(cmp).startswith("insights.specs."):
cmpn = get_simple_name(cmp)
req.append(cmpn) if cmpn not in req else None
else:
union = get_requires(cmp) + get_at_least_one(cmp)
req.extend([ss for ss in union if ss not in req])
return req
def get_at_least_one(comp):
alo = list()
for cmps in get_delegate(comp).at_least_one:
salo = list()
for cmp in cmps:
ssreq = get_requires(cmp)
ssalo = get_at_least_one(cmp)
if ssreq and ssalo:
# they are mixed and in the same `requires` level
salo.append([ss for ss in ssreq + ssalo if ss not in salo])
else:
# no mixed, just add them one by one
salo.extend([ss for ss in ssreq or ssalo if ss not in salo])
alo.append(tuple(salo))
return alo
return get_requires(component) + get_at_least_one(component)
[docs]
def is_registry_point(component):
return type(component).__name__ == "RegistryPoint"
[docs]
def is_datasource(component):
comp_type = get_component_type(component)
if comp_type:
return comp_type.__name__ == "datasource"
return False
[docs]
def get_registry_points(component, datasource=None):
"""
Loop through the dependency graph to identify the corresponding spec registry
points for the component. This is primarily used by datasources and returns a
`set`. In most cases only one registry point will be included in the set, but
in some cases more than one.
Args:
component (callable): The component object
datasource (Boolean): To avoid infinite recursive calls.
Returns:
(set): A list of the registry points found.
"""
reg_points = set()
if is_registry_point(component):
reg_points.add(component)
return reg_points
# avoid infinite recursive call
if datasource is None:
datasource = is_datasource(component)
if datasource:
# Always search dependents for datasources
for dep in get_dependents(component):
if is_registry_point(dep):
reg_points.add(dep)
else:
dep_reg_pts = get_registry_points(dep, True)
if dep_reg_pts:
reg_points.update(dep_reg_pts)
else:
# Always search dependencies for Parsers/Combiners
for dep in get_dependencies(component):
if is_registry_point(dep):
reg_points.add(dep)
else:
dep_reg_pts = get_registry_points(dep, False)
if dep_reg_pts:
reg_points.update(dep_reg_pts)
return reg_points
[docs]
def get_subgraphs(graph=None):
"""
Given a graph of possibly disconnected components, generate all graphs of
connected components. graph is a dictionary of dependencies. Keys are
components, and values are sets of components on which they depend.
Return the sub-graphs sorted as per the "prio".
"""
graph = graph or DEPENDENCIES
# Sort the keys as per "prio", 0 -> no priority
keys = sorted(graph, key=lambda x: getattr(next(iter(get_registry_points(x) or [object])), 'prio', 0), reverse=True)
frontier = set()
seen = set()
while keys:
frontier.add(keys.pop(0))
while frontier:
component = frontier.pop()
seen.add(component)
frontier |= set([d for d in get_dependencies(component) if d in graph])
frontier |= set([d for d in get_dependents(component) if d in graph])
frontier -= seen
yield dict((s, get_dependencies(s)) for s in seen)
# Remove processed keys without changing their order
for s in seen:
try:
keys.remove(s)
except Exception:
pass
seen.clear()
def _import(path, continue_on_error):
log.debug("Importing %s" % path)
try:
return importlib.import_module(path)
except BaseException:
if not continue_on_error:
raise
def _load_components(path, include=".*", exclude="\\.tests", continue_on_error=True):
do_include = re.compile(include).search if include else lambda x: True
do_exclude = re.compile(exclude).search if exclude else lambda x: False
num_loaded = 0
if path.endswith(".py"):
path, _ = os.path.splitext(path)
path = path.rstrip("/").replace("/", ".")
if do_exclude(path):
return 0
package = _import(path, continue_on_error)
if not package:
return 0
num_loaded += 1
if not hasattr(package, "__path__"):
return num_loaded
prefix = package.__name__ + "."
for _, name, is_pkg in pkgutil.iter_modules(path=package.__path__, prefix=prefix):
if not name.startswith(prefix):
name = prefix + name
if is_pkg:
num_loaded += _load_components(name, include, exclude, continue_on_error)
else:
if do_include(name) and not do_exclude(name):
_import(name, continue_on_error)
num_loaded += 1
return num_loaded
[docs]
def load_components(*paths, **kwargs):
"""
Loads all components on the paths. Each path should be a package or module.
All components beneath a path are loaded.
Args:
paths (str): A package or module to load
Keyword Args:
include (str): A regular expression of packages and modules to include.
Defaults to '.*'
exclude (str): A regular expression of packges and modules to exclude.
Defaults to 'test'
continue_on_error (bool): If True, continue importing even if something
raises an ImportError. If False, raise the first ImportError.
Returns:
int: The total number of modules loaded.
Raises:
ImportError
"""
num_loaded = 0
for path in paths:
num_loaded += _load_components(path, **kwargs)
return num_loaded
[docs]
def first_of(dependencies, broker):
for d in dependencies:
if d in broker:
return broker[d]
[docs]
def split_requirements(requires):
req_all = []
req_any = []
for r in requires:
if isinstance(r, list):
req_any.append(r)
else:
req_all.append(r)
return req_all, req_any
[docs]
def stringify_requirements(requires):
if isinstance(requires, tuple):
req_all, req_any = requires
else:
req_all, req_any = split_requirements(requires)
pretty_all = [get_name(r) for r in req_all]
pretty_any = [str([get_name(r) for r in any_list]) for any_list in req_any]
result = "All: %s" % pretty_all + " Any: " + " Any: ".join(pretty_any)
return result
def _register_component(delegate):
component = delegate.component
dependencies = delegate.get_dependencies()
DEPENDENCIES[component] = dependencies
COMPONENTS[delegate.group][component] |= dependencies
COMPONENTS_BY_TYPE[delegate.type].add(component)
for k, v in COMPONENTS_BY_TYPE.items():
if issubclass(delegate.type, k) and delegate.type is not k:
v.add(component)
DELEGATES[component] = delegate
MODULE_NAMES[component] = get_module_name(component)
BASE_MODULE_NAMES[component] = get_base_module_name(component)
[docs]
class ComponentType(object):
"""
ComponentType is the base class for all component type decorators.
For Example:
.. code-block:: python
class my_component_type(ComponentType):
pass
@my_component_type(SshDConfig, InstalledRpms, [ChkConfig, UnitFiles], optional=[IPTables, IpAddr])
def my_func(sshd_config, installed_rpms, chk_config, unit_files, ip_tables, ip_addr):
return installed_rpms.newest("bash")
Notice that the arguments to ``my_func`` correspond to the dependencies in
the ``@my_component_type`` and are in the same order.
When used, a ``my_component_type`` instance is created whose
``__init__`` gets passed dependencies and whose ``__call__`` gets
passed the component to run if dependencies are met.
Parameters to the decorator have these forms:
============ =============================== ==========================
Criteria Example Decorator Arguments Description
============ =============================== ==========================
Required ``SshDConfig, InstalledRpms`` A regular argument
At Least One ``[ChkConfig, UnitFiles]`` An argument as a list
Optional ``optional=[IPTables, IpAddr]`` A list following optional=
============ =============================== ==========================
If a parameter is required, the value provided for it is guaranteed not to
be ``None``. In the example above, ``sshd_config`` and ``installed_rpms``
will not be ``None``.
At least one of the arguments to parameters of an "at least one"
list will not be ``None``. In the example, either or both of ``chk_config``
and unit_files will not be ``None``.
Any or all arguments for optional parameters may be ``None``.
The following keyword arguments may be passed to the decorator:
Attributes:
requires (list): a list of components that all components decorated with
this type will implicitly require. Additional components passed to
the decorator will be appended to this list.
optional (list): a list of components that all components decorated with
this type will implicitly depend on optionally. Additional components
passed as ``optional`` to the decorator will be appended to this list.
metadata (dict): an arbitrary dictionary of information to associate
with the component you're decorating. It can be retrieved with
``get_metadata``.
tags (list): a list of strings that categorize the component. Useful for
formatting output or sifting through results for components you care
about.
group: ``GROUPS.single`` or ``GROUPS.cluster``. Used to organize
components into "groups" that run together with :func:`insights.core.dr.run`.
cluster (bool): if ``True`` will put the component into the
``GROUPS.cluster`` group. Defaults to ``False``. Overrides ``group``
if ``True``.
"""
requires = []
"""
a list of components that all components decorated with this type will
implicitly require. Additional components passed to the decorator will be
appended to this list.
"""
optional = []
"""
a list of components that all components decorated with this type will
implicitly depend on optionally. Additional components passed as
``optional`` to the decorator will be appended to this list.
"""
metadata = {}
"""
an arbitrary dictionary of information to associate with the component
you're decorating. It can be retrieved with ``get_metadata``.
"""
tags = []
"""
a list of strings that categorize the component. Useful for formatting
output or sifting through results for components you care about.
"""
group = GROUPS.single
"""
group: ``GROUPS.single`` or ``GROUPS.cluster``. Used to organize components
into "groups" that run together with :func:`insights.core.dr.run`.
"""
def __init__(self, *deps, **kwargs):
"""
This constructor is the parameterized part of a decorator.
"""
for k, v in kwargs.items():
setattr(self, k, v)
self.component = None
self.requires = []
self.at_least_one = []
self.deps = []
self.type = self.__class__
deps = list(deps) or kwargs.get("requires", [])
requires = list(self.__class__.requires) + deps
for d in requires:
if isinstance(d, list):
self.at_least_one.append(d)
self.deps.extend(d)
else:
self.requires.append(d)
self.deps.append(d)
self.optional = list(self.__class__.optional)
optional = kwargs.get("optional", [])
if optional and not isinstance(optional, list):
optional = [optional]
self.optional.extend(optional)
self.deps.extend(self.optional)
self.dependencies = set(self.deps)
self.metadata = {}
self.metadata.update(self.__class__.metadata)
self.metadata.update(kwargs.get("metadata", {}) or {})
self.group = kwargs.get("group", self.__class__.group)
if kwargs.get("cluster", False):
self.group = GROUPS.cluster
tags = []
tags.extend(self.__class__.tags)
tags.extend(kwargs.get("tags", []) or [])
self.tags = set(tags)
def __call__(self, component):
"""
This function is the part of the decorator that receives the function
or class.
"""
self.component = component
self.__name__ = component.__name__
self.__module__ = component.__module__
self.__doc__ = component.__doc__
self.__qualname__ = getattr(component, "__qualname__", None)
for d in self.dependencies:
add_dependent(d, component)
_register_component(self)
return component
[docs]
def invoke(self, results):
"""
Handles invocation of the component. The default implementation invokes
it with positional arguments based on order of dependency declaration.
"""
args = [results.get(d) for d in self.deps]
return self.component(*args)
[docs]
def get_missing_dependencies(self, broker):
"""
Gets required and at-least-one dependencies not provided by the broker.
"""
missing_required = [r for r in self.requires if r not in broker]
missing_at_least_one = [d for d in self.at_least_one if not set(d).intersection(broker)]
if missing_required or missing_at_least_one:
return (missing_required, missing_at_least_one)
[docs]
def process(self, broker):
"""
Ensures dependencies have been met before delegating to `self.invoke`.
"""
if any(i in broker for i in IGNORE.get(self.component, [])):
raise SkipComponent()
missing = self.get_missing_dependencies(broker)
if missing:
raise MissingRequirements(missing)
return self.invoke(broker)
def get_dependencies(self):
return self.dependencies
def add_dependency(self, dep):
group = self.group
self.at_least_one[0].append(dep)
self.deps.append(dep)
self.dependencies.add(dep)
add_dependent(dep, self.component)
DEPENDENCIES[self.component].add(dep)
COMPONENTS[group][self.component].add(dep)
[docs]
class Broker(object):
"""
The Broker is a fancy dictionary that keeps up with component instances as
a graph is evaluated. It's the state of the evaluation. Once a graph has
executed, the broker will contain everything about the evaluation:
component instances, timings, exceptions, and tracebacks.
You can either inspect the broker at the end of an evaluation, or you can
register callbacks with it, and they'll get invoked after each component
is called.
Attributes:
instances (dict): the component instances with components as keys.
missing_requirements (dict): components that didn't have their dependencies
met. Values are a two-tuple. The first element is the list of
required dependencies that were missing. The second element is the
list of "at least one" dependencies that were missing. For more
information on dependency types, see the :class:`ComponentType`
docs.
exceptions (defaultdict(list)): Components that raise any type of
exception except :class:`SkipComponent` during evaluation. The key
is the component, and the value is a list of exceptions. It's a
list because some components produce multiple instances.
tracebacks (dict): keys are exceptions and values are their text
tracebacks.
exec_times (dict): component -> float dictionary where values are the
number of seconds the component took to execute. Calculated using
:func:`time.time`. For components that produce multiple instances,
the execution time here is the sum of their individual execution
times.
store_skips (bool): Weather to store skips in the broker or not.
"""
def __init__(self, seed_broker=None):
self.instances = dict(seed_broker.instances) if seed_broker else {}
self.missing_requirements = {}
self.exceptions = defaultdict(list)
self.tracebacks = {}
self.exec_times = {}
self.store_skips = False
self.observers = defaultdict(set)
if seed_broker is not None:
self.observers.update(seed_broker.observers)
else:
self.observers[ComponentType] = set()
for k, v in TYPE_OBSERVERS.items():
self.observers[k] |= set(v)
[docs]
def observer(self, component_type=ComponentType):
"""
You can use ``@broker.observer()`` as a decorator to your callback
instead of :func:`Broker.add_observer`.
"""
def inner(func):
self.add_observer(func, component_type)
return func
return inner
[docs]
def add_observer(self, o, component_type=ComponentType):
"""
Add a callback that will get invoked after each component is called.
Args:
o (func): the callback function
Keyword Args:
component_type (ComponentType): the :class:`ComponentType` to observe.
The callback will fire any time an instance of the class or its
subclasses is invoked.
The callback should look like this:
.. code-block:: python
def callback(comp, broker):
value = broker.get(comp)
# do something with value
pass
"""
self.observers[component_type].add(o)
[docs]
def fire_observers(self, component):
_type = get_component_type(component)
if not _type:
return
for k, v in self.observers.items():
if issubclass(_type, k):
for o in v:
try:
o(component, self)
except Exception as e:
log.exception(e)
[docs]
def add_exception(self, component, ex, tb=None):
if isinstance(ex, MissingRequirements):
self.missing_requirements[component] = ex.requirements
else:
self.exceptions[component].append(ex)
self.tracebacks[ex] = tb
def __iter__(self):
return iter(self.instances)
[docs]
def keys(self):
return self.instances.keys()
[docs]
def items(self):
return self.instances.items()
[docs]
def values(self):
return self.instances.values()
[docs]
def get_by_type(self, _type):
"""
Return all of the instances of :class:`ComponentType` ``_type``.
"""
r = {}
for k, v in self.items():
if get_component_type(k) is _type:
r[k] = v
return r
def __contains__(self, component):
return component in self.instances
def __setitem__(self, component, instance):
msg = "Already exists in broker with key: %s"
if component in self.instances:
raise KeyError(msg % get_name(component))
self.instances[component] = instance
def __delitem__(self, component):
if component in self.instances:
del self.instances[component]
return
def __getitem__(self, component):
if component in self.instances:
return self.instances[component]
raise KeyError("Unknown component: %s" % get_name(component))
[docs]
def get(self, component, default=None):
try:
return self[component]
except KeyError:
return default
[docs]
def print_component(self, component_type):
print(json.dumps(
dict((get_name(c), self[c])
for c in sorted(self.get_by_type(component_type), key=get_name))))
[docs]
def get_missing_requirements(func, requires, d):
"""
.. deprecated:: 1.x
"""
if not requires:
return None
if any(i in d for i in IGNORE.get(func, [])):
raise SkipComponent()
req_all, req_any = split_requirements(requires)
d = set(d.keys())
req_all = [r for r in req_all if r not in d]
req_any = [r for r in req_any if set(r).isdisjoint(d)]
if req_all or req_any:
return req_all, req_any
else:
return None
[docs]
def add_observer(o, component_type=ComponentType):
"""
Add a callback that will get invoked after each component is called.
Args:
o (func): the callback function
Keyword Args:
component_type (ComponentType): the :class:`ComponentType` to observe.
The callback will fire any time an instance of the class or its
subclasses is invoked.
The callback should look like this:
.. code-block:: python
def callback(comp, broker):
value = broker.get(comp)
# do something with value
pass
"""
TYPE_OBSERVERS[component_type].add(o)
[docs]
def observer(component_type=ComponentType):
"""
You can use ``@broker.observer()`` as a decorator to your callback
instead of :func:`add_observer`.
"""
def inner(func):
add_observer(func, component_type)
return func
return inner
[docs]
def run_order(graph):
"""
Returns components in an order that satisfies their dependency
relationships.
"""
return toposort_flatten(graph, sort=False)
[docs]
def determine_components(components):
if isinstance(components, dict):
return components
if hashable(components) and components in COMPONENTS_BY_TYPE:
components = get_components_of_type(components)
if isinstance(components, (list, set)):
graph = {}
for c in components:
graph.update(get_dependency_graph(c))
return graph
if hashable(components) and components in DELEGATES:
return get_dependency_graph(components)
if hashable(components) and components in COMPONENTS:
return COMPONENTS[components]
_determine_components = determine_components
[docs]
def run_components(ordered_components, components, broker):
"""
Runs a list of preordered components using the provided broker.
This function allows callers to order components themselves and cache the
result so they don't incur the toposort overhead on every run.
"""
for component in ordered_components:
start = time.time()
try:
if (component not in broker and component in components and
component in DELEGATES and
is_enabled(component)):
log.info("Trying %s" % get_name(component))
result = DELEGATES[component].process(broker)
broker[component] = result
except BlacklistedSpec as bs:
for x in get_registry_points(component):
BLACKLISTED_SPECS.append(str(x).split('.')[-1])
broker.add_exception(component, bs, traceback.format_exc())
except MissingRequirements as mr:
if log.isEnabledFor(logging.DEBUG):
name = get_name(component)
reqs = stringify_requirements(mr.requirements)
log.debug("%s missing requirements %s" % (name, reqs))
broker.add_exception(component, mr)
except SkipComponent as sc:
if broker.store_skips:
log.debug(sc)
broker.add_exception(component, sc, traceback.format_exc())
else:
pass
except Exception as ex:
log.debug(ex)
tb = traceback.format_exc()
broker.add_exception(component, ex, tb)
for reg_spec in get_registry_points(component):
broker.add_exception(reg_spec, ex, tb)
finally:
broker.exec_times[component] = time.time() - start
broker.fire_observers(component)
return broker
[docs]
def run(components=None, broker=None):
"""
Executes components in an order that satisfies their dependency
relationships.
Keyword Args:
components: Can be one of a dependency graph, a single component, a
component group, or a component type. If it's anything other than a
dependency graph, the appropriate graph is built for you and before
evaluation.
broker (Broker): Optionally pass a broker to use for evaluation. One is
created by default, but it's often useful to seed a broker with an
initial dependency.
Returns:
Broker: The broker after evaluation.
"""
components = components or COMPONENTS[GROUPS.single]
components = determine_components(components)
broker = broker or Broker()
# If a SerializedArchiveContext then data found in the archive's
# ./meta_data directory are prepopulated in the broker as Specs so
# no need to collect them again
if broker.get(SerializedArchiveContext) is not None:
for comp in list(components):
if comp in broker:
for dep in components[comp]:
components.pop(dep, None)
return run_components(run_order(components), components, broker)
[docs]
def generate_incremental(components=None, broker=None):
components = components or COMPONENTS[GROUPS.single]
components = determine_components(components)
for graph in get_subgraphs(components):
yield graph, broker or Broker()
[docs]
def run_incremental(components=None, broker=None):
"""
Executes components in an order that satisfies their dependency
relationships. Disjoint subgraphs are executed one at a time and a broker
containing the results for each is yielded. If a broker is passed here, its
instances are used to seed the broker used to hold state for each sub
graph.
Keyword Args:
components: Can be one of a dependency graph, a single component, a
component group, or a component type. If it's anything other than a
dependency graph, the appropriate graph is built for you and before
evaluation.
broker (Broker): Optionally pass a broker to use for evaluation. One is
created by default, but it's often useful to seed a broker with an
initial dependency.
Yields:
Broker: the broker used to evaluate each subgraph.
"""
for graph, _broker in generate_incremental(components, broker):
yield run(graph, broker=_broker)
[docs]
def run_all(components=None, broker=None, pool=None):
if pool:
futures = []
for graph, _broker in generate_incremental(components, broker):
futures.append(pool.submit(run, graph, _broker))
return [f.result() for f in futures]
else:
return list(run_incremental(components=components, broker=broker))