import json import logging import uuid from abc import ABC, abstractmethod from dataclasses import dataclass from datetime import datetime from io import BytesIO from json import JSONDecodeError from typing import Dict, Union, Tuple from urllib.parse import urljoin import requests from django.utils.translation import gettext_lazy from company.models.company import Company from console.settings.base import NCIRCC_DOMAIN_NAME, NCIRCC_CERT_VERIFY from core.utils import dtnow from ncircc.enums.notifications import NotificationCategoryEnum, NotificationStatusEnum from ncircc.models.notification import Notification _log = logging.getLogger(__name__) class NICRCCUpdateException(Exception): """Update notification error.""" pass class NOCRCCCreateException(Exception): """Create notification error.""" pass @dataclass class NotificationCreateResponseSuccess: """Notification success data""" uuid: uuid.UUID identifier: str create_time: datetime @dataclass class NotificationUpdateResponseSuccess: """Notification success data""" uuid: uuid.UUID identifier: str update_time: datetime class NotificationSenderABC(ABC): path = 'api/v2/incidents' company = None def get_headers(self) -> Dict[str, str]: """Return headers for sending.""" headers = { 'x-token': self.company.api_key, 'Content-Type': 'application/json', } return headers @abstractmethod def get_payload(self) -> Dict[str, Union[str, bool]]: """Return data for sending not implement""" ... @abstractmethod def send(self) -> Tuple[str, int]: ... class NotificationSenderCreateService(NotificationSenderABC): """Prepare, and send information to NCIRCC.""" def __init__(self, notification_pk: int) -> None: self.notification = Notification.objects.filter(pk=notification_pk).first() self.incident = self.notification.incident self.company = Company.objects.first() if self.incident is None: _log.error('Not incident') raise NOCRCCCreateException('Not incident.') if self.company is None: _log.error('No Company.') raise NOCRCCCreateException(gettext_lazy('No Company.')) def get_payload(self) -> Dict[str, Union[str, bool]]: """return data for sending""" payload = { 'category': self.notification.category, # Категория 'type': self.notification.type, # Тип события ИБ 'activitystatus': self.notification.activity_status, # статус реагирования 'tlp': self.notification.tlp, # статус конфиденциальности 'affectedsystemname': self.notification.affected_system_name, # имя контролируемого ресурса 'affectedsystemcategory': self.notification.affected_system_category, # категория ОКИИ 'eventdescription': self.notification.event_description, # Описания события 'affectedsystemconnection': self.notification.affected_system_connection, # подключение к сети 'assistance': self.notification.assistance, # силы госсопки 'city': self.company.city, 'location': self.company.location.code, 'affectedsystemfunction': self.company.affected_system_function, 'ownername': self.company.name, 'detectiontool': 'InfoWatch ARMA', 'detecttime': self.incident.events[0].get('event_first', '').split('.')[0] if self.incident.events else None, # data from first event 'endtime': self.incident.events[-1].get('event_first', '').split('.')[0] if self.incident.events else None, # data from last event } if self.notification.category in {NotificationCategoryEnum.INCIDENT.value, NotificationCategoryEnum.ATTACK.value}: payload['integrityimpact'] = self.notification.integrity_impact payload['availabilityimpact'] = self.notification.availability_impact payload['confidentialityimpact'] = self.notification.confidentiality_impact payload['customimpact'] = self.notification.custom_impact else: payload['vulnerabilityid'] = self.notification.vulnerability_id payload['productcategory'] = self.notification.product_category return payload def _save(self, data: NotificationCreateResponseSuccess) -> None: """Save notification data from NCIRCC""" self.notification.uuid = data.uuid self.notification.update_time = data.create_time self.notification.identifier = data.identifier self.notification.sending_time = dtnow() self.notification.save() def send(self) -> Tuple[str, int]: """Sending data ti NCIRCC and save it.""" msg, status = 'Success', 201 url = urljoin(NCIRCC_DOMAIN_NAME, self.path) data = self.get_payload() headers = self.get_headers() try: response = requests.post(url, json=data, headers=headers, verify=NCIRCC_CERT_VERIFY) except requests.exceptions.RequestException as err: _log.error(f'Error: {err}') return str(err), 400 try: data = response.json() except (JSONDecodeError, AttributeError) as err: _log.error(f'Error: {err}') return gettext_lazy(f'Error: {err}'), 400 if response.status_code == 201: _log.debug('success') try: _data = data.get('data', [])[0] except IndexError: _log.error(f'Error getting data {data}') return 'error', 400 data_for_save = NotificationCreateResponseSuccess(**_data) self._save(data_for_save) else: _log.warning('error') status = 400 msg = data.get('error', 'Error') return msg, status class NotificationUpdateSenderService(NotificationSenderABC): """Updating data to NCIRCCC""" def __init__(self, notification_pk: int) -> None: self.notification = Notification.objects.filter(pk=notification_pk).first() self.incident = self.notification.incident self.company = Company.objects.first() if self.notification.notification_status != NotificationStatusEnum.ADDITION_REQUIRED.value: _log.error( f'Update is possible only with status `Addition required` currently status: [{self.notification.notification_status}]') raise NICRCCUpdateException(gettext_lazy('Update is possible only with status `Addition required`')) def get_payload(self) -> Dict[str, Union[str, bool]]: payload = { 'uuid': str(self.notification.uuid), 'company': self.company.name, 'status': NotificationStatusEnum.CHECK_NCIRCC.value, 'eventdescription': self.notification.event_description, 'city': self.company.city, 'activitystatus': self.notification.activity_status, } if self.notification.category == NotificationCategoryEnum.INCIDENT.value: payload['integrityimpact'] = self.notification.integrity_impact payload['availabilityimpact'] = self.notification.availability_impact payload['confidentialityimpact'] = self.notification.confidentiality_impact payload['customimpact'] = self.notification.custom_impact return payload def _save(self, data: NotificationUpdateResponseSuccess): """Save updating time""" self.notification.update_time = data.update_time self.notification.sending_time = dtnow() self.notification.save() def send(self) -> Tuple[str, int]: msg, status = 'Success', 200 url = urljoin(NCIRCC_DOMAIN_NAME, self.path) headers = self.get_headers() payload = self.get_payload() try: response = requests.post(url, json=payload, headers=headers, verify=NCIRCC_CERT_VERIFY) except requests.exceptions.RequestException as err: _log.error(f'Error: {err}') return str(err), 400 try: response_data = response.json() except (JSONDecodeError, AttributeError) as err: _log.error(f'Error: {err}') return f'Error: {err}', 400 if response.status_code == 200: try: _data = response_data.get('data', [])[0] except IndexError: _log.error('Error') return 'Error', 400 self._save(NotificationUpdateResponseSuccess(**_data)) else: msg = response_data.get('value', {}).get('message', 'Error') _log.error(f'Error: {msg}') status = 400 return msg, status class NotificationGetterUpdatedStatusServices(NotificationSenderABC): """Service for getting updated status from NCIFCC""" def get_payload(self) -> Dict[str, Union[str, bool]]: return {} def __init__(self, notification_pk: int) -> None: self.company = Company.objects.first() self.notification = Notification.objects.get(pk=notification_pk) def get_params(self) -> tuple: """Return params for filter by uid and returned field status.""" data_filter = [{ 'property': 'uuid', 'operator': '=', 'value': str(self.notification.uuid), }] params = ( ('filter', json.dumps(data_filter)), ('fields', ['status', 'updated']), ) return params def _update(self, status: str, updated: str) -> None: _log.debug(f'start updating: {status}, {updated}') status_set = { NotificationStatusEnum.CHECK_NCIRCC.value, NotificationStatusEnum.CREATED.value, NotificationStatusEnum.REGISTERED.value, NotificationStatusEnum.ADDITION_REQUIRED.value, NotificationStatusEnum.DECISION.value, NotificationStatusEnum.ARCHIVED.value } if self.notification.notification_status != status and status in status_set: self.notification.notification_status = status if updated is not None: self.notification.update_time = updated self.notification.save() def send(self) -> Tuple[str, int]: """Send and updating status.""" url = urljoin(NCIRCC_DOMAIN_NAME, self.path) msg, status = 'Success', 200 params = self.get_params() headers = self.get_headers() try: response = requests.get(url, headers=headers, params=params, verify=NCIRCC_CERT_VERIFY) except requests.exceptions.RequestException as err: _log.error(f'Error: {err}') return str(err), 400 try: data = response.json() except (JSONDecodeError, AttributeError) as err: _log.error(f'Error: {err}') return f'Error: {err}', 400 if response.status_code == 200: try: status_name = data.get('data', {}).get('result')[0].get('status', {}).get('name') updated = data.get('data', {}).get('result')[0].get('updated') _log.debug(f'updated: {updated};\n status_name={status_name}') except (IndexError, KeyError) as err: _log.error(err) return 'Error', 400 self._update(status_name, updated) return msg, status else: msg = data.get('value', {}).get('message', 'Error') _log.error(f'Error: {msg}') return msg, 400 class NotificationUploadFileService(NotificationSenderABC): """Upload file for incident in to NCIRCC.""" def __init__(self, notification_pk: int, file: BytesIO, file_name: str) -> None: self.file_data = file self.file_name = file_name self.company = Company.objects.first() self.notification = Notification.objects.get(pk=notification_pk) def get_headers(self) -> Dict[str, str]: headers = super().get_headers() headers['Content-Type'] = 'multipart/form-data' return headers def get_payload(self) -> dict: data = { 'uuid': (None, str(self.notification.uuid)), 'files': (self.file_name, self.file_data.getvalue()) } return data def send(self) -> Tuple[str, int]: url = urljoin(NCIRCC_DOMAIN_NAME, self.path) files = self.get_payload() headers = self.get_headers() try: response = requests.post(url, headers=headers, files=files, verify=NCIRCC_CERT_VERIFY) except requests.exceptions.RequestException as err: _log.error(f'Error: {err}') return 'Error', 400 if response.status_code == 200: # todo Сохранять данные return 'Success', 200 else: _log.error(f'Error') return 'Error', 400