339 lines
13 KiB
Python
339 lines
13 KiB
Python
import json
|
||
import logging
|
||
import uuid
|
||
from abc import ABC, abstractmethod
|
||
from dataclasses import dataclass
|
||
from datetime import datetime
|
||
from io import BytesIO
|
||
from json import JSONDecodeError
|
||
from typing import Dict, Union, Tuple
|
||
from urllib.parse import urljoin
|
||
|
||
import requests
|
||
from django.utils.translation import gettext_lazy
|
||
|
||
from company.models.company import Company
|
||
from console.settings.base import NCIRCC_DOMAIN_NAME, NCIRCC_CERT_VERIFY
|
||
from core.utils import dtnow
|
||
from ncircc.enums.notifications import NotificationCategoryEnum, NotificationStatusEnum
|
||
from ncircc.models.notification import Notification
|
||
|
||
_log = logging.getLogger(__name__)
|
||
|
||
|
||
class NICRCCUpdateException(Exception):
|
||
"""Update notification error."""
|
||
pass
|
||
|
||
|
||
class NOCRCCCreateException(Exception):
|
||
"""Create notification error."""
|
||
pass
|
||
|
||
|
||
@dataclass
|
||
class NotificationCreateResponseSuccess:
|
||
"""Notification success data"""
|
||
|
||
uuid: uuid.UUID
|
||
identifier: str
|
||
create_time: datetime
|
||
|
||
|
||
@dataclass
|
||
class NotificationUpdateResponseSuccess:
|
||
"""Notification success data"""
|
||
|
||
uuid: uuid.UUID
|
||
identifier: str
|
||
update_time: datetime
|
||
|
||
|
||
class NotificationSenderABC(ABC):
|
||
path = 'api/v2/incidents'
|
||
company = None
|
||
|
||
def get_headers(self) -> Dict[str, str]:
|
||
"""Return headers for sending."""
|
||
headers = {
|
||
'x-token': self.company.api_key,
|
||
'Content-Type': 'application/json',
|
||
}
|
||
return headers
|
||
|
||
@abstractmethod
|
||
def get_payload(self) -> Dict[str, Union[str, bool]]:
|
||
"""Return data for sending not implement"""
|
||
...
|
||
|
||
@abstractmethod
|
||
def send(self) -> Tuple[str, int]:
|
||
...
|
||
|
||
|
||
class NotificationSenderCreateService(NotificationSenderABC):
|
||
"""Prepare, and send information to NCIRCC."""
|
||
|
||
def __init__(self, notification_pk: int) -> None:
|
||
self.notification = Notification.objects.filter(pk=notification_pk).first()
|
||
self.incident = self.notification.incident
|
||
self.company = Company.objects.first()
|
||
if self.incident is None:
|
||
_log.error('Not incident')
|
||
raise NOCRCCCreateException('Not incident.')
|
||
if self.company is None:
|
||
_log.error('No Company.')
|
||
raise NOCRCCCreateException(gettext_lazy('No Company.'))
|
||
|
||
def get_payload(self) -> Dict[str, Union[str, bool]]:
|
||
"""return data for sending"""
|
||
payload = {
|
||
'category': self.notification.category, # Категория
|
||
'type': self.notification.type, # Тип события ИБ
|
||
'activitystatus': self.notification.activity_status, # статус реагирования
|
||
'tlp': self.notification.tlp, # статус конфиденциальности
|
||
'affectedsystemname': self.notification.affected_system_name, # имя контролируемого ресурса
|
||
'affectedsystemcategory': self.notification.affected_system_category, # категория ОКИИ
|
||
'eventdescription': self.notification.event_description, # Описания события
|
||
'affectedsystemconnection': self.notification.affected_system_connection, # подключение к сети
|
||
'assistance': self.notification.assistance, # силы госсопки
|
||
|
||
'city': self.company.city,
|
||
'location': self.company.location.code,
|
||
'affectedsystemfunction': self.company.affected_system_function,
|
||
'ownername': self.company.name,
|
||
|
||
'detectiontool': 'InfoWatch ARMA',
|
||
'detecttime': self.incident.events[0].get('event_first', '').split('.')[0] if self.incident.events else None, # data from first event
|
||
'endtime': self.incident.events[-1].get('event_first', '').split('.')[0] if self.incident.events else None, # data from last event
|
||
}
|
||
|
||
if self.notification.category in {NotificationCategoryEnum.INCIDENT.value,
|
||
NotificationCategoryEnum.ATTACK.value}:
|
||
payload['integrityimpact'] = self.notification.integrity_impact
|
||
payload['availabilityimpact'] = self.notification.availability_impact
|
||
payload['confidentialityimpact'] = self.notification.confidentiality_impact
|
||
payload['customimpact'] = self.notification.custom_impact
|
||
else:
|
||
payload['vulnerabilityid'] = self.notification.vulnerability_id
|
||
payload['productcategory'] = self.notification.product_category
|
||
|
||
return payload
|
||
|
||
def _save(self, data: NotificationCreateResponseSuccess) -> None:
|
||
"""Save notification data from NCIRCC"""
|
||
self.notification.uuid = data.uuid
|
||
self.notification.update_time = data.create_time
|
||
self.notification.identifier = data.identifier
|
||
self.notification.sending_time = dtnow()
|
||
self.notification.save()
|
||
|
||
def send(self) -> Tuple[str, int]:
|
||
"""Sending data ti NCIRCC and save it."""
|
||
|
||
msg, status = 'Success', 201
|
||
url = urljoin(NCIRCC_DOMAIN_NAME, self.path)
|
||
data = self.get_payload()
|
||
headers = self.get_headers()
|
||
try:
|
||
response = requests.post(url, json=data, headers=headers, verify=NCIRCC_CERT_VERIFY)
|
||
except requests.exceptions.RequestException as err:
|
||
_log.error(f'Error: {err}')
|
||
return str(err), 400
|
||
try:
|
||
data = response.json()
|
||
except (JSONDecodeError, AttributeError) as err:
|
||
_log.error(f'Error: {err}')
|
||
return gettext_lazy(f'Error: {err}'), 400
|
||
if response.status_code == 201:
|
||
_log.debug('success')
|
||
try:
|
||
_data = data.get('data', [])[0]
|
||
except IndexError:
|
||
_log.error(f'Error getting data {data}')
|
||
return 'error', 400
|
||
data_for_save = NotificationCreateResponseSuccess(**_data)
|
||
self._save(data_for_save)
|
||
|
||
else:
|
||
_log.warning('error')
|
||
status = 400
|
||
msg = data.get('error', 'Error')
|
||
return msg, status
|
||
|
||
|
||
class NotificationUpdateSenderService(NotificationSenderABC):
|
||
"""Updating data to NCIRCCC"""
|
||
|
||
def __init__(self, notification_pk: int) -> None:
|
||
self.notification = Notification.objects.filter(pk=notification_pk).first()
|
||
self.incident = self.notification.incident
|
||
self.company = Company.objects.first()
|
||
|
||
if self.notification.notification_status != NotificationStatusEnum.ADDITION_REQUIRED.value:
|
||
_log.error(
|
||
f'Update is possible only with status `Addition required` currently status: [{self.notification.notification_status}]')
|
||
raise NICRCCUpdateException(gettext_lazy('Update is possible only with status `Addition required`'))
|
||
|
||
def get_payload(self) -> Dict[str, Union[str, bool]]:
|
||
payload = {
|
||
'uuid': str(self.notification.uuid),
|
||
'company': self.company.name,
|
||
'status': NotificationStatusEnum.CHECK_NCIRCC.value,
|
||
'eventdescription': self.notification.event_description,
|
||
'city': self.company.city,
|
||
'activitystatus': self.notification.activity_status,
|
||
}
|
||
if self.notification.category == NotificationCategoryEnum.INCIDENT.value:
|
||
payload['integrityimpact'] = self.notification.integrity_impact
|
||
payload['availabilityimpact'] = self.notification.availability_impact
|
||
payload['confidentialityimpact'] = self.notification.confidentiality_impact
|
||
payload['customimpact'] = self.notification.custom_impact
|
||
|
||
return payload
|
||
|
||
def _save(self, data: NotificationUpdateResponseSuccess):
|
||
"""Save updating time"""
|
||
self.notification.update_time = data.update_time
|
||
self.notification.sending_time = dtnow()
|
||
self.notification.save()
|
||
|
||
def send(self) -> Tuple[str, int]:
|
||
msg, status = 'Success', 200
|
||
url = urljoin(NCIRCC_DOMAIN_NAME, self.path)
|
||
headers = self.get_headers()
|
||
payload = self.get_payload()
|
||
try:
|
||
response = requests.post(url, json=payload, headers=headers, verify=NCIRCC_CERT_VERIFY)
|
||
except requests.exceptions.RequestException as err:
|
||
_log.error(f'Error: {err}')
|
||
return str(err), 400
|
||
try:
|
||
response_data = response.json()
|
||
except (JSONDecodeError, AttributeError) as err:
|
||
_log.error(f'Error: {err}')
|
||
return f'Error: {err}', 400
|
||
if response.status_code == 200:
|
||
try:
|
||
_data = response_data.get('data', [])[0]
|
||
except IndexError:
|
||
_log.error('Error')
|
||
return 'Error', 400
|
||
|
||
self._save(NotificationUpdateResponseSuccess(**_data))
|
||
|
||
else:
|
||
msg = response_data.get('value', {}).get('message', 'Error')
|
||
_log.error(f'Error: {msg}')
|
||
status = 400
|
||
return msg, status
|
||
|
||
|
||
class NotificationGetterUpdatedStatusServices(NotificationSenderABC):
|
||
"""Service for getting updated status from NCIFCC"""
|
||
|
||
def get_payload(self) -> Dict[str, Union[str, bool]]:
|
||
return {}
|
||
|
||
def __init__(self, notification_pk: int) -> None:
|
||
self.company = Company.objects.first()
|
||
self.notification = Notification.objects.get(pk=notification_pk)
|
||
|
||
def get_params(self) -> tuple:
|
||
"""Return params for filter by uid and returned field status."""
|
||
data_filter = [{
|
||
'property': 'uuid',
|
||
'operator': '=',
|
||
'value': str(self.notification.uuid),
|
||
}]
|
||
params = (
|
||
('filter', json.dumps(data_filter)),
|
||
('fields', ['status', 'updated']),
|
||
)
|
||
return params
|
||
|
||
def _update(self, status: str, updated: str) -> None:
|
||
_log.debug(f'start updating: {status}, {updated}')
|
||
status_set = {
|
||
NotificationStatusEnum.CHECK_NCIRCC.value, NotificationStatusEnum.CREATED.value,
|
||
NotificationStatusEnum.REGISTERED.value, NotificationStatusEnum.ADDITION_REQUIRED.value,
|
||
NotificationStatusEnum.DECISION.value, NotificationStatusEnum.ARCHIVED.value
|
||
}
|
||
if self.notification.notification_status != status and status in status_set:
|
||
self.notification.notification_status = status
|
||
if updated is not None:
|
||
self.notification.update_time = updated
|
||
self.notification.save()
|
||
|
||
def send(self) -> Tuple[str, int]:
|
||
"""Send and updating status."""
|
||
url = urljoin(NCIRCC_DOMAIN_NAME, self.path)
|
||
msg, status = 'Success', 200
|
||
params = self.get_params()
|
||
headers = self.get_headers()
|
||
|
||
try:
|
||
response = requests.get(url, headers=headers, params=params, verify=NCIRCC_CERT_VERIFY)
|
||
except requests.exceptions.RequestException as err:
|
||
_log.error(f'Error: {err}')
|
||
return str(err), 400
|
||
|
||
try:
|
||
data = response.json()
|
||
except (JSONDecodeError, AttributeError) as err:
|
||
_log.error(f'Error: {err}')
|
||
return f'Error: {err}', 400
|
||
if response.status_code == 200:
|
||
try:
|
||
status_name = data.get('data', {}).get('result')[0].get('status', {}).get('name')
|
||
updated = data.get('data', {}).get('result')[0].get('updated')
|
||
_log.debug(f'updated: {updated};\n status_name={status_name}')
|
||
except (IndexError, KeyError) as err:
|
||
_log.error(err)
|
||
return 'Error', 400
|
||
self._update(status_name, updated)
|
||
return msg, status
|
||
|
||
else:
|
||
msg = data.get('value', {}).get('message', 'Error')
|
||
_log.error(f'Error: {msg}')
|
||
return msg, 400
|
||
|
||
|
||
class NotificationUploadFileService(NotificationSenderABC):
|
||
"""Upload file for incident in to NCIRCC."""
|
||
|
||
def __init__(self, notification_pk: int, file: BytesIO, file_name: str) -> None:
|
||
self.file_data = file
|
||
self.file_name = file_name
|
||
self.company = Company.objects.first()
|
||
self.notification = Notification.objects.get(pk=notification_pk)
|
||
|
||
def get_headers(self) -> Dict[str, str]:
|
||
headers = super().get_headers()
|
||
headers['Content-Type'] = 'multipart/form-data'
|
||
return headers
|
||
|
||
def get_payload(self) -> dict:
|
||
data = {
|
||
'uuid': (None, str(self.notification.uuid)),
|
||
'files': (self.file_name, self.file_data.getvalue())
|
||
}
|
||
return data
|
||
|
||
def send(self) -> Tuple[str, int]:
|
||
url = urljoin(NCIRCC_DOMAIN_NAME, self.path)
|
||
files = self.get_payload()
|
||
headers = self.get_headers()
|
||
try:
|
||
response = requests.post(url, headers=headers, files=files, verify=NCIRCC_CERT_VERIFY)
|
||
except requests.exceptions.RequestException as err:
|
||
_log.error(f'Error: {err}')
|
||
return 'Error', 400
|
||
if response.status_code == 200:
|
||
# todo Сохранять данные
|
||
return 'Success', 200
|
||
else:
|
||
_log.error(f'Error')
|
||
return 'Error', 400
|
||
|