Source code for insights.cleaner.ip

"""
IP Obfuscation
==============
The following cleaners are included in this module:

IPv4 Obfuscation
----------------

IPv6 Obfuscation
----------------

"""

import hashlib
import logging
import os
import re
import socket
import struct

from insights.cleaner.utilities import write_report

logger = logging.getLogger(__name__)


[docs] class IPv4(object): """ Class for obfuscating IPv4. """ def __init__(self): # - IP obfuscate information self._ip_db = dict() # IP database self._start_ip = '10.230.230.1' self._ignore_list = ["127.0.0.1"] # self.pattern = r'((?<!(\.|\d))([0-9]{1,3}\.){3}([0-9]){1,3}(\/([0-9]{1,2}))?)' self.pattern = r"(((\b25[0-5]|\b2[0-4][0-9]|\b1[0-9][0-9]|\b[1-9][0-9]|\b[1-9]))(\.(\b25[0-5]|\b2[0-4][0-9]|\b1[0-9][0-9]|\b[1-9][0-9]|\b[0-9])){3})" def _ip2int(self, ipstr): # converts a dotted decimal IP address into an integer that can be incremented return struct.unpack('!I', socket.inet_aton(ipstr))[0] def _int2ip(self, num): # converts an integer stored in the IP database into a dotted decimal IP return socket.inet_ntoa(struct.pack('!I', num)) def _ip2db(self, ip): ''' adds an IP address to the IP database and returns the obfuscated entry, or returns the existing obfuscated IP entry FORMAT: {$obfuscated_ip: $original_ip,} ''' ip_num = self._ip2int(ip) ip_found = False db = self._ip_db for k, v in db.items(): if v == ip_num: ret_ip = self._int2ip(k) ip_found = True if ip_found: # the entry already existed return ret_ip else: # the entry did not already exist if len(self._ip_db) > 0: new_ip = max(db.keys()) + 1 else: new_ip = self._ip2int(self._start_ip) db[new_ip] = ip_num return self._int2ip(new_ip)
[docs] def parse_line(self, line, **kwargs): ''' This will substitute an obfuscated IP for each instance of a given IP in a file It scans a given line and if an IP exists, it obfuscates the IP using _ip2db and returns the altered line ''' def _sub_ip(line, ip): new_ip = self._ip2db(ip) logger.debug("Obfuscating IPv4 - %s > %s", ip, new_ip) return line.replace(ip, new_ip) def _sub_ip_keep_width(line, ip): ip_len = len(ip) new_ip = self._ip2db(ip) new_ip_len = len(new_ip) logger.debug("Obfuscating IPv4 - %s > %s", ip, new_ip) # pad or remove spaces to allow for the new length if ip_len > new_ip_len: numspaces = ip_len - new_ip_len line = line.replace(ip, new_ip) # shift past port specification to add spaces idx = line.index(new_ip) + new_ip_len c = line[idx] while c != " ": idx += 1 if idx == len(line): idx = len(line) - 1 break c = line[idx] return line[0:idx] + numspaces * " " + line[idx:] elif new_ip_len > ip_len: numspaces = new_ip_len - ip_len line = line.replace(ip, new_ip) # shift past port specification to skip spaces idx = line.index(new_ip) + new_ip_len c = line[idx] while c != " ": idx += 1 if idx == len(line): break c = line[idx] return line[0:idx] + line[(idx + numspaces) :] else: return line.replace(ip, new_ip) if not line: return line try: ips = [each[0] for each in re.findall(self.pattern, line)] for ip in sorted(ips or [], key=len, reverse=True): if ip not in self._ignore_list: # ip must in line if kwargs.get('width', False): line = _sub_ip_keep_width(line, ip) else: line = _sub_ip(line, ip) return line except Exception as e: # pragma: no cover logger.warning(e) raise Exception('SubIPError: Unable to Substitute IPv4 Address - %s', ips)
[docs] def mapping(self): mapping = [] for k, v in self._ip_db.items(): mapping.append({'original': self._int2ip(v), 'obfuscated': self._int2ip(k)}) return mapping
[docs] def generate_report(self, report_dir, archive_name): try: ip_report_file = os.path.join(report_dir, "%s-ipv4.csv" % archive_name) logger.info('Creating IPv4 Report - %s', ip_report_file) lines = ['Obfuscated IPv4,Original IPv4'] for k, v in self._ip_db.items(): lines.append('{0},{1}'.format(self._int2ip(k), self._int2ip(v))) except Exception as e: # pragma: no cover logger.exception(e) raise Exception('CreateReport Error: Error Creating IPv4 Report') write_report(lines, ip_report_file) logger.info('Completed IPv4 Report.')
[docs] class IPv6(object): """ Class for obfuscating IPv6. """ def __init__(self): self._ipv6_db = dict() # IPv6 database # Ignore list for IPv6 self._ignore_list = [r'\s+'] # ignore whitespace # IPv6 pattern, stolen from sos # FIXME: # This pattern is not perfect, e.g. it cannot match "::1" perfectly. # Hence, we add the above ignore list to avoid the roughly match that # includes whitespace. self.pattern = ( r"(?<![:\\.\\-a-z0-9])((([0-9a-f]{1,4})(:[0-9a-f]{1,4}){7})|" r"(([0-9a-f]{1,4}(:[0-9a-f]{0,4}){0,5}))([^.])::(([0-9a-f]{1,4}" r"(:[0-9a-f]{1,4}){0,5})?))(/\d{1,3})?(?![:\\a-z0-9])" ) def _ip2db(self, ip): ''' Add an IPv6 address to IPv6 database and return obfuscated address. FORMAT: {$original_ip:, $obfuscated_ip} ''' def obfuscate_hex(_hex): if _hex: n_0_hex = _hex.lstrip('0').lower() if n_0_hex: old_hex = n_0_hex.encode('utf-8') new_hex = hashlib.sha1(old_hex).hexdigest()[: len(old_hex)] return '0' * (len(_hex) - len(n_0_hex)) + new_hex return '0' * len(_hex) return '' try: if ip in self._ipv6_db: return self._ipv6_db[ip] if ip in self._ipv6_db.values(): # pragma: no cover # avoid nested obfuscating return None self._ipv6_db[ip] = ':'.join(obfuscate_hex(h) for h in ip.split(':')) return self._ipv6_db[ip] except Exception as e: # pragma: no cover logger.warning(e) raise Exception('SubIPv6Error: Unable to Substitute IPv6 Address - %s', ip)
[docs] def parse_line(self, line, **kwargs): def _sub_ip(line, ip): new_ip = self._ip2db(ip) if new_ip: logger.debug("Obfuscating IPv6 - %s > %s", ip, new_ip) return line.replace(ip, new_ip) # it's an obfuscated IP return line if not line: return line for ip in re.findall(self.pattern, line, re.I): if any(re.search(_i, ip[0], re.I) for _i in self._ignore_list): continue line = _sub_ip(line, ip[0]) return line
[docs] def mapping(self): mapping = [] for k, v in self._ipv6_db.items(): mapping.append({'original': k, 'obfuscated': v}) return mapping
[docs] def generate_report(self, report_dir, archive_name): try: ip_report_file = os.path.join(report_dir, "%s-ipv6.csv" % archive_name) logger.info('Creating IPv6 Report - %s', ip_report_file) lines = ['Obfuscated IPv6,Original IPv6'] for k, v in self._ipv6_db.items(): lines.append('{0},{1}'.format(v, k)) except Exception as e: # pragma: no cover logger.exception(e) raise Exception('CreateReport Error: Error Creating IPv6 Report') write_report(lines, ip_report_file) logger.info('Completed IPv6 Report.')