from __future__ import print_function
import copy
import importlib
import inspect
import itertools
import json
import logging
from collections import defaultdict
from functools import wraps
from io import StringIO
from operator import eq
from yaml import SafeLoader
import insights
from insights import apply_filters
from insights.core import dr, filters, spec_factory, _PatchedSafeLoader # noqa: F401
from insights.core.context import Context
from insights.core.plugins import make_none
from insights.specs import Specs
# Use yaml.SafeLoader when testing plugins, as the yaml.CSafeLoader
# does not work well with coverage test.
insights.core._PatchedSafeLoader = SafeLoader
# we intercept the add_filter call during integration testing so we can ensure
# that rules add filters to datasources that *should* be filterable
ADDED_FILTERS = defaultdict(set)
add_filter = filters.add_filter
find = spec_factory.find
def _intercept_add_filter(func):
@wraps(func)
def inner(component, pattern, max_match=filters.MAX_MATCH):
ret = add_filter(component, pattern, max_match=max_match)
calling_module = inspect.stack()[1][0].f_globals.get("__name__")
ADDED_FILTERS[calling_module] |= set(
r for r in dr.get_registry_points(component) if r.filterable
)
return ret
return inner
def _intercept_find(func):
@wraps(func)
def inner(ds, pattern):
ret = find(ds, pattern)
calling_module = inspect.stack()[1][0].f_globals.get("__name__")
ADDED_FILTERS[calling_module].add(ds)
return ret
return inner
filters.add_filter = _intercept_add_filter(filters.add_filter)
insights.add_filter = _intercept_add_filter(insights.add_filter)
spec_factory.find = _intercept_find(spec_factory.find)
logger = logging.getLogger(__name__)
ARCHIVE_GENERATORS = []
HEARTBEAT_ID = "99e26bb4823d770cc3c11437fe075d4d1a4db4c7500dad5707faed3b"
HEARTBEAT_NAME = "insights-heartbeat-9cd6f607-6b28-44ef-8481-62b0e7773614"
DEFAULT_RELEASE = "Red Hat Enterprise Linux Server release 7.2 (Maipo)"
DEFAULT_HOSTNAME = "hostname.example.com"
MAKE_NONE_RESULT = make_none()
_UNDEFINED = object()
def _beautify_deep_compare_diff(result, expected):
if not (isinstance(result, dict) and isinstance(expected, dict)):
return result
if result.get('type') == 'skip':
return result
expected_keys = set(expected.keys())
result_keys = set(result.keys())
common_keys = set.intersection(result_keys, expected_keys)
diff = []
for k in result_keys - common_keys:
diff.append('\tkey "{0}" not in Expected;'.format(k))
for k in expected_keys - common_keys:
diff.append('\tkey "{0}" not in Result;'.format(k))
for k in common_keys:
if not eq(result[k], expected[k]):
diff.append(
'\tkey "{0}" unequal values:\n\t\tExpected: {1}\n\t\tResult : {2}'.format(
k, expected[k], result[k]
)
)
if not diff:
diff.append('\tUnrecognized unequal values in result layer one;')
diff.append('Result: "{0}"'.format(result))
return '\n' + '\n'.join(diff)
[docs]
def deep_compare(result, expected):
"""
Deep compare rule reducer results when testing.
.. note::
"[None, XX]" is a special format of the `expected` for this methoed to
check the missing dependencies.
"""
logger.debug("--Comparing-- (%s) %s to (%s) %s", type(result), result, type(expected), expected)
missing = None
if isinstance(expected, (tuple, list, set)) and len(expected) == 2 and expected[0] is None:
expected, missing = expected
# This case ensures that when rules return a make_none() response, all of the older
# CI tests that are looking for None instead of make_none() will still pass
if result is None or (isinstance(result, dict) and result.get("type") == "none"):
assert expected is None or expected == MAKE_NONE_RESULT, result
return
if isinstance(result, dict) and expected is None:
# checking the missing component (RHINRULE-283)
if missing:
assert "MISSING_REQUIREMENTS" == result['reason'], result['reason']
for mis in [missing] if isinstance(missing, str) else missing:
assert mis in result['details'], '"{0}" not in "{1}"'.format(mis, result['details'])
assert result["type"] == "skip", result
return
assert eq(result, expected), _beautify_deep_compare_diff(result, expected)
COMPONENT_FILTERED_PARSERS = {
'BootLoaderEntries': ['insights.specs.Specs.ls_lan_filtered'],
'CloudInstance': ['insights.parsers.subscription_manager.SubscriptionManagerFacts'],
'CloudProvider': [
'insights.parsers.rhsm_conf.RHSMConf',
'insights.parsers.subscription_manager.SubscriptionManagerFacts',
],
'Grubby': ['insights.specs.Specs.ls_lan_filtered'],
'GrubConf': ['insights.specs.Specs.ls_lan_filtered'],
'IsUEFIBoot': ['insights.specs.Specs.ls_lan_filtered'],
'OSRelease': ['insights.parsers.dmesg.DmesgLineList'],
'Sap': ['insights.parsers.saphostctrl.SAPHostCtrlInstances'],
'SELinux': ['insights.specs.Specs.ls_lan_filtered'],
}
[docs]
def run_test(component, input_data, expected=_UNDEFINED, return_make_none=False, do_filter=True):
"""
Arguments:
component: The insights component need to test.
input_data: The test data prepared for testing the component.
expected: The expected result need to compare.
return_make_none: Does it allow to return None?
do_filter: Does need to check dependency spec filter warning?
- it's not required to check the filters for sosreport
"""
def get_filtered_specs(module):
filtered = set()
mods = dir(importlib.import_module(module))
for comp, parsers in COMPONENT_FILTERED_PARSERS.items():
if comp in mods:
for parser in parsers:
if parser.split('.')[-1] not in mods:
# The parser is NOT imported again in the rule
parser = dr.get_component_by_name(parser)
filtered.update(dr.get_registry_points(parser))
return filtered
if do_filter and filters.ENABLED:
mod = dr.get_module_name(component)
sup_mod = '.'.join(mod.split('.')[:-1])
rps = dr.get_registry_points(component)
filtered = get_filtered_specs(mod)
filterable = set(d for d in rps if dr.get_delegate(d).filterable) - filtered
missing_filters = (
filterable - ADDED_FILTERS.get(mod, set()) - ADDED_FILTERS.get(sup_mod, set())
)
if missing_filters:
names = [dr.get_name(m) for m in missing_filters]
msg = "%s must add filters to %s"
raise Exception(msg % (mod, ", ".join(names)))
broker = run_input_data(component, input_data)
result = broker.get(component)
if expected is not _UNDEFINED:
deep_compare(result, expected)
elif result == MAKE_NONE_RESULT and not return_make_none:
# Convert make_none() result to None as default unless
# make_none explicitly requested
return None
return result
[docs]
def integrate(input_data, component):
return run_test(component, input_data)
[docs]
def context_wrap(
lines,
path="path",
hostname=DEFAULT_HOSTNAME,
release=DEFAULT_RELEASE,
version="-1.-1",
machine_id="machine_id",
strip=True,
split=True,
filtered_spec=None,
**kwargs,
):
if isinstance(lines, str):
if strip:
lines = lines.strip()
if split:
lines = lines.splitlines()
if filtered_spec is not None and filtered_spec in filters.FILTERS:
lines = [l for l in lines if any([f in l for f in filters.FILTERS[filtered_spec]])]
return Context(
content=lines,
path=path,
hostname=hostname,
release=release,
version=version.split("."),
machine_id=machine_id,
relative_path=path,
**kwargs,
)
input_data_cache = {}
counter = itertools.count()
# Helper constants when its necessary to test for a specific RHEL major version
# eg RHEL6, but the minor version isn't important
RHEL4 = "Red Hat Enterprise Linux AS release 4 (Nahant Update 9)"
RHEL5 = "Red Hat Enterprise Linux Server release 5.11 (Tikanga)"
RHEL6 = "Red Hat Enterprise Linux Server release 6.5 (Santiago)"
RHEL7 = "Red Hat Enterprise Linux Server release 7.0 (Maipo)"
RHEL8 = "Red Hat Enterprise Linux release 8.0 (Ootpa)"
[docs]
def redhat_release(major, minor=""):
"""
Helper function to construct a redhat-release string for a specific RHEL
major and minor version. Only constructs redhat-releases for RHEL major
releases 4, 5, 6 & 7
Arguments:
major: RHEL major number. Accepts str, int or float (as major.minor)
minor: RHEL minor number. Optional and accepts str or int
For example, to construct a redhat-release for::
RHEL4U9: redhat_release('4.9') or (4.9) or (4, 9)
RHEL5 GA: redhat_release('5') or (5.0) or (5, 0) or (5)
RHEL6.6: redhat_release('6.6') or (6.6) or (6, 6)
RHEL7.1: redhat_release('7.1') or (7.1) or (7, 1)
Limitation with float args: (x.10) will be parsed as minor = 1
"""
if isinstance(major, str) and "." in major:
major, minor = major.split(".")
elif isinstance(major, float):
major, minor = str(major).split(".")
elif isinstance(major, int):
major = str(major)
if isinstance(minor, int):
minor = str(minor)
if major == "4":
if minor:
minor = "" if minor == "0" else " Update %s" % minor
return "Red Hat Enterprise Linux AS release %s (Nahant%s)" % (major, minor)
if major == "8":
if not minor:
minor = "0"
return "Red Hat Enterprise Linux release %s.%s Beta (Ootpa)" % (major, minor)
template = "Red Hat Enterprise Linux Server release %s%s (%s)"
if major == "5":
if minor:
minor = "" if minor == "0" else "." + minor
return template % (major, minor, "Tikanga")
elif major == "6" or major == "7":
if not minor:
minor = "0"
name = "Santiago" if major == "6" else "Maipo"
return template % (major, "." + minor, name)
else:
raise Exception("invalid major version: %s" % major)
[docs]
def archive_provider(component, test_func=deep_compare, stride=1):
"""
Decorator used to register generator functions that yield InputData and
expected response tuples. These generators will be consumed by py.test
such that:
- Each InputData will be passed into an integrate() function
- The result will be compared [1] against the expected value from the
tuple.
Parameters
----------
component: (str)
The component to be tested.
test_func: function
A custom comparison function with the parameters (result, expected).
This will override the use of the compare() [1] function.
stride: int
yield every `stride` InputData object rather than the full set. This
is used to provide a faster execution path in some test setups.
[1] insights.tests.deep_compare()
"""
def _wrap(func):
@wraps(func)
def __wrap(stride=stride):
for input_data, expected in itertools.islice(func(), None, None, stride):
yield component, test_func, input_data, expected
__wrap.stride = stride
ARCHIVE_GENERATORS.append(__wrap)
return __wrap
return _wrap