import csv import json import logging import os from abc import ABC, abstractmethod from django.conf import settings from django.contrib.auth.models import User from django.utils.translation import gettext_lazy from rest_framework.fields import BuiltinSignatureError from assets.models.assets import Asset from assets.serializers.assets import AssetCsvExportSerializer from correlation.models import Rule from correlation.serializers import RuleExportSerializer from devices.filters import DeviceFilter from devices.models.device import Device from devices.serializers.device import ExportToCSVDeviceSerializer from incident.models import Incident, IncidentRecommendations, IncidentEffect from incident.serializers.incident import IncidentsCsvExportSerializer from storage.models import DataStorage, get_storage_path MEDIA_ROOT = getattr(settings, 'MEDIA_ROOT') _log = logging.getLogger(__name__) class ExportException(Exception): pass class ExportService(ABC): def __init__(self, user_pk: int, model_name: str, query_parameters: dict): self.user_pk = user_pk self.model_name = model_name self.query_parameters = query_parameters self.current_export_model = None def create_store(self, user: User): """ Creating default DataStorage object for saving info :param user: User which started export :return: DataStorage object """ store = DataStorage(type=self.store_type, format=self.store_format, user=user, file='table', size=0) store.save() return store def get_file_path(self, file_name): """ Getting the path where the file will be created :param file_name: File name :return: Path """ file_path = os.path.join(MEDIA_ROOT, file_name) os.makedirs(os.path.dirname(file_path), exist_ok=True) return file_path def get_export_data(self, model): """ Preparing data for export. :param model: Model to be exported :return: dict with data """ serializer, filter = self.model_mapping[model] queryset = self.get_queryset(model, filter) export_data = serializer(queryset, many=True).data return export_data def get_queryset(self, model, model_filter=None): """ Selection of objects that will be included in the export :param model: Model to be exported :param model_filter: Filter using for export :return: queryset """ queryset = model.objects.order_by('pk') if model_filter: queryset = model_filter(self.query_parameters, queryset).qs return queryset @abstractmethod def write_to_file(self, export_data, file_path): """ We just write data to a file :param export_data: Data to write to file :param file_path: File path :return: """ raise NotImplementedError def run_export(self): """ The main method in which the magic happens. We receive data for export, filter, write to a file, save information to the database :return: DataStorage pk """ model_name_mapping = {model._meta.model_name: model for model in self.model_mapping.keys()} self.current_export_model = model_name_mapping[self.model_name] user = User.objects.get(pk=self.user_pk) _log.info(f'[{user}] request to export {self.model_name} as JSON') store = self.create_store(user) file_name = get_storage_path(store, f'export.{self.extension}') file_path = self.get_file_path(file_name) export_data = self.get_export_data(self.current_export_model) try: self.write_to_file(export_data, file_path) store.description = gettext_lazy('Exported {model_name} data').format(model_name=self.model_name) store.file = file_name _log.info(f'{self.store_format.label} export ready in [{store.file.name}]') store.update_crc() except BuiltinSignatureError as err: _log.exception(f'Following error occurred during {self.store_format.label} export: {err}') store.delete() raise ExportException store.size = os.path.getsize(file_path) store.save() return store.pk class ExportToJSON(ExportService): def __init__(self, user_pk: int, model_name: str, query_parameters: dict): super().__init__(user_pk, model_name, query_parameters) self.store_type = DataStorage.Type.JSON_EXPORT self.store_format = DataStorage.Format.JSON self.extension = 'json' self.model_mapping = { Rule: (RuleExportSerializer, None) } def write_to_file(self, export_data, file_path): with open(file_path, 'w') as file: json.dump(export_data, file, ensure_ascii=False, indent=4) def get_export_data(self, model): export_data = super().get_export_data(model) recommendations, effects = self._get_recommendations_and_effects(export_data) new_format_data = { 'meta': { 'version': settings.PRODUCT_VERSION['version'], 'device': 'console' }, 'rules': export_data, 'close_recommendations': recommendations, 'effects': effects } return new_format_data @staticmethod def _get_recommendations_and_effects(export_data): """Getting all recommendations and effects from all rules.""" for rule in export_data: for action in rule['actions_json']: if action['type'] != 'incident': continue rule_recommendations = [] rule_effects = [] if "close_recommendations" in action: for recommendation in action['close_recommendations']: recommendation.pop('id', None) rule_recommendations.append(recommendation['name']) if "effects" in action: for effect in action['effects']: effect.pop('id', None) rule_effects.append(effect['name']) action['close_recommendations'] = rule_recommendations action['effects'] = rule_effects unique_recommendations = list(IncidentRecommendations.objects.values('name', 'description')) unique_effects = list(IncidentEffect.objects.values('name', 'description')) return unique_recommendations, unique_effects class ExportToCSV(ExportService): def __init__(self, user_pk: int, model_name: str, query_parameters: dict): super().__init__(user_pk, model_name, query_parameters) self.store_type = DataStorage.Type.CSV_EXPORT self.store_format = DataStorage.Format.CSV self.extension = 'csv' self.model_mapping = { Device: (ExportToCSVDeviceSerializer, DeviceFilter), Asset: (AssetCsvExportSerializer, None), Incident: (IncidentsCsvExportSerializer, None), } def write_to_file(self, export_data, file_path): with open(file_path, 'w') as file: headers = self.model_mapping[self.current_export_model][0].Meta.fields writer = csv.DictWriter(file, fieldnames=headers) writer.writeheader() writer.writerows(export_data) def export(export_type, user_pk, model_name, query_parameters): export_service_mapping = { 'json': ExportToJSON, 'csv': ExportToCSV } try: export_service = export_service_mapping[export_type] except KeyError: raise ExportException(f'"{export_type}" export type not possible') service = export_service(user_pk, model_name, query_parameters) export_result = service.run_export() return export_result