Source code for insights.cleaner.filters

"""
Filtering
=========
"""

import logging

logger = logging.getLogger(__name__)


[docs] class AllowFilter(object): """ Class for filtering per allow list. """
[docs] def parse_line(self, line, **kwargs): # filter line as per the allow list specified by plugins if not line: return line allowlist = kwargs.get('allowlist', {}) if allowlist: for a_key in list(allowlist.keys()): # copy keys to avoid RuntimeError # keep line when any filter match # FIXME: # Considering performance, didn't handle multiple filters in one same line if a_key in line: allowlist[a_key] -= 1 # stop checking it when enough lines contain the key were found allowlist.pop(a_key) if allowlist[a_key] == 0 else None return line
# discard line when none filters found
[docs] def generate_report(self, report_dir, archive_name): pass # pragma: no cover
[docs] @staticmethod def filter_content(lines, allowlist): """ Filter content based on allowlist. When a key of allowlist is found in a line, it is added to the result list. The key is removed from the allowlist when enough lines containing the key were found. When the allowlist is empty, an empty result is returned. The lines are processed in reverse order. But the processed result is returned in the original order. :param lines: list of lines :param allowlist: dictionary of allowlist :return: list of lines """ allowlist = dict(allowlist) # copy it to avoid write back result = [] for idx in range(len(lines) - 1, -1, -1): for a_key in list(allowlist.keys()): # copy keys to avoid RuntimeError if a_key in lines[idx]: allowlist[a_key] -= 1 # stop checking it when enough lines contain the key were found allowlist.pop(a_key) if allowlist[a_key] == 0 else None result.append(lines[idx]) # stop checking other keys when one key is found # it's sometimes not fair to other keys, but it's # the best we can do for performance break # Return the result in right order result.reverse() return result