Source code for insights.core.context

import logging
import os
import six
from contextlib import contextmanager
from insights.util import streams, subproc

log = logging.getLogger(__name__)
GLOBAL_PRODUCTS = []
PRODUCT_NAMES = []
DEFAULT_VERSION = ["-1", "-1"]

FSRoots = []


[docs] def fs_root(thing): FSRoots.append(thing) return thing
[docs] def product(klass): GLOBAL_PRODUCTS.append(klass) PRODUCT_NAMES.append(klass.name) return klass
[docs] def get_system(metadata, hostname): for system in metadata.get("systems", []): if system.get("system_id") == hostname: return system
[docs] class MultiNodeProduct(object): def __init__(self, role=None): self.role = role def __bool__(self): return bool(self.role) __nonzero__ = __bool__
[docs] def is_parent(self): return self.role == self.parent_type
[docs] def create_product(metadata, hostname): current_system = get_system(metadata, hostname) for p in GLOBAL_PRODUCTS: if metadata.get("product", "").lower() == p.name and current_system: instance = p() instance.__dict__ = current_system if hasattr(instance, "type"): instance.role = instance.type return instance
[docs] @product class Docker(MultiNodeProduct): name = "docker" parent_type = "host"
[docs] @product class OSP(MultiNodeProduct): name = "osp" parent_type = "Director"
[docs] @product class RHEV(MultiNodeProduct): name = "rhev" parent_type = "Manager"
[docs] @product class RHEL(object): name = "rhel" def __init__(self, version=DEFAULT_VERSION, release=None): self.version = version self.release = release def __bool__(self): return all([(self.version != DEFAULT_VERSION), bool(self.release)]) __nonzero__ = __bool__
[docs] @classmethod def from_metadata(cls, metadata, processor_obj): return cls()
[docs] class Context(object): def __init__(self, **kwargs): self.version = kwargs.pop("version", DEFAULT_VERSION) self.metadata = kwargs.pop("metadata", {}) self.loaded = True self.cmd = None optional_attrs = [ "content", "path", "hostname", "release", "machine_id", "target", "last_client_run", "relative_path", "args", "engine", "image", "container_id" ] for k in optional_attrs: setattr(self, k, kwargs.pop(k, None)) self.relative_path = self.path for p in GLOBAL_PRODUCTS: if p.name in kwargs: setattr(self, p.name, kwargs.pop(p.name)) else: setattr(self, p.name, create_product(self.metadata, self.hostname))
[docs] def stream(self): return iter(self.content)
[docs] def product(self): for pname in PRODUCT_NAMES: if hasattr(self, pname): return getattr(self, pname)
def __repr__(self): return repr(dict((k, str(v)[:30]) for k, v in self.__dict__.items()))
[docs] class ExecutionContextMeta(type): registry = [] def __init__(cls, name, bases, dct): if name == "ExecutionContext": return ExecutionContextMeta.registry.append(cls) # Remember that contexts are tried *in reverse order* so that they # may be overridden by just loading a plugin.
[docs] @classmethod def identify(cls, files): for e in reversed(cls.registry): root, ctx = e.handles(files) if ctx is not None: return (root, ctx) return (None, None)
[docs] class ExecutionContext(six.with_metaclass(ExecutionContextMeta)): marker = None def __init__(self, root="/", timeout=None, all_files=None): self.root = root self.timeout = timeout self.all_files = all_files or []
[docs] @classmethod def handles(cls, files): if cls.marker is None or not files: return (None, None) sep = os.path.sep m = sep + cls.marker.lstrip(sep) marker_root = set() for f in files: if m in f: i = f.find(m) if f.endswith(m) or f[i + len(m)] == sep: root = os.path.dirname(f[:i + 1]) marker_root.add(root) if len(marker_root) == 1: return (marker_root.pop(), cls) if len(marker_root) > 1: # when more marker found, return the one which is closest to root closest_root = marker_root.pop() for left_one in marker_root: if len(left_one) < len(closest_root): closest_root = left_one return (closest_root, cls) return (None, None)
[docs] def check_output(self, cmd, timeout=None, keep_rc=False, env=None, signum=None): """ Subclasses can override to provide special environment setup, command prefixes, etc. """ return subproc.call(cmd, timeout=timeout or self.timeout, signum=signum, keep_rc=keep_rc, env=env)
[docs] def shell_out(self, cmd, split=True, timeout=None, keep_rc=False, env=None, signum=None): env = env or os.environ rc = None raw = self.check_output(cmd, timeout=timeout, keep_rc=keep_rc, env=env, signum=signum) if keep_rc: rc, output = raw else: output = raw if split: output = output.splitlines() return (rc, output) if keep_rc else output
[docs] @contextmanager def stream(self, *args, **kwargs): with streams.stream(*args, **kwargs) as s: yield s
[docs] @contextmanager def connect(self, *args, **kwargs): with streams.connect(*args, **kwargs) as s: yield s
[docs] def locate_path(self, path): return os.path.expandvars(path)
def __repr__(self): msg = "<%s('%s', %s)>" return msg % (self.__class__.__name__, self.root, self.timeout)
[docs] @fs_root class HostContext(ExecutionContext): def __init__(self, root='/', timeout=30, all_files=None): super(HostContext, self).__init__(root=root, timeout=timeout, all_files=all_files)
[docs] @fs_root class HostArchiveContext(ExecutionContext): marker = "insights_commands"
[docs] @fs_root class SerializedArchiveContext(ExecutionContext): marker = "insights_archive.txt"
[docs] @fs_root class SosArchiveContext(ExecutionContext): marker = "sos_commands"
[docs] @fs_root class ClusterArchiveContext(ExecutionContext): pass
[docs] @fs_root class DockerImageContext(ExecutionContext): pass
[docs] @fs_root class JBossContext(HostContext): pass
[docs] @fs_root class JDRContext(ExecutionContext): marker = "JBOSS_HOME"
[docs] def locate_path(self, path): p = path.replace("$JBOSS_HOME", "JBOSS_HOME") return super(JDRContext, self).locate_path(p)
[docs] @fs_root class InsightsOperatorContext(ExecutionContext): """Recognizes insights-operator archives""" marker = "config/featuregate"
[docs] @fs_root class MustGatherContext(ExecutionContext): """Recognizes must-gather archives""" marker = "cluster-scoped-resources"
[docs] class OpenStackContext(ExecutionContext): def __init__(self, hostname): super(OpenStackContext, self).__init__() self.hostname = hostname