import logging import pytest from console.settings.base import SITE_INFO from core.utils import dtnow from devices.models.sensor import ArmaSensor from incident.models import Incident, IncidentCategory from inputs.enums import SensorType, SensorScheme from incident_export.services.export import incident_to_cef, escaper from inputs.enums import LogInputType from inputs.models import LogInput, LogInputArmaIF cef_header_data = [ (r"aaa|", r"aaa\|"), (r"aaa\\", r"aaa\\") ] cef_body_data = [ (r"aaa\\", r"aaa\\"), (r"aaa=", r"aaa\=") ] EVENTS = [ { "type": "armaif_1", "sign_id": "429496728", "@created": "2020-12-17T11:18:39.635302026Z", "event_id": "f0cc19bb-27f8-4616-8bd3-ba01db9e378d", "rule_tags": "null", "sign_name": "test", "source_ip": "192.168.56.200", "@timestamp": "2020-12-17T11:18:27.217Z", "event_hash": "57732e01a1e69a79100b6b5dc75c210c4cf36ca8db91614fa47b5b995f08b615", "event_last": "2020-12-17T11:18:51.317Z", "event_count": 1909, "event_first": "2020-12-17T11:18:27.142Z", "source_host": "", "source_port": 80, "source_user": "", "aggregated_id": "3896014033_57732e01a1e69a79100b6b5dc75c210c4cf36ca8db91614fa47b5b995f08b615", "device_action": "suricataalert", "device_vendor": "armaif", "event_src_msg": "<14>CEF:0|armaif|Suricata|2.0|429496728|suricataalert|8|unixdate=1608203907.142307 log_from=suricata cid=28775 gid=1 signature=429496728 rev=1 msg=test classification=null priority=3 proto=TCP ip_src=192.168.56.200 port_src=80 ip_dst=10.20.30.1 port_dst=34568 mechanic=IDS", "sign_category": "IDS", "destination_ip": "10.20.30.1", "device_product": "Suricata", "device_version": "2.0", "event_protocol": "TCP", "event_severity": "8", "event_timestamp": "2020-12-17T11:18:27.142Z", "destination_host": "", "destination_port": 34568, "destination_user": "", "sign_subcategory": "null" }, { "type": "armaif_1", "sign_id": "429496728", "@created": "2020-12-17T11:16:19.058354413Z", "event_id": "3a486dcd-cd3d-4acf-8ebf-e47d92b66a14", "rule_tags": "null", "sign_name": "test", "source_ip": "192.168.56.100", "@timestamp": "2020-12-17T11:16:13.388Z", "event_hash": "707fdfbbb1385f20f74c71047bb40c3a4ef6ef8220b715450c3085cce81e28b3", "event_last": "2020-12-17T11:16:14.368Z", "event_count": 1109, "event_first": "2020-12-17T11:16:13.388Z", "source_host": "", "source_port": 80, "source_user": "", "aggregated_id": "264127437_707fdfbbb1385f20f74c71047bb40c3a4ef6ef8220b715450c3085cce81e28b3", "device_action": "suricataalert", "device_vendor": "armaif", "event_src_msg": "<14>CEF:0|armaif|Suricata|2.0|429496728|suricataalert|8|unixdate=1608203773.388082 log_from=suricata cid=28775 gid=1 signature=429496728 rev=1 msg=test classification=null priority=3 proto=TCP ip_src=192.168.56.100 port_src=80 ip_dst=10.20.30.1 port_dst=34568 mechanic=IDS", "sign_category": "IDS", "destination_ip": "10.20.30.1", "device_product": "Suricata", "device_version": "2.0", "event_protocol": "TCP", "event_severity": "8", "event_timestamp": "2020-12-17T11:16:13.388Z", "destination_host": "", "destination_port": 34568, "destination_user": "", "sign_subcategory": "null" }, { "type": "armaif_1", "sign_id": "429496728", "@created": "2020-12-17T11:10:22.587668545Z", "event_id": "8aec4a34-6584-4d06-a88a-2de2c7214a58", "rule_tags": "null", "sign_name": "test", "source_ip": "192.168.56.100", "@timestamp": "2020-12-17T11:10:11.117Z", "event_hash": "707fdfbbb1385f20f74c71047bb40c3a4ef6ef8220b715450c3085cce81e28b3", "event_last": "2020-12-17T11:10:11.208Z", "event_count": 959, "event_first": "2020-12-17T11:10:11.117Z", "source_host": "", "source_port": 80, "source_user": "", "aggregated_id": "1975003720_707fdfbbb1385f20f74c71047bb40c3a4ef6ef8220b715450c3085cce81e28b3", "device_action": "suricataalert", "device_vendor": "armaif", "event_src_msg": "<14>CEF:0|armaif|Suricata|2.0|429496728|suricataalert|8|unixdate=1608203411.117516 log_from=suricata cid=28775 gid=1 signature=429496728 rev=1 msg=test classification=null priority=3 proto=TCP ip_src=192.168.56.100 port_src=80 ip_dst=10.20.30.1 port_dst=34568 mechanic=IDS", "sign_category": "IDS", "destination_ip": "10.20.30.1", "device_product": "Suricata", "device_version": "2.0", "event_protocol": "TCP", "event_severity": "8", "event_timestamp": "2020-12-17T11:10:11.117Z", "destination_host": "", "destination_port": 34568, "destination_user": "", "sign_subcategory": "null" } ] SENSOR_DATA = { "name": "sensor_test", "ip": "1.1.1.1", "port": 5000, "type": 'firewall' } @pytest.mark.django_db class TestCEF: @pytest.fixture(autouse=True) def incidents(self): now = dtnow() cat1 = IncidentCategory.objects.create(name='SSH') cat2 = IncidentCategory.objects.create(name='Suricata') # IncidentCategory.objects.bulk_create([cat1, cat2]) Incident.objects.bulk_create([ Incident(timestamp=now, title='Test1', importance=50, event_count=len(EVENTS), category=cat1, description="Event 1 description", events=EVENTS), Incident(timestamp=dtnow(hours=-1), title='Test2', importance=len(EVENTS), event_count=10, category=cat2, description="Event 2 description", events=EVENTS), Incident(timestamp=dtnow(days=-1), title='Test3', importance=len(EVENTS), event_count=10, description="Event 3 description", events=EVENTS), ]) @pytest.mark.unit @pytest.mark.parametrize("inStr,outStr", cef_header_data) def test_header_escape(self, inStr, outStr): header = escaper('|') assert header(inStr) == outStr @pytest.mark.unit @pytest.mark.parametrize("inStr,outStr", cef_body_data) def test_body_escape(self, inStr, outStr): extension = escaper('=') assert extension(inStr) == outStr @pytest.mark.unit def test_correct_format(self, incidents): version = SITE_INFO['version'] incident = Incident.objects.first() print(f' \nCORRECT FORMAT INCIDENT: {incident}') assert incident is not None result = incident_to_cef(incident) assert result == f"AMC: CEF:0|InfoWatch ARMA|ARMAMC|{version}|Incident|Test1|5|cnt=3 rt={int(incident.created.timestamp()) * 1000} cs1={str(incident.incident_id)} cs1Label=IncidentID cat=SSH start=1608203411000 end=1608203907000 msg=Event 1 description " @pytest.mark.unit def test_correct_format_with_sensor(self, incidents): version = SITE_INFO['version'] test_sensor = ArmaSensor.objects.create(**SENSOR_DATA) test_input = LogInput.objects.create(label='test_input', type=LogInputType.ARMAIF) armaif = LogInputArmaIF.objects.create(input=test_input, port=5000, sensor=test_sensor) Incident.objects.create(timestamp=dtnow(), title='Test4', importance=10, event_count=10, description="Incident 4 description", sensor=f'armaif_{armaif.pk}', events=EVENTS) incident_with_sensor = Incident.objects.get(title='Test4') assert incident_with_sensor is not None print(f' \nCEF FORMAT INCIDENT: {incident_with_sensor.sensor}') result = incident_to_cef(incident_with_sensor) assert result == f"AMC: CEF:0|InfoWatch ARMA|ARMAMC|{version}|Incident|Test4|1|cnt=10 rt={int(incident_with_sensor.created.timestamp()) * 1000} cs1={str(incident_with_sensor.incident_id)} cs1Label=IncidentID start=1608203411000 end=1608203907000 msg=Incident 4 description cs3={test_sensor.pk} cs3Label=SensorID cs4=1.1.1.1 cs4Label=SensorIP cs5=firewall_{test_sensor.pk} cs5Label=SensorSID cs6=sensor_test cs6Label=SensorName "