207 lines
8.7 KiB
Python
207 lines
8.7 KiB
Python
import logging
|
|
|
|
import pytest
|
|
|
|
from console.settings.base import SITE_INFO
|
|
from core.utils import dtnow
|
|
from devices.models.sensor import ArmaSensor
|
|
from incident.models import Incident, IncidentCategory
|
|
from inputs.enums import SensorType, SensorScheme
|
|
from incident_export.services.export import incident_to_cef, escaper
|
|
from inputs.enums import LogInputType
|
|
from inputs.models import LogInput, LogInputArmaIF
|
|
|
|
cef_header_data = [
|
|
(r"aaa|", r"aaa\|"),
|
|
(r"aaa\\", r"aaa\\")
|
|
]
|
|
|
|
cef_body_data = [
|
|
(r"aaa\\", r"aaa\\"),
|
|
(r"aaa=", r"aaa\=")
|
|
]
|
|
|
|
EVENTS = [
|
|
{
|
|
"type": "armaif_1",
|
|
"sign_id": "429496728",
|
|
"@created": "2020-12-17T11:18:39.635302026Z",
|
|
"event_id": "f0cc19bb-27f8-4616-8bd3-ba01db9e378d",
|
|
"rule_tags": "null",
|
|
"sign_name": "test",
|
|
"source_ip": "192.168.56.200",
|
|
"@timestamp": "2020-12-17T11:18:27.217Z",
|
|
"event_hash": "57732e01a1e69a79100b6b5dc75c210c4cf36ca8db91614fa47b5b995f08b615",
|
|
"event_last": "2020-12-17T11:18:51.317Z",
|
|
"event_count": 1909,
|
|
"event_first": "2020-12-17T11:18:27.142Z",
|
|
"source_host": "",
|
|
"source_port": 80,
|
|
"source_user": "",
|
|
"aggregated_id": "3896014033_57732e01a1e69a79100b6b5dc75c210c4cf36ca8db91614fa47b5b995f08b615",
|
|
"device_action": "suricataalert",
|
|
"device_vendor": "armaif",
|
|
"event_src_msg": "<14>CEF:0|armaif|Suricata|2.0|429496728|suricataalert|8|unixdate=1608203907.142307 log_from=suricata cid=28775 gid=1 signature=429496728 rev=1 msg=test classification=null priority=3 proto=TCP ip_src=192.168.56.200 port_src=80 ip_dst=10.20.30.1 port_dst=34568 mechanic=IDS",
|
|
"sign_category": "IDS",
|
|
"destination_ip": "10.20.30.1",
|
|
"device_product": "Suricata",
|
|
"device_version": "2.0",
|
|
"event_protocol": "TCP",
|
|
"event_severity": "8",
|
|
"event_timestamp": "2020-12-17T11:18:27.142Z",
|
|
"destination_host": "",
|
|
"destination_port": 34568,
|
|
"destination_user": "",
|
|
"sign_subcategory": "null"
|
|
},
|
|
{
|
|
"type": "armaif_1",
|
|
"sign_id": "429496728",
|
|
"@created": "2020-12-17T11:16:19.058354413Z",
|
|
"event_id": "3a486dcd-cd3d-4acf-8ebf-e47d92b66a14",
|
|
"rule_tags": "null",
|
|
"sign_name": "test",
|
|
"source_ip": "192.168.56.100",
|
|
"@timestamp": "2020-12-17T11:16:13.388Z",
|
|
"event_hash": "707fdfbbb1385f20f74c71047bb40c3a4ef6ef8220b715450c3085cce81e28b3",
|
|
"event_last": "2020-12-17T11:16:14.368Z",
|
|
"event_count": 1109,
|
|
"event_first": "2020-12-17T11:16:13.388Z",
|
|
"source_host": "",
|
|
"source_port": 80,
|
|
"source_user": "",
|
|
"aggregated_id": "264127437_707fdfbbb1385f20f74c71047bb40c3a4ef6ef8220b715450c3085cce81e28b3",
|
|
"device_action": "suricataalert",
|
|
"device_vendor": "armaif",
|
|
"event_src_msg": "<14>CEF:0|armaif|Suricata|2.0|429496728|suricataalert|8|unixdate=1608203773.388082 log_from=suricata cid=28775 gid=1 signature=429496728 rev=1 msg=test classification=null priority=3 proto=TCP ip_src=192.168.56.100 port_src=80 ip_dst=10.20.30.1 port_dst=34568 mechanic=IDS",
|
|
"sign_category": "IDS",
|
|
"destination_ip": "10.20.30.1",
|
|
"device_product": "Suricata",
|
|
"device_version": "2.0",
|
|
"event_protocol": "TCP",
|
|
"event_severity": "8",
|
|
"event_timestamp": "2020-12-17T11:16:13.388Z",
|
|
"destination_host": "",
|
|
"destination_port": 34568,
|
|
"destination_user": "",
|
|
"sign_subcategory": "null"
|
|
},
|
|
{
|
|
"type": "armaif_1",
|
|
"sign_id": "429496728",
|
|
"@created": "2020-12-17T11:10:22.587668545Z",
|
|
"event_id": "8aec4a34-6584-4d06-a88a-2de2c7214a58",
|
|
"rule_tags": "null",
|
|
"sign_name": "test",
|
|
"source_ip": "192.168.56.100",
|
|
"@timestamp": "2020-12-17T11:10:11.117Z",
|
|
"event_hash": "707fdfbbb1385f20f74c71047bb40c3a4ef6ef8220b715450c3085cce81e28b3",
|
|
"event_last": "2020-12-17T11:10:11.208Z",
|
|
"event_count": 959,
|
|
"event_first": "2020-12-17T11:10:11.117Z",
|
|
"source_host": "",
|
|
"source_port": 80,
|
|
"source_user": "",
|
|
"aggregated_id": "1975003720_707fdfbbb1385f20f74c71047bb40c3a4ef6ef8220b715450c3085cce81e28b3",
|
|
"device_action": "suricataalert",
|
|
"device_vendor": "armaif",
|
|
"event_src_msg": "<14>CEF:0|armaif|Suricata|2.0|429496728|suricataalert|8|unixdate=1608203411.117516 log_from=suricata cid=28775 gid=1 signature=429496728 rev=1 msg=test classification=null priority=3 proto=TCP ip_src=192.168.56.100 port_src=80 ip_dst=10.20.30.1 port_dst=34568 mechanic=IDS",
|
|
"sign_category": "IDS",
|
|
"destination_ip": "10.20.30.1",
|
|
"device_product": "Suricata",
|
|
"device_version": "2.0",
|
|
"event_protocol": "TCP",
|
|
"event_severity": "8",
|
|
"event_timestamp": "2020-12-17T11:10:11.117Z",
|
|
"destination_host": "",
|
|
"destination_port": 34568,
|
|
"destination_user": "",
|
|
"sign_subcategory": "null"
|
|
}
|
|
]
|
|
|
|
SENSOR_DATA = {
|
|
"name": "sensor_test",
|
|
"ip": "1.1.1.1",
|
|
"port": 5000,
|
|
"type": 'firewall'
|
|
}
|
|
|
|
|
|
@pytest.mark.django_db
|
|
class TestCEF:
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def incidents(self):
|
|
now = dtnow()
|
|
|
|
cat1 = IncidentCategory.objects.create(name='SSH')
|
|
cat2 = IncidentCategory.objects.create(name='Suricata')
|
|
# IncidentCategory.objects.bulk_create([cat1, cat2])
|
|
|
|
Incident.objects.bulk_create([
|
|
Incident(timestamp=now,
|
|
title='Test1',
|
|
importance=50,
|
|
event_count=len(EVENTS),
|
|
category=cat1,
|
|
description="Event 1 description",
|
|
events=EVENTS),
|
|
Incident(timestamp=dtnow(hours=-1),
|
|
title='Test2',
|
|
importance=len(EVENTS),
|
|
event_count=10,
|
|
category=cat2,
|
|
description="Event 2 description",
|
|
events=EVENTS),
|
|
Incident(timestamp=dtnow(days=-1),
|
|
title='Test3',
|
|
importance=len(EVENTS),
|
|
event_count=10,
|
|
description="Event 3 description",
|
|
events=EVENTS),
|
|
])
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.parametrize("inStr,outStr", cef_header_data)
|
|
def test_header_escape(self, inStr, outStr):
|
|
header = escaper('|')
|
|
assert header(inStr) == outStr
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.parametrize("inStr,outStr", cef_body_data)
|
|
def test_body_escape(self, inStr, outStr):
|
|
extension = escaper('=')
|
|
assert extension(inStr) == outStr
|
|
|
|
@pytest.mark.unit
|
|
def test_correct_format(self, incidents):
|
|
version = SITE_INFO['version']
|
|
incident = Incident.objects.first()
|
|
print(f' \nCORRECT FORMAT INCIDENT: {incident}')
|
|
assert incident is not None
|
|
result = incident_to_cef(incident)
|
|
assert result == f"AMC: CEF:0|InfoWatch ARMA|ARMAMC|{version}|Incident|Test1|5|cnt=3 rt={int(incident.created.timestamp()) * 1000} cs1={str(incident.incident_id)} cs1Label=IncidentID cat=SSH start=1608203411000 end=1608203907000 msg=Event 1 description "
|
|
|
|
@pytest.mark.unit
|
|
def test_correct_format_with_sensor(self, incidents):
|
|
version = SITE_INFO['version']
|
|
test_sensor = ArmaSensor.objects.create(**SENSOR_DATA)
|
|
test_input = LogInput.objects.create(label='test_input',
|
|
type=LogInputType.ARMAIF)
|
|
armaif = LogInputArmaIF.objects.create(input=test_input,
|
|
port=5000,
|
|
sensor=test_sensor)
|
|
Incident.objects.create(timestamp=dtnow(),
|
|
title='Test4',
|
|
importance=10,
|
|
event_count=10,
|
|
description="Incident 4 description",
|
|
sensor=f'armaif_{armaif.pk}',
|
|
events=EVENTS)
|
|
incident_with_sensor = Incident.objects.get(title='Test4')
|
|
assert incident_with_sensor is not None
|
|
print(f' \nCEF FORMAT INCIDENT: {incident_with_sensor.sensor}')
|
|
|
|
result = incident_to_cef(incident_with_sensor)
|
|
assert result == f"AMC: CEF:0|InfoWatch ARMA|ARMAMC|{version}|Incident|Test4|1|cnt=10 rt={int(incident_with_sensor.created.timestamp()) * 1000} cs1={str(incident_with_sensor.incident_id)} cs1Label=IncidentID start=1608203411000 end=1608203907000 msg=Incident 4 description cs3={test_sensor.pk} cs3Label=SensorID cs4=1.1.1.1 cs4Label=SensorIP cs5=firewall_{test_sensor.pk} cs5Label=SensorSID cs6=sensor_test cs6Label=SensorName "
|