import csv import json import os import shutil from unittest import mock import datetime import pytest from django.conf import settings from correlation.models import Rule from devices.models.device import Device from incident.models import IncidentRecommendations from storage.export import export, ExportException, ExportToCSV, ExportToJSON from storage.models import DataStorage, get_storage_path TMP_DIR_EXPORT = '/tmp/export' @pytest.mark.django_db class TestExport: @pytest.fixture(autouse=True) def setup_tests(self, django_user_model): self.admin_user = django_user_model.objects.get(username='admin') os.makedirs(TMP_DIR_EXPORT, exist_ok=True) yield shutil.rmtree(TMP_DIR_EXPORT) @pytest.mark.unit def test_export_csv_task(self, api_client): test_csv_device1 = Device.objects.create(ip='2.2.2.2', port='2500', type='firewall', name='FIREWALL') test_csv_device2 = Device.objects.create(ip='3.3.3.3', port='9999', type='firewall', name='ENDPOINT') queryset = Device.objects.all() model_name = queryset.model._meta.model_name datastorage_id = export('csv', self.admin_user.pk, model_name, query_parameters={}) datastorage = DataStorage.objects.get(pk=datastorage_id) full_name = datastorage.get_full_path() assert os.path.exists(full_name) with open(full_name, 'r') as csv_file: reader = csv.DictReader(csv_file) parsed_file = [] for row in reader: parsed_file.append(row) assert len(parsed_file) == 2 assert parsed_file[0]['id'] == str(test_csv_device1.pk) assert parsed_file[0]['port'] == '2500' assert parsed_file[0]['name'] == 'FIREWALL' assert parsed_file[1]['id'] == str(test_csv_device2.pk) assert parsed_file[1]['port'] == '9999' assert parsed_file[1]['name'] == 'ENDPOINT' os.remove(datastorage.get_full_path()) @pytest.mark.unit def test_export_csv_task_with_parameters(self, api_client): test_csv_device1 = Device.objects.create(ip='2.2.2.2', port='2500', type='firewall', name='FIREWALL') test_csv_device2 = Device.objects.create(ip='3.3.3.3', port='9999', type='firewall', name='ENDPOINT') queryset = Device.objects.all() model_name = queryset.model._meta.model_name # export only one device - 'test_csv_device1' datastorage_id = export('csv', self.admin_user.pk, model_name, query_parameters={'name': 'FIREWALL'}) datastorage = DataStorage.objects.get(pk=datastorage_id) full_name = datastorage.get_full_path() assert os.path.exists(full_name) with open(full_name, 'r') as csv_file: reader = csv.DictReader(csv_file) parsed_file = [] for row in reader: parsed_file.append(row) assert len(parsed_file) == 1 assert parsed_file[0]['id'] == str(test_csv_device1.pk) assert parsed_file[0]['port'] == '2500' assert parsed_file[0]['name'] == 'FIREWALL' os.remove(datastorage.get_full_path()) @pytest.mark.django_db class TestExportService: @pytest.fixture(autouse=True) def setup_tests(self, django_user_model): self.test_csv_device1 = Device.objects.create(ip='2.2.2.2', port='2500', type='firewall', name='FIREWALL') self.test_csv_device2 = Device.objects.create(ip='3.3.3.3', port='9999', type='firewall', name='ENDPOINT') self.admin_user = django_user_model.objects.get(username='admin') os.makedirs(TMP_DIR_EXPORT, exist_ok=True) yield shutil.rmtree(TMP_DIR_EXPORT) @pytest.mark.unit def test_pass_invalid_export_type(self): with pytest.raises(ExportException) as exc: export('blah-blah', 1, '1', {}) assert str(exc.value) == '"blah-blah" export type not possible' @pytest.mark.unit def test_create_default_store(self): adapter = ExportToCSV(1, '1', {}) store = adapter.create_store(self.admin_user) assert store.user == self.admin_user assert store.format == 3 assert store.type == 2 assert store.size == 0 assert store.file == 'table' @pytest.mark.unit def test_get_file_path(self): adapter = ExportToCSV(1, '1', {}) store = DataStorage(type=DataStorage.Type.CSV_EXPORT, format=DataStorage.Format.CSV, user=self.admin_user, file='table', size=0) store.save() file_name = get_storage_path(store, 'export.csv') file_path = adapter.get_file_path(file_name) assert file_path == os.path.join(settings.MEDIA_ROOT, file_name) @pytest.mark.unit def test_get_export_data(self): adapter = ExportToCSV(self.admin_user.pk, 'device', {}) data = adapter.get_export_data(Device) assert len(data) == 2 device = data[0] assert device['id'] == self.test_csv_device1.pk assert device['type'] == self.test_csv_device1.type assert device['name'] == self.test_csv_device1.name assert device['description'] == self.test_csv_device1.description assert device['group'] == self.test_csv_device1.group @pytest.mark.unit @mock.patch('storage.export.MEDIA_ROOT', TMP_DIR_EXPORT) @mock.patch('storage.models.MEDIA_ROOT', TMP_DIR_EXPORT) def test_run_export(self): service = ExportToCSV(self.admin_user.pk, 'device', {}) data_storage_id = service.run_export() store = DataStorage.objects.get(id=data_storage_id) os.path.exists(store.get_full_path()) @pytest.mark.django_db class TestExportRule: @pytest.fixture(autouse=True) def setup_test(self, django_user_model): self.admin_user = django_user_model.objects.get(username='admin') recommendation_in_rule = IncidentRecommendations.objects.create(name='rec_name', description='rec_desc') recommendation_without_rule = IncidentRecommendations.objects.create(name='TEST', description='TEST') Rule.objects.create( name='Test', type=0, status=True, rev=1, sid=2, depth=datetime.timedelta(minutes=10), rule_json={ "type": "query_string", "field": "", "operands": "event_severity:>=6", }, actions_json=[{ "type": "incident", "title": "{{.source_ip}}", "comment": "", "category": "", "importance": "50", "assigned_to": "", "description": "{{.source_ip}}", "close_recommendations": [recommendation_in_rule.pk] }], ) @pytest.mark.unit @mock.patch('storage.export.MEDIA_ROOT', TMP_DIR_EXPORT) @mock.patch('storage.models.MEDIA_ROOT', TMP_DIR_EXPORT) def test_export_all_effects_recommendations(self): service = ExportToJSON(self.admin_user.pk, 'rule', {}) data_storage_id = service.run_export() store = DataStorage.objects.get(id=data_storage_id) with open(store.get_full_path(), 'r') as f: content = json.loads(f.read()) assert os.path.exists(store.get_full_path()) assert len(content['rules'][0]['actions_json'][0]['close_recommendations']) == 1 # one recommendation in rule assert len(content['close_recommendations']) == 2 # but exist two recommendations