"""
InstalledRpms - Command ``rpm -qa``
===================================
InstalledRpms - command ``rpm -qa``
-----------------------------------
ContainerInstalledRpms - command ``rpm -qa`` for containers
-----------------------------------------------------------
"""
import json
import re
import six
from collections import defaultdict
from insights import ContainerParser, parser, CommandParser
from insights.core.exceptions import SkipComponent
from insights.parsers.rpm_vercmp import rpm_version_compare
from insights.specs import Specs
from insights.util import rsplit
# This list of architectures is taken from PDC (Product Definition Center):
# https://pdc.fedoraproject.org/rest_api/v1/arches/
KNOWN_ARCHITECTURES = [
# Common architectures
'x86_64',
'i386',
'i486',
'i586',
'i686',
'src',
'ia64',
'ppc',
'ppc64',
's390',
's390x',
'amd64',
'(none)',
'noarch',
# Less common
'alpha',
'alphaev4',
'alphaev45',
'alphaev5',
'alphaev56',
'alphaev6',
'alphaev67',
'alphaev68',
'alphaev7',
'alphapca56',
'arm64',
'armv5tejl',
'armv5tel',
'armv6l',
'armv7hl',
'armv7hnl',
'armv7l',
'athlon',
'armhfp',
'geode',
'ia32e',
'nosrc',
'ppc64iseries',
'ppc64le',
'ppc64p7',
'ppc64pseries',
'sh3',
'sh4',
'sh4a',
'sparc',
'sparc64',
'sparc64v',
'sparcv8',
'sparcv9',
'sparcv9v',
'aarch64',
]
"""list: List of recognized architectures.
This list is taken from the PDC (Product Definition Center) available
here https://pdc.fedoraproject.org/rest_api/v1/arches/.
"""
[docs]
class RpmList(object):
"""
Mixin class providing ``__contains__``, ``get_max``, ``get_min``,
``newest``, and ``oldest`` implementations for components that handle rpms.
"""
def __contains__(self, package_name):
"""
Checks if package name is in list of installed RPMs.
.. note::
The :attr:`packages` could be empty, e.g. when rpm database corrupt.
When doing exclusion check, make sure the ``packages`` is NOT
empty, e.g.::
if rpms.packages and "pkg_name" not in rpms:
pass
Args:
package_name (str): RPM package name such as 'bash'
Returns:
bool: True if package name is in list of installed packages, otherwise False
"""
return package_name in self.packages
[docs]
def get_max(self, package_name):
"""
Returns the highest version of the installed package with the given name.
Args:
package_name (str): Installed RPM package name such as 'bash'
Returns:
InstalledRpm: Installed RPM with highest version
"""
if package_name not in self.packages:
return None
else:
return max(self.packages[package_name])
[docs]
def get_min(self, package_name):
"""
Returns the lowest version of the installed package with the given name.
Args:
package_name (str): Installed RPM package name such as 'bash'.
Returns:
InstalledRpm: Installed RPM with lowest version
"""
if package_name not in self.packages:
return None
else:
return min(self.packages[package_name])
# re-export get_max/min with more descriptive names
newest = get_max
oldest = get_min
[docs]
@parser(Specs.installed_rpms)
class InstalledRpms(CommandParser, RpmList):
"""
The ``InstalledRpms`` class parses the output of the ``rpm -qa`` command.
Each line is parsed and stored in an ``InstalledRpm`` object. The ``rpm -qa``
command may output data in different formats and each format can be
handled by the parsing routines of this class. The basic format of command is
the package and is shown in the Examples.
A parser for working with data containing a list of installed RPM files on the system
and related information.
Sample input data::
a52dec-0.7.4-18.el7.nux.x86_64 Tue 14 Jul 2015 09:25:38 AEST 1398536494
aalib-libs-1.4.0-0.22.rc5.el7.x86_64 Tue 14 Jul 2015 09:25:40 AEST 1390535634
abrt-2.1.11-35.el7.x86_64 Wed 09 Nov 2016 14:52:01 AEDT 1446193355
...
kernel-3.10.0-267.el7.x86_64 Sat 24 Oct 2015 09:56:17 AEDT 1434466402
kernel-3.10.0-327.36.3.el7.x86_64 Wed 09 Nov 2016 14:53:25 AEDT 1476954923
kernel-headers-3.10.0-327.36.3.el7.x86_64 Wed 09 Nov 2016 14:20:59 AEDT 1476954923
kernel-tools-3.10.0-327.36.3.el7.x86_64 Wed 09 Nov 2016 15:09:42 AEDT 1476954923
kernel-tools-libs-3.10.0-327.36.3.el7.x86_64 Wed 09 Nov 2016 14:52:13 AEDT 1476954923
kexec-tools-2.0.7-38.el7_2.1.x86_64 Wed 09 Nov 2016 14:48:21 AEDT 1452845178
...
zlib-1.2.7-15.el7.x86_64 Wed 09 Nov 2016 14:21:19 AEDT 1431443476
zsh-5.0.2-14.el7_2.2.x86_64 Wed 09 Nov 2016 15:13:19 AEDT 1464185248
Examples:
>>> type(rpms)
<class 'insights.parsers.installed_rpms.InstalledRpms'>
>>> 'kernel' in rpms
True
>>> rpms.corrupt
False
>>> rpms.get_max('kernel')
0:kernel-3.10.0-327.36.3.el7
>>> type(rpms.get_max('kernel'))
<class 'insights.parsers.installed_rpms.InstalledRpm'>
>>> rpms.get_min('kernel')
0:kernel-3.10.0-267.el7
>>> rpm = rpms.get_max('kernel')
>>> rpm
0:kernel-3.10.0-327.36.3.el7
>>> type(rpm)
<class 'insights.parsers.installed_rpms.InstalledRpm'>
>>> rpm.package == 'kernel-3.10.0-327.36.3.el7'
True
>>> rpm.nvr == 'kernel-3.10.0-327.36.3.el7'
True
>>> rpm.source
>>> rpm.name
'kernel'
>>> rpm.version
'3.10.0'
>>> rpm.release
'327.36.3.el7'
>>> rpm.arch
'x86_64'
>>> rpm.epoch
'0'
>>> from insights.parsers.installed_rpms import InstalledRpm
>>> rpm2 = InstalledRpm.from_package('kernel-3.10.0-267.el7')
>>> rpm == rpm2
False
>>> rpm > rpm2
True
>>> rpm < rpm2
False
"""
def __init__(self, *args, **kwargs):
self.errors = list()
"""list: List of input lines that indicate an error acquiring the data on the client."""
self.unparsed = list()
"""list: List of input lines that raised an exception during parsing."""
self.packages = dict()
"""
dict (InstalledRpm): Dictionary of RPMs keyed by package name.
.. note::
The ``packages`` could be empty, e.g. when rpm database corrupt.
When doing exclusion check, make sure the ``packages`` is NOT
empty, e.g.::
>>> if rpms.packages and "pkg_name" not in rpms.packages:
>>> pass
"""
super(InstalledRpms, self).__init__(*args, **kwargs)
[docs]
def parse_content(self, content):
if content and (not content[0].strip() or "COMMAND>" in content[0]):
content = content[1:]
if not content:
raise SkipComponent("The content of rpm command is empty!")
parse_func = InstalledRpm.from_json if any(
'"name":' in _l for _l in content) else InstalledRpm.from_line
packages = defaultdict(list)
for line in content:
if not line.strip():
continue
if line.startswith('error:') or line.startswith('warning:'):
self.errors.append(line)
else:
try:
rpm = parse_func(line)
packages[rpm.name].append(rpm)
except Exception:
self.unparsed.append(line)
self.packages = dict(packages)
@property
def corrupt(self):
"""bool: True if RPM database is corrupted, else False."""
_corrupts = [
'error: rpmdbNextIterator',
'error: rpmdb: BDB0113',
'error: db5 error',
]
return any(c in s for s in self.errors for c in _corrupts)
p = re.compile(r"(\d+|[a-z]+|\.|-|_)")
def _int_or_str(c):
try:
return int(c)
except ValueError:
return c
def vcmp(s):
return [_int_or_str(c) for c in p.split(s) if c and c not in (".", "_", "-")]
[docs]
def pad_version(left, right):
"""Returns two sequences of the same length so that they can be compared.
The shorter of the two arguments is lengthened by inserting extra zeros
before non-integer components. The algorithm attempts to align character
components."""
pair = vcmp(left), vcmp(right)
mn, mx = min(pair, key=len), max(pair, key=len)
for idx, c in enumerate(mx):
try:
a = mx[idx]
b = mn[idx]
if type(a) is not type(b):
mn.insert(idx, 0)
except IndexError:
if type(c) is int:
mn.append(0)
elif isinstance(c, six.string_types):
mn.append('')
else:
raise Exception("pad_version failed (%s) (%s)" % (left, right))
return pair
[docs]
class InstalledRpm(object):
"""
Class for holding information about one installed RPM.
This class is usually created from dictionary with following structure::
{
'name': 'package name',
'version': 'package version',
'release': 'package release',
'arch': 'package architecture'
}
It may also contain supplementary information from SOS report or epoch
information from JSON.
When comparing rpms whose epoch is not ``null``, it is necessary to create
InstalledRpm object with epoch information like following example::
InstalledRpm.from_json('{"name":"microcode_ctl","epoch":"4","version":"20200609","release":"2.20201027.1.el8_3"}'
Factory methods are provided such as ``from_package`` to create an object from a
short package string::
kernel-devel-3.10.0-327.36.1.el7.x86_64
``from_json`` to create an object from JSON::
{"name": "kernel-devel",
"version": "3.10.0",
"release": "327.36.1.el7",
"arch": "x86_64"}
and ``from_line`` to create an object from a long package string:
.. code:: python
('kernel-devel-3.10.0-327.36.1.el7.x86_64'
' '
'Wed May 18 14:16:21 2016' '\t'
'1410968065' '\t'
'Red Hat, Inc.' '\t'
'hs20-bc2-4.build.redhat.com' '\t'
'8902150305004...b3576ff37da7e12e2285358267495ac48a437d4eefb3213' '\t'
'RSA/8, Mon Aug 16 11:14:17 2010, Key ID 199e2f91fd431d51')
"""
PRODUCT_SIGNING_KEYS = [
# NOTE: All In lower cases
# RELEASE PACKAGE SIGNING
'199e2f91fd431d51', '1ac4971355a34a82', '5054e4a45a6340b3',
'e1a4bd708a828aad', 'f76f66c3d4082792', '5326810137017186',
'45689c882fa658e0', '219180cddb42a60e', '7514f77d8366b0d9',
'08dd962c1c711042',
# BETA PACKAGE SIGNING
'fd372689897da07a', '938a80caf21541eb'
# DEVELOPMENT PACKAGE SIGNING
'08b871e6a5787476',
# OTHER PRODUCTS
'e191ddb2c509e861',
# CERTIFICATES
'66e8f8a29c65f85c', '680b9144769a9f8f', '8ed29db42a2898c8'
]
"""
list: List of package-signing keys in all lower cases. Should be updated
timely according to https://access.redhat.com/security/team/key/
"""
SOSREPORT_KEYS = [
'installtime', 'buildtime', 'vendor', 'buildserver', 'pgpsig', 'pgpsig_short'
]
"""list: List of keys for SOS Report RPM information."""
def __init__(self, data):
self.name = None
"""str: RPM package name."""
self.version = None
"""str: RPM package version."""
self.release = None
"""str: RPM package release."""
self.arch = None
"""str: RPM package architecture."""
self.redhat_signed = None
"""bool: True when RPM package is signed by Red Hat, False when RPM package is not signed by Red Hat,
None when no sufficient info to determine"""
self.vendor = None
"""str: RPM package vendor. `None` when no 'vendor' info"""
if isinstance(data, six.string_types):
data = self._parse_package(data)
for k, v in data.items():
setattr(self, k, v)
self.epoch = data['epoch'] if 'epoch' in data and data['epoch'] != '(none)' else '0'
self.vendor = data['vendor'] if 'vendor' in data else None
_gpg_key_pos = data.get('sigpgp', data.get('rsaheader', data.get('pgpsig_short', data.get('pgpsig', data.get('vendor', '')))))
if _gpg_key_pos:
self.redhat_signed = any(key in _gpg_key_pos.lower()
for key in self.PRODUCT_SIGNING_KEYS)
[docs]
@classmethod
def from_package(cls, package_string):
"""
The object of this class is usually created from dictionary. Alternatively it can be
created from package string.
Args:
package_string (str): package string in the following format (shown as Python string)::
'kernel-devel-3.10.0-327.36.1.el7.x86_64'
"""
return cls(cls._parse_package(package_string))
[docs]
@classmethod
def from_json(cls, json_line):
"""
The object of this class is usually created from dictionary. Alternatively it can be
created from JSON line.
Args:
json_line (str): JSON string in the following format (shown as Python string)::
'{"name": "kernel-devel", "version": "3.10.0", "release": "327.36.1.el7", "arch": "x86_64"}'
"""
return cls(json.loads(json_line))
[docs]
@classmethod
def from_line(cls, line):
"""
The object of this class is usually created from dictionary. Alternatively it can be
created from package line.
Args:
line (str): package line in the following format (shown as Python string):
.. code-block:: python
('kernel-devel-3.10.0-327.36.1.el7.x86_64'
' '
'Wed May 18 14:16:21 2016' '\t'
'1410968065' '\t'
'Red Hat, Inc.' '\t'
'hs20-bc2-4.build.redhat.com' '\t'
'8902150305004...b3576ff37da7e12e2285358267495ac48a437d4eefb3213' '\t'
'RSA/8, Mon Aug 16 11:14:17 2010, Key ID 199e2f91fd431d51')
"""
return cls(cls._parse_line(line))
@staticmethod
def _arch_sep(package_string):
"""
Helper method for finding if arch separator is '.' or '-'
Args:
package_string (str): dash separated package string such as 'bash-4.2.39-3.el7'.
Returns:
str: arch separator
"""
return '.' if package_string.rfind('.') > package_string.rfind('-') else '-'
@classmethod
def _parse_package(cls, package_string):
"""
Helper method for parsing package string.
Args:
package_string (str): dash separated package string such as 'bash-4.2.39-3.el7'
Returns:
dict: dictionary containing 'name', 'version', 'release' and 'arch' keys
"""
pkg, arch = rsplit(package_string, cls._arch_sep(package_string))
if arch not in KNOWN_ARCHITECTURES:
pkg, arch = (package_string, None)
pkg, release = rsplit(pkg, '-')
name, version = rsplit(pkg, '-')
epoch, version = version.split(':', 1) if ":" in version else ['0', version]
# oracleasm packages have a dash in their version string, fix that
if name.startswith('oracleasm') and name.endswith('.el5'):
name, version2 = name.split('-', 1)
version = version2 + '-' + version
return {
'name': name,
'version': version,
'release': release,
'arch': arch,
'epoch': epoch
}
@classmethod
def _parse_line(cls, line):
"""
Helper method for parsing package line with or without SOS report information.
Args:
line (str): package line with or without SOS report information
Returns:
dict: dictionary containing 'name', 'version', 'release' and 'arch' keys plus
additionally 'installtime', 'buildtime', 'vendor', 'buildserver', 'pgpsig',
'pgpsig_short' if these are present.
"""
try:
pkg, rest = line.split(None, 1)
except ValueError:
rpm = cls._parse_package(line.strip())
return rpm
rpm = cls._parse_package(pkg)
rest = rest.split('\t')
for i, value in enumerate(rest):
rpm[cls.SOSREPORT_KEYS[i]] = value
return rpm
@property
def package(self):
"""str: Package `name-version-release` string."""
return u'{0}-{1}-{2}'.format(self.name,
self.version,
self.release)
@property
def package_with_epoch(self):
"""
str: Package string in the format::
name-epoch:version-release
"""
return u'{0}-{1}:{2}-{3}'.format(self.name,
self.epoch,
self.version,
self.release)
@property
def nvr(self):
"""str: Package `name-version-release` string."""
return self.package
@property
def nvra(self):
"""str: Package `name-version-release.arch` string."""
return ".".join([self.package, self.arch])
@property
def nevra(self):
"""
str: Package string in the format::
name-epoch:version-release.arch
"""
return ".".join([self.package_with_epoch, self.arch])
@property
def source(self):
"""InstalledRpm: Returns source RPM of this RPM object."""
if hasattr(self, 'srpm'):
rpm = self.from_package(self.srpm)
# Source RPMs don't have epoch for some reason
rpm.epoch = self.epoch
return rpm
def __getitem__(self, item):
"""
Allows to use `rpm["element"]` instead of `rpm.element`. Dot notation should be preferred,
however it is especially useful for values containing dash, such as "pgpsig_short".
"""
return getattr(self, item)
def __str__(self):
return '{0}:{1}'.format(self.epoch, self.package)
def __unicode__(self):
return str(self)
def __repr__(self):
return str(self)
def __eq__(self, other):
if not isinstance(other, InstalledRpm):
return False
if self.name != other.name:
raise ValueError('Cannot compare packages with differing names {0} != {1}'
.format(self.name, other.name))
return rpm_version_compare(self, other) == 0
def __lt__(self, other):
if not isinstance(other, InstalledRpm):
return False
if self == other:
return False
return rpm_version_compare(self, other) < 0
def __ne__(self, other):
return not self == other
def __gt__(self, other):
return isinstance(other, InstalledRpm) and other.__lt__(self)
def __ge__(self, other):
return isinstance(other, InstalledRpm) and not self.__lt__(other)
def __le__(self, other):
return isinstance(other, InstalledRpm) and not other.__lt__(self)
def __hash__(self):
# Python 3 requires hash implementation to have hashable object.
try:
# Just NVR is not enouch for uniqueness. Try NVRA first.
value = self.nvra
except TypeError:
value = self.nvr
return hash(value)
# re-exports
from_package = InstalledRpm.from_package
Rpm = InstalledRpm
Installed = InstalledRpms
[docs]
@parser(Specs.container_installed_rpms)
class ContainerInstalledRpms(ContainerParser, InstalledRpms):
"""
Parses the data for list of installed rpms of the running
containers which are based on RHEL images.
Sample output::
a52dec-0.7.4-18.el7.nux.x86_64 Tue 14 Jul 2015 09:25:38 AEST 1398536494
aalib-libs-1.4.0-0.22.rc5.el7.x86_64 Tue 14 Jul 2015 09:25:40 AEST 1390535634
abrt-2.1.11-35.el7.x86_64 Wed 09 Nov 2016 14:52:01 AEDT 1446193355
kernel-3.10.0-267.el7.x86_64 Sat 24 Oct 2015 09:56:17 AEDT 1434466402
kernel-3.10.0-327.36.3.el7.x86_64 Wed 09 Nov 2016 14:53:25 AEDT 1476954923
kernel-headers-3.10.0-327.36.3.el7.x86_64 Wed 09 Nov 2016 14:20:59 AEDT 1476954923
kernel-tools-3.10.0-327.36.3.el7.x86_64 Wed 09 Nov 2016 15:09:42 AEDT 1476954923
kernel-tools-libs-3.10.0-327.36.3.el7.x86_64 Wed 09 Nov 2016 14:52:13 AEDT 1476954923
kexec-tools-2.0.7-38.el7_2.1.x86_64 Wed 09 Nov 2016 14:48:21 AEDT 1452845178
zlib-1.2.7-15.el7.x86_64 Wed 09 Nov 2016 14:21:19 AEDT 1431443476
zsh-5.0.2-14.el7_2.2.x86_64 Wed 09 Nov 2016 15:13:19 AEDT 1464185248
Examples:
>>> type(container_rpms)
<class 'insights.parsers.installed_rpms.ContainerInstalledRpms'>
>>> container_rpms.container_id
'cc2883a1a369'
>>> container_rpms.image
'quay.io/rhel8'
>>> container_rpms.engine
'podman'
>>> container_rpms.get_min('kernel').package == 'kernel-3.10.0-267.el7'
True
>>> container_rpms.get_max("kernel").name
'kernel'
>>> container_rpms.get_max("kernel").version
'3.10.0'
"""
pass