import logging import re from datetime import datetime from django.conf import settings from incident.models import Incident from inputs.models import LogInput from inputs.services.inputs import get_sensor _log = logging.getLogger(__name__) def escaper(special_chars): strip_escaped_re = re.compile(r'\\([{}\\])'.format(special_chars)) do_escape_re = re.compile(r'([{}\\])'.format(special_chars)) def escape(s): stripped = strip_escaped_re.sub(r'\1', str(s)) return do_escape_re.sub(r'\\\1', stripped) return escape def incident_to_cef(incident: Incident) -> str: header = escaper('|') extension = escaper('=') device_vendor = header(settings.SITE_INFO['vendor']) device_product = header(settings.SITE_INFO['product']) device_version = header(settings.SITE_INFO['version']) signature_id = header('Incident') name = header(incident.title) severity = int(incident.importance / 10) # CEF accept only 0-10 timestamps = [] for timestamp in incident.events: tmp_timestamp = timestamp['event_timestamp'].split('.') timestamps.append(datetime.strptime(tmp_timestamp[0], '%Y-%m-%dT%H:%M:%S')) # See https://community.microfocus.com/t5/ArcSight-Connectors/ArcSight-Common-Event-Format-CEF-Implementation-Standard/ta-p/1645557 # See https://kc.mcafee.com/resources/sites/MCAFEE/content/live/CORP_KNOWLEDGEBASE/78000/KB78712/en_US/CEF_White_Paper_20100722.pdf extension_dict = { 'cnt': incident.event_count, 'rt': int(incident.created.timestamp()) * 1000, 'cs1': extension(str(incident.incident_id)[:4000]), 'cs1Label': 'IncidentID' } if incident.category is not None: extension_dict['cat'] = extension(incident.category.name[:1023]) if len(timestamps) > 0: extension_dict['start'] = int(min(timestamps).timestamp()) * 1000 extension_dict['end'] = int(max(timestamps).timestamp()) * 1000 if incident.description is not None and len(incident.description) > 0: extension_dict['msg'] = extension(incident.description[:1023]) # Try to get sensor for exporting incident try: sensor_type, sensor = get_sensor(incident.sensor) extension_dict['cs3'] = sensor.id extension_dict['cs3Label'] = 'SensorID' extension_dict['cs4'] = sensor.ip extension_dict['cs4Label'] = 'SensorIP' extension_dict['cs5'] = f'{sensor.type}_{sensor.id}'.lower() extension_dict['cs5Label'] = 'SensorSID' extension_dict['cs6'] = sensor.name extension_dict['cs6Label'] = 'SensorName' except RuntimeError as e: _log.warning(f'Incident export warning: Bad sensor value: {incident.sensor}', exc_info=e) except LogInput.DoesNotExist: _log.warning(f'Incident export warning: sensor with that type and pk: {incident.sensor} does not exist') except AttributeError: _log.warning('Incident export warning: No sensor for log input') extension = '' for key, value in extension_dict.items(): if value is not None: extension += f'{key}={value} ' result = f'AMC: CEF:0|{device_vendor}|{device_product}|{device_version}|{signature_id}|{name}|{severity}|{extension}' return result