308 lines
16 KiB
Python
308 lines
16 KiB
Python
import json
|
|
from datetime import datetime
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
|
|
from company.models.company import Company
|
|
from company.models.location import LocationCode
|
|
from incident.models import Incident
|
|
from ncircc.enums.notifications import NotificationCategoryEnum, ImpactEffect, EventTypeEnum, AffectedSystemFunction, \
|
|
TlpEnum, ActivityStatusEnum, AffectedSystemCategoryEnum, NotificationStatusEnum
|
|
from ncircc.models.notification import Notification
|
|
from ncircc.services.notification import NotificationSenderCreateService, NotificationUpdateSenderService, \
|
|
NotificationGetterUpdatedStatusServices, NICRCCUpdateException
|
|
from ncircc.tests.utils import mocked_notification_requests_post_status_201, \
|
|
mocked_notification_requests_post_status_400, mocked_notification_request_get_status_200
|
|
|
|
|
|
@pytest.mark.django_db
|
|
class TestNotificationSenderCreateService:
|
|
"""Test build data and send data to NCIRCC. Also saving data from NCITCC."""
|
|
|
|
incident = None
|
|
notification = None
|
|
company = None
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def setup_test(self):
|
|
self.location, _ = LocationCode.objects.get_or_create(code='RU-MOS')
|
|
self.company = Company.objects.create(name='Test_name', api_key='test_api_key',
|
|
city='Moscow', location=self.location)
|
|
self.incident = Incident.objects.create(title='test_inc_1', importance=10, event_count=10,
|
|
events=[{'type': 'armaif_1',
|
|
'Index': 'aggregated-2022.03.23',
|
|
'sign_id': '63',
|
|
'@created': '2022-03-23T10:06:08.90934681Z',
|
|
'event_id': '81c5a5ec-7779-4c8d-a942-6e8642937b58',
|
|
'rule_tags': None,
|
|
'sign_name': 'Firewall Rule',
|
|
'source_ip': '127.0.0.1',
|
|
'@timestamp': '2022-03-23T10:06:06.697862758Z',
|
|
'event_hash': '2c34bfca72f29c21e0f136b2c53773979908a3978779978cc5c65d9ecfec28fa',
|
|
'event_last': '2022-03-23T10:06:06.703452398Z',
|
|
'source_mac': '',
|
|
'celery_done': False,
|
|
'event_count': 1,
|
|
'event_first': '2022-03-23T10:06:06.703452398Z',
|
|
'source_host': '',
|
|
'source_port': 46084,
|
|
'source_user': '',
|
|
'aggregated_id': '1036597098_2c34bfca72f29c21e0f136b2c53773979908a3978779978cc5c65d9ecfec28fa',
|
|
'device_action': 'разрешение (pass)',
|
|
'device_vendor': 'InfoWatch ARMA',
|
|
'event_src_msg': '<14>CEF:0|InfoWatch ARMA|ARMAIF|3.5|pfalert|PF rule alert|8|cs1=63 cs2=deviceInboundInterface=lo0 act=разрешение (pass) src=127.0.0.1 deviceDirection=in proto=icmp dst=127.0.0.1 spt=46084 dpt=53 rt=1648040766697 log_from=filterlog cid=None\n',
|
|
'sign_category': 'PF',
|
|
'destination_ip': '127.0.0.1',
|
|
'device_product': 'Industrial Firerwall',
|
|
'device_version': '3.5',
|
|
'event_protocol': 'icmp',
|
|
'event_severity': 8,
|
|
'event_timestamp': '2022-03-23T10:06:06.703452398Z',
|
|
'destination_host': '',
|
|
'destination_port': 53,
|
|
'destination_user': '',
|
|
'sign_subcategory': ''}])
|
|
|
|
self.notification = Notification.objects.create(
|
|
incident=self.incident,
|
|
affected_system_name='affected_system_name_test',
|
|
event_description='event_description_test',
|
|
type=EventTypeEnum.DDOS.value,
|
|
)
|
|
|
|
@pytest.mark.unit
|
|
def test_get_headers(self):
|
|
"""Test bulding headers for sending"""
|
|
notification_sender = NotificationSenderCreateService(self.notification.pk)
|
|
headers = notification_sender.get_headers()
|
|
|
|
assert isinstance(headers, dict)
|
|
assert 'x-token' in headers
|
|
assert headers.get('x-token') == self.company.api_key
|
|
assert 'Content-Type' in headers
|
|
assert headers.get('Content-Type') == 'application/json'
|
|
|
|
@pytest.mark.unit
|
|
def test_building_payload_for_incident(self):
|
|
"""Test building data for category NotificationCategoryEnum.INCIDENT"""
|
|
required_fields = {
|
|
'city', 'location', 'affectedsystemfunction',
|
|
'category', 'type', 'tlp', 'detecttime', 'eventdescription', 'activitystatus',
|
|
'affectedsystemname', 'affectedsystemcategory', 'affectedsystemconnection', 'assistance',
|
|
'integrityimpact', 'availabilityimpact', 'confidentialityimpact', 'customimpact', 'endtime',
|
|
'detectiontool',
|
|
}
|
|
self.notification.category = NotificationCategoryEnum.INCIDENT.value
|
|
self.notification.integrity_impact = ImpactEffect.NO.value
|
|
self.notification.availability_impact = ImpactEffect.LOW.value
|
|
self.notification.confidentiality_impact = ImpactEffect.HIGH.value
|
|
self.notification.custom_impact = 'custom_incident_impact_test'
|
|
self.notification.save()
|
|
|
|
notification_sender = NotificationSenderCreateService(self.notification.pk)
|
|
payload = notification_sender.get_payload()
|
|
|
|
assert isinstance(payload, dict)
|
|
for field in required_fields:
|
|
assert field in payload
|
|
|
|
assert payload['category'] == NotificationCategoryEnum.INCIDENT.value
|
|
assert payload['type'] == 'DDoS-атака'
|
|
assert payload['activitystatus'] == ActivityStatusEnum.TAKEN_ACTION.value
|
|
assert payload['tlp'] == TlpEnum.GREEN.value
|
|
assert payload['affectedsystemname'] == 'affected_system_name_test'
|
|
assert payload['affectedsystemcategory'] == AffectedSystemCategoryEnum.RESOURCE_NOT_CII.value
|
|
assert payload['eventdescription'] == 'event_description_test'
|
|
assert not payload['affectedsystemconnection']
|
|
assert not payload['assistance']
|
|
assert payload['detecttime'] == self.incident.events[0].get('event_first', '').split('.')[0]
|
|
assert payload['endtime'] == self.incident.events[-1].get('event_first', '').split('.')[0]
|
|
|
|
assert payload['city'] == self.company.city
|
|
assert payload['location'] == self.location.code
|
|
assert payload['affectedsystemfunction'] == AffectedSystemFunction.NUCLEAR_POWER.value
|
|
|
|
assert payload['integrityimpact'] == ImpactEffect.NO.value
|
|
assert payload['availabilityimpact'] == ImpactEffect.LOW.value
|
|
assert payload['confidentialityimpact'] == ImpactEffect.HIGH.value
|
|
assert payload['customimpact'] == 'custom_incident_impact_test'
|
|
assert payload['detectiontool'] == 'InfoWatch ARMA'
|
|
|
|
@pytest.mark.unit
|
|
def test_building_payload_attack(self):
|
|
"""Test building data for category NotificationCategoryEnum.ATTACK"""
|
|
required_fields = {
|
|
'city', 'location', 'affectedsystemfunction',
|
|
'category', 'type', 'tlp', 'detecttime', 'eventdescription', 'activitystatus',
|
|
'affectedsystemname', 'affectedsystemcategory', 'affectedsystemconnection', 'assistance',
|
|
'integrityimpact', 'availabilityimpact', 'confidentialityimpact', 'customimpact',
|
|
}
|
|
self.notification.category = NotificationCategoryEnum.ATTACK.value
|
|
self.notification.integrity_impact = ImpactEffect.NO.value
|
|
self.notification.availability_impact = ImpactEffect.LOW.value
|
|
self.notification.confidentiality_impact = ImpactEffect.HIGH.value
|
|
self.notification.custom_impact = 'custom_attack_impact_test'
|
|
self.notification.save()
|
|
|
|
notification_sender = NotificationSenderCreateService(self.notification.pk)
|
|
payload = notification_sender.get_payload()
|
|
|
|
for field in required_fields:
|
|
assert field in payload
|
|
|
|
@pytest.mark.unit
|
|
def test_building_payload_vulnerability(self):
|
|
"""Test building data for category NotificationCategoryEnum.VULNERABILITY"""
|
|
self.notification.type = EventTypeEnum.VULNERABLE_RESOURCE
|
|
self.notification.category = NotificationCategoryEnum.VULNERABILITY.value
|
|
self.notification.vulnerability_id = 'CVE-2020-1231'
|
|
self.notification.product_category = 'Microsoft'
|
|
self.notification.save()
|
|
|
|
required_fields = {
|
|
'city', 'location', 'affectedsystemfunction',
|
|
'category', 'type', 'tlp', 'detecttime', 'eventdescription', 'activitystatus',
|
|
'affectedsystemname', 'affectedsystemcategory', 'affectedsystemconnection', 'assistance',
|
|
'vulnerabilityid', 'productcategory',
|
|
}
|
|
|
|
notification_sender = NotificationSenderCreateService(self.notification.pk)
|
|
payload = notification_sender.get_payload()
|
|
|
|
for field in required_fields:
|
|
assert field in payload
|
|
|
|
assert payload['vulnerabilityid'] == 'CVE-2020-1231'
|
|
assert payload['productcategory'] == 'Microsoft'
|
|
|
|
@pytest.mark.unit
|
|
@patch('requests.post', side_effect=mocked_notification_requests_post_status_201)
|
|
def test_notification_sending_status_201(self, *args):
|
|
self.notification.custom_impact = 'custom_incident_impact_test'
|
|
self.notification.save()
|
|
|
|
notification_sender = NotificationSenderCreateService(self.notification.pk)
|
|
msg, status = notification_sender.send()
|
|
assert status == 201
|
|
assert msg == 'Success'
|
|
|
|
notification = Notification.objects.get(pk=self.notification.pk)
|
|
assert notification.uuid
|
|
assert notification.update_time
|
|
assert notification.identifier == '21-07-573'
|
|
assert notification.sending_time
|
|
|
|
@pytest.mark.unit
|
|
@patch('requests.post', side_effect=mocked_notification_requests_post_status_400)
|
|
def test_notification_status_400(self, *args):
|
|
notification_sender = NotificationSenderCreateService(self.notification.pk)
|
|
msg, status = notification_sender.send()
|
|
assert status == 400
|
|
assert msg == 'Передан неверный токен'
|
|
|
|
notification = Notification.objects.get(pk=self.notification.pk)
|
|
assert not notification.uuid
|
|
assert not notification.update_time
|
|
assert not notification.identifier
|
|
assert not notification.sending_time
|
|
|
|
|
|
@pytest.mark.django_db
|
|
class TestNotificationSendUpdaterServices:
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def setup_test(self):
|
|
self.location, _ = LocationCode.objects.get_or_create(code='RU-MOS')
|
|
self.company = Company.objects.create(name='Test_name', api_key='test_api_key',
|
|
city='Moscow', location=self.location)
|
|
self.incident = Incident.objects.create(title='test_inc_1', importance=10, event_count=10, events='')
|
|
self.notification = Notification.objects.create(
|
|
incident=self.incident,
|
|
uuid='87e37777-0a7c-4879-888b-86f1d9cdb175',
|
|
identifier='21-07-573',
|
|
update_time=datetime.now(),
|
|
affected_system_name='affected_system_name_test',
|
|
event_description='event_description_test',
|
|
type=EventTypeEnum.DDOS.value,
|
|
notification_status=NotificationStatusEnum.ADDITION_REQUIRED.value,
|
|
)
|
|
|
|
@pytest.mark.unit
|
|
def test_payload_for_incident(self):
|
|
required_fields = {
|
|
'uuid', 'status', 'eventdescription', 'integrityimpact', 'availabilityimpact',
|
|
'confidentialityimpact', 'customimpact', 'city', 'activitystatus'
|
|
}
|
|
notification_updated = NotificationUpdateSenderService(self.notification.pk)
|
|
data = notification_updated.get_payload()
|
|
for field in required_fields:
|
|
assert field in data
|
|
|
|
@pytest.mark.unit
|
|
def test_payload_for_attack(self):
|
|
required_fields = {'uuid', 'status', 'eventdescription', 'city', 'activitystatus'}
|
|
|
|
self.notification.category = NotificationCategoryEnum.ATTACK.value
|
|
data = NotificationUpdateSenderService(self.notification.pk).get_payload()
|
|
for field in required_fields:
|
|
assert field in data
|
|
|
|
@pytest.mark.unit
|
|
def test_payload_for_vulnerability(self):
|
|
required_fields = {'uuid', 'status', 'eventdescription', 'city', 'activitystatus'}
|
|
self.notification.category = NotificationCategoryEnum.VULNERABILITY.value
|
|
data = NotificationUpdateSenderService(self.notification.pk).get_payload()
|
|
for field in required_fields:
|
|
assert field in data
|
|
|
|
@pytest.mark.unit
|
|
def test_raise_error_in_init_(self):
|
|
"""Test to Updating to NCIRRCC. Updating is possible with status `ADDITION_REQUIRED`"""
|
|
self.notification.notification_status = NotificationStatusEnum.ARCHIVED.value
|
|
self.notification.save()
|
|
with pytest.raises(NICRCCUpdateException):
|
|
NotificationUpdateSenderService(self.notification.pk)
|
|
|
|
|
|
@pytest.mark.django_db
|
|
class TestNotificationGetUpdatedDataFromNCIRCC:
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def setup_test(self):
|
|
self.location, _ = LocationCode.objects.get_or_create(code='RU-MOS')
|
|
self.company = Company.objects.create(name='Test_name', api_key='test_api_key',
|
|
city='Moscow', location=self.location)
|
|
self.incident = Incident.objects.create(title='test_inc_1', importance=10, event_count=10, events='')
|
|
self.notification = Notification.objects.create(
|
|
incident=self.incident,
|
|
uuid='87e37777-0a7c-4879-888b-86f1d9cdb175',
|
|
identifier='21-07-573',
|
|
update_time=datetime.now(),
|
|
affected_system_name='affected_system_name_test',
|
|
event_description='event_description_test',
|
|
type=EventTypeEnum.DDOS.value,
|
|
)
|
|
|
|
@pytest.mark.unit
|
|
@patch('requests.get', side_effect=mocked_notification_request_get_status_200)
|
|
def test_getting_data_from_ncircc(self, *args):
|
|
getter = NotificationGetterUpdatedStatusServices(self.notification.pk)
|
|
assert self.notification.notification_status == NotificationStatusEnum.CHECK_NCIRCC.value
|
|
msg, status = getter.send()
|
|
assert status == 200
|
|
assert not self.notification.notification_status == NotificationStatusEnum.ARCHIVED.value
|
|
|
|
@pytest.mark.unit
|
|
def test_get_params_method(self):
|
|
data_filter = json.dumps([{
|
|
'property': 'uuid',
|
|
'operator': '=',
|
|
'value': str(self.notification.uuid)
|
|
}])
|
|
sender = NotificationGetterUpdatedStatusServices(self.notification.pk)
|
|
params = sender.get_params()
|
|
assert isinstance(params, tuple)
|
|
assert params[0][1] == data_filter
|
|
assert 'status' in params[1][1]
|
|
assert 'updated' in params[1][1]
|