"""
Custom datasource compliance
============================
Only when option `--compliance` is specified, the insights collection will
collect data for compliance. When it is specified, only the Insights Spec
:py:attr:`insights.specs.default.DefaultSpecs.compliance` will be
collected. To collect other specs within the collection of compliance data,
add them to the "persist" section in the "compliance_manifest"
pre-defined in the :py:mod:`insights.specs.datasources.manifests`
"""
import logging
import os
import pkgutil
import sys
import yaml
import json
from tempfile import NamedTemporaryFile
from traceback import format_exc
from insights.core import ET
from insights.core.context import HostContext
from insights.core.exceptions import SkipComponent
from insights.core.plugins import datasource
from insights.core.spec_factory import DatasourceProvider
from insights.parsers.installed_rpms import InstalledRpms
from insights.parsers.os_release import OsRelease
from insights.parsers.redhat_release import RedhatRelease
from insights.specs.datasources.compliance import (
ComplianceClient,
SSG_PACKAGE,
REQUIRED_PACKAGES,
constants,
)
logger = logging.getLogger(__name__)
[docs]
@datasource(HostContext)
def compliance_enabled(broker):
insights_config = broker.get('client_config')
# Run only if `--compliance` option is enabled to insights-client
if insights_config and hasattr(insights_config, 'compliance') and insights_config.compliance:
return True
raise SkipComponent("Collect compliance data only --compliance is specified")
[docs]
@datasource(HostContext)
def compliance_policies_enabled(broker):
insights_config = broker.get('client_config')
# Run only if `--compliance-policies` option is enabled to insights-client
if (
insights_config
and hasattr(insights_config, 'compliance_policies')
and insights_config.compliance_policies
):
return True
raise SkipComponent("Run only when --compliance-policies is specified")
[docs]
@datasource(HostContext)
def compliance_assign_enabled(broker):
insights_config = broker.get('client_config')
# Run only if `--compliance-assign` option is enabled to insights-client
if (
insights_config
and hasattr(insights_config, 'compliance_assign')
and insights_config.compliance_assign
):
return True
raise SkipComponent("Run only when --compliance-assign is specified")
[docs]
@datasource(HostContext)
def compliance_unassign_enabled(broker):
insights_config = broker.get('client_config')
# Run only if `--compliance-unassign` option is enabled to insights-client
if (
insights_config
and hasattr(insights_config, 'compliance_unassign')
and insights_config.compliance_unassign
):
return True
raise SkipComponent("Run only when --compliance-unassign is specified")
[docs]
@datasource(
HostContext,
[OsRelease, RedhatRelease],
optional=[
compliance_assign_enabled,
compliance_enabled,
compliance_policies_enabled,
compliance_unassign_enabled,
],
)
def os_version(broker):
os_release = None
if OsRelease in broker:
os_release = broker[OsRelease].get("VERSION_ID")
if not os_release and RedhatRelease in broker:
os_release = broker[RedhatRelease].version
if os_release:
return os_release.split('.')
if any(
com in broker
for com in [
compliance_assign_enabled,
compliance_enabled,
compliance_policies_enabled,
compliance_unassign_enabled,
]
):
sys.exit(constants.sig_kill_bad)
raise SkipComponent
[docs]
@datasource(
HostContext,
InstalledRpms,
optional=[
compliance_assign_enabled,
compliance_enabled,
compliance_policies_enabled,
compliance_unassign_enabled,
],
)
def package_check(broker):
rpms = broker[InstalledRpms]
missed = [rpm for rpm in REQUIRED_PACKAGES if rpm not in rpms]
if missed:
if any(
com in broker
for com in [
compliance_assign_enabled,
compliance_enabled,
compliance_policies_enabled,
compliance_unassign_enabled,
]
):
msg = 'Missing required packages for compliance scanning. Please ensure the following packages are installed: {0}\n'.format(
', '.join(missed)
)
logger.error(msg)
sys.exit(constants.sig_kill_bad)
raise SkipComponent
return rpms.newest(SSG_PACKAGE).version
@datasource(compliance_enabled, os_version, package_check, HostContext, timeout=0)
def compliance(broker):
"""
Collect compliance data when '--compliance' is specified.
"""
try:
insights_config = broker.get('client_config')
compliance = ComplianceClient(
os_version=broker[os_version], ssg_version=broker[package_check], config=insights_config
)
# Preparations
policies = compliance.get_system_policies()
if not policies:
logger.error(
"System is not associated with any policies. Assign policies using the Compliance web UI.\n"
)
sys.exit(constants.sig_kill_bad)
results_need_repair = compliance.results_need_repair()
compliance_result = list()
# OSCAP scan
for policy in policies:
profile_ref_id = policy['ref_id']
file_name = 'oscap_results-{0}'.format(profile_ref_id)
results_file = NamedTemporaryFile(prefix=file_name, suffix='.xml', delete=True)
tailoring_file = compliance.download_tailoring_file(policy)
compliance.run_scan(
profile_ref_id,
compliance.find_scap_policy(profile_ref_id),
results_file.name,
tailoring_file_path=tailoring_file,
)
# TODO: align with core collection
obfuscation_list = insights_config.obfuscation_list
if obfuscation_list:
tree = ET.parse(results_file.name)
# Retrieve the list of xpaths that need to be obfuscated
xpaths = yaml.load(
pkgutil.get_data('insights', 'compliance_obfuscations.yaml'),
Loader=yaml.SafeLoader,
)
if 'ipv4' in obfuscation_list or 'ipv6' in obfuscation_list:
# Obfuscate IP addresses in the XCCDF report
compliance.obfuscate(tree, xpaths['obfuscate_ip'])
if 'hostname' in obfuscation_list:
# Obfuscate the hostname in the XCCDF report
compliance.obfuscate(tree, xpaths['obfuscate_hostname'])
# Overwrite the results file with the obfuscations
tree.write(results_file.name)
if tailoring_file:
os.remove(tailoring_file)
# Store result as DatasourceProvider
with open(results_file.name, 'r') as f:
content = [l.rstrip("\n") for l in f]
if results_need_repair:
content = compliance.repair_results(results_file.name, content)
compliance_result.append(
# self obfuscation is done above, cleaner is not required
DatasourceProvider(
content=content, # The actual result content here
relative_path='{0}.xml'.format(file_name),
)
)
if compliance_result:
return compliance_result
except Exception as err:
err_msg = "Unexpected exception in compliance: {0}".format(str(err))
logger.error(err_msg)
logger.debug(format_exc())
logger.error("No scan results were produced.")
sys.exit(constants.sig_kill_bad)
@datasource(compliance_policies_enabled, os_version, package_check, HostContext, timeout=0)
def compliance_policies(broker):
"""
Run when '--compliance-policies' is specified.
"""
try:
insights_config = broker.get('client_config')
compliance = ComplianceClient(
os_version=broker[os_version], ssg_version=broker[package_check], config=insights_config
)
# --compliance-policies was called
result = compliance.assignable_policies()
sys.exit(result)
except Exception as err:
err_msg = "Unexpected exception in compliance: {0}".format(str(err))
logger.error(err_msg)
logger.debug(format_exc())
sys.exit(constants.sig_kill_bad)
@datasource(compliance_assign_enabled, os_version, package_check, HostContext, timeout=0)
def compliance_assign(broker):
"""
Run when '--compliance-assign ID' is specified.
"""
try:
insights_config = broker.get('client_config')
compliance = ComplianceClient(
os_version=broker[os_version], ssg_version=broker[package_check], config=insights_config
)
# --compliance-assign was called
result = compliance.policy_link(insights_config.compliance_assign, 'patch')
sys.exit(result)
except Exception as err:
err_msg = "Unexpected exception in compliance: {0}".format(str(err))
logger.error(err_msg)
logger.debug(format_exc())
sys.exit(constants.sig_kill_bad)
@datasource(compliance_unassign_enabled, os_version, package_check, HostContext, timeout=0)
def compliance_unassign(broker):
"""
Run when '--compliance-unassign ID' is specified.
"""
try:
insights_config = broker.get('client_config')
compliance = ComplianceClient(
os_version=broker[os_version], ssg_version=broker[package_check], config=insights_config
)
# --compliance-unassign was called
result = compliance.policy_link(insights_config.compliance_unassign, 'delete')
sys.exit(result)
except Exception as err:
err_msg = "Unexpected exception in compliance: {0}".format(str(err))
logger.error(err_msg)
logger.debug(format_exc())
sys.exit(constants.sig_kill_bad)
@datasource(os_version, package_check, HostContext, timeout=0)
def compliance_advisor_rule_enabled(broker):
try:
result = {}
insights_config = broker.get('client_config')
compliance = ComplianceClient(
os_version=broker[os_version], ssg_version=broker[package_check], config=insights_config
)
policies = compliance.get_system_policies()
if not policies:
raise SkipComponent
result['enabled_policies'] = policies
tailoring_policies = []
for policy in policies:
tailoring_content = compliance.fetch_tailoring_content(policy)
if tailoring_content:
tailoring_policy = dict(ref_id=policy['ref_id'])
xml_root = ET.fromstring(tailoring_content)
pre_tag = xml_root.tag.split("Tailoring")[0]
profile_select_tag = pre_tag + 'Profile/' + pre_tag + 'select'
profile_select_info = xml_root.findall(profile_select_tag)
tailoring_policy['check_items'] = [item.attrib for item in profile_select_info]
tailoring_policies.append(tailoring_policy)
if tailoring_policies:
result['tailoring_policies'] = tailoring_policies
return DatasourceProvider(
content=json.dumps(result),
relative_path='insights_datasources/compliance_enabled_policies',
)
except Exception:
raise SkipComponent