old_console/storage/export.py
2024-11-02 14:12:45 +03:00

226 lines
7.8 KiB
Python

import csv
import json
import logging
import os
from abc import ABC, abstractmethod
from django.conf import settings
from django.contrib.auth.models import User
from django.utils.translation import gettext_lazy
from rest_framework.fields import BuiltinSignatureError
from assets.models.assets import Asset
from assets.serializers.assets import AssetCsvExportSerializer
from correlation.models import Rule
from correlation.serializers import RuleExportSerializer
from devices.filters import DeviceFilter
from devices.models.device import Device
from devices.serializers.device import ExportToCSVDeviceSerializer
from incident.models import Incident, IncidentRecommendations, IncidentEffect
from incident.serializers.incident import IncidentsCsvExportSerializer
from storage.models import DataStorage, get_storage_path
MEDIA_ROOT = getattr(settings, 'MEDIA_ROOT')
_log = logging.getLogger(__name__)
class ExportException(Exception):
pass
class ExportService(ABC):
def __init__(self, user_pk: int, model_name: str, query_parameters: dict):
self.user_pk = user_pk
self.model_name = model_name
self.query_parameters = query_parameters
self.current_export_model = None
def create_store(self, user: User):
"""
Creating default DataStorage object for saving info
:param user: User which started export
:return: DataStorage object
"""
store = DataStorage(type=self.store_type, format=self.store_format,
user=user, file='table', size=0)
store.save()
return store
def get_file_path(self, file_name):
"""
Getting the path where the file will be created
:param file_name: File name
:return: Path
"""
file_path = os.path.join(MEDIA_ROOT, file_name)
os.makedirs(os.path.dirname(file_path), exist_ok=True)
return file_path
def get_export_data(self, model):
"""
Preparing data for export.
:param model: Model to be exported
:return: dict with data
"""
serializer, filter = self.model_mapping[model]
queryset = self.get_queryset(model, filter)
export_data = serializer(queryset, many=True).data
return export_data
def get_queryset(self, model, model_filter=None):
"""
Selection of objects that will be included in the export
:param model: Model to be exported
:param model_filter: Filter using for export
:return: queryset
"""
queryset = model.objects.order_by('pk')
if model_filter:
queryset = model_filter(self.query_parameters, queryset).qs
return queryset
@abstractmethod
def write_to_file(self, export_data, file_path):
"""
We just write data to a file
:param export_data: Data to write to file
:param file_path: File path
:return:
"""
raise NotImplementedError
def run_export(self):
"""
The main method in which the magic happens.
We receive data for export, filter, write to a file, save information to the database
:return: DataStorage pk
"""
model_name_mapping = {model._meta.model_name: model for model in self.model_mapping.keys()}
self.current_export_model = model_name_mapping[self.model_name]
user = User.objects.get(pk=self.user_pk)
_log.info(f'[{user}] request to export {self.model_name} as JSON')
store = self.create_store(user)
file_name = get_storage_path(store, f'export.{self.extension}')
file_path = self.get_file_path(file_name)
export_data = self.get_export_data(self.current_export_model)
try:
self.write_to_file(export_data, file_path)
store.description = gettext_lazy('Exported {model_name} data').format(model_name=self.model_name)
store.file = file_name
_log.info(f'{self.store_format.label} export ready in [{store.file.name}]')
store.update_crc()
except BuiltinSignatureError as err:
_log.exception(f'Following error occurred during {self.store_format.label} export: {err}')
store.delete()
raise ExportException
store.size = os.path.getsize(file_path)
store.save()
return store.pk
class ExportToJSON(ExportService):
def __init__(self, user_pk: int, model_name: str, query_parameters: dict):
super().__init__(user_pk, model_name, query_parameters)
self.store_type = DataStorage.Type.JSON_EXPORT
self.store_format = DataStorage.Format.JSON
self.extension = 'json'
self.model_mapping = {
Rule: (RuleExportSerializer, None)
}
def write_to_file(self, export_data, file_path):
with open(file_path, 'w') as file:
json.dump(export_data, file, ensure_ascii=False, indent=4)
def get_export_data(self, model):
export_data = super().get_export_data(model)
recommendations, effects = self._get_recommendations_and_effects(export_data)
new_format_data = {
'meta': {
'version': settings.PRODUCT_VERSION['version'],
'device': 'console'
},
'rules': export_data,
'close_recommendations': recommendations,
'effects': effects
}
return new_format_data
@staticmethod
def _get_recommendations_and_effects(export_data):
"""Getting all recommendations and effects from all rules."""
for rule in export_data:
for action in rule['actions_json']:
if action['type'] != 'incident':
continue
rule_recommendations = []
rule_effects = []
if "close_recommendations" in action:
for recommendation in action['close_recommendations']:
recommendation.pop('id', None)
rule_recommendations.append(recommendation['name'])
if "effects" in action:
for effect in action['effects']:
effect.pop('id', None)
rule_effects.append(effect['name'])
action['close_recommendations'] = rule_recommendations
action['effects'] = rule_effects
unique_recommendations = list(IncidentRecommendations.objects.values('name', 'description'))
unique_effects = list(IncidentEffect.objects.values('name', 'description'))
return unique_recommendations, unique_effects
class ExportToCSV(ExportService):
def __init__(self, user_pk: int, model_name: str, query_parameters: dict):
super().__init__(user_pk, model_name, query_parameters)
self.store_type = DataStorage.Type.CSV_EXPORT
self.store_format = DataStorage.Format.CSV
self.extension = 'csv'
self.model_mapping = {
Device: (ExportToCSVDeviceSerializer, DeviceFilter),
Asset: (AssetCsvExportSerializer, None),
Incident: (IncidentsCsvExportSerializer, None),
}
def write_to_file(self, export_data, file_path):
with open(file_path, 'w') as file:
headers = self.model_mapping[self.current_export_model][0].Meta.fields
writer = csv.DictWriter(file, fieldnames=headers)
writer.writeheader()
writer.writerows(export_data)
def export(export_type, user_pk, model_name, query_parameters):
export_service_mapping = {
'json': ExportToJSON,
'csv': ExportToCSV
}
try:
export_service = export_service_mapping[export_type]
except KeyError:
raise ExportException(f'"{export_type}" export type not possible')
service = export_service(user_pk, model_name, query_parameters)
export_result = service.run_export()
return export_result