"""
InstalledRpms - Command ``rpm -qa``
===================================
The ``InstalledRpms`` class parses the output of the ``rpm -qa`` command.
Each line is parsed and stored in an ``InstalledRpm`` object. The ``rpm -qa``
command may output data in different formats and each format can be
handled by the parsing routines of this class. The basic format of command is
the package and is shown in the Examples.
Sample input data::
a52dec-0.7.4-18.el7.nux.x86_64 Tue 14 Jul 2015 09:25:38 AEST 1398536494
aalib-libs-1.4.0-0.22.rc5.el7.x86_64 Tue 14 Jul 2015 09:25:40 AEST 1390535634
abrt-2.1.11-35.el7.x86_64 Wed 09 Nov 2016 14:52:01 AEDT 1446193355
...
kernel-3.10.0-230.el7synaptics.1186112.1186106.2.x86_64 Wed 20 May 2015 11:24:00 AEST 1425955944
kernel-3.10.0-267.el7.x86_64 Sat 24 Oct 2015 09:56:17 AEDT 1434466402
kernel-3.10.0-327.36.3.el7.x86_64 Wed 09 Nov 2016 14:53:25 AEDT 1476954923
kernel-headers-3.10.0-327.36.3.el7.x86_64 Wed 09 Nov 2016 14:20:59 AEDT 1476954923
kernel-tools-3.10.0-327.36.3.el7.x86_64 Wed 09 Nov 2016 15:09:42 AEDT 1476954923
kernel-tools-libs-3.10.0-327.36.3.el7.x86_64 Wed 09 Nov 2016 14:52:13 AEDT 1476954923
kexec-tools-2.0.7-38.el7_2.1.x86_64 Wed 09 Nov 2016 14:48:21 AEDT 1452845178
...
zlib-1.2.7-15.el7.x86_64 Wed 09 Nov 2016 14:21:19 AEDT 1431443476
zsh-5.0.2-14.el7_2.2.x86_64 Wed 09 Nov 2016 15:13:19 AEDT 1464185248
Examples:
>>> type(rpms)
<class 'insights.parsers.installed_rpms.InstalledRpms'>
>>> 'openjpeg-libs' in rpms
True
>>> rpms.corrupt
False
>>> rpms.get_max('openjpeg-libs')
0:openjpeg-libs-1.3-9.el6_3
>>> type(rpms.get_max('openjpeg-libs'))
<class 'insights.parsers.installed_rpms.InstalledRpm'>
>>> rpms.get_min('openjpeg-libs')
0:openjpeg-libs-1.3-9.el6_3
>>> rpm = rpms.get_max('openssh-server')
>>> rpm
0:openssh-server-5.3p1-104.el6
>>> type(rpm)
<class 'insights.parsers.installed_rpms.InstalledRpm'>
>>> rpm.package
'openssh-server-5.3p1-104.el6'
>>> rpm.nvr
'openssh-server-5.3p1-104.el6'
>>> rpm.source
>>> rpm.name
'openssh-server'
>>> rpm.version
'5.3p1'
>>> rpm.release
'104.el6'
>>> rpm.arch
'x86_64'
>>> rpm.epoch
'0'
>>> from insights.parsers.installed_rpms import InstalledRpm
>>> rpm2 = InstalledRpm.from_package('openssh-server-6.0-100.el6.x86_64')
>>> rpm == rpm2
False
>>> rpm > rpm2
False
>>> rpm < rpm2
True
"""
import json
import re
from collections import defaultdict
import six
import warnings
from ..util import rsplit
from .. import parser, get_active_lines, CommandParser
from .rpm_vercmp import rpm_version_compare
from insights.specs import Specs
# This list of architectures is taken from PDC (Product Definition Center):
# https://pdc.fedoraproject.org/rest_api/v1/arches/
KNOWN_ARCHITECTURES = [
# Common architectures
'x86_64',
'i386',
'i486',
'i586',
'i686',
'src',
'ia64',
'ppc',
'ppc64',
's390',
's390x',
'amd64',
'(none)',
'noarch',
# Less common
'alpha',
'alphaev4',
'alphaev45',
'alphaev5',
'alphaev56',
'alphaev6',
'alphaev67',
'alphaev68',
'alphaev7',
'alphapca56',
'arm64',
'armv5tejl',
'armv5tel',
'armv6l',
'armv7hl',
'armv7hnl',
'armv7l',
'athlon',
'armhfp',
'geode',
'ia32e',
'nosrc',
'ppc64iseries',
'ppc64le',
'ppc64p7',
'ppc64pseries',
'sh3',
'sh4',
'sh4a',
'sparc',
'sparc64',
'sparc64v',
'sparcv8',
'sparcv9',
'sparcv9v',
'aarch64',
]
"""list: List of recognized architectures.
This list is taken from the PDC (Product Definition Center) available
here https://pdc.fedoraproject.org/rest_api/v1/arches/.
"""
[docs]class RpmList(object):
"""
Mixin class providing ``__contains__``, ``get_max``, ``get_min``,
``newest``, and ``oldest`` implementations for components that handle rpms.
"""
def __contains__(self, package_name):
"""
Checks if package name is in list of installed RPMs.
Args:
package_name (str): RPM package name such as 'bash'
Returns:
bool: True if package name is in list of installed packages, otherwise False
"""
return package_name in self.packages
[docs] def get_max(self, package_name):
"""
Returns the highest version of the installed package with the given name.
Args:
package_name (str): Installed RPM package name such as 'bash'
Returns:
InstalledRpm: Installed RPM with highest version
"""
if package_name not in self.packages:
return None
else:
return max(self.packages[package_name])
[docs] def get_min(self, package_name):
"""
Returns the lowest version of the installed package with the given name.
Args:
package_name (str): Installed RPM package name such as 'bash'.
Returns:
InstalledRpm: Installed RPM with lowest version
"""
if package_name not in self.packages:
return None
else:
return min(self.packages[package_name])
@property
def is_hypervisor(self):
"""
.. warning::
This method is deprecated, please use
:py:class:`insights.parsers.virt_what.VirtWhat` which uses the command `virt-what` to check the hypervisor type.
bool: True if ".el[6|7]ev" exists in "vdsm".release, else False.
"""
warnings.warn("`is_hypervisor` is deprecated: Use `virt_what.VirtWhat` which uses the command `virt-what` to check the hypervisor type.", DeprecationWarning)
rpm = self.get_max("vdsm")
return (True if rpm and rpm.release.endswith((".el6ev", ".el7ev")) else
False)
# re-export get_max/min with more descriptive names
newest = get_max
oldest = get_min
[docs]@parser(Specs.installed_rpms)
class InstalledRpms(CommandParser, RpmList):
"""
A parser for working with data containing a list of installed RPM files on the system and
related information.
"""
def __init__(self, *args, **kwargs):
self.errors = []
"""list: List of input lines that indicate an error acquiring the data on the client."""
self.unparsed = []
"""list: List of input lines that raised an exception during parsing."""
self.packages = defaultdict(list)
"""dict (InstalledRpm): Dictionary of RPMs keyed by package name."""
super(InstalledRpms, self).__init__(*args, **kwargs)
[docs] def parse_content(self, content):
for line in get_active_lines(content, comment_char='COMMAND>'):
if line.startswith('error:') or line.startswith('warning:'):
self.errors.append(line)
else:
try:
# Try to parse from JSON input
rpm = InstalledRpm.from_json(line)
self.packages[rpm.name].append(rpm)
except Exception:
# If that fails, try to parse from line input
if line.strip():
try:
rpm = InstalledRpm.from_line(line)
self.packages[rpm.name].append(rpm)
except Exception:
# Both ways failed
self.unparsed.append(line)
# Don't want defaultdict's behavior after parsing is complete
self.packages = dict(self.packages)
@property
def corrupt(self):
"""bool: True if RPM database is corrupted, else False."""
return any('rpmdbNextIterator' in s for s in self.errors)
p = re.compile(r"(\d+|[a-z]+|\.|-|_)")
def _int_or_str(c):
try:
return int(c)
except ValueError:
return c
def vcmp(s):
return [_int_or_str(c) for c in p.split(s) if c and c not in (".", "_", "-")]
[docs]def pad_version(left, right):
"""Returns two sequences of the same length so that they can be compared.
The shorter of the two arguments is lengthened by inserting extra zeros
before non-integer components. The algorithm attempts to align character
components."""
pair = vcmp(left), vcmp(right)
mn, mx = min(pair, key=len), max(pair, key=len)
for idx, c in enumerate(mx):
try:
a = mx[idx]
b = mn[idx]
if type(a) != type(b):
mn.insert(idx, 0)
except IndexError:
if type(c) is int:
mn.append(0)
elif isinstance(c, six.string_types):
mn.append('')
else:
raise Exception("pad_version failed (%s) (%s)" % (left, right))
return pair
[docs]class InstalledRpm(object):
"""
Class for holding information about one installed RPM.
This class is usually created from dictionary with following structure::
{
'name': 'package name',
'version': 'package version',
'release': 'package release',
'arch': 'package architecture'
}
It may also contain supplementary information from SOS report or epoch
information from JSON.
Factory methods are provided such as ``from_package`` to create an object from a
short package string::
kernel-devel-3.10.0-327.36.1.el7.x86_64
``from_json`` to create an object from JSON::
{"name": "kernel-devel",
"version": "3.10.0",
"release": "327.36.1.el7",
"arch": "x86_64"}
and ``from_line`` to create an object from a long package string:
.. code:: python
('kernel-devel-3.10.0-327.36.1.el7.x86_64'
' '
'Wed May 18 14:16:21 2016' '\t'
'1410968065' '\t'
'Red Hat, Inc.' '\t'
'hs20-bc2-4.build.redhat.com' '\t'
'8902150305004...b3576ff37da7e12e2285358267495ac48a437d4eefb3213' '\t'
'RSA/8, Mon Aug 16 11:14:17 2010, Key ID 199e2f91fd431d51')
"""
PRODUCT_SIGNING_KEYS = [
'F76F66C3D4082792', '199e2f91fd431d51', '5326810137017186',
'45689c882fa658e0', '219180cddb42a60e', '7514f77d8366b0d9',
'fd372689897da07a', '938a80caf21541eb',
'08b871e6a5787476',
'E191DDB2C509E861'
]
"""
list: List of package-signing keys. Should be updated timely according to
https://access.redhat.com/security/team/key/
"""
SOSREPORT_KEYS = [
'installtime', 'buildtime', 'vendor', 'buildserver', 'pgpsig', 'pgpsig_short'
]
"""list: List of keys for SOS Report RPM information."""
def __init__(self, data):
self.name = None
"""str: RPM package name."""
self.version = None
"""str: RPM package version."""
self.release = None
"""str: RPM package release."""
self.arch = None
"""str: RPM package architecture."""
self.redhat_signed = None
"""bool: True when RPM package is signed by Red Hat, False when RPM package is not signed by Red Hat,
None when no sufficient info to determine"""
if isinstance(data, six.string_types):
data = self._parse_package(data)
for k, v in data.items():
setattr(self, k, v)
self.epoch = data['epoch'] if 'epoch' in data and data['epoch'] != '(none)' else '0'
_gpg_key_pos = data.get('sigpgp', data.get('rsaheader', data.get('pgpsig_short', data.get('pgpsig', data.get('vendor', '')))))
if _gpg_key_pos:
self.redhat_signed = any(key in _gpg_key_pos for key in self.PRODUCT_SIGNING_KEYS)
[docs] @classmethod
def from_package(cls, package_string):
"""
The object of this class is usually created from dictionary. Alternatively it can be
created from package string.
Args:
package_string (str): package string in the following format (shown as Python string)::
'kernel-devel-3.10.0-327.36.1.el7.x86_64'
"""
return cls(cls._parse_package(package_string))
[docs] @classmethod
def from_json(cls, json_line):
"""
The object of this class is usually created from dictionary. Alternatively it can be
created from JSON line.
Args:
json_line (str): JSON string in the following format (shown as Python string)::
'{"name": "kernel-devel", "version": "3.10.0", "release": "327.36.1.el7", "arch": "x86_64"}'
"""
return cls(json.loads(json_line))
[docs] @classmethod
def from_line(cls, line):
"""
The object of this class is usually created from dictionary. Alternatively it can be
created from package line.
Args:
line (str): package line in the following format (shown as Python string):
.. code-block:: python
('kernel-devel-3.10.0-327.36.1.el7.x86_64'
' '
'Wed May 18 14:16:21 2016' '\t'
'1410968065' '\t'
'Red Hat, Inc.' '\t'
'hs20-bc2-4.build.redhat.com' '\t'
'8902150305004...b3576ff37da7e12e2285358267495ac48a437d4eefb3213' '\t'
'RSA/8, Mon Aug 16 11:14:17 2010, Key ID 199e2f91fd431d51')
"""
return cls(cls._parse_line(line))
@staticmethod
def _arch_sep(package_string):
"""
Helper method for finding if arch separator is '.' or '-'
Args:
package_string (str): dash separated package string such as 'bash-4.2.39-3.el7'.
Returns:
str: arch separator
"""
return '.' if package_string.rfind('.') > package_string.rfind('-') else '-'
@classmethod
def _parse_package(cls, package_string):
"""
Helper method for parsing package string.
Args:
package_string (str): dash separated package string such as 'bash-4.2.39-3.el7'
Returns:
dict: dictionary containing 'name', 'version', 'release' and 'arch' keys
"""
pkg, arch = rsplit(package_string, cls._arch_sep(package_string))
if arch not in KNOWN_ARCHITECTURES:
pkg, arch = (package_string, None)
pkg, release = rsplit(pkg, '-')
name, version = rsplit(pkg, '-')
epoch, version = version.split(':', 1) if ":" in version else ['0', version]
# oracleasm packages have a dash in their version string, fix that
if name.startswith('oracleasm') and name.endswith('.el5'):
name, version2 = name.split('-', 1)
version = version2 + '-' + version
return {
'name': name,
'version': version,
'release': release,
'arch': arch,
'epoch': epoch
}
@classmethod
def _parse_line(cls, line):
"""
Helper method for parsing package line with or without SOS report information.
Args:
line (str): package line with or without SOS report information
Returns:
dict: dictionary containing 'name', 'version', 'release' and 'arch' keys plus
additionally 'installtime', 'buildtime', 'vendor', 'buildserver', 'pgpsig',
'pgpsig_short' if these are present.
"""
try:
pkg, rest = line.split(None, 1)
except ValueError:
rpm = cls._parse_package(line.strip())
return rpm
rpm = cls._parse_package(pkg)
rest = rest.split('\t')
for i, value in enumerate(rest):
rpm[cls.SOSREPORT_KEYS[i]] = value
return rpm
@property
def package(self):
"""str: Package `name-version-release` string."""
return u'{0}-{1}-{2}'.format(self.name,
self.version,
self.release)
@property
def package_with_epoch(self):
"""
str: Package string in the format::
name-epoch:version-release
"""
return u'{0}-{1}:{2}-{3}'.format(self.name,
self.epoch,
self.version,
self.release)
@property
def nvr(self):
"""str: Package `name-version-release` string."""
return self.package
@property
def nvra(self):
"""str: Package `name-version-release.arch` string."""
return ".".join([self.package, self.arch])
@property
def nevra(self):
"""
str: Package string in the format::
name-epoch:version-release.arch
"""
return ".".join([self.package_with_epoch, self.arch])
@property
def source(self):
"""InstalledRpm: Returns source RPM of this RPM object."""
if hasattr(self, 'srpm'):
rpm = self.from_package(self.srpm)
# Source RPMs don't have epoch for some reason
rpm.epoch = self.epoch
return rpm
def __getitem__(self, item):
"""
Allows to use `rpm["element"]` instead of `rpm.element`. Dot notation should be preferred,
however it is especially useful for values containing dash, such as "pgpsig_short".
"""
return getattr(self, item)
def __str__(self):
return '{0}:{1}'.format(self.epoch, self.package)
def __unicode__(self):
return str(self)
def __repr__(self):
return str(self)
def __eq__(self, other):
if not isinstance(other, InstalledRpm):
return False
if self.name != other.name:
raise ValueError('Cannot compare packages with differing names {0} != {1}'
.format(self.name, other.name))
return rpm_version_compare(self, other) == 0
def __lt__(self, other):
if not isinstance(other, InstalledRpm):
return False
if self == other:
return False
return rpm_version_compare(self, other) < 0
def __ne__(self, other):
return not self == other
def __gt__(self, other):
return isinstance(other, InstalledRpm) and other.__lt__(self)
def __ge__(self, other):
return isinstance(other, InstalledRpm) and not self.__lt__(other)
def __le__(self, other):
return isinstance(other, InstalledRpm) and not other.__lt__(self)
def __hash__(self):
# Python 3 requires hash implementation to have hashable object.
try:
# Just NVR is not enouch for uniqueness. Try NVRA first.
value = self.nvra
except TypeError:
value = self.nvr
return hash(value)
# re-exports
from_package = InstalledRpm.from_package
Rpm = InstalledRpm
Installed = InstalledRpms