Source code for insights.parsers.mount

"""
Mount Entries
=============

Parsers provided in this module includes:

Mount - command ``/bin/mount``
------------------------------

ProcMounts - file ``/proc/mounts``
----------------------------------

The ``Mount`` class implements parsing for the ``mount`` command output which looks like::

    /dev/mapper/rootvg-rootlv on / type ext4 (rw,relatime,barrier=1,data=ordered)
    proc on /proc type proc (rw,nosuid,nodev,noexec,relatime)
    /dev/mapper/HostVG-Config on /etc/shadow type ext4 (rw,noatime,seclabel,stripe=256,data=ordered)
    dev/sr0 on /run/media/root/VMware Tools type iso9660 (ro,nosuid,nodev,relatime,uid=0,gid=0,iocharset=utf8,mode=0400,dmode=0500,uhelper=udisks2) [VMware Tools]

The information is stored as a list of :class:`MountEntry` objects.  Each
:class:`MountEntry` object contains attributes for the following information that
are listed in the same order as in the command output:

 * ``filesystem`` - Name of filesystem or the mounted device
 * ``mount_point`` - Name of mount point for filesystem
 * ``mount_type`` - Name of filesystem type
 * ``mount_options`` -  Mount options as ``MountOpts`` object
 * ``mount_label`` - Optional label of this mount entry, empty string by default
 * ``mount_clause`` - Full raw string from command output

The :class:`MountOpts` class contains the mount options as attributes accessible
via the attribute name as it appears in the command output.  For instance the
options ``(rw,dmode=0500)`` may be accessed as ''mnt_row_info.rw`` with the
value ``True`` and ``mnt_row_info.dmode`` with the value "0500".  The ``in``
operator may be used to determine if an option is present.

MountEntry lines are also available in a ``mounts`` property, keyed on the
mount point.
"""

import os
from insights.specs import Specs
from insights.parsers import optlist_to_dict, keyword_search, ParseException, SkipException
from insights import parser, get_active_lines, CommandParser


class AttributeAsDict(object):
    def __contains__(self, item):
        return item in self.__dict__

    def __getitem__(self, item):
        return self.__dict__[item]

    def get(self, item, default=None):
        return self.__dict__.get(item, default)

    def items(self):
        for k, v in self.__dict__.items():
            yield k, v


[docs]class MountOpts(AttributeAsDict): """ An object representing the mount options found in mount or fstab entry as attributes accessible via the attribute name as it appears in the command output. For instance, the options ``(rw,dmode=0500)`` may be accessed as ``mnt_row_info.rw`` with the value ``True`` and ``mnt_row_info.dmode`` with the value "0500". The ``in`` operator may be used to determine if an option is present. """ def __init__(self, data=None): data = {} if data is None else data for k, v in data.items(): setattr(self, k, v)
[docs]class MountEntry(AttributeAsDict): """ An object representing an mount entry of ``mount`` command or ``/proc/mounts`` file. Each entry contains below fixed attributes: Attributes: filesystem (str): Name of filesystem of mounted device mount_point (str): Name of mount point for filesystem mount_type (str): Name of filesystem type mount_options (MountOpts): Mount options as :class:`MountOpts` mount_label (str): Optional label of this mount entry, an empty string by default mount_clause (str): Full raw string from command output """ def __init__(self, data=None): data = {} if data is None else data for k, v in data.items(): setattr(self, k, v)
[docs]class MountedFileSystems(CommandParser): """ Base Class for :class:`Mount` and :class:`ProcMounts`. Attributes: rows (list): List of :class:`MountEntry` objects for each row of the content. mounts (dict): Dict with the `mount_point` as the key and the :class:`MountEntry` objects as the value. Raises: SkipException: When the file is empty. ParseException: Raised when any problem parsing the command output. """ def __len__(self): return len(self.rows) def __iter__(self): for row in self.rows: yield row def __getitem__(self, idx): if isinstance(idx, int): return self.rows[idx] elif isinstance(idx, str): return self.mounts[idx] else: raise TypeError("Mounts can only be indexed by mount string or line number")
[docs] def parse_content(self, content): # No content found or file is empty if not content: raise SkipException('Empty content') self._parse_mounts(content) if '/' not in self.mounts: raise ParseException("Input for mount must contain '/' mount point.")
[docs] def get_dir(self, path): """ This finds the most specific mount path that contains the given path, by successively removing the directory or file name on the end of the path and seeing if that is a mount point. This will always terminate since / is always a mount point. Strings that are not absolute paths will return None. Arguments: path (str): The path to check. Returns: MountEntry: The mount point that contains the given path. """ while path != '': if path in self.mounts: return self.mounts[path] path = os.path.split(path)[0] return None
[docs] def search(self, **kwargs): """ Returns a list of the mounts (in order) matching the given criteria. Keys are searched for directly - see the :py:func:`insights.parsers.keyword_search` utility function for more details. If no search parameters are given, no rows are returned. Examples: >>> mounts.search(filesystem='proc')[0].mount_clause 'proc on /proc type proc (rw,nosuid,nodev,noexec,relatime)' >>> mounts.search(mount_options__contains='seclabel')[0].mount_clause '/dev/mapper/HostVG-Config on /etc/shadow type ext4 (rw,noatime,seclabel,stripe=256,data=ordered)' Arguments: **kwargs (dict): Dictionary of key-value pairs to search for. Returns: (list): The list of mount points matching the given criteria. """ return keyword_search(self.rows, **kwargs)
[docs]@parser(Specs.mount) class Mount(MountedFileSystems): """ Class of information for all output from ``mount`` command. .. note:: Please refer to its super-class :class:`MountedFileSystems` for more details. The typical output of ``mount`` command looks like:: /dev/mapper/rootvg-rootlv on / type ext4 (rw,relatime,barrier=1,data=ordered) proc on /proc type proc (rw,nosuid,nodev,noexec,relatime) /dev/mapper/HostVG-Config on /etc/shadow type ext4 (rw,noatime,seclabel,stripe=256,data=ordered) dev/sr0 on /run/media/root/VMware Tools type iso9660 (ro,nosuid,nodev,relatime,uid=0,gid=0,iocharset=utf8,mode=0400,dmode=0500,uhelper=udisks2) [VMware Tools] Examples: >>> type(mnt_info) <class 'insights.parsers.mount.Mount'> >>> len(mnt_info) 4 >>> mnt_info[3].filesystem 'dev/sr0' >>> mnt_info[3].mount_label '[VMware Tools]' >>> mnt_info[3].mount_type 'iso9660' >>> 'ro' in mnt_info[3].mount_options True >>> mnt_info['/run/media/root/VMware Tools'].filesystem 'dev/sr0' >>> mnt_info['/run/media/root/VMware Tools'].mount_label '[VMware Tools]' >>> mnt_info['/run/media/root/VMware Tools'].mount_options.ro True """ def _parse_mounts(self, content): self.rows = [] self.mounts = {} for line in get_active_lines(content): mount = {} mount['mount_clause'] = line # Get the mounted filesystem by checking the ' on ' line_sp = _customized_split(line, line, sep=' on ') mount['filesystem'] = line_sp[0] # Get the mounted point by checking the last ' type ' before the last '(' mnt_pt_sp = _customized_split(raw=line, l=line_sp[1], sep=' (', reverse=True) line_sp = _customized_split(raw=line, l=mnt_pt_sp[0], sep=' type ', reverse=True) mount['mount_point'] = line_sp[0] mount['mount_type'] = line_sp[1].split()[0] line_sp = _customized_split(raw=line, l=mnt_pt_sp[1], sep=None, check=False) mount['mount_options'] = MountOpts(optlist_to_dict(line_sp[0].strip('()'))) if len(line_sp) == 2: mount['mount_label'] = line_sp[1] entry = MountEntry(mount) self.rows.append(entry) self.mounts[mount['mount_point']] = entry
[docs]@parser(Specs.mounts) class ProcMounts(MountedFileSystems): """ Class to parse the content of ``/proc/mounts`` file. This class is required to parse the ``/proc/mounts`` file in addition to the ``/bin/mount`` command because it lists the mount points of those process's which are not present in the output of the ``/bin/mount`` command. .. note:: Please refer to its super-class :class:`MountedFileSystems` for more details. The typical content of ``/proc/mounts`` file looks like:: /dev/mapper/rootvg-rootlv / ext4 rw,relatime,barrier=1,data=ordered 0 0 proc /proc proc rw,nosuid,nodev,noexec,relatime 0 0 /dev/mapper/HostVG-Config /etc/shadow ext4 rw,noatime,seclabel,stripe=256,data=ordered 0 0 dev/sr0 /run/media/root/VMware\040Tools iso9660 ro,nosuid,nodev,relatime,uid=0,gid=0,iocharset=utf8,mode=0400,dmode=0500,uhelper=udisks2 0 0 Examples: >>> type(proc_mnt_info) <class 'insights.parsers.mount.ProcMounts'> >>> len(proc_mnt_info) 4 >>> proc_mnt_info[3].filesystem == 'dev/sr0' True >>> proc_mnt_info[3].mounted_device == 'dev/sr0' True >>> proc_mnt_info[3].mounted_device == proc_mnt_info[3].filesystem True >>> proc_mnt_info[3].mount_type == 'iso9660' True >>> proc_mnt_info[3].filesystem_type == 'iso9660' True >>> proc_mnt_info['/run/media/root/VMware Tools'].mount_label == ['0', '0'] True >>> proc_mnt_info['/run/media/root/VMware Tools'].mount_options.ro True >>> proc_mnt_info['/run/media/root/VMware Tools'].mounted_device == 'dev/sr0' True """ def _parse_mounts(self, content): self.rows = [] self.mounts = {} for line in get_active_lines(content): mount = {} mount['mount_clause'] = line # Handle the '\040' in `mount_point`, e.g. "VMware\040Tools" line_sp = line.encode().decode("unicode-escape") line_sp = _customized_split(raw=line, l=line_sp) mount['filesystem'] = mount['mounted_device'] = line_sp[0] line_sp = _customized_split(raw=line, l=line_sp[1], num=3, reverse=True) mount['mount_label'] = line_sp[-2:] line_sp = _customized_split(raw=line, l=line_sp[0], reverse=True) mount['mount_options'] = MountOpts(optlist_to_dict(line_sp[1])) line_sp = _customized_split(raw=line, l=line_sp[0], reverse=True) mount['mount_type'] = mount['filesystem_type'] = line_sp[1] mount['mount_point'] = line_sp[0] entry = MountEntry(mount) self.rows.append(entry) self.mounts[mount['mount_point']] = entry
def _customized_split(raw, l, sep=None, num=2, reverse=False, check=True): if num >= 2: if reverse is False: line_sp = l.split(sep, num - 1) else: line_sp = l.rsplit(sep, num - 1) if check and len(line_sp) < num: raise ParseException('Unable to parse: "{0}"'.format(raw)) return line_sp