old_console/incident_export/services/export.py
2024-11-02 14:12:45 +03:00

85 lines
3.2 KiB
Python

import logging
import re
from datetime import datetime
from django.conf import settings
from incident.models import Incident
from inputs.models import LogInput
from inputs.services.inputs import get_sensor
_log = logging.getLogger(__name__)
def escaper(special_chars):
strip_escaped_re = re.compile(r'\\([{}\\])'.format(special_chars))
do_escape_re = re.compile(r'([{}\\])'.format(special_chars))
def escape(s):
stripped = strip_escaped_re.sub(r'\1', str(s))
return do_escape_re.sub(r'\\\1', stripped)
return escape
def incident_to_cef(incident: Incident) -> str:
header = escaper('|')
extension = escaper('=')
device_vendor = header(settings.SITE_INFO['vendor'])
device_product = header(settings.SITE_INFO['product'])
device_version = header(settings.SITE_INFO['version'])
signature_id = header('Incident')
name = header(incident.title)
severity = int(incident.importance / 10) # CEF accept only 0-10
timestamps = []
for timestamp in incident.events:
tmp_timestamp = timestamp['event_timestamp'].split('.')
timestamps.append(datetime.strptime(tmp_timestamp[0], '%Y-%m-%dT%H:%M:%S'))
# See https://community.microfocus.com/t5/ArcSight-Connectors/ArcSight-Common-Event-Format-CEF-Implementation-Standard/ta-p/1645557
# See https://kc.mcafee.com/resources/sites/MCAFEE/content/live/CORP_KNOWLEDGEBASE/78000/KB78712/en_US/CEF_White_Paper_20100722.pdf
extension_dict = {
'cnt': incident.event_count,
'rt': int(incident.created.timestamp()) * 1000,
'cs1': extension(str(incident.incident_id)[:4000]),
'cs1Label': 'IncidentID'
}
if incident.category is not None:
extension_dict['cat'] = extension(incident.category.name[:1023])
if len(timestamps) > 0:
extension_dict['start'] = int(min(timestamps).timestamp()) * 1000
extension_dict['end'] = int(max(timestamps).timestamp()) * 1000
if incident.description is not None and len(incident.description) > 0:
extension_dict['msg'] = extension(incident.description[:1023])
# Try to get sensor for exporting incident
try:
sensor_type, sensor = get_sensor(incident.sensor)
extension_dict['cs3'] = sensor.id
extension_dict['cs3Label'] = 'SensorID'
extension_dict['cs4'] = sensor.ip
extension_dict['cs4Label'] = 'SensorIP'
extension_dict['cs5'] = f'{sensor.type}_{sensor.id}'.lower()
extension_dict['cs5Label'] = 'SensorSID'
extension_dict['cs6'] = sensor.name
extension_dict['cs6Label'] = 'SensorName'
except RuntimeError as e:
_log.warning(f'Incident export warning: Bad sensor value: {incident.sensor}', exc_info=e)
except LogInput.DoesNotExist:
_log.warning(f'Incident export warning: sensor with that type and pk: {incident.sensor} does not exist')
except AttributeError:
_log.warning('Incident export warning: No sensor for log input')
extension = ''
for key, value in extension_dict.items():
if value is not None:
extension += f'{key}={value} '
result = f'AMC: CEF:0|{device_vendor}|{device_product}|{device_version}|{signature_id}|{name}|{severity}|{extension}'
return result