"""
Custom datasources relate to directory list
"""
import json
import os
from insights.core.context import HostContext
from insights.core.dr import get_name
from insights.core.exceptions import SkipComponent
from insights.core.filters import get_filters
from insights.core.plugins import datasource
from insights.core.spec_factory import DatasourceProvider
from insights.parsers.blkid import BlockIDInfo
from insights.parsers.fstab import FSTab
from insights.parsers.lvm import Pvs
from insights.specs import Specs
def _list_items(spec):
"""Return a tuple of directories according to the spec filters."""
filters = sorted(get_filters(spec))
if filters:
if len(filters) == 1 and 'R' not in get_name(spec).split('_')[-2]:
"""
.. note::
Insert a non-existing directory when there is only ONE target for
the datasource to list, to make sure the directory be outputted.
==============================================================
# list a single dir '/mnt'
$ ls -lan /mnt
total 0
drwxr-xr-x. 2 0 0 6 Jun 21 2021 .
dr-xr-xr-x. 17 0 0 224 Apr 17 16:45 ..
--------------------------------------------------------------
# list a single dir with this patch
$ ls -lan /mnt _non_existing_
ls: cannot access ' _non_existing_': No such file or directory
/mnt: # <--<< the dir is outputted here
total 0
drwxr-xr-x. 2 0 0 6 Jun 21 2021 .
dr-xr-xr-x. 17 0 0 224 Apr 17 16:45 ..
==============================================================
"""
filters.append('_non_existing_')
return filters
raise SkipComponent
def _get_fstab_mounted_device_files(fstab_mounts, blkid_info):
result = []
blk_uuid_name_map = {}
blk_label_name_map = {}
for blk in blkid_info.data:
uuid = blk.get("UUID", None)
label = blk.get("LABEL", None)
name = blk.get("NAME")
if uuid:
blk_uuid_name_map[uuid] = name
if label:
blk_label_name_map[label] = name
for record in fstab_mounts:
fs_spec = record['fs_spec']
fs_spec_pair = fs_spec.split("=", 1)
if fs_spec_pair[0] == "UUID" and fs_spec_pair[1] in blk_uuid_name_map:
blkid_name = blk_uuid_name_map.get(fs_spec_pair[1])
result.append(blkid_name)
elif fs_spec_pair[0] == "LABEL" and fs_spec_pair[1] in blk_label_name_map:
blkid_name = blk_label_name_map.get(fs_spec_pair[1])
result.append(blkid_name)
# Filter out devices like tmpfs, sysfs, proc ...
elif fs_spec.startswith('/') and "bind" not in record['fs_mntops']:
result.append(fs_spec)
return set(result)
[docs]
@datasource(HostContext)
def list_with_la(broker):
return ' '.join(_list_items(Specs.ls_la_dirs))
[docs]
@datasource(HostContext)
def list_with_la_filtered(broker):
return ' '.join(_list_items(Specs.ls_la_filtered_dirs))
[docs]
@datasource(HostContext, optional=[FSTab])
def list_with_lan(broker):
filters = set(_list_items(Specs.ls_lan_dirs))
if 'fstab_mounted.dirs' in filters and FSTab in broker:
filters.remove('fstab_mounted.dirs')
for mntp in broker[FSTab].mounted_on.keys():
mnt_point = os.path.dirname(mntp)
filters.add(mnt_point) if mnt_point else None
return ' '.join(sorted(filters))
[docs]
@datasource(HostContext)
def list_with_lan_filtered(broker):
return ' '.join(_list_items(Specs.ls_lan_filtered_dirs))
[docs]
@datasource(HostContext)
def list_with_lanL(broker):
return ' '.join(_list_items(Specs.ls_lanL_dirs))
[docs]
@datasource(HostContext)
def list_with_lanR(broker):
return ' '.join(_list_items(Specs.ls_lanR_dirs))
[docs]
@datasource(HostContext)
def list_with_lanRL(broker):
return ' '.join(_list_items(Specs.ls_lanRL_dirs))
[docs]
@datasource(HostContext)
def list_with_laRZ(broker):
return ' '.join(_list_items(Specs.ls_laRZ_dirs))
[docs]
@datasource(HostContext)
def list_with_laZ(broker):
return ' '.join(_list_items(Specs.ls_laZ_dirs))
@datasource(HostContext)
def files_dirs_number(broker):
"""Return a dict of file numbers from the spec filter"""
result = {}
for item in sorted(get_filters(Specs.files_dirs_number_filter)):
item = os.path.join(item, '') # ensure endswith "/", --> directory
if os.path.exists(item):
result[item] = dict(files_number=0, dirs_number=0)
for name in os.listdir(item):
path = os.path.join(item, name)
result[item]['files_number'] += 1 if os.path.isfile(path) else 0
result[item]['dirs_number'] += 1 if os.path.isdir(path) else 0
if result:
return DatasourceProvider(
content=json.dumps(result, sort_keys=True),
relative_path='insights_datasources/files_dirs_number',
ds=Specs.files_dirs_number,
ctx=broker.get(HostContext),
cleaner=broker.get("cleaner"),
)
raise SkipComponent
[docs]
@datasource(HostContext, optional=[FSTab, BlockIDInfo, Pvs])
def list_with_ldH(broker):
filters = set(_list_items(Specs.ls_ldH_items))
files = set(_f for _f in filters if not os.path.isdir(_f))
if 'fstab_mounted.devices' in filters and FSTab in broker and BlockIDInfo in broker:
files.remove('fstab_mounted.devices')
fstab_mounts = broker[FSTab]
blkid_info = broker[BlockIDInfo]
files.update(_get_fstab_mounted_device_files(fstab_mounts, blkid_info))
if 'pvs.devices' in filters and Pvs in broker:
files.remove('pvs.devices')
pvs_info = broker[Pvs]
files.update(set([item['PV'] for item in pvs_info]))
if files:
return ' '.join(sorted(files))
raise SkipComponent
[docs]
@datasource(HostContext)
def list_with_ldZ(broker):
filters = set(_list_items(Specs.ls_ldZ_items))
if 'fstab_mounted.dirs' in filters and FSTab in broker:
filters.remove('fstab_mounted.dirs')
for mntp in broker[FSTab].mounted_on.keys():
filters.add(mntp) if mntp.startswith('/') else None
if filters:
return ' '.join(sorted(filters))
raise SkipComponent