Source code for insights.specs.datasources.rpm

"""
Custom datasource for RPM command
"""

import grp
import pwd
import signal

from collections import defaultdict

from insights.core.context import HostContext
from insights.core.exceptions import SkipComponent
from insights.core.filters import get_filters
from insights.core.plugins import datasource
from insights.core.spec_factory import DatasourceProvider, simple_command
from insights.specs import Specs


def _make_rpm_formatter(fmt=None):
    """function: Returns function that will format output of rpm query command"""
    if fmt is None:
        fmt = [
            '"name":"%{NAME}"',
            '"epoch":"%{EPOCH}"',
            '"version":"%{VERSION}"',
            '"release":"%{RELEASE}"',
            '"arch":"%{ARCH}"',
            '"installtime":"%{INSTALLTIME:date}"',
            '"buildtime":"%{BUILDTIME}"',
            '"vendor":"%{VENDOR}"',
            '"buildhost":"%{BUILDHOST}"',
            '"sigpgp":"%{SIGPGP:pgpsig}"',
        ]
    return r"\{" + ",".join(fmt) + r"\}\n"


_rpm_format = _make_rpm_formatter()
"""Query format for specs `installed_rpms` and `container_installed_rpms`"""


class LocalSpecs(Specs):
    """
    Local spec used only by the rpm_pkgs datasource.
    """

    rpm_args = simple_command(
        'rpm -qa --nosignature --qf="[%{=NAME}; %{=NEVRA}; %{FILENAMES}; %{FILEMODES:perms}; %{FILEUSERNAME}; %{FILEGROUPNAME}; %{=VENDOR}\n]"',
        signum=signal.SIGTERM,
    )


def get_shells():
    """
    Returns all full pathnames of valid login shells without nologins.
    """
    with open("/etc/shells") as file:
        return set(line.strip() for line in file if "nologin" not in line)


def get_users():
    """
    Returns all users with shell specified in get_shells() except for root.
    """
    shells = get_shells()
    users = set()

    for user in pwd.getpwall():
        name = user[0]
        shell = user[6]

        if name == "root" or (shell not in shells):
            continue
        users.add(name)
    return users


def get_groups(users):
    """
    Returns all groups for users specified in get_users().
    Every user has at least one group with its name.
    """
    groups = set()

    for group in grp.getgrall():
        group_name = group[0]
        user_list = group[3]

        if group_name in users:
            groups.add(group_name)
        else:
            for user in user_list:
                if user.strip() in users:
                    groups.add(group_name)
                    break
    return groups


@datasource(LocalSpecs.rpm_args, HostContext)
def pkgs_with_writable_dirs(broker):
    r"""
    Custom datasource for CVE-2021-35937, CVE-2021-35938, and CVE-2021-35939.

    It collects packages from the ``rpm -qa --nosignature --qf="[%{=NAME}; %{=NEVRA}; %{FILENAMES};
    %{FILEMODES:perms}; %{FILEUSERNAME}; %{FILEGROUPNAME}; %{=VENDOR}\n]" command``.

    The output is a sorted list of all packages, which have at least one directory with files
    inside, and this directory is writable by a specific user/group or the others.

    Raises:
        SkipComponent: Raised if no data is available

    Returns:
        List[str]: Sorted list of strings, where every string contains `name`, `nevra`
        and `vendor` for a given package, and the pipe is used as a separator.
        E.g. ["httpd-core|httpd-core-2.4.53-7.el9.x86_64|Red Hat, Inc."]
    """
    content = broker[LocalSpecs.rpm_args].content

    if not content or "command not found" in content[0]:
        raise SkipComponent

    users = get_users()
    groups = get_groups(users)

    dir_package = defaultdict(set)
    dirs = set()

    for line in content:
        pkg_name, nevra, path_name, perms, user, group, vendor = line.split("; ")

        if perms[0] == "d":
            user_w = user in users and perms[2] == "w"
            group_w = group in groups and perms[5] == "w"
            others_w = perms[8] == "w"

            if user_w or group_w or others_w:
                # Stores a writeable directory with its package
                dir_package[path_name].add("{0}|{1}|{2}".format(pkg_name, nevra, vendor))
        else:
            # Stores a file directory
            dirs.add(path_name.rsplit('/', 1)[0])

    # Stores a package if its directory has files inside
    packages = set()
    for dir_path in dir_package:
        if dir_path in dirs:
            packages.update(dir_package[dir_path])

    if packages:
        return DatasourceProvider(
            content=sorted(packages),
            relative_path="insights_datasources/rpm_pkgs",
            ds=Specs.rpm_pkgs,
            ctx=broker.get(HostContext),
            cleaner=broker.get("cleaner"),
        )


[docs] @datasource(HostContext) def rpm_v_pkg_list(broker): """ Custom datasources for ``/bin/rpm -V <package>`` commands Return a list according to the spec filters. """ filters = sorted(get_filters(Specs.rpm_V_package_list)) if filters: return filters raise SkipComponent