old_console/ncircc/tests/test_notification_services.py
2024-11-02 14:12:45 +03:00

308 lines
16 KiB
Python

import json
from datetime import datetime
from unittest.mock import patch
import pytest
from company.models.company import Company
from company.models.location import LocationCode
from incident.models import Incident
from ncircc.enums.notifications import NotificationCategoryEnum, ImpactEffect, EventTypeEnum, AffectedSystemFunction, \
TlpEnum, ActivityStatusEnum, AffectedSystemCategoryEnum, NotificationStatusEnum
from ncircc.models.notification import Notification
from ncircc.services.notification import NotificationSenderCreateService, NotificationUpdateSenderService, \
NotificationGetterUpdatedStatusServices, NICRCCUpdateException
from ncircc.tests.utils import mocked_notification_requests_post_status_201, \
mocked_notification_requests_post_status_400, mocked_notification_request_get_status_200
@pytest.mark.django_db
class TestNotificationSenderCreateService:
"""Test build data and send data to NCIRCC. Also saving data from NCITCC."""
incident = None
notification = None
company = None
@pytest.fixture(autouse=True)
def setup_test(self):
self.location, _ = LocationCode.objects.get_or_create(code='RU-MOS')
self.company = Company.objects.create(name='Test_name', api_key='test_api_key',
city='Moscow', location=self.location)
self.incident = Incident.objects.create(title='test_inc_1', importance=10, event_count=10,
events=[{'type': 'armaif_1',
'Index': 'aggregated-2022.03.23',
'sign_id': '63',
'@created': '2022-03-23T10:06:08.90934681Z',
'event_id': '81c5a5ec-7779-4c8d-a942-6e8642937b58',
'rule_tags': None,
'sign_name': 'Firewall Rule',
'source_ip': '127.0.0.1',
'@timestamp': '2022-03-23T10:06:06.697862758Z',
'event_hash': '2c34bfca72f29c21e0f136b2c53773979908a3978779978cc5c65d9ecfec28fa',
'event_last': '2022-03-23T10:06:06.703452398Z',
'source_mac': '',
'celery_done': False,
'event_count': 1,
'event_first': '2022-03-23T10:06:06.703452398Z',
'source_host': '',
'source_port': 46084,
'source_user': '',
'aggregated_id': '1036597098_2c34bfca72f29c21e0f136b2c53773979908a3978779978cc5c65d9ecfec28fa',
'device_action': 'разрешение (pass)',
'device_vendor': 'InfoWatch ARMA',
'event_src_msg': '<14>CEF:0|InfoWatch ARMA|ARMAIF|3.5|pfalert|PF rule alert|8|cs1=63 cs2=deviceInboundInterface=lo0 act=разрешение (pass) src=127.0.0.1 deviceDirection=in proto=icmp dst=127.0.0.1 spt=46084 dpt=53 rt=1648040766697 log_from=filterlog cid=None\n',
'sign_category': 'PF',
'destination_ip': '127.0.0.1',
'device_product': 'Industrial Firerwall',
'device_version': '3.5',
'event_protocol': 'icmp',
'event_severity': 8,
'event_timestamp': '2022-03-23T10:06:06.703452398Z',
'destination_host': '',
'destination_port': 53,
'destination_user': '',
'sign_subcategory': ''}])
self.notification = Notification.objects.create(
incident=self.incident,
affected_system_name='affected_system_name_test',
event_description='event_description_test',
type=EventTypeEnum.DDOS.value,
)
@pytest.mark.unit
def test_get_headers(self):
"""Test bulding headers for sending"""
notification_sender = NotificationSenderCreateService(self.notification.pk)
headers = notification_sender.get_headers()
assert isinstance(headers, dict)
assert 'x-token' in headers
assert headers.get('x-token') == self.company.api_key
assert 'Content-Type' in headers
assert headers.get('Content-Type') == 'application/json'
@pytest.mark.unit
def test_building_payload_for_incident(self):
"""Test building data for category NotificationCategoryEnum.INCIDENT"""
required_fields = {
'city', 'location', 'affectedsystemfunction',
'category', 'type', 'tlp', 'detecttime', 'eventdescription', 'activitystatus',
'affectedsystemname', 'affectedsystemcategory', 'affectedsystemconnection', 'assistance',
'integrityimpact', 'availabilityimpact', 'confidentialityimpact', 'customimpact', 'endtime',
'detectiontool',
}
self.notification.category = NotificationCategoryEnum.INCIDENT.value
self.notification.integrity_impact = ImpactEffect.NO.value
self.notification.availability_impact = ImpactEffect.LOW.value
self.notification.confidentiality_impact = ImpactEffect.HIGH.value
self.notification.custom_impact = 'custom_incident_impact_test'
self.notification.save()
notification_sender = NotificationSenderCreateService(self.notification.pk)
payload = notification_sender.get_payload()
assert isinstance(payload, dict)
for field in required_fields:
assert field in payload
assert payload['category'] == NotificationCategoryEnum.INCIDENT.value
assert payload['type'] == 'DDoS-атака'
assert payload['activitystatus'] == ActivityStatusEnum.TAKEN_ACTION.value
assert payload['tlp'] == TlpEnum.GREEN.value
assert payload['affectedsystemname'] == 'affected_system_name_test'
assert payload['affectedsystemcategory'] == AffectedSystemCategoryEnum.RESOURCE_NOT_CII.value
assert payload['eventdescription'] == 'event_description_test'
assert not payload['affectedsystemconnection']
assert not payload['assistance']
assert payload['detecttime'] == self.incident.events[0].get('event_first', '').split('.')[0]
assert payload['endtime'] == self.incident.events[-1].get('event_first', '').split('.')[0]
assert payload['city'] == self.company.city
assert payload['location'] == self.location.code
assert payload['affectedsystemfunction'] == AffectedSystemFunction.NUCLEAR_POWER.value
assert payload['integrityimpact'] == ImpactEffect.NO.value
assert payload['availabilityimpact'] == ImpactEffect.LOW.value
assert payload['confidentialityimpact'] == ImpactEffect.HIGH.value
assert payload['customimpact'] == 'custom_incident_impact_test'
assert payload['detectiontool'] == 'InfoWatch ARMA'
@pytest.mark.unit
def test_building_payload_attack(self):
"""Test building data for category NotificationCategoryEnum.ATTACK"""
required_fields = {
'city', 'location', 'affectedsystemfunction',
'category', 'type', 'tlp', 'detecttime', 'eventdescription', 'activitystatus',
'affectedsystemname', 'affectedsystemcategory', 'affectedsystemconnection', 'assistance',
'integrityimpact', 'availabilityimpact', 'confidentialityimpact', 'customimpact',
}
self.notification.category = NotificationCategoryEnum.ATTACK.value
self.notification.integrity_impact = ImpactEffect.NO.value
self.notification.availability_impact = ImpactEffect.LOW.value
self.notification.confidentiality_impact = ImpactEffect.HIGH.value
self.notification.custom_impact = 'custom_attack_impact_test'
self.notification.save()
notification_sender = NotificationSenderCreateService(self.notification.pk)
payload = notification_sender.get_payload()
for field in required_fields:
assert field in payload
@pytest.mark.unit
def test_building_payload_vulnerability(self):
"""Test building data for category NotificationCategoryEnum.VULNERABILITY"""
self.notification.type = EventTypeEnum.VULNERABLE_RESOURCE
self.notification.category = NotificationCategoryEnum.VULNERABILITY.value
self.notification.vulnerability_id = 'CVE-2020-1231'
self.notification.product_category = 'Microsoft'
self.notification.save()
required_fields = {
'city', 'location', 'affectedsystemfunction',
'category', 'type', 'tlp', 'detecttime', 'eventdescription', 'activitystatus',
'affectedsystemname', 'affectedsystemcategory', 'affectedsystemconnection', 'assistance',
'vulnerabilityid', 'productcategory',
}
notification_sender = NotificationSenderCreateService(self.notification.pk)
payload = notification_sender.get_payload()
for field in required_fields:
assert field in payload
assert payload['vulnerabilityid'] == 'CVE-2020-1231'
assert payload['productcategory'] == 'Microsoft'
@pytest.mark.unit
@patch('requests.post', side_effect=mocked_notification_requests_post_status_201)
def test_notification_sending_status_201(self, *args):
self.notification.custom_impact = 'custom_incident_impact_test'
self.notification.save()
notification_sender = NotificationSenderCreateService(self.notification.pk)
msg, status = notification_sender.send()
assert status == 201
assert msg == 'Success'
notification = Notification.objects.get(pk=self.notification.pk)
assert notification.uuid
assert notification.update_time
assert notification.identifier == '21-07-573'
assert notification.sending_time
@pytest.mark.unit
@patch('requests.post', side_effect=mocked_notification_requests_post_status_400)
def test_notification_status_400(self, *args):
notification_sender = NotificationSenderCreateService(self.notification.pk)
msg, status = notification_sender.send()
assert status == 400
assert msg == 'Передан неверный токен'
notification = Notification.objects.get(pk=self.notification.pk)
assert not notification.uuid
assert not notification.update_time
assert not notification.identifier
assert not notification.sending_time
@pytest.mark.django_db
class TestNotificationSendUpdaterServices:
@pytest.fixture(autouse=True)
def setup_test(self):
self.location, _ = LocationCode.objects.get_or_create(code='RU-MOS')
self.company = Company.objects.create(name='Test_name', api_key='test_api_key',
city='Moscow', location=self.location)
self.incident = Incident.objects.create(title='test_inc_1', importance=10, event_count=10, events='')
self.notification = Notification.objects.create(
incident=self.incident,
uuid='87e37777-0a7c-4879-888b-86f1d9cdb175',
identifier='21-07-573',
update_time=datetime.now(),
affected_system_name='affected_system_name_test',
event_description='event_description_test',
type=EventTypeEnum.DDOS.value,
notification_status=NotificationStatusEnum.ADDITION_REQUIRED.value,
)
@pytest.mark.unit
def test_payload_for_incident(self):
required_fields = {
'uuid', 'status', 'eventdescription', 'integrityimpact', 'availabilityimpact',
'confidentialityimpact', 'customimpact', 'city', 'activitystatus'
}
notification_updated = NotificationUpdateSenderService(self.notification.pk)
data = notification_updated.get_payload()
for field in required_fields:
assert field in data
@pytest.mark.unit
def test_payload_for_attack(self):
required_fields = {'uuid', 'status', 'eventdescription', 'city', 'activitystatus'}
self.notification.category = NotificationCategoryEnum.ATTACK.value
data = NotificationUpdateSenderService(self.notification.pk).get_payload()
for field in required_fields:
assert field in data
@pytest.mark.unit
def test_payload_for_vulnerability(self):
required_fields = {'uuid', 'status', 'eventdescription', 'city', 'activitystatus'}
self.notification.category = NotificationCategoryEnum.VULNERABILITY.value
data = NotificationUpdateSenderService(self.notification.pk).get_payload()
for field in required_fields:
assert field in data
@pytest.mark.unit
def test_raise_error_in_init_(self):
"""Test to Updating to NCIRRCC. Updating is possible with status `ADDITION_REQUIRED`"""
self.notification.notification_status = NotificationStatusEnum.ARCHIVED.value
self.notification.save()
with pytest.raises(NICRCCUpdateException):
NotificationUpdateSenderService(self.notification.pk)
@pytest.mark.django_db
class TestNotificationGetUpdatedDataFromNCIRCC:
@pytest.fixture(autouse=True)
def setup_test(self):
self.location, _ = LocationCode.objects.get_or_create(code='RU-MOS')
self.company = Company.objects.create(name='Test_name', api_key='test_api_key',
city='Moscow', location=self.location)
self.incident = Incident.objects.create(title='test_inc_1', importance=10, event_count=10, events='')
self.notification = Notification.objects.create(
incident=self.incident,
uuid='87e37777-0a7c-4879-888b-86f1d9cdb175',
identifier='21-07-573',
update_time=datetime.now(),
affected_system_name='affected_system_name_test',
event_description='event_description_test',
type=EventTypeEnum.DDOS.value,
)
@pytest.mark.unit
@patch('requests.get', side_effect=mocked_notification_request_get_status_200)
def test_getting_data_from_ncircc(self, *args):
getter = NotificationGetterUpdatedStatusServices(self.notification.pk)
assert self.notification.notification_status == NotificationStatusEnum.CHECK_NCIRCC.value
msg, status = getter.send()
assert status == 200
assert not self.notification.notification_status == NotificationStatusEnum.ARCHIVED.value
@pytest.mark.unit
def test_get_params_method(self):
data_filter = json.dumps([{
'property': 'uuid',
'operator': '=',
'value': str(self.notification.uuid)
}])
sender = NotificationGetterUpdatedStatusServices(self.notification.pk)
params = sender.get_params()
assert isinstance(params, tuple)
assert params[0][1] == data_filter
assert 'status' in params[1][1]
assert 'updated' in params[1][1]