Source code for insights.shell

# -*- coding: utf-8 -*-
from __future__ import print_function
import argparse
import logging
import importlib
import inspect
import os
import re
import six
import traceback
import yaml

from collections import defaultdict
from contextlib import contextmanager

import IPython
from pygments.console import ansiformat
from traitlets.config.loader import Config

from insights.core.context import SerializedArchiveContext
from insights.core.serde import Hydration
from insights.parsr.query import *  # noqa
from insights.parsr.query import eq, matches, make_child_query as q  # noqa
from insights.parsr.query.boolean import FALSE, TRUE

from insights import (
    apply_configs,
    apply_default_enabled,
    create_context,
    datasource,
    dr,
    extract,
    load_default_plugins,
    load_packages,
    parse_plugins,
)
from insights.core import plugins
from insights.core.context import HostContext
from insights.core.spec_factory import ContentProvider, RegistryPoint
from insights.formats import render
from insights.formats.text import render_links

Loader = getattr(yaml, "CSafeLoader", yaml.SafeLoader)

RULE_COLORS = {"fail": "brightred", "pass": "blue", "info": "magenta", "skip": "yellow"}


@contextmanager
def _create_new_broker(path=None):
    """
    Create a broker and populate it by evaluating the path with all
    registered datasources.

    Args:
        path (str): path to the archive or directory to analyze. ``None`` will
            analyze the current system.
    """
    datasources = dr.get_components_of_type(datasource)

    def make_broker(ctx):
        broker = dr.Broker()
        broker[ctx.__class__] = ctx

        if isinstance(ctx, SerializedArchiveContext):
            h = Hydration(ctx.root)
            broker = h.hydrate(broker=broker)

        dr.run(datasources, broker=broker)

        del broker[ctx.__class__]
        return broker

    if path:
        if os.path.isdir(path):
            ctx = create_context(path)
            yield (path, make_broker(ctx))
        else:
            with extract(path) as e:
                ctx = create_context(e.tmp_dir)
                yield (e.tmp_dir, make_broker(ctx))
    else:
        yield (os.curdir, make_broker(HostContext()))


# contextlib.ExitStack isn't available in all python versions
# so recurse to victory.
def with_brokers(archives, callback):
    brokers = []

    def inner(paths):
        if paths:
            path = paths.pop()
            with _create_new_broker(path) as ctx:
                brokers.append(ctx)
                inner(paths)
        else:
            callback(brokers)

    if archives:
        inner(list(reversed(archives)))
    else:
        with _create_new_broker() as ctx:
            callback([ctx])


def _get_available_models(broker, group=dr.GROUPS.single):
    """
    Given a broker populated with datasources, return everything that could
    run based on them.
    """
    state = set(broker.instances.keys())
    models = {}

    for comp in dr.run_order(dr.COMPONENTS[group]):
        if comp in dr.DELEGATES and not plugins.is_datasource(comp):
            if dr.DELEGATES[comp].get_missing_dependencies(state):
                continue

            if plugins.is_type(
                comp, (plugins.rule, plugins.condition, plugins.incident)
            ):
                name = "_".join(
                    [dr.get_base_module_name(comp), dr.get_simple_name(comp)]
                )
            else:
                name = dr.get_simple_name(comp)

            if name in models:
                prev = models[name]
                models[dr.get_name(prev).replace(".", "_")] = prev

                del models[name]
                name = dr.get_name(comp).replace(".", "_")

            models[name] = comp
            state.add(comp)

    return models


[docs] class Models(dict): u""" Represents all components that may be available given the data being analyzed. Use models.find() to see them. Tab complete attributes to access them. Use help(models) for more info. Start the shell with the environment variable INSIGHTS_FILTERS_ENABLED=False to disable filtering that may cause unexpected missing data. Examples: >>> models.find("rpm") InstalledRpms (insights.parsers.installed_rpms.InstalledRpms) >>> rpms = models.InstalledRpms >>> rpms.newest("bash") 0:bash-4.1.2-48.el6 >>> models.find("(?i)yum") # Prefix "(?i)" ignores case. YumConf (insights.parsers.yum_conf.YumConf, parser) YumLog (insights.parsers.yumlog.YumLog, parser) YumRepoList (insights.parsers.yum.YumRepoList, parser) └╌╌Incorrect line: 'repolist: 33296' YumReposD (insights.parsers.yum_repos_d.YumReposD, parser) >>> models.show_trees("rpm") insights.parsers.installed_rpms.InstalledRpms (parser) ┊ insights.specs.Specs.installed_rpms (unfiltered / lines) ┊ ┊╌╌╌╌╌TextFileProvider("'/home/csams/Downloads/archives/sosreport-example-20191225000000/sos_commands/rpm/package-data'") ┊ ┊ insights.specs.insights_archive.InsightsArchiveSpecs.installed_rpms (unfiltered / lines) ┊ ┊ ┊ insights.specs.insights_archive.InsightsArchiveSpecs.all_installed_rpms (unfiltered / lines) ┊ ┊ ┊ ┊ insights.core.context.HostArchiveContext () ┊ ┊ insights.specs.sos_archive.SosSpecs.installed_rpms (unfiltered / lines) ┊ ┊ ┊╌╌╌╌╌TextFileProvider("'/home/csams/Downloads/archives/sosreport-example-20191225000000/sos_commands/rpm/package-data'") ┊ ┊ ┊ insights.core.context.SosArchiveContext () ┊ ┊ insights.specs.default.DefaultSpecs.installed_rpms (unfiltered / lines) ┊ ┊ ┊ insights.specs.default.DefaultSpecs.docker_installed_rpms (unfiltered / lines) ┊ ┊ ┊ ┊ insights.core.context.DockerImageContext () ┊ ┊ ┊ insights.specs.default.DefaultSpecs.host_installed_rpms (unfiltered / lines) ┊ ┊ ┊ ┊ insights.core.context.HostContext () """ def __init__(self, broker, models, cwd, tmp, cov): self._requested = set() self._broker = broker self._cwd = cwd self._tmp = tmp self._cov = cov super(Models, self).__init__(models) def __dir__(self): """ Enabled ipython autocomplete. """ return sorted(set(list(self.keys()) + dir(Models))) def _ipython_key_completions_(self): """ For autocomplete of keys when accessing models as a dict. """ return sorted(self.keys()) def __str__(self): return "{} components possibly available".format(len(self)) def _get_color(self, comp): if comp in self._broker: if plugins.is_type(comp, plugins.rule) and self._broker[comp].get("type") == "skip": return "yellow" return "green" elif comp in self._broker.exceptions: return "brightred" elif comp in self._broker.missing_requirements: return "yellow" else: return "" def _dump_diagnostics(self, comp): results = [] results.append("Dependency Tree") results.append("===============") results.extend(self._show_tree(comp)) results.append("") results.append("Missing Dependencies") results.append("====================") results.extend(self._show_missing(comp)) results.append("") results.append("Exceptions") results.append("==========") results.extend(self._show_exceptions(comp)) IPython.core.page.page(six.u(os.linesep.join(results)))
[docs] def evaluate_all(self, match=None, ignore="spec"): """ Evaluate all components that match. Args: match (str, optional): regular expression for matching against the fully qualified name of components to keep. ignore (str, optional): regular expression for searching against the fully qualified name of components to ignore. """ match, ignore = self._desugar_match_ignore(match, ignore) tasks = [] for c in self.values(): name = dr.get_name(c) if match.test(name) and not ignore.test(name): if not any( [ c in self._broker.instances, c in self._broker.exceptions, c in self._broker.missing_requirements, ] ): tasks.append(c) if not tasks: return dr.run(tasks, broker=self._broker) self.show_timings(match, ignore)
[docs] def evaluate(self, name): """ Evaluate a component and return its result. Prints diagnostic information in the case of failure. This function is useful when a component's name contains characters that aren't valid for python identifiers so you can't access it with ``models.<name>``. Args: name (str): the name of the component as shown by :func:`Models.find()`. """ comp = self.get(name) or dr.get_component(name) if not comp: return if not plugins.is_rule(comp): self._requested.add((name, comp)) if comp in self._broker: return self._broker[comp] if comp in self._broker.exceptions or comp in self._broker.missing_requirements: self._dump_diagnostics(comp) return val = dr.run(comp, broker=self._broker).get(comp) if comp not in self._broker: if comp in self._broker.exceptions or comp in self._broker.missing_requirements: self._dump_diagnostics(comp) else: print("{} chose to skip.".format(dr.get_name(comp))) return val
def __getattr__(self, name): return self.evaluate(name) # TODO: lot of room for improvement here...
[docs] def make_rule(self, path=None, overwrite=False, pick=None): """ Attempt to generate a rule based on models used so far. Args: path(str): path to store the rule. overwrite (bool): whether to overwrite an existing file. pick (str): Optionally specify which lines or line ranges to use for the rule body. "1 2 3" gets lines 1,2 and 3. "1 3-5 7" gets line 1, lines 3 through 5, and line 7. """ import IPython ip = IPython.get_ipython() ignore = [ r"=.*models\.", r"^(%|!|help)", r"make_rule", r"models\.(show|find).*", r".*\?$", r"^(clear|pwd|cd *.*|ll|ls)$", ] if pick: lines = [r[2] for r in ip.history_manager.get_range_by_str(pick)] else: lines = [] for r in ip.history_manager.get_range(): l = r[2] if any(re.search(i, l) for i in ignore): continue elif l.startswith("models."): l = l[7:] lines.append(l) # the user asked for these models during the session. requested = sorted(self._requested, key=lambda i: i[0]) # figure out what we need to import for filtering. filterable = defaultdict(list) for _, c in requested: for d in dr.get_dependency_graph(c): try: if isinstance(d, RegistryPoint) and d.filterable: n = d.__qualname__ cls = n.split(".")[0] filterable[dr.get_module_name(d)].append((cls, n)) except: pass model_names = [dr.get_simple_name(i[1]) for i in requested] var_names = [i[0].lower() for i in requested] imports = [ "from insights import rule, make_fail, make_info, make_pass # noqa", "from insights.parsr.query import * # noqa", "from insights.parsr.query import make_child_query as q # noqa", "", ] for _, c in requested: mod = dr.get_module_name(c) name = dr.get_simple_name(c) imports.append("from {} import {}".format(mod, name)) seen = set() if filterable: imports.append("") for k in sorted(filterable): for (cls, n) in filterable[k]: if (k, cls) not in seen: imports.append("from {} import {}".format(k, cls)) seen.add((k, cls)) seen = set() filters = [] for k in sorted(filterable): for i in filterable[k]: if i not in seen: filters.append("add_filter({}, ...)".format(i[1])) seen.add(i) if filters: imports.append("from insights.core.filters import add_filter") filter_stanza = os.linesep.join(filters) import_stanza = os.linesep.join(imports) decorator = "@rule({})".format(", ".join(model_names)) func_decl = "def report({}):".format(", ".join(var_names)) body = os.linesep.join([" " + x for x in lines]) if lines else " pass" res = import_stanza if filter_stanza: res += (os.linesep * 2) + filter_stanza res += (os.linesep * 3) + decorator + os.linesep + func_decl + os.linesep + body if path: if not path.startswith("/"): realpath = os.path.realpath(path) if not realpath.startswith(self._cwd): path = os.path.join(self._cwd, path) if os.path.exists(path) and not overwrite: print( "{} already exists. Use overwrite=True to overwrite.".format(path) ) return if not os.path.exists(path) or overwrite: with open(path, "w") as f: f.write(res) ip.magic("edit -x {}".format(path)) print("Saved to {}".format(path)) else: IPython.core.page.page(ip.pycolorize(res))
def _desugar_match_ignore(self, match, ignore): if match is None: match = TRUE elif isinstance(match, str): if match in self: match = eq(dr.get_name(self[match])) else: match = matches(match) if ignore is None: ignore = FALSE elif isinstance(ignore, str): ignore = matches(ignore) return (match, ignore) def _show_missing(self, comp): results = [] try: req, alo = self._broker.missing_requirements[comp] name = dr.get_name(comp) results.append(name) results.append("-" * len(name)) if req: results.append("Requires:") for r in req: results.append(" {}".format(dr.get_name(r))) if alo: if req: results.append("") results.append("Requires At Least One From Each List:") for r in alo: results.append("[") for i in r: results.append(" {}".format(dr.get_name(i))) results.append("]") except: pass return results
[docs] def show_requested(self): """ Show the components you've worked with so far. """ results = [] for name, comp in sorted(self._requested): results.append( ansiformat( self._get_color(comp), "{} {}".format(name, dr.get_name(comp)) ) ) IPython.core.page.page(six.u(os.linesep.join(results)))
[docs] def reset_requested(self): """ Reset requested state so you can work on a new rule. """ IPython.get_ipython().history_manager.reset() self._requested.clear()
[docs] def show_source(self, comp): """ Show source for the given module, class, or function. Also accepts the string names, with the side effect that the component will be imported. """ try: if isinstance(comp, six.string_types): comp = self.get(comp) or dr.get_component(comp) or importlib.import_module(comp) comp = inspect.getmodule(comp) ip = IPython.get_ipython() if self._cov: path, runnable, excluded, not_run, _ = self._cov.analysis2(comp) runnable, not_run = set(runnable), set(not_run) src = ip.pycolorize(inspect.getsource(comp)).splitlines() width = len(str(len(src))) template = "{0:>%s}" % width results = [] file_line = "{} {}".format( ansiformat("red", "File:"), os.path.realpath(path) ) explain_line = "{} numbered lines have executed. python standard libs are excluded.".format( ansiformat("*brightgreen*", "Green") ) results.append(file_line) results.append(explain_line) results.append("") for i, line in enumerate(src, start=1): prefix = template.format(i) if i in runnable and i not in not_run: color = "*brightgreen*" else: color = "gray" results.append("{} {}".format(ansiformat(color, prefix), line)) IPython.core.page.page(six.u(os.linesep.join(results))) else: ip.inspector.pinfo(comp, detail_level=1) except: traceback.print_exc()
def _get_type_name(self, comp): try: return dr.get_component_type(comp).__name__ except: return "" def _get_rule_value_kind(self, val): if val is None: kind = None elif isinstance(val, plugins.make_response): kind = "fail" elif isinstance(val, plugins.make_pass): kind = "pass" elif isinstance(val, plugins.make_info): kind = "info" elif isinstance(val, plugins._make_skip): kind = "skip" elif isinstance(val, plugins.Response): kind = val.response_type or "" else: kind = "" return kind def _get_rule_value(self, comp): try: val = self._broker[comp] if plugins.is_rule(comp): _type = val.__class__.__name__ kind = self._get_rule_value_kind(val) color = RULE_COLORS.get(kind, "") return ansiformat(color, " [{}]".format(_type)) except: pass return "" def _show_datasource(self, d, v, indent=""): try: filtered = "filtered" if dr.DELEGATES[d].filterable else "unfiltered" mode = "bytes" if dr.DELEGATES[d].raw else "lines" except: filtered = "unknown" mode = "unknown" desc = "{n} ({f} / {m})".format(n=dr.get_name(d), f=filtered, m=mode) color = self._get_color(d) results = [] results.append(indent + ansiformat(color, desc)) if not v: return results if not isinstance(v, list): v = [v] for i in v: if isinstance(i, ContentProvider): s = ansiformat(color, str(i)) else: s = ansiformat(color, "<intermediate value>") results.append("{}\u250A\u254C\u254C\u254C\u254C\u254C{}".format(indent, s)) return results def _show_tree(self, node, indent="", depth=None, dep_getter=dr.get_dependencies): if depth is not None and depth == 0: return [] results = [] color = self._get_color(node) if plugins.is_datasource(node): results.extend( self._show_datasource(node, self._broker.get(node), indent=indent) ) else: _type = self._get_type_name(node) name = dr.get_name(node) suffix = self._get_rule_value(node) desc = ansiformat(color, "{n} ({t}".format(n=name, t=_type)) results.append(indent + desc + suffix + ansiformat(color, ")")) dashes = "\u250A\u254C\u254C\u254C\u254C\u254C" if node in self._broker.exceptions: for ex in self._broker.exceptions[node]: results.append(indent + dashes + ansiformat(color, str(ex))) deps = dep_getter(node) next_indent = indent + "\u250A " for d in deps: results.extend( self._show_tree( d, next_indent, depth=depth if depth is None else depth - 1, dep_getter=dep_getter ) ) return results
[docs] def show_trees(self, match=None, ignore="spec", depth=None, toward_dependents=False): """ Show dependency trees of any components whether they're available or not. Args: match (str, optional): regular expression for matching against the fully qualified name of components to keep. ignore (str, optional): regular expression for searching against the fully qualified name of components to ignore. depth (int, optional): how deep into the tree to explore. toward_dependents (bool, optional): whether to walk the tree toward dependents. Default is to walk toward dependencies. """ match, ignore = self._desugar_match_ignore(match, ignore) dep_getter = dr.get_dependents if toward_dependents else dr.get_dependencies graph = defaultdict(list) for c in dr.DELEGATES: name = dr.get_name(c) if match.test(name) and not ignore.test(name): graph[name].append(c) results = [] for name in sorted(graph): for c in graph[name]: results.extend(self._show_tree(c, depth=depth, dep_getter=dep_getter)) results.append("") IPython.core.page.page(six.u(os.linesep.join(results)))
[docs] def show_failed(self, match=None, ignore="spec"): """ Show names of any components that failed during evaluation. Ignores "spec" by default. Args: match (str, optional): regular expression for matching against the fully qualified name of components to keep. ignore (str, optional): regular expression for searching against the fully qualified name of components to ignore. """ match, ignore = self._desugar_match_ignore(match, ignore) mid_dashes = "\u250A\u254C\u254C" bottom_dashes = "\u2514\u254C\u254C" results = [] for comp in sorted(self._broker.exceptions, key=dr.get_name): name = dr.get_name(comp) if match.test(name) and not ignore.test(name): color = self._get_color(comp) results.append(ansiformat(color, name)) exes = self._broker.exceptions[comp] last = len(exes) - 1 for i, ex in enumerate(exes): dashes = bottom_dashes if i == last else mid_dashes results.append(ansiformat(color, dashes + str(ex))) results.append("") IPython.core.page.page(six.u(os.linesep.join(results)))
def _show_exceptions(self, comp): name = dr.get_name(comp) results = [ansiformat("*brightred*", name)] results.append(ansiformat("*brightred*", "-" * len(name))) for e in self._broker.exceptions.get(comp, []): t = self._broker.tracebacks.get(e) if t: results.append(t) return results
[docs] def show_exceptions(self, match=None, ignore="spec"): """ Show exceptions that occurred during evaluation. Ignores "spec" by default. Args: match (str, optional): regular expression for matching against the fully qualified name of components to keep. ignore (str, optional): regular expression for searching against the fully qualified name of components to ignore. """ match, ignore = self._desugar_match_ignore(match, ignore) results = [] for comp in sorted(self._broker.exceptions, key=dr.get_name): name = dr.get_name(comp) if match.test(name) and not ignore.test(name): results.extend(self._show_exceptions(comp)) IPython.core.page.page(six.u(os.linesep.join(results)))
[docs] def show_rule_report(self, match=None, ignore=None): """ Print a rule report for the matching rules. """ match, ignore = self._desugar_match_ignore(match, ignore) results = defaultdict(dict) for comp, val in self._broker.items(): name = dr.get_name(comp) if plugins.is_rule(comp) and match.test(name) and not ignore.test(name): kind = self._get_rule_value_kind(val) if kind: body = render(comp, val) links = render_links(comp) results[kind][name] = os.linesep.join([body, "", links]) report = [] for kind in ["info", "pass", "fail"]: color = RULE_COLORS.get(kind, "") hits = results.get(kind, {}) for name in sorted(hits): report.append(ansiformat(color, name)) report.append(ansiformat(color, "-" * len(name))) report.append(hits[name]) report.append("") IPython.core.page.page(six.u(os.linesep.join(report)))
[docs] def show_timings(self, match=None, ignore="spec", group=dr.GROUPS.single): """ Show timings for components that have successfully evaluated. Args: match (str, optional): regular expression for matching against the fully qualified name of components to keep. ignore (str, optional): regular expression for searching against the fully qualified name of components to ignore. """ match, ignore = self._desugar_match_ignore(match, ignore) results = [] total = 0.0 for comp in dr.COMPONENTS[group]: name = dr.get_name(comp) if comp in self._broker.exec_times and match.test(name) and not ignore.test(name): color = self._get_color(comp) t = self._broker.exec_times[comp] total += t results.append((t, name, color)) report = [ansiformat("brightmagenta", "Total: {:.10f} seconds".format(total)), ""] for timing, name, color in sorted(results, reverse=True): report.append(ansiformat(color, "{:.10f}: {}".format(timing, name))) IPython.core.page.page(six.u(os.linesep.join(report)))
[docs] def find(self, match=None, ignore=None): """ Find components that might be available based on the data being analyzed. Args: match (str, optional): regular expression for matching against the fully qualified name of components to keep. ignore (str, optional): regular expression for searching against the fully qualified name of components to ignore. """ match, ignore = self._desugar_match_ignore(match, ignore) mid_dashes = "\u250A\u254C\u254C" bottom_dashes = "\u2514\u254C\u254C" results = [] for p in sorted(self, key=str.lower): comp = self[p] name = dr.get_name(comp) if match.test(name) and not ignore.test(name): color = self._get_color(comp) _type = self._get_type_name(comp) suffix = self._get_rule_value(comp) desc = ansiformat(color, "{p} ({n}, {t}".format(p=p, n=name, t=_type)) results.append(desc + suffix + ansiformat(color, ")")) if comp in self._broker.exceptions: exes = self._broker.exceptions[comp] last = len(exes) - 1 for i, ex in enumerate(exes): dashes = bottom_dashes if i == last else mid_dashes results.append(ansiformat(color, dashes + str(ex))) IPython.core.page.page(six.u(os.linesep.join(results)))
[docs] class Holder(dict): """ This is a dictionary that holds models for multiple archives. Access each model set using the path to the archive as the key. See models.keys(). """ def _ipython_key_completions_(self): return self.keys()
def start_session(paths, change_directory=False, __coverage=None, kernel=False): __cwd = os.path.abspath(os.curdir) def callback(brokers): models = Holder() for i, (path, broker) in enumerate(brokers): avail = _get_available_models(broker) if paths: if len(paths) > 1: models[paths[i]] = Models(broker, avail, __cwd, path, __coverage) else: models = Models(broker, avail, __cwd, path, __coverage) else: models = Models(broker, avail, __cwd, path, __coverage) if change_directory and len(brokers) == 1: __working_path, _ = brokers[0] os.chdir(__working_path) # disable jedi since it won't autocomplete for objects with__getattr__ # defined. IPython.core.completer.Completer.use_jedi = False __cfg = Config() __cfg.TerminalInteractiveShell.banner1 = Models.__doc__ __ns = {} __ns.update(globals()) __ns.update({"models": models}) if kernel: from ipykernel import kernelapp kernelapp.launch_new_instance([], user_ns=__ns, config=__cfg) else: IPython.start_ipython([], user_ns=__ns, config=__cfg) with_brokers(paths, callback) if change_directory: os.chdir(__cwd) def _handle_config(config): if config: with open(config) as f: cfg = yaml.load(f, Loader=Loader) load_packages(cfg.get("packages", [])) apply_default_enabled(cfg) apply_configs(cfg) def _parse_args(): desc = "Perform interactive system analysis with insights components." epilog = """ Set env INSIGHTS_FILTERS_ENABLED=False to disable filtering that may cause unexpected missing data. """.strip() p = argparse.ArgumentParser(description=desc, epilog=epilog) p.add_argument( "-p", "--plugins", default="", help="Comma separated list of packages to load." ) p.add_argument("-c", "--config", help="The insights configuration to apply.") p.add_argument( "--no-coverage", action="store_true", help="Don't show code coverage when viewing source.", ) p.add_argument( "--cd", action="store_true", help="Change into the expanded directory for analysis.", ) p.add_argument( "--no-defaults", action="store_true", help="Don't load default components." ) p.add_argument( "-v", "--verbose", action="store_true", help="Global debug level logging." ) p.add_argument( "-k", "--kernel", action="store_true", default=False, help="Start an IPython kernel instead of an interactive session." " Requires ipykernel module" ) path_desc = "Archives or paths to analyze. Leave off to target the current system." p.add_argument("paths", nargs="*", help=path_desc) return p.parse_args() def main(): args = _parse_args() logging.basicConfig(level=logging.DEBUG if args.verbose else logging.ERROR) cov = None if not args.no_coverage: from coverage import Coverage cov = Coverage(cover_pylib=False) cov.start() if not args.no_defaults: load_default_plugins() dr.load_components("insights.parsers", "insights.combiners") load_packages(parse_plugins(args.plugins)) _handle_config(args.config) start_session(args.paths, args.cd, __coverage=cov, kernel=args.kernel) if cov: cov.stop() cov.erase() if __name__ == "__main__": main()